Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,15 @@ public Task InitializeAsync()
}

[Collection("sqlite")]
public class LocalSqliteBackedTransportCompliance : TransportCompliance<LocalSqliteBackedFixture>;
public class LocalSqliteBackedTransportCompliance : TransportCompliance<LocalSqliteBackedFixture>
{
// The inherited test verifies "from one node to another" delivery, which
// assumes a true multi-node broker transport (Rabbit, Azure SB, ...). The
// SQLite "transport" is a single-host local in-process queue with SQLite
// durability — there is no second node. The single-host send/receive path
// is already covered by other tests in this fixture (can_send_and_wait,
// can_request_reply, tags_the_envelope_with_the_source, etc.), so skipping
// this case loses no unique coverage.
[Fact(Skip = "Not meaningful for SQLite local transport: there is no second node.")]
public override Task can_send_from_one_node_to_another_by_destination() => Task.CompletedTask;
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,10 @@ public async Task scheduled_messages_are_processed_in_tenant_files()
await endpoint.SendAsync(red, new DeliveryOptions { TenantId = "red", ScheduleDelay = 2.Seconds() });
await endpoint.SendAsync(blue, new DeliveryOptions { TenantId = "blue", ScheduleDelay = 2.Seconds() });

await Task.Delay(300.Milliseconds());

_tracker.Received.Any(x => x.Id == red.Id).ShouldBeFalse();
_tracker.Received.Any(x => x.Id == blue.Id).ShouldBeFalse();

// Intentionally no "shouldn't be delivered yet" check here. ScheduleDelay
// promises "no earlier than T+2s", not "exactly at T+2s" — asserting the
// negative was racy and coupled the test to polling cadence. The contract
// we care about is that the messages eventually arrive at the right tenant.
var allReceived = await Poll(30.Seconds(), () =>
_tracker.Received.Any(x => x.Id == red.Id) && _tracker.Received.Any(x => x.Id == blue.Id));

Expand Down
160 changes: 160 additions & 0 deletions src/Persistence/SqliteTests/Transport/sqlite_advisory_lock.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Wolverine;
using Wolverine.Persistence.Durability;
using Wolverine.Sqlite;

namespace SqliteTests.Transport;

[Collection("sqlite")]
public class sqlite_advisory_lock : SqliteContext, IAsyncLifetime
{
private SqliteTestDatabase _db = null!;

public Task InitializeAsync()
{
_db = Servers.CreateDatabase("sqlite_advisory_lock");
return Task.CompletedTask;
}

public Task DisposeAsync()
{
_db.Dispose();
return Task.CompletedTask;
}

[Fact]
public async Task try_attain_is_idempotent()
{
// Regression test for the bug where calling TryAttainLockAsync twice for
// the same lockId on the same instance returned false the second time
// (because INSERT OR IGNORE no-ops on the existing row).
using var host = await CreateHostAsync(_db.ConnectionString);
var store = (SqliteMessageStore)host.Services.GetRequiredService<IMessageStore>();

(await store.AdvisoryLock.TryAttainLockAsync(4242, default)).ShouldBeTrue();
(await store.AdvisoryLock.TryAttainLockAsync(4242, default)).ShouldBeTrue();
store.AdvisoryLock.HasLock(4242).ShouldBeTrue();

await store.AdvisoryLock.ReleaseLockAsync(4242);
store.AdvisoryLock.HasLock(4242).ShouldBeFalse();
}

[Fact]
public async Task release_actually_deletes_the_row()
{
// The base ReleaseLockAsync was a no-op for SQLite (its doc comment
// assumed session-scoped engine locks). The override now delegates to
// SqliteAdvisoryLock which deletes the row.
using var host = await CreateHostAsync(_db.ConnectionString);
var store = (SqliteMessageStore)host.Services.GetRequiredService<IMessageStore>();

await store.AdvisoryLock.TryAttainLockAsync(7777, default);
await store.AdvisoryLock.ReleaseLockAsync(7777);

await using var conn = new SqliteConnection(_db.ConnectionString);
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
cmd.CommandText = "select count(*) from wolverine_locks where lock_id = 7777";
((long)(await cmd.ExecuteScalarAsync())!).ShouldBe(0);
}

[Fact]
public async Task stale_row_is_reaped_on_attempt()
{
// A holder that died without releasing leaves a row no peer would ever
// clean up. The TTL sweep on TryAttainLockAsync deletes rows older than
// the configured TTL before the INSERT OR IGNORE.
using var host = await CreateHostAsync(_db.ConnectionString); // creates wolverine_locks

await using (var seed = new SqliteConnection(_db.ConnectionString))
{
await seed.OpenAsync();
await using var cmd = seed.CreateCommand();
cmd.CommandText = "INSERT INTO wolverine_locks (lock_id, acquired_at) VALUES ($id, $when)";
cmd.Parameters.AddWithValue("$id", 9001);
// Pre-date by 10s; TTL is 1s in this test
cmd.Parameters.AddWithValue("$when",
DateTime.UtcNow.AddSeconds(-10).ToString("yyyy-MM-dd HH:mm:ss"));
await cmd.ExecuteNonQueryAsync();
}

var dataSource = new Weasel.Sqlite.SqliteDataSource(_db.ConnectionString);
await using var lockA = new SqliteAdvisoryLock(dataSource, NullLogger.Instance,
"test", TimeSpan.FromSeconds(1));

(await lockA.TryAttainLockAsync(9001, default)).ShouldBeTrue();
}

[Fact]
public async Task live_holder_is_not_stolen_after_ttl_thanks_to_heartbeat()
{
// Holder A keeps re-attempting (as the production polling loops do).
// The heartbeat advances acquired_at on every re-attempt, so even
// after the TTL window has elapsed several times over, holder B
// cannot acquire the lock.
using var host = await CreateHostAsync(_db.ConnectionString);

var dataSource = new Weasel.Sqlite.SqliteDataSource(_db.ConnectionString);
await using var holderA = new SqliteAdvisoryLock(dataSource, NullLogger.Instance,
"A", TimeSpan.FromSeconds(1));
await using var holderB = new SqliteAdvisoryLock(dataSource, NullLogger.Instance,
"B", TimeSpan.FromSeconds(1));

(await holderA.TryAttainLockAsync(9100, default)).ShouldBeTrue();

// Beat the heartbeat across more than 2× TTL while B repeatedly tries
for (var i = 0; i < 6; i++)
{
await Task.Delay(500);
(await holderA.TryAttainLockAsync(9100, default)).ShouldBeTrue(); // heartbeat tick
(await holderB.TryAttainLockAsync(9100, default)).ShouldBeFalse(); // never steals
}
}

[Fact]
public async Task heartbeat_advances_acquired_at_on_reattempt()
{
using var host = await CreateHostAsync(_db.ConnectionString);
var store = (SqliteMessageStore)host.Services.GetRequiredService<IMessageStore>();

(await store.AdvisoryLock.TryAttainLockAsync(9200, default)).ShouldBeTrue();
var firstAcquired = await readAcquiredAtAsync(_db.ConnectionString, 9200);

await Task.Delay(TimeSpan.FromSeconds(1.2));

(await store.AdvisoryLock.TryAttainLockAsync(9200, default)).ShouldBeTrue();
var secondAcquired = await readAcquiredAtAsync(_db.ConnectionString, 9200);

secondAcquired.ShouldBeGreaterThan(firstAcquired);

await store.AdvisoryLock.ReleaseLockAsync(9200);
}

private static async Task<DateTime> readAcquiredAtAsync(string connectionString, int lockId)
{
await using var conn = new SqliteConnection(connectionString);
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
cmd.CommandText = "SELECT acquired_at FROM wolverine_locks WHERE lock_id = $id";
cmd.Parameters.AddWithValue("$id", lockId);
var raw = (string)(await cmd.ExecuteScalarAsync())!;
return DateTime.SpecifyKind(
DateTime.ParseExact(raw, "yyyy-MM-dd HH:mm:ss", System.Globalization.CultureInfo.InvariantCulture),
DateTimeKind.Utc);
}

private static async Task<IHost> CreateHostAsync(string connectionString)
{
return await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.PersistMessagesWithSqlite(connectionString);
opts.Discovery.DisableConventionalDiscovery();
})
.StartAsync();
}
}
83 changes: 83 additions & 0 deletions src/Persistence/SqliteTests/Transport/sqlite_migration_lock.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine;
using Wolverine.Persistence.Durability;
using Wolverine.Sqlite;

