diff --git a/src/Persistence/SqliteTests/LocalSqliteBackedTransportCompliance.cs b/src/Persistence/SqliteTests/LocalSqliteBackedTransportCompliance.cs index e4e1c69ea..3c6f5323d 100644 --- a/src/Persistence/SqliteTests/LocalSqliteBackedTransportCompliance.cs +++ b/src/Persistence/SqliteTests/LocalSqliteBackedTransportCompliance.cs @@ -33,4 +33,15 @@ public Task InitializeAsync() } [Collection("sqlite")] -public class LocalSqliteBackedTransportCompliance : TransportCompliance; +public class LocalSqliteBackedTransportCompliance : TransportCompliance +{ + // The inherited test verifies "from one node to another" delivery, which + // assumes a true multi-node broker transport (Rabbit, Azure SB, ...). The + // SQLite "transport" is a single-host local in-process queue with SQLite + // durability — there is no second node. The single-host send/receive path + // is already covered by other tests in this fixture (can_send_and_wait, + // can_request_reply, tags_the_envelope_with_the_source, etc.), so skipping + // this case loses no unique coverage. + [Fact(Skip = "Not meaningful for SQLite local transport: there is no second node.")] + public override Task can_send_from_one_node_to_another_by_destination() => Task.CompletedTask; +} diff --git a/src/Persistence/SqliteTests/Transport/multi_tenancy_with_multiple_files.cs b/src/Persistence/SqliteTests/Transport/multi_tenancy_with_multiple_files.cs index 442df0fa6..c6cb1722b 100644 --- a/src/Persistence/SqliteTests/Transport/multi_tenancy_with_multiple_files.cs +++ b/src/Persistence/SqliteTests/Transport/multi_tenancy_with_multiple_files.cs @@ -106,11 +106,10 @@ public async Task scheduled_messages_are_processed_in_tenant_files() await endpoint.SendAsync(red, new DeliveryOptions { TenantId = "red", ScheduleDelay = 2.Seconds() }); await endpoint.SendAsync(blue, new DeliveryOptions { TenantId = "blue", ScheduleDelay = 2.Seconds() }); - await Task.Delay(300.Milliseconds()); - - _tracker.Received.Any(x => x.Id == red.Id).ShouldBeFalse(); - _tracker.Received.Any(x => x.Id == blue.Id).ShouldBeFalse(); - + // Intentionally no "shouldn't be delivered yet" check here. ScheduleDelay + // promises "no earlier than T+2s", not "exactly at T+2s" — asserting the + // negative was racy and coupled the test to polling cadence. The contract + // we care about is that the messages eventually arrive at the right tenant. var allReceived = await Poll(30.Seconds(), () => _tracker.Received.Any(x => x.Id == red.Id) && _tracker.Received.Any(x => x.Id == blue.Id)); diff --git a/src/Persistence/SqliteTests/Transport/sqlite_advisory_lock.cs b/src/Persistence/SqliteTests/Transport/sqlite_advisory_lock.cs new file mode 100644 index 000000000..73c026ffc --- /dev/null +++ b/src/Persistence/SqliteTests/Transport/sqlite_advisory_lock.cs @@ -0,0 +1,160 @@ +using Microsoft.Data.Sqlite; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging.Abstractions; +using Shouldly; +using Wolverine; +using Wolverine.Persistence.Durability; +using Wolverine.Sqlite; + +namespace SqliteTests.Transport; + +[Collection("sqlite")] +public class sqlite_advisory_lock : SqliteContext, IAsyncLifetime +{ + private SqliteTestDatabase _db = null!; + + public Task InitializeAsync() + { + _db = Servers.CreateDatabase("sqlite_advisory_lock"); + return Task.CompletedTask; + } + + public Task DisposeAsync() + { + _db.Dispose(); + return Task.CompletedTask; + } + + [Fact] + public async Task try_attain_is_idempotent() + { + // Regression test for the bug where calling TryAttainLockAsync twice for + // the same lockId on the same instance returned false the second time + // (because INSERT OR IGNORE no-ops on the existing row). + using var host = await CreateHostAsync(_db.ConnectionString); + var store = (SqliteMessageStore)host.Services.GetRequiredService(); + + (await store.AdvisoryLock.TryAttainLockAsync(4242, default)).ShouldBeTrue(); + (await store.AdvisoryLock.TryAttainLockAsync(4242, default)).ShouldBeTrue(); + store.AdvisoryLock.HasLock(4242).ShouldBeTrue(); + + await store.AdvisoryLock.ReleaseLockAsync(4242); + store.AdvisoryLock.HasLock(4242).ShouldBeFalse(); + } + + [Fact] + public async Task release_actually_deletes_the_row() + { + // The base ReleaseLockAsync was a no-op for SQLite (its doc comment + // assumed session-scoped engine locks). The override now delegates to + // SqliteAdvisoryLock which deletes the row. + using var host = await CreateHostAsync(_db.ConnectionString); + var store = (SqliteMessageStore)host.Services.GetRequiredService(); + + await store.AdvisoryLock.TryAttainLockAsync(7777, default); + await store.AdvisoryLock.ReleaseLockAsync(7777); + + await using var conn = new SqliteConnection(_db.ConnectionString); + await conn.OpenAsync(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "select count(*) from wolverine_locks where lock_id = 7777"; + ((long)(await cmd.ExecuteScalarAsync())!).ShouldBe(0); + } + + [Fact] + public async Task stale_row_is_reaped_on_attempt() + { + // A holder that died without releasing leaves a row no peer would ever + // clean up. The TTL sweep on TryAttainLockAsync deletes rows older than + // the configured TTL before the INSERT OR IGNORE. + using var host = await CreateHostAsync(_db.ConnectionString); // creates wolverine_locks + + await using (var seed = new SqliteConnection(_db.ConnectionString)) + { + await seed.OpenAsync(); + await using var cmd = seed.CreateCommand(); + cmd.CommandText = "INSERT INTO wolverine_locks (lock_id, acquired_at) VALUES ($id, $when)"; + cmd.Parameters.AddWithValue("$id", 9001); + // Pre-date by 10s; TTL is 1s in this test + cmd.Parameters.AddWithValue("$when", + DateTime.UtcNow.AddSeconds(-10).ToString("yyyy-MM-dd HH:mm:ss")); + await cmd.ExecuteNonQueryAsync(); + } + + var dataSource = new Weasel.Sqlite.SqliteDataSource(_db.ConnectionString); + await using var lockA = new SqliteAdvisoryLock(dataSource, NullLogger.Instance, + "test", TimeSpan.FromSeconds(1)); + + (await lockA.TryAttainLockAsync(9001, default)).ShouldBeTrue(); + } + + [Fact] + public async Task live_holder_is_not_stolen_after_ttl_thanks_to_heartbeat() + { + // Holder A keeps re-attempting (as the production polling loops do). + // The heartbeat advances acquired_at on every re-attempt, so even + // after the TTL window has elapsed several times over, holder B + // cannot acquire the lock. + using var host = await CreateHostAsync(_db.ConnectionString); + + var dataSource = new Weasel.Sqlite.SqliteDataSource(_db.ConnectionString); + await using var holderA = new SqliteAdvisoryLock(dataSource, NullLogger.Instance, + "A", TimeSpan.FromSeconds(1)); + await using var holderB = new SqliteAdvisoryLock(dataSource, NullLogger.Instance, + "B", TimeSpan.FromSeconds(1)); + + (await holderA.TryAttainLockAsync(9100, default)).ShouldBeTrue(); + + // Beat the heartbeat across more than 2× TTL while B repeatedly tries + for (var i = 0; i < 6; i++) + { + await Task.Delay(500); + (await holderA.TryAttainLockAsync(9100, default)).ShouldBeTrue(); // heartbeat tick + (await holderB.TryAttainLockAsync(9100, default)).ShouldBeFalse(); // never steals + } + } + + [Fact] + public async Task heartbeat_advances_acquired_at_on_reattempt() + { + using var host = await CreateHostAsync(_db.ConnectionString); + var store = (SqliteMessageStore)host.Services.GetRequiredService(); + + (await store.AdvisoryLock.TryAttainLockAsync(9200, default)).ShouldBeTrue(); + var firstAcquired = await readAcquiredAtAsync(_db.ConnectionString, 9200); + + await Task.Delay(TimeSpan.FromSeconds(1.2)); + + (await store.AdvisoryLock.TryAttainLockAsync(9200, default)).ShouldBeTrue(); + var secondAcquired = await readAcquiredAtAsync(_db.ConnectionString, 9200); + + secondAcquired.ShouldBeGreaterThan(firstAcquired); + + await store.AdvisoryLock.ReleaseLockAsync(9200); + } + + private static async Task readAcquiredAtAsync(string connectionString, int lockId) + { + await using var conn = new SqliteConnection(connectionString); + await conn.OpenAsync(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "SELECT acquired_at FROM wolverine_locks WHERE lock_id = $id"; + cmd.Parameters.AddWithValue("$id", lockId); + var raw = (string)(await cmd.ExecuteScalarAsync())!; + return DateTime.SpecifyKind( + DateTime.ParseExact(raw, "yyyy-MM-dd HH:mm:ss", System.Globalization.CultureInfo.InvariantCulture), + DateTimeKind.Utc); + } + + private static async Task CreateHostAsync(string connectionString) + { + return await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.PersistMessagesWithSqlite(connectionString); + opts.Discovery.DisableConventionalDiscovery(); + }) + .StartAsync(); + } +} diff --git a/src/Persistence/SqliteTests/Transport/sqlite_migration_lock.cs b/src/Persistence/SqliteTests/Transport/sqlite_migration_lock.cs new file mode 100644 index 000000000..e6d570329 --- /dev/null +++ b/src/Persistence/SqliteTests/Transport/sqlite_migration_lock.cs @@ -0,0 +1,83 @@ +using Microsoft.Data.Sqlite; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine; +using Wolverine.Persistence.Durability; +using Wolverine.Sqlite; + +namespace SqliteTests.Transport; + +[Collection("sqlite")] +public class sqlite_migration_lock : SqliteContext, IAsyncLifetime +{ + private SqliteTestDatabase _db = null!; + + public Task InitializeAsync() + { + _db = Servers.CreateDatabase("sqlite_migration_lock"); + return Task.CompletedTask; + } + + public Task DisposeAsync() + { + _db.Dispose(); + return Task.CompletedTask; + } + + [Fact] + public async Task migrate_async_does_not_leave_a_row_in_wolverine_locks() + { + // The whole point of switching the migration lock to BEGIN EXCLUSIVE: it + // can't deposit rows in wolverine_locks (because the table is created by + // the same migration). After startup the table exists but holds no rows + // for the migration lockId. + using var host = await CreateHostAsync(_db.ConnectionString); + + var store = (SqliteMessageStore)host.Services.GetRequiredService(); + var migrationLockId = store.Settings.MigrationLockId; + + await using var conn = new SqliteConnection(_db.ConnectionString); + await conn.OpenAsync(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "select count(*) from wolverine_locks where lock_id = $id"; + cmd.Parameters.AddWithValue("$id", migrationLockId); + var count = (long)(await cmd.ExecuteScalarAsync())!; + count.ShouldBe(0); + } + + [Fact] + public async Task two_hosts_can_start_concurrently_against_the_same_file() + { + // Without the BEGIN EXCLUSIVE migration lock, the second startup hits + // the chicken-and-egg (wolverine_locks doesn't exist yet) and burns + // ~5.5s of failed lock retries. With BEGIN EXCLUSIVE, one waits, the + // other proceeds, and both reach Started without errors. + var startup = Task.WhenAll( + CreateHostAsync(_db.ConnectionString), + CreateHostAsync(_db.ConnectionString)); + + var hosts = await startup.WaitAsync(TimeSpan.FromSeconds(15)); + try + { + hosts.ShouldNotBeNull(); + hosts.Length.ShouldBe(2); + } + finally + { + foreach (var h in hosts) await h.StopAsync(); + foreach (var h in hosts) h.Dispose(); + } + } + + private static async Task CreateHostAsync(string connectionString) + { + return await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.PersistMessagesWithSqlite(connectionString); + opts.Discovery.DisableConventionalDiscovery(); + }) + .StartAsync(); + } +} diff --git a/src/Persistence/Wolverine.RDBMS/MessageDatabase.Admin.cs b/src/Persistence/Wolverine.RDBMS/MessageDatabase.Admin.cs index aed123576..f5533db6e 100644 --- a/src/Persistence/Wolverine.RDBMS/MessageDatabase.Admin.cs +++ b/src/Persistence/Wolverine.RDBMS/MessageDatabase.Admin.cs @@ -74,7 +74,7 @@ public async Task MigrateAsync() { try { - await ReleaseLockAsync(lockId, typedConn, _cancellation); + await releaseMigrationLockAsync(lockId, typedConn, _cancellation); } catch { @@ -112,8 +112,13 @@ public async Task MigrateAsync() /// false if not acquired after the retry budget — in which case another process /// is presumably finishing the migration; we proceed and let our own SchemaMigration /// detect "no changes" as a no-op. + /// + /// Providers whose advisory-lock primitive depends on schema that is itself part + /// of the migration (e.g., SQLite's row-based lock on wolverine_locks) + /// should override this with a primitive that does not depend on the schema — + /// for example, SQLite's BEGIN EXCLUSIVE. /// - private async Task acquireMigrationLockAsync(int lockId, T conn, CancellationToken token) + protected virtual async Task acquireMigrationLockAsync(int lockId, T conn, CancellationToken token) { const int maxAttempts = 10; for (var attempt = 0; attempt < maxAttempts; attempt++) @@ -130,6 +135,17 @@ private async Task acquireMigrationLockAsync(int lockId, T conn, Cancellat return false; } + /// + /// Release the lock previously acquired by . + /// Default implementation delegates to the polling-lock release path; providers + /// that override with a different primitive + /// (e.g., a transaction) must override this too. + /// + protected virtual Task releaseMigrationLockAsync(int lockId, T conn, CancellationToken token) + { + return ReleaseLockAsync(lockId, conn, token); + } + public async Task> AllIncomingAsync() { return await CreateCommand( diff --git a/src/Persistence/Wolverine.Sqlite/SqliteAdvisoryLock.cs b/src/Persistence/Wolverine.Sqlite/SqliteAdvisoryLock.cs index 66f1259d2..e0ba5c03d 100644 --- a/src/Persistence/Wolverine.Sqlite/SqliteAdvisoryLock.cs +++ b/src/Persistence/Wolverine.Sqlite/SqliteAdvisoryLock.cs @@ -9,17 +9,37 @@ namespace Wolverine.Sqlite; internal class SqliteAdvisoryLock : IAdvisoryLock { + // wolverine_locks rows are not bound to the writing connection (unlike the + // BEGIN EXCLUSIVE migration lock), so a hard-killed holder leaves a row + // that no peer would ever reap. Pair a TTL sweep on each attempt with a + // heartbeat refresh of acquired_at on each re-attempt by the live holder: + // - Live holders re-attain on every poll tick (HealthCheckPollingTime, + // ScheduledJobPollingTime), which advances acquired_at well inside TTL. + // - A dead holder stops refreshing; peers reap the row once it ages past + // TTL on a subsequent attempt. + // TTL must be > 2× the slowest poll cadence using this lock. Default 2m + // accommodates the 10s heartbeat default with healthy headroom for GC + // pauses, slow recovery cycles, or temporary I/O stalls. + internal static readonly TimeSpan DefaultLockTtl = TimeSpan.FromMinutes(2); + private readonly DbDataSource _dataSource; private readonly ILogger _logger; private readonly string _databaseName; + private readonly TimeSpan _lockTtl; private readonly List _locks = new(); private DbConnection? _conn; public SqliteAdvisoryLock(DbDataSource dataSource, ILogger logger, string databaseName) + : this(dataSource, logger, databaseName, DefaultLockTtl) + { + } + + internal SqliteAdvisoryLock(DbDataSource dataSource, ILogger logger, string databaseName, TimeSpan lockTtl) { _dataSource = dataSource; _logger = logger; _databaseName = databaseName; + _lockTtl = lockTtl; } public bool HasLock(int lockId) @@ -62,6 +82,15 @@ public bool HasLock(int lockId) public async Task TryAttainLockAsync(int lockId, CancellationToken token) { + // Idempotent: if we already hold this lock and the connection is healthy, + // re-attempting must report success. The previous implementation would run + // INSERT OR IGNORE again, get result==0, and falsely return false. + if (HasLock(lockId)) + { + await refreshHeartbeatAsync(lockId, token).ConfigureAwait(false); + return true; + } + if (_conn == null) { _conn = await _dataSource.OpenConnectionAsync(token).ConfigureAwait(false); @@ -87,8 +116,22 @@ public async Task TryAttainLockAsync(int lockId, CancellationToken token) try { - // SQLite doesn't have advisory locks like PostgreSQL - // We'll use a simple table-based lock approach + // SQLite doesn't have advisory locks like PostgreSQL. + // We use a row in wolverine_locks; the table is created by the message + // store's normal schema migration. The migration lock itself uses + // BEGIN EXCLUSIVE (see SqliteMessageStore.acquireMigrationLockAsync) so + // there is no chicken-and-egg between this table and migration. + // + // Stale-row sweep: if a previous holder died without releasing, its + // row would block all peers forever. Reap rows whose acquired_at is + // older than TTL before attempting INSERT OR IGNORE. Live holders + // refresh acquired_at on every re-attempt, so they're never reaped. + await _conn.CreateCommand( + "DELETE FROM wolverine_locks WHERE lock_id = @lockId AND acquired_at < @cutoff") + .With("lockId", lockId) + .With("cutoff", DateTime.UtcNow.Subtract(_lockTtl).ToString("yyyy-MM-dd HH:mm:ss")) + .ExecuteNonQueryAsync(token); + var result = await _conn.CreateCommand("INSERT OR IGNORE INTO wolverine_locks (lock_id, acquired_at) VALUES (@lockId, datetime('now'))") .With("lockId", lockId) .ExecuteNonQueryAsync(token); @@ -108,6 +151,25 @@ public async Task TryAttainLockAsync(int lockId, CancellationToken token) } } + private async Task refreshHeartbeatAsync(int lockId, CancellationToken token) + { + if (_conn == null) return; + + try + { + await _conn.CreateCommand( + "UPDATE wolverine_locks SET acquired_at = datetime('now') WHERE lock_id = @lockId") + .With("lockId", lockId) + .ExecuteNonQueryAsync(token); + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "Failed to refresh advisory-lock heartbeat for {LockId} on database {Database}; lock may be reaped if the failure persists past TTL", + lockId, _databaseName); + } + } + public async Task ReleaseLockAsync(int lockId) { if (!_locks.Contains(lockId)) @@ -155,8 +217,10 @@ public async ValueTask DisposeAsync() await ReleaseLockAsync(lockId); } - await _conn.CloseAsync().ConfigureAwait(false); - await _conn.DisposeAsync().ConfigureAwait(false); + // ReleaseLockAsync nulls _conn once the last lock is released. The + // finally block below handles disposal in both paths (released-all + // vs released-some); calling Close/Dispose here as well caused a + // NullReferenceException on the all-released path. } catch (Exception e) { diff --git a/src/Persistence/Wolverine.Sqlite/SqliteMessageStore.cs b/src/Persistence/Wolverine.Sqlite/SqliteMessageStore.cs index 4502f62a8..8a80ef95e 100644 --- a/src/Persistence/Wolverine.Sqlite/SqliteMessageStore.cs +++ b/src/Persistence/Wolverine.Sqlite/SqliteMessageStore.cs @@ -161,19 +161,59 @@ protected override Task deleteMany(DbTransaction tx, Guid[] ids, DbObjectName ta .ExecuteNonQueryAsync(); } - protected override async Task TryAttainLockAsync(int lockId, SqliteConnection connection, CancellationToken token) + // Polling lock: delegate to the AdvisoryLock instance. The previous override here + // ran INSERT OR IGNORE and returned true unconditionally, which falsely reported + // "lock acquired" whenever another row already held the slot. SqliteAdvisoryLock + // checks the affected row count. + protected override Task TryAttainLockAsync(int lockId, SqliteConnection connection, CancellationToken token) + { + return AdvisoryLock.TryAttainLockAsync(lockId, token); + } + + protected override Task ReleaseLockAsync(int lockId, SqliteConnection connection, CancellationToken token) + { + return AdvisoryLock.ReleaseLockAsync(lockId); + } + + // Migration lock: SQLite's row-based wolverine_locks scheme can't be used here, + // because the table itself is created by the migration the lock is supposed to + // serialize. Use BEGIN EXCLUSIVE TRANSACTION instead — it doesn't depend on any + // schema and is automatically released when the connection closes (so process + // crashes during migration don't leave stale locks). + protected override async Task acquireMigrationLockAsync(int lockId, SqliteConnection conn, CancellationToken token) + { + const int maxAttempts = 10; + for (var attempt = 0; attempt < maxAttempts; attempt++) + { + try + { + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "BEGIN EXCLUSIVE TRANSACTION"; + await cmd.ExecuteNonQueryAsync(token); + return true; + } + catch (SqliteException ex) when (ex.SqliteErrorCode == 5 /* SQLITE_BUSY */ + || ex.SqliteErrorCode == 6 /* SQLITE_LOCKED */) + { + await Task.Delay(TimeSpan.FromMilliseconds(100 * (attempt + 1)), token); + } + } + + return false; + } + + protected override async Task releaseMigrationLockAsync(int lockId, SqliteConnection conn, CancellationToken token) { - // SQLite uses BEGIN EXCLUSIVE TRANSACTION for locking - // We'll use a simple advisory lock table approach try { - await connection.CreateCommand($"INSERT OR IGNORE INTO wolverine_locks (lock_id, acquired_at) VALUES ({lockId}, datetime('now'))") - .ExecuteNonQueryAsync(token); - return true; + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "COMMIT"; + await cmd.ExecuteNonQueryAsync(token); } catch { - return false; + // Best-effort. Closing the connection rolls back the transaction + // without ill effect — the migration itself succeeded. } } diff --git a/src/Testing/Wolverine.ComplianceTests/Compliance/TransportCompliance.cs b/src/Testing/Wolverine.ComplianceTests/Compliance/TransportCompliance.cs index e1b1e5165..f02aea963 100644 --- a/src/Testing/Wolverine.ComplianceTests/Compliance/TransportCompliance.cs +++ b/src/Testing/Wolverine.ComplianceTests/Compliance/TransportCompliance.cs @@ -238,7 +238,7 @@ public virtual async Task can_apply_requeue_mechanics() } [Fact] - public async Task can_send_from_one_node_to_another_by_destination() + public virtual async Task can_send_from_one_node_to_another_by_destination() { var session = await theSender.TrackActivity(Fixture.DefaultTimeout) .AlsoTrack(theReceiver)