Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/Persistence/SqliteTests/SqliteTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
</PropertyGroup>

<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App"/>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
<PackageReference Include="Microsoft.NET.Test.Sdk" />
<PackageReference Include="GitHubActionsTestLogger" PrivateAssets="All" />
<PackageReference Include="xunit"/>
<PackageReference Include="xunit" />
<PackageReference Include="xunit.runner.visualstudio">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand All @@ -18,13 +18,13 @@
<ItemGroup>
<PackageReference Include="NSubstitute" />
<PackageReference Include="Shouldly" />
<ProjectReference Include="..\..\Persistence\Wolverine.RDBMS\Wolverine.RDBMS.csproj"/>
<ProjectReference Include="..\Wolverine.Sqlite\Wolverine.Sqlite.csproj"/>
<ProjectReference Include="..\..\Persistence\Wolverine.RDBMS\Wolverine.RDBMS.csproj" />
<ProjectReference Include="..\Wolverine.Sqlite\Wolverine.Sqlite.csproj" />
<ProjectReference Include="..\..\Testing\Wolverine.ComplianceTests\Wolverine.ComplianceTests.csproj" />
</ItemGroup>

<ItemGroup>
<Content Include="$(SolutionDir)xunit.runner.json" CopyToOutputDirectory="PreserveNewest"/>
<Content Include="$(SolutionDir)xunit.runner.json" CopyToOutputDirectory="PreserveNewest" />
</ItemGroup>


Expand Down
172 changes: 172 additions & 0 deletions src/Persistence/SqliteTests/Transport/advisory_lock.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
using Microsoft.Extensions.DependencyInjection;
using System.Reflection;
using Weasel.Core;
using Wolverine.Persistence.Durability;
using Wolverine.RDBMS;
using Wolverine.Sqlite;

namespace SqliteTests.Transport;

[Collection("sqlite")]
public class advisory_lock : SqliteContext, IAsyncLifetime
{
private SqliteTestDatabase _main = null!;
private SqliteTestDatabase _green = null!;
private IHost _host1 = null!;
private IHost _host2 = null!;

public async Task InitializeAsync()
{
_main = Servers.CreateDatabase("sqlite_store_lock_main");
_green = Servers.CreateDatabase("sqlite_store_lock_green");
_host1 = CreateHost();
_host2 = CreateHost();
await Task.WhenAll(_host1.StartAsync(), _host2.StartAsync());
}

public async Task DisposeAsync()
{
_host1.Dispose();
_host2.Dispose();
_main.Dispose();
_green.Dispose();
}

[Theory]
[InlineData(null)]
[InlineData("green")]
public async Task should_not_attain_lock_already_attained_by_another_host(string? tenantId)
{
var advisoryLock1 = await GetAdvisoryLockAsync(_host1, tenantId);
(await advisoryLock1.TryAttainLockAsync(69, default)).ShouldBeTrue();
advisoryLock1.HasLock(69).ShouldBeTrue();

var advisoryLock2 = await GetAdvisoryLockAsync(_host2, tenantId);
(await advisoryLock2.TryAttainLockAsync(69, default)).ShouldBeFalse();
advisoryLock2.HasLock(69).ShouldBeFalse();
}

[Theory]
[InlineData(null)]
[InlineData("green")]
public async Task should_attain_lock_released_by_another_host(string? tenantId)
{
var advisoryLock1 = await GetAdvisoryLockAsync(_host1, tenantId);
(await advisoryLock1.TryAttainLockAsync(69, default)).ShouldBeTrue();
advisoryLock1.HasLock(69).ShouldBeTrue();
await advisoryLock1.ReleaseLockAsync(69);
advisoryLock1.HasLock(69).ShouldBeFalse();

var advisoryLock2 = await GetAdvisoryLockAsync(_host2, tenantId);

(await advisoryLock2.TryAttainLockAsync(69, default)).ShouldBeTrue();
advisoryLock2.HasLock(69).ShouldBeTrue();
}

[Theory]
[InlineData(null)]
[InlineData("green")]
public async Task should_attain_lock_not_attained_by_another_host(string? tenantId)
{
var advisoryLock1 = await GetAdvisoryLockAsync(_host1, tenantId);
(await advisoryLock1.TryAttainLockAsync(69, default)).ShouldBeTrue();
advisoryLock1.HasLock(69).ShouldBeTrue();

var advisoryLock2 = await GetAdvisoryLockAsync(_host2, tenantId);
(await advisoryLock2.TryAttainLockAsync(70, default)).ShouldBeTrue();
advisoryLock2.HasLock(70).ShouldBeTrue();
}

[Fact]
public async Task should_attain_lock_per_tenant()
{
var advisoryLock1 = await GetAdvisoryLockAsync(_host1);
(await advisoryLock1.TryAttainLockAsync(69, default)).ShouldBeTrue();
advisoryLock1.HasLock(69).ShouldBeTrue();

var advisoryLock2 = await GetAdvisoryLockAsync(_host2, "green");
(await advisoryLock2.TryAttainLockAsync(69, default)).ShouldBeTrue();
advisoryLock2.HasLock(69).ShouldBeTrue();
}

[Fact]
public async Task should_attain_lock_twice()
{
var advisoryLock1 = await GetAdvisoryLockAsync(_host1);
(await advisoryLock1.TryAttainLockAsync(69, default)).ShouldBeTrue();
advisoryLock1.HasLock(69).ShouldBeTrue();

(await advisoryLock1.TryAttainLockAsync(69, default)).ShouldBeTrue();
advisoryLock1.HasLock(69).ShouldBeTrue();
}

[Fact]
public async Task should_attain_lock_when_previous_owner_is_disposed_without_releasing()
{
var advisoryLock1 = await GetAdvisoryLockAsync(_host1);
(await advisoryLock1.TryAttainLockAsync(69, default)).ShouldBeTrue();

_host1.Dispose();

var advisoryLock2 = await GetAdvisoryLockAsync(_host2);
(await advisoryLock2.TryAttainLockAsync(69, default)).ShouldBeTrue();
advisoryLock2.HasLock(69).ShouldBeTrue();
}

[Fact]
public async Task should_not_keep_migration_lock_after_applying_migration_on_start()
{
var dbSettings1 = _host1.Services.GetRequiredService<DatabaseSettings>();
var dbSettings2 = _host2.Services.GetRequiredService<DatabaseSettings>();
dbSettings1.MigrationLockId.ShouldBe(dbSettings2.MigrationLockId);
var migrationLockId = dbSettings1.MigrationLockId;

var advisoryLock1 = await GetAdvisoryLockAsync(_host1);
var advisoryLock2 = await GetAdvisoryLockAsync(_host2);

advisoryLock1.HasLock(migrationLockId).ShouldBeFalse();
advisoryLock2.HasLock(migrationLockId).ShouldBeFalse();
}

[Fact] // This test is for demonstration - not expected behavior
public async Task should_not_attain_lock_when_previous_owner_crashes_without_releasing()
{
var store = await GetStoreAsync(_host1);
await store.AdvisoryLock.TryAttainLockAsync(69, default);
var prop = typeof(SqliteMessageStore).GetProperty(
nameof(SqliteMessageStore.AdvisoryLock),
BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic);
prop?.SetValue(store, null);
_host1.Dispose(); // advisory lock can't be released because of the simulated crash

var store2 = await GetStoreAsync(_host2);
(await store2.AdvisoryLock.TryAttainLockAsync(69, default)).ShouldBeFalse(); // stale lock hurts
store2.AdvisoryLock.HasLock(69).ShouldBeFalse();
}

private static async Task<SqliteMessageStore> GetStoreAsync(IHost host, string? tenantId = null)
{
var multitenantedStore = (MultiTenantedMessageStore)host.Services.GetRequiredService<IMessageStore>();
var store = await multitenantedStore.GetDatabaseAsync(tenantId);
return (SqliteMessageStore)store;
}

private static async Task<IAdvisoryLock> GetAdvisoryLockAsync(IHost host, string? tenantId = null)
{
var store = await GetStoreAsync(host, tenantId);
return store.AdvisoryLock;
}

private IHost CreateHost()
{
return Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.PersistMessagesWithSqlite(_main.ConnectionString)
.RegisterStaticTenants(tenants => tenants.Register("green", _green.ConnectionString));

opts.Discovery.DisableConventionalDiscovery();
})
.Build();
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
using System.Collections.Concurrent;
using JasperFx.Core;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine;
using Wolverine.Persistence;
using Wolverine.Sqlite;

namespace SqliteTests.Transport;
Expand Down
2 changes: 1 addition & 1 deletion src/Persistence/Wolverine.RDBMS/MessageDatabase.Polling.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ await deleteMany(tx, envelopes.Select(x => x.Id).ToArray(), externalTable.TableN

/// <summary>
/// Releases a previously-acquired session-scoped advisory lock. Default
/// implementation is a no-op for providers (e.g., SQLite) where the lock
/// implementation is a no-op for providers where the lock
/// is automatically released when the connection closes.
/// </summary>
protected virtual Task ReleaseLockAsync(int lockId, T connection, CancellationToken token)
Expand Down
50 changes: 32 additions & 18 deletions src/Persistence/Wolverine.Sqlite/SqliteAdvisoryLock.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
using JasperFx;
using Microsoft.Extensions.Logging;
using System.Data;
using System.Data.Common;
using Microsoft.Extensions.Logging;
using Weasel.Core;
using Wolverine.RDBMS;
using Wolverine.Runtime;
using Weasel.Sqlite;

namespace Wolverine.Sqlite;

internal class SqliteAdvisoryLock : IAdvisoryLock
{
private readonly DbDataSource _dataSource;
private readonly ILogger _logger;
private readonly string _databaseName;
private readonly List<int> _locks = new();
private readonly string _databaseName;
private readonly HashSet<int> _locks = [];
private bool _hasLocksTable;
private DbConnection? _conn;

public SqliteAdvisoryLock(DbDataSource dataSource, ILogger logger, string databaseName)
Expand Down Expand Up @@ -89,17 +90,17 @@ public async Task<bool> TryAttainLockAsync(int lockId, CancellationToken token)
{
// SQLite doesn't have advisory locks like PostgreSQL
// We'll use a simple table-based lock approach
var result = await _conn.CreateCommand("INSERT OR IGNORE INTO wolverine_locks (lock_id, acquired_at) VALUES (@lockId, datetime('now'))")
await CreateLocksTableIfMissing(_conn, token);
var result = await _conn.CreateCommand(
"INSERT OR IGNORE INTO wolverine_locks (lock_id, acquired_at) " +
"VALUES (@lockId, datetime('now'))")
.With("lockId", lockId)
.ExecuteNonQueryAsync(token);

if (result > 0)
{
_locks.Add(lockId);
return true;
}

return false;
return HasLock(lockId);
}
catch (Exception ex)
{
Expand All @@ -108,6 +109,26 @@ public async Task<bool> TryAttainLockAsync(int lockId, CancellationToken token)
}
}

private async Task CreateLocksTableIfMissing(DbConnection connection, CancellationToken token)
{
if (_hasLocksTable)
return;

var table = new Weasel.Sqlite.Tables.Table(new SqliteObjectName("wolverine_locks"));
table.AddColumn("lock_id", "INTEGER").AsPrimaryKey();
table.AddColumn("acquired_at", "TEXT").NotNull();
table.AddColumn("owner_id", "INTEGER");

var migration = await SchemaMigration.DetermineAsync(connection, default(CancellationToken), table);
if (migration.Difference != SchemaPatchDifference.None)
{
await new SqliteMigrator()
.ApplyAllAsync(connection, migration, AutoCreate.CreateOrUpdate, ct: token);
}

_hasLocksTable = true;
}

public async Task ReleaseLockAsync(int lockId)
{
if (!_locks.Contains(lockId))
Expand All @@ -128,7 +149,7 @@ await _conn.CreateCommand("DELETE FROM wolverine_locks WHERE lock_id = @lockId")
.ExecuteNonQueryAsync();
_locks.Remove(lockId);

if (!_locks.Any())
if (_locks.Count == 0)
{
await _conn.CloseAsync().ConfigureAwait(false);
await _conn.DisposeAsync().ConfigureAwait(false);
Expand All @@ -151,12 +172,7 @@ public async ValueTask DisposeAsync()
try
{
foreach (var lockId in _locks.ToList())
{
await ReleaseLockAsync(lockId);
}

await _conn.CloseAsync().ConfigureAwait(false);
await _conn.DisposeAsync().ConfigureAwait(false);
}
catch (Exception e)
{
Expand All @@ -166,9 +182,7 @@ public async ValueTask DisposeAsync()
finally
{
if (_conn != null)
{
await _conn.DisposeAsync().ConfigureAwait(false);
}
}
}
}
28 changes: 7 additions & 21 deletions src/Persistence/Wolverine.Sqlite/SqliteMessageStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging;
using Weasel.Core;
using Weasel.Core.Migrations;
using Weasel.Sqlite;
using Wolverine.Logging;
using Wolverine.Persistence.Durability;
Expand All @@ -18,7 +17,6 @@
using Wolverine.Runtime.Agents;
using Wolverine.Sqlite.Schema;
using Wolverine.Sqlite.Sagas;
using Wolverine.Sqlite.Util;
using Wolverine.Transports;
using DbCommandBuilder = Weasel.Core.DbCommandBuilder;

Expand Down Expand Up @@ -152,20 +150,14 @@ protected override Task deleteMany(DbTransaction tx, Guid[] ids, DbObjectName ta
.ExecuteNonQueryAsync();
}

protected override async Task<bool> TryAttainLockAsync(int lockId, SqliteConnection connection, CancellationToken token)
protected override Task<bool> TryAttainLockAsync(int lockId, SqliteConnection _, CancellationToken token)
{
// SQLite uses BEGIN EXCLUSIVE TRANSACTION for locking
// We'll use a simple advisory lock table approach
try
{
await connection.CreateCommand($"INSERT OR IGNORE INTO wolverine_locks (lock_id, acquired_at) VALUES ({lockId}, datetime('now'))")
.ExecuteNonQueryAsync(token);
return true;
}
catch
{
return false;
}
return AdvisoryLock.TryAttainLockAsync(lockId, token);
}

protected override Task ReleaseLockAsync(int lockId, SqliteConnection _, CancellationToken token)
{
return AdvisoryLock.ReleaseLockAsync(lockId);
}

protected override DbCommand buildFetchSql(SqliteConnection conn, DbObjectName tableName, string[] columnNames, int maxRecords)
Expand Down Expand Up @@ -462,12 +454,6 @@ public override IEnumerable<ISchemaObject> AllObjects()
restrictionTable.AddColumn("type", "TEXT").NotNull();
restrictionTable.AddColumn("node", "INTEGER").NotNull().DefaultValue(0);
yield return restrictionTable;

// Advisory lock table for SQLite
var lockTable = new Weasel.Sqlite.Tables.Table(new SqliteObjectName("wolverine_locks"));
lockTable.AddColumn("lock_id", "INTEGER").AsPrimaryKey();
lockTable.AddColumn("acquired_at", "TEXT").NotNull();
yield return lockTable;
}

foreach (var table in _otherTables)
Expand Down
Loading