namespace SqliteTests.Transport;

[Collection("sqlite")]
public class sqlite_migration_lock : SqliteContext, IAsyncLifetime
{
private SqliteTestDatabase _db = null!;

public Task InitializeAsync()
{
_db = Servers.CreateDatabase("sqlite_migration_lock");
return Task.CompletedTask;
}

public Task DisposeAsync()
{
_db.Dispose();
return Task.CompletedTask;
}

[Fact]
public async Task migrate_async_does_not_leave_a_row_in_wolverine_locks()
{
// The whole point of switching the migration lock to BEGIN EXCLUSIVE: it
// can't deposit rows in wolverine_locks (because the table is created by
// the same migration). After startup the table exists but holds no rows
// for the migration lockId.
using var host = await CreateHostAsync(_db.ConnectionString);

var store = (SqliteMessageStore)host.Services.GetRequiredService<IMessageStore>();
var migrationLockId = store.Settings.MigrationLockId;

await using var conn = new SqliteConnection(_db.ConnectionString);
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
cmd.CommandText = "select count(*) from wolverine_locks where lock_id = $id";
cmd.Parameters.AddWithValue("$id", migrationLockId);
var count = (long)(await cmd.ExecuteScalarAsync())!;
count.ShouldBe(0);
}

[Fact]
public async Task two_hosts_can_start_concurrently_against_the_same_file()
{
// Without the BEGIN EXCLUSIVE migration lock, the second startup hits
// the chicken-and-egg (wolverine_locks doesn't exist yet) and burns
// ~5.5s of failed lock retries. With BEGIN EXCLUSIVE, one waits, the
// other proceeds, and both reach Started without errors.
var startup = Task.WhenAll(
CreateHostAsync(_db.ConnectionString),
CreateHostAsync(_db.ConnectionString));

var hosts = await startup.WaitAsync(TimeSpan.FromSeconds(15));
try
{
hosts.ShouldNotBeNull();
hosts.Length.ShouldBe(2);
}
finally
{
foreach (var h in hosts) await h.StopAsync();
foreach (var h in hosts) h.Dispose();
}
}

private static async Task<IHost> CreateHostAsync(string connectionString)
{
return await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.PersistMessagesWithSqlite(connectionString);
opts.Discovery.DisableConventionalDiscovery();
})
.StartAsync();
}
}
20 changes: 18 additions & 2 deletions src/Persistence/Wolverine.RDBMS/MessageDatabase.Admin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public async Task MigrateAsync()
{
try
{
await ReleaseLockAsync(lockId, typedConn, _cancellation);
await releaseMigrationLockAsync(lockId, typedConn, _cancellation);
}
catch
{
Expand Down Expand Up @@ -112,8 +112,13 @@ public async Task MigrateAsync()
/// false if not acquired after the retry budget — in which case another process
/// is presumably finishing the migration; we proceed and let our own SchemaMigration
/// detect "no changes" as a no-op.
///
/// Providers whose advisory-lock primitive depends on schema that is itself part
/// of the migration (e.g., SQLite's row-based lock on <c>wolverine_locks</c>)
/// should override this with a primitive that does not depend on the schema —
/// for example, SQLite's <c>BEGIN EXCLUSIVE</c>.
/// </summary>
private async Task<bool> acquireMigrationLockAsync(int lockId, T conn, CancellationToken token)
protected virtual async Task<bool> acquireMigrationLockAsync(int lockId, T conn, CancellationToken token)
{
const int maxAttempts = 10;
for (var attempt = 0; attempt < maxAttempts; attempt++)
Expand All @@ -130,6 +135,17 @@ private async Task<bool> acquireMigrationLockAsync(int lockId, T conn, Cancellat
return false;
}

/// <summary>
/// Release the lock previously acquired by <see cref="acquireMigrationLockAsync"/>.
/// Default implementation delegates to the polling-lock release path; providers
/// that override <see cref="acquireMigrationLockAsync"/> with a different primitive
/// (e.g., a transaction) must override this too.
/// </summary>
protected virtual Task releaseMigrationLockAsync(int lockId, T conn, CancellationToken token)
{
return ReleaseLockAsync(lockId, conn, token);
}

public async Task<IReadOnlyList<Envelope>> AllIncomingAsync()
{
return await CreateCommand(
Expand Down
Loading
Loading