diff --git a/src/Persistence/SqliteTests/SqliteTests.csproj b/src/Persistence/SqliteTests/SqliteTests.csproj index e58abf0ca..e3f8e6897 100644 --- a/src/Persistence/SqliteTests/SqliteTests.csproj +++ b/src/Persistence/SqliteTests/SqliteTests.csproj @@ -5,10 +5,10 @@ - + - + all runtime; build; native; contentfiles; analyzers; buildtransitive @@ -18,13 +18,13 @@ - - + + - + diff --git a/src/Persistence/SqliteTests/Transport/advisory_lock.cs b/src/Persistence/SqliteTests/Transport/advisory_lock.cs new file mode 100644 index 000000000..7be06bb05 --- /dev/null +++ b/src/Persistence/SqliteTests/Transport/advisory_lock.cs @@ -0,0 +1,172 @@ +using Microsoft.Extensions.DependencyInjection; +using System.Reflection; +using Weasel.Core; +using Wolverine.Persistence.Durability; +using Wolverine.RDBMS; +using Wolverine.Sqlite; + +namespace SqliteTests.Transport; + +[Collection("sqlite")] +public class advisory_lock : SqliteContext, IAsyncLifetime +{ + private SqliteTestDatabase _main = null!; + private SqliteTestDatabase _green = null!; + private IHost _host1 = null!; + private IHost _host2 = null!; + + public async Task InitializeAsync() + { + _main = Servers.CreateDatabase("sqlite_store_lock_main"); + _green = Servers.CreateDatabase("sqlite_store_lock_green"); + _host1 = CreateHost(); + _host2 = CreateHost(); + await Task.WhenAll(_host1.StartAsync(), _host2.StartAsync()); + } + + public async Task DisposeAsync() + { + _host1.Dispose(); + _host2.Dispose(); + _main.Dispose(); + _green.Dispose(); + } + + [Theory] + [InlineData(null)] + [InlineData("green")] + public async Task should_not_attain_lock_already_attained_by_another_host(string? tenantId) + { + var advisoryLock1 = await GetAdvisoryLockAsync(_host1, tenantId); + (await advisoryLock1.TryAttainLockAsync(69, default)).ShouldBeTrue(); + advisoryLock1.HasLock(69).ShouldBeTrue(); + + var advisoryLock2 = await GetAdvisoryLockAsync(_host2, tenantId); + (await advisoryLock2.TryAttainLockAsync(69, default)).ShouldBeFalse(); + advisoryLock2.HasLock(69).ShouldBeFalse(); + } + + [Theory] + [InlineData(null)] + [InlineData("green")] + public async Task should_attain_lock_released_by_another_host(string? tenantId) + { + var advisoryLock1 = await GetAdvisoryLockAsync(_host1, tenantId); + (await advisoryLock1.TryAttainLockAsync(69, default)).ShouldBeTrue(); + advisoryLock1.HasLock(69).ShouldBeTrue(); + await advisoryLock1.ReleaseLockAsync(69); + advisoryLock1.HasLock(69).ShouldBeFalse(); + + var advisoryLock2 = await GetAdvisoryLockAsync(_host2, tenantId); + + (await advisoryLock2.TryAttainLockAsync(69, default)).ShouldBeTrue(); + advisoryLock2.HasLock(69).ShouldBeTrue(); + } + + [Theory] + [InlineData(null)] + [InlineData("green")] + public async Task should_attain_lock_not_attained_by_another_host(string? tenantId) + { + var advisoryLock1 = await GetAdvisoryLockAsync(_host1, tenantId); + (await advisoryLock1.TryAttainLockAsync(69, default)).ShouldBeTrue(); + advisoryLock1.HasLock(69).ShouldBeTrue(); + + var advisoryLock2 = await GetAdvisoryLockAsync(_host2, tenantId); + (await advisoryLock2.TryAttainLockAsync(70, default)).ShouldBeTrue(); + advisoryLock2.HasLock(70).ShouldBeTrue(); + } + + [Fact] + public async Task should_attain_lock_per_tenant() + { + var advisoryLock1 = await GetAdvisoryLockAsync(_host1); + (await advisoryLock1.TryAttainLockAsync(69, default)).ShouldBeTrue(); + advisoryLock1.HasLock(69).ShouldBeTrue(); + + var advisoryLock2 = await GetAdvisoryLockAsync(_host2, "green"); + (await advisoryLock2.TryAttainLockAsync(69, default)).ShouldBeTrue(); + advisoryLock2.HasLock(69).ShouldBeTrue(); + } + + [Fact] + public async Task should_attain_lock_twice() + { + var advisoryLock1 = await GetAdvisoryLockAsync(_host1); + (await advisoryLock1.TryAttainLockAsync(69, default)).ShouldBeTrue(); + advisoryLock1.HasLock(69).ShouldBeTrue(); + + (await advisoryLock1.TryAttainLockAsync(69, default)).ShouldBeTrue(); + advisoryLock1.HasLock(69).ShouldBeTrue(); + } + + [Fact] + public async Task should_attain_lock_when_previous_owner_is_disposed_without_releasing() + { + var advisoryLock1 = await GetAdvisoryLockAsync(_host1); + (await advisoryLock1.TryAttainLockAsync(69, default)).ShouldBeTrue(); + + _host1.Dispose(); + + var advisoryLock2 = await GetAdvisoryLockAsync(_host2); + (await advisoryLock2.TryAttainLockAsync(69, default)).ShouldBeTrue(); + advisoryLock2.HasLock(69).ShouldBeTrue(); + } + + [Fact] + public async Task should_not_keep_migration_lock_after_applying_migration_on_start() + { + var dbSettings1 = _host1.Services.GetRequiredService(); + var dbSettings2 = _host2.Services.GetRequiredService(); + dbSettings1.MigrationLockId.ShouldBe(dbSettings2.MigrationLockId); + var migrationLockId = dbSettings1.MigrationLockId; + + var advisoryLock1 = await GetAdvisoryLockAsync(_host1); + var advisoryLock2 = await GetAdvisoryLockAsync(_host2); + + advisoryLock1.HasLock(migrationLockId).ShouldBeFalse(); + advisoryLock2.HasLock(migrationLockId).ShouldBeFalse(); + } + + [Fact] // This test is for demonstration - not expected behavior + public async Task should_not_attain_lock_when_previous_owner_crashes_without_releasing() + { + var store = await GetStoreAsync(_host1); + await store.AdvisoryLock.TryAttainLockAsync(69, default); + var prop = typeof(SqliteMessageStore).GetProperty( + nameof(SqliteMessageStore.AdvisoryLock), + BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic); + prop?.SetValue(store, null); + _host1.Dispose(); // advisory lock can't be released because of the simulated crash + + var store2 = await GetStoreAsync(_host2); + (await store2.AdvisoryLock.TryAttainLockAsync(69, default)).ShouldBeFalse(); // stale lock hurts + store2.AdvisoryLock.HasLock(69).ShouldBeFalse(); + } + + private static async Task GetStoreAsync(IHost host, string? tenantId = null) + { + var multitenantedStore = (MultiTenantedMessageStore)host.Services.GetRequiredService(); + var store = await multitenantedStore.GetDatabaseAsync(tenantId); + return (SqliteMessageStore)store; + } + + private static async Task GetAdvisoryLockAsync(IHost host, string? tenantId = null) + { + var store = await GetStoreAsync(host, tenantId); + return store.AdvisoryLock; + } + + private IHost CreateHost() + { + return Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.PersistMessagesWithSqlite(_main.ConnectionString) + .RegisterStaticTenants(tenants => tenants.Register("green", _green.ConnectionString)); + + opts.Discovery.DisableConventionalDiscovery(); + }) + .Build(); + } +} diff --git a/src/Persistence/SqliteTests/Transport/multi_tenancy_with_multiple_files.cs b/src/Persistence/SqliteTests/Transport/multi_tenancy_with_multiple_files.cs index 6d931696b..277ba9d11 100644 --- a/src/Persistence/SqliteTests/Transport/multi_tenancy_with_multiple_files.cs +++ b/src/Persistence/SqliteTests/Transport/multi_tenancy_with_multiple_files.cs @@ -1,10 +1,6 @@ using System.Collections.Concurrent; using JasperFx.Core; using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using Shouldly; -using Wolverine; -using Wolverine.Persistence; using Wolverine.Sqlite; namespace SqliteTests.Transport; diff --git a/src/Persistence/Wolverine.RDBMS/MessageDatabase.Polling.cs b/src/Persistence/Wolverine.RDBMS/MessageDatabase.Polling.cs index 0e2325340..4ffae2260 100644 --- a/src/Persistence/Wolverine.RDBMS/MessageDatabase.Polling.cs +++ b/src/Persistence/Wolverine.RDBMS/MessageDatabase.Polling.cs @@ -65,7 +65,7 @@ await deleteMany(tx, envelopes.Select(x => x.Id).ToArray(), externalTable.TableN /// /// Releases a previously-acquired session-scoped advisory lock. Default - /// implementation is a no-op for providers (e.g., SQLite) where the lock + /// implementation is a no-op for providers where the lock /// is automatically released when the connection closes. /// protected virtual Task ReleaseLockAsync(int lockId, T connection, CancellationToken token) diff --git a/src/Persistence/Wolverine.Sqlite/SqliteAdvisoryLock.cs b/src/Persistence/Wolverine.Sqlite/SqliteAdvisoryLock.cs index 66f1259d2..48704296b 100644 --- a/src/Persistence/Wolverine.Sqlite/SqliteAdvisoryLock.cs +++ b/src/Persistence/Wolverine.Sqlite/SqliteAdvisoryLock.cs @@ -1,9 +1,9 @@ +using JasperFx; +using Microsoft.Extensions.Logging; using System.Data; using System.Data.Common; -using Microsoft.Extensions.Logging; using Weasel.Core; -using Wolverine.RDBMS; -using Wolverine.Runtime; +using Weasel.Sqlite; namespace Wolverine.Sqlite; @@ -11,8 +11,9 @@ internal class SqliteAdvisoryLock : IAdvisoryLock { private readonly DbDataSource _dataSource; private readonly ILogger _logger; - private readonly string _databaseName; - private readonly List _locks = new(); + private readonly string _databaseName; + private readonly HashSet _locks = []; + private bool _hasLocksTable; private DbConnection? _conn; public SqliteAdvisoryLock(DbDataSource dataSource, ILogger logger, string databaseName) @@ -89,17 +90,17 @@ public async Task TryAttainLockAsync(int lockId, CancellationToken token) { // SQLite doesn't have advisory locks like PostgreSQL // We'll use a simple table-based lock approach - var result = await _conn.CreateCommand("INSERT OR IGNORE INTO wolverine_locks (lock_id, acquired_at) VALUES (@lockId, datetime('now'))") + await CreateLocksTableIfMissing(_conn, token); + var result = await _conn.CreateCommand( + "INSERT OR IGNORE INTO wolverine_locks (lock_id, acquired_at) " + + "VALUES (@lockId, datetime('now'))") .With("lockId", lockId) .ExecuteNonQueryAsync(token); if (result > 0) - { _locks.Add(lockId); - return true; - } - return false; + return HasLock(lockId); } catch (Exception ex) { @@ -108,6 +109,26 @@ public async Task TryAttainLockAsync(int lockId, CancellationToken token) } } + private async Task CreateLocksTableIfMissing(DbConnection connection, CancellationToken token) + { + if (_hasLocksTable) + return; + + var table = new Weasel.Sqlite.Tables.Table(new SqliteObjectName("wolverine_locks")); + table.AddColumn("lock_id", "INTEGER").AsPrimaryKey(); + table.AddColumn("acquired_at", "TEXT").NotNull(); + table.AddColumn("owner_id", "INTEGER"); + + var migration = await SchemaMigration.DetermineAsync(connection, default(CancellationToken), table); + if (migration.Difference != SchemaPatchDifference.None) + { + await new SqliteMigrator() + .ApplyAllAsync(connection, migration, AutoCreate.CreateOrUpdate, ct: token); + } + + _hasLocksTable = true; + } + public async Task ReleaseLockAsync(int lockId) { if (!_locks.Contains(lockId)) @@ -128,7 +149,7 @@ await _conn.CreateCommand("DELETE FROM wolverine_locks WHERE lock_id = @lockId") .ExecuteNonQueryAsync(); _locks.Remove(lockId); - if (!_locks.Any()) + if (_locks.Count == 0) { await _conn.CloseAsync().ConfigureAwait(false); await _conn.DisposeAsync().ConfigureAwait(false); @@ -151,12 +172,7 @@ public async ValueTask DisposeAsync() try { foreach (var lockId in _locks.ToList()) - { await ReleaseLockAsync(lockId); - } - - await _conn.CloseAsync().ConfigureAwait(false); - await _conn.DisposeAsync().ConfigureAwait(false); } catch (Exception e) { @@ -166,9 +182,7 @@ public async ValueTask DisposeAsync() finally { if (_conn != null) - { await _conn.DisposeAsync().ConfigureAwait(false); - } } } } diff --git a/src/Persistence/Wolverine.Sqlite/SqliteMessageStore.cs b/src/Persistence/Wolverine.Sqlite/SqliteMessageStore.cs index bd2dfcfb0..6c65074f7 100644 --- a/src/Persistence/Wolverine.Sqlite/SqliteMessageStore.cs +++ b/src/Persistence/Wolverine.Sqlite/SqliteMessageStore.cs @@ -7,7 +7,6 @@ using Microsoft.Data.Sqlite; using Microsoft.Extensions.Logging; using Weasel.Core; -using Weasel.Core.Migrations; using Weasel.Sqlite; using Wolverine.Logging; using Wolverine.Persistence.Durability; @@ -18,7 +17,6 @@ using Wolverine.Runtime.Agents; using Wolverine.Sqlite.Schema; using Wolverine.Sqlite.Sagas; -using Wolverine.Sqlite.Util; using Wolverine.Transports; using DbCommandBuilder = Weasel.Core.DbCommandBuilder; @@ -152,20 +150,14 @@ protected override Task deleteMany(DbTransaction tx, Guid[] ids, DbObjectName ta .ExecuteNonQueryAsync(); } - protected override async Task TryAttainLockAsync(int lockId, SqliteConnection connection, CancellationToken token) + protected override Task TryAttainLockAsync(int lockId, SqliteConnection _, CancellationToken token) { - // SQLite uses BEGIN EXCLUSIVE TRANSACTION for locking - // We'll use a simple advisory lock table approach - try - { - await connection.CreateCommand($"INSERT OR IGNORE INTO wolverine_locks (lock_id, acquired_at) VALUES ({lockId}, datetime('now'))") - .ExecuteNonQueryAsync(token); - return true; - } - catch - { - return false; - } + return AdvisoryLock.TryAttainLockAsync(lockId, token); + } + + protected override Task ReleaseLockAsync(int lockId, SqliteConnection _, CancellationToken token) + { + return AdvisoryLock.ReleaseLockAsync(lockId); } protected override DbCommand buildFetchSql(SqliteConnection conn, DbObjectName tableName, string[] columnNames, int maxRecords) @@ -462,12 +454,6 @@ public override IEnumerable AllObjects() restrictionTable.AddColumn("type", "TEXT").NotNull(); restrictionTable.AddColumn("node", "INTEGER").NotNull().DefaultValue(0); yield return restrictionTable; - - // Advisory lock table for SQLite - var lockTable = new Weasel.Sqlite.Tables.Table(new SqliteObjectName("wolverine_locks")); - lockTable.AddColumn("lock_id", "INTEGER").AsPrimaryKey(); - lockTable.AddColumn("acquired_at", "TEXT").NotNull(); - yield return lockTable; } foreach (var table in _otherTables)