diff --git a/src/Persistence/SqlServerTests/MultiTenancy/MultiTenancyContext.cs b/src/Persistence/SqlServerTests/MultiTenancy/MultiTenancyContext.cs new file mode 100644 index 000000000..84dcb3326 --- /dev/null +++ b/src/Persistence/SqlServerTests/MultiTenancy/MultiTenancyContext.cs @@ -0,0 +1,83 @@ +using IntegrationTests; +using Microsoft.Data.SqlClient; +using Microsoft.Extensions.Hosting; +using Weasel.SqlServer; +using Weasel.SqlServer.Tables; +using Wolverine; +using Wolverine.RDBMS; + +namespace SqlServerTests.MultiTenancy; + +public abstract class MultiTenancyContext : SqlServerContext, IAsyncLifetime +{ + protected IHost theHost; + protected string tenant1ConnectionString; + protected string tenant2ConnectionString; + protected string tenant3ConnectionString; + + public new async Task InitializeAsync() + { + await using var conn = new SqlConnection(Servers.SqlServerConnectionString); + await conn.OpenAsync(); + + tenant1ConnectionString = await CreateDatabaseIfNotExists(conn, "db1"); + tenant2ConnectionString = await CreateDatabaseIfNotExists(conn, "db2"); + tenant3ConnectionString = await CreateDatabaseIfNotExists(conn, "db3"); + + await cleanItems(tenant1ConnectionString); + await cleanItems(tenant2ConnectionString); + await cleanItems(tenant3ConnectionString); + + theHost = await Host.CreateDefaultBuilder() + .UseWolverine(opts => { configureWolverine(opts); }).StartAsync(); + + await onStartup(); + } + + protected abstract void configureWolverine(WolverineOptions opts); + + protected virtual Task onStartup() => Task.CompletedTask; + + public new async Task DisposeAsync() + { + await theHost.StopAsync(); + } + + private async Task CreateDatabaseIfNotExists(SqlConnection conn, string databaseName) + { + var builder = new SqlConnectionStringBuilder(Servers.SqlServerConnectionString); + + var exists = await DatabaseExistsAsync(conn, databaseName); + if (!exists) + { + await conn.CreateCommand($"CREATE DATABASE [{databaseName}]").ExecuteNonQueryAsync(); + } + + builder.InitialCatalog = databaseName; + + return builder.ConnectionString; + } + + private static async Task DatabaseExistsAsync(SqlConnection conn, string databaseName) + { + var cmd = conn.CreateCommand($"SELECT DB_ID(@databaseName)"); + cmd.Parameters.AddWithValue("@databaseName", databaseName); + var result = await cmd.ExecuteScalarAsync(); + return result != null && result != DBNull.Value; + } + + private async Task cleanItems(string connectionString) + { + var table = new Table("items"); + table.AddColumn("Id").AsPrimaryKey(); + table.AddColumn("Name"); + + await using var conn = new SqlConnection(connectionString); + await conn.OpenAsync(); + + await table.ApplyChangesAsync(conn); + + await conn.CreateCommand("delete from items").ExecuteNonQueryAsync(); + await conn.CloseAsync(); + } +} diff --git a/src/Persistence/SqlServerTests/MultiTenancy/static_multi_tenancy.cs b/src/Persistence/SqlServerTests/MultiTenancy/static_multi_tenancy.cs new file mode 100644 index 000000000..d1a0f10a1 --- /dev/null +++ b/src/Persistence/SqlServerTests/MultiTenancy/static_multi_tenancy.cs @@ -0,0 +1,159 @@ +using IntegrationTests; +using JasperFx.Core; +using JasperFx.Core.Reflection; +using JasperFx.Resources; +using Microsoft.Extensions.DependencyInjection; +using Shouldly; +using Weasel.Core.CommandLine; +using Weasel.Core.Migrations; +using Wolverine; +using Wolverine.Persistence.Durability; +using Wolverine.SqlServer; +using Wolverine.SqlServer.Persistence; +using Wolverine.RDBMS; +using Xunit.Abstractions; + +namespace SqlServerTests.MultiTenancy; + +public class static_multi_tenancy : MultiTenancyContext +{ + private readonly ITestOutputHelper _output; + + public static_multi_tenancy(ITestOutputHelper output) + { + _output = output; + } + + protected override void configureWolverine(WolverineOptions opts) + { + opts.PersistMessagesWithSqlServer(Servers.SqlServerConnectionString, "static_multi_tenancy2") + .RegisterStaticTenants(tenants => + { + tenants.Register("red", tenant1ConnectionString); + tenants.Register("blue", tenant2ConnectionString); + tenants.Register("green", tenant3ConnectionString); + }); + + opts.Services.AddResourceSetupOnStartup(); + + opts.AddSagaType("blues"); + opts.AddSagaType("reds"); + } + + [Fact] + public async Task registers_a_multi_tenanted_message_store() + { + var store = theHost.Services.GetRequiredService() + .ShouldBeOfType(); + + store.Main.Describe().DatabaseName.ShouldBe("master"); + + (await store.Source.FindAsync("red")).Describe().DatabaseName.ShouldBe("db1"); + (await store.Source.FindAsync("blue")).Describe().DatabaseName.ShouldBe("db2"); + (await store.Source.FindAsync("green")).Describe().DatabaseName.ShouldBe("db3"); + } + + [Fact] + public async Task exposes_every_database_in_all_active() + { + var store = theHost.Services.GetRequiredService() + .ShouldBeOfType(); + + var all = store.ActiveDatabases(); + all.Count.ShouldBe(4); + } + + [Fact] + public async Task all_databases_are_exposed_to_weasel() + { + var databases = await new WeaselInput().FilterDatabases(theHost); + + var store = theHost.Services.GetRequiredService() + .ShouldBeOfType(); + + databases.ShouldContain((IDatabase)store.Main); + databases.ShouldContain((IDatabase)await store.Source.FindAsync("red")); + databases.ShouldContain((IDatabase)await store.Source.FindAsync("blue")); + databases.ShouldContain((IDatabase)await store.Source.FindAsync("green")); + } + + [Fact] + public async Task the_main_database_tables_include_node_persistence() + { + var store = theHost.Services.GetRequiredService() + .ShouldBeOfType(); + var tables = await store.Main.As().SchemaTables(); + + var expected = @" +static_multi_tenancy2.blues +static_multi_tenancy2.reds +static_multi_tenancy2.wolverine_agent_restrictions +static_multi_tenancy2.wolverine_control_queue +static_multi_tenancy2.wolverine_dead_letters +static_multi_tenancy2.wolverine_incoming_envelopes +static_multi_tenancy2.wolverine_node_assignments +static_multi_tenancy2.wolverine_node_records +static_multi_tenancy2.wolverine_nodes +static_multi_tenancy2.wolverine_outgoing_envelopes +".ReadLines().Where(x => x.IsNotEmpty()).ToArray(); + + tables.OrderBy(x => x.QualifiedName).Select(x => x.QualifiedName).ToArray() + .ShouldBe(expected); + } + + [Fact] + public async Task the_tenant_databases_have_only_envelope_and_saga_tables() + { + var store = theHost.Services.GetRequiredService() + .ShouldBeOfType(); + + await store.Source.RefreshAsync(); + + var expected = @" +static_multi_tenancy2.blues +static_multi_tenancy2.reds +static_multi_tenancy2.wolverine_dead_letters +static_multi_tenancy2.wolverine_incoming_envelopes +static_multi_tenancy2.wolverine_outgoing_envelopes +".ReadLines().Where(x => x.IsNotEmpty()).ToArray(); + + foreach (var tenantId in new string[] { "red", "blue", "green" }) + { + var messageStore = await store.Source.FindAsync(tenantId); + var tables = await messageStore.As().SchemaTables(); + + tables.OrderBy(x => x.QualifiedName).Select(x => x.QualifiedName).ToArray() + .ShouldBe(expected); + } + } + + [Fact] + public async Task have_all_the_correct_durability_agents() + { + var store = theHost.Services.GetRequiredService() + .ShouldBeOfType(); + + var expected = @" +wolverinedb://sqlserver/localhost/master/static_multi_tenancy2 +wolverinedb://sqlserver/localhost/db1/static_multi_tenancy2 +wolverinedb://sqlserver/localhost/db3/static_multi_tenancy2 +wolverinedb://sqlserver/localhost/db2/static_multi_tenancy2 +".ReadLines().Where(x => x.IsNotEmpty()).Select(x => new Uri(x)).OrderBy(x => x.ToString()).ToArray(); + + + var agents = await store.AllKnownAgentsAsync(); + agents.OrderBy(x => x.ToString()).ToArray().ShouldBe(expected); + } +} + +public class RedSaga : Saga +{ + public Guid Id { get; set; } + public string Name { get; set; } +} + +public class BlueSaga : Saga +{ + public Guid Id { get; set; } + public string Name { get; set; } +} diff --git a/src/Persistence/SqlServerTests/Transport/inbox_outbox_usage.cs b/src/Persistence/SqlServerTests/Transport/inbox_outbox_usage.cs index e0edb7b9d..4871cf7c6 100644 --- a/src/Persistence/SqlServerTests/Transport/inbox_outbox_usage.cs +++ b/src/Persistence/SqlServerTests/Transport/inbox_outbox_usage.cs @@ -35,7 +35,7 @@ public async Task InitializeAsync() [Fact] public async Task cascaded_response_with_outbox() { - var tracked = await _host.TrackActivity().WaitForMessageToBeReceivedAt(_host).InvokeMessageAndWaitAsync(new SqlServerPing("first")); + var tracked = await _host.TrackActivity().Timeout(15.Seconds()).WaitForMessageToBeReceivedAt(_host).InvokeMessageAndWaitAsync(new SqlServerPing("first")); tracked.FindSingleTrackedMessageOfType() .Name.ShouldBe("first"); diff --git a/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs index 5a32204f1..9da9a3d6e 100644 --- a/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs +++ b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs @@ -80,6 +80,40 @@ protected override void writeMessageIdArrayQueryList(DbCommandBuilder builder, G } } + /// + /// Fetch a list of the existing tables in the database filtered by schemas + /// + /// + /// + // TODO -- get this moved to Weasel. Shouldn't be here, but Claude brute forced it + public async Task> SchemaTables(CancellationToken ct = default) + { + var schemaNames = AllSchemaNames(); + + await using var conn = CreateConnection(); + await conn.OpenAsync(ct).ConfigureAwait(false); + + var tables = new List(); + foreach (var schemaName in schemaNames) + { + var sql = $"SELECT table_schema, table_name FROM information_schema.tables WHERE table_schema = @schema"; + var cmd = conn.CreateCommand(sql).With("schema", schemaName); + + await using var reader = await cmd.ExecuteReaderAsync(ct).ConfigureAwait(false); + while (await reader.ReadAsync(ct).ConfigureAwait(false)) + { + var schema = reader.GetString(0); + var name = reader.GetString(1); + tables.Add(new DbObjectName(schema, name)); + } + + await reader.CloseAsync().ConfigureAwait(false); + } + + await conn.CloseAsync().ConfigureAwait(false); + return tables; + } + protected override INodeAgentPersistence? buildNodeStorage(DatabaseSettings databaseSettings, DbDataSource dataSource) { diff --git a/src/Persistence/Wolverine.SqlServer/Transport/ISqlServerQueueSender.cs b/src/Persistence/Wolverine.SqlServer/Transport/ISqlServerQueueSender.cs new file mode 100644 index 000000000..41fb8bea4 --- /dev/null +++ b/src/Persistence/Wolverine.SqlServer/Transport/ISqlServerQueueSender.cs @@ -0,0 +1,8 @@ +using Wolverine.Transports.Sending; + +namespace Wolverine.SqlServer.Transport; + +internal interface ISqlServerQueueSender : ISender +{ + Task ScheduleRetryAsync(Envelope envelope, CancellationToken cancellationToken); +} diff --git a/src/Persistence/Wolverine.SqlServer/Transport/MultiTenantedQueueListener.cs b/src/Persistence/Wolverine.SqlServer/Transport/MultiTenantedQueueListener.cs new file mode 100644 index 000000000..243341da3 --- /dev/null +++ b/src/Persistence/Wolverine.SqlServer/Transport/MultiTenantedQueueListener.cs @@ -0,0 +1,115 @@ +using ImTools; +using JasperFx.Core; +using Microsoft.Extensions.Logging; +using Wolverine.Runtime; +using Wolverine.SqlServer.Persistence; +using Wolverine.Transports; +using MultiTenantedMessageStore = Wolverine.Persistence.Durability.MultiTenantedMessageStore; + +namespace Wolverine.SqlServer.Transport; + +public class MultiTenantedQueueListener : IListener +{ + private readonly ILogger _logger; + private readonly SqlServerQueue _queue; + private readonly MultiTenantedMessageStore _stores; + private readonly IWolverineRuntime _runtime; + private readonly IReceiver _receiver; + private readonly CancellationTokenSource _cancellation; + + private ImHashMap _listeners = ImHashMap.Empty; + private Task? _activator; + + public MultiTenantedQueueListener(ILogger logger, SqlServerQueue queue, MultiTenantedMessageStore stores, + IWolverineRuntime runtime, IReceiver receiver) + { + _logger = logger; + _queue = queue; + _stores = stores; + _runtime = runtime; + _receiver = receiver; + + Address = _queue.Uri; + + _cancellation = CancellationTokenSource.CreateLinkedTokenSource(runtime.Cancellation); + } + + public async Task StartAsync() + { + if (_queue.Parent.Storage != null) + { + await startListening(_queue.Parent.Storage); + } + + foreach (var store in _stores.Source.AllActive().OfType()) + { + await startListening(store); + } + + _activator = Task.Run(async () => + { + while (!_cancellation.IsCancellationRequested) + { + await Task.Delay(_runtime.Options.Durability.TenantCheckPeriod, _cancellation.Token); + + await _stores.Source.RefreshAsync(); + var databases = _stores.Source.AllActive(); + foreach (var store in databases.OfType()) + { + if (!_listeners.Contains(store.Name)) + { + await startListening(store); + } + } + } + }, _cancellation.Token); + } + + private async Task startListening(SqlServerMessageStore store) + { + var listener = new SqlServerQueueListener(_queue, _runtime, _receiver, store.Settings.ConnectionString, store.Name); + _listeners = _listeners.AddOrUpdate(store.Name, listener); + await listener.StartAsync(); + + _logger.LogInformation("Started message listening for SQL Server queue {QueueName} on database {Database}", + _queue.Name, store.Name); + } + + public IHandlerPipeline? Pipeline => _receiver.Pipeline; + + ValueTask IChannelCallback.CompleteAsync(Envelope envelope) + { + return new ValueTask(); + } + + ValueTask IChannelCallback.DeferAsync(Envelope envelope) + { + return new ValueTask(); + } + + public async ValueTask DisposeAsync() + { + _cancellation.Cancel(); + _activator?.SafeDispose(); + foreach (var entry in _listeners.Enumerate()) + { + await entry.Value.DisposeAsync(); + } + } + + public Uri Address { get; set; } + + public async ValueTask StopAsync() + { + _cancellation.Cancel(); + foreach (var entry in _listeners.Enumerate()) + { + await entry.Value.StopAsync(); + } + } + + public bool IsListeningToDatabase(string databaseName) + { + return _listeners.Contains(databaseName); + } +} diff --git a/src/Persistence/Wolverine.SqlServer/Transport/MultiTenantedQueueSender.cs b/src/Persistence/Wolverine.SqlServer/Transport/MultiTenantedQueueSender.cs new file mode 100644 index 000000000..9ed66a6e5 --- /dev/null +++ b/src/Persistence/Wolverine.SqlServer/Transport/MultiTenantedQueueSender.cs @@ -0,0 +1,96 @@ +using ImTools; +using JasperFx.Core; +using Wolverine.RDBMS; +using Wolverine.SqlServer.Persistence; +using MultiTenantedMessageStore = Wolverine.Persistence.Durability.MultiTenantedMessageStore; + +namespace Wolverine.SqlServer.Transport; + +internal class MultiTenantedQueueSender : ISqlServerQueueSender, IAsyncDisposable +{ + private readonly SqlServerQueue _queue; + private readonly SqlServerQueueSender _master; + private readonly MultiTenantedMessageStore _stores; + private ImHashMap _byDatabase = ImHashMap.Empty; + private readonly SemaphoreSlim _lock = new(1); + private readonly CancellationTokenSource _cancellation = new(); + + public MultiTenantedQueueSender(SqlServerQueue queue, MultiTenantedMessageStore stores) + { + _queue = queue; + _master = new SqlServerQueueSender(queue); + _stores = stores; + + Destination = _queue.Uri; + } + + public bool SupportsNativeScheduledSend => true; + public Uri Destination { get; } + + public Task PingAsync() + { + return _master.PingAsync(); + } + + public async ValueTask SendAsync(Envelope envelope) + { + var sender = await resolveSender(envelope); + await sender.SendAsync(envelope); + } + + public async Task ScheduleRetryAsync(Envelope envelope, CancellationToken cancellationToken) + { + var sender = await resolveSender(envelope); + await sender.ScheduleRetryAsync(envelope, cancellationToken); + } + + private async ValueTask resolveSender(Envelope envelope) + { + if (envelope.TenantId.IsEmpty() || envelope.TenantId == "*DEFAULT*") + { + return _master; + } + + if (_byDatabase.TryFind(envelope.TenantId, out var sender)) + { + return sender; + } + + await _lock.WaitAsync(_cancellation.Token); + try + { + var database = (IMessageDatabase)await _stores.GetDatabaseAsync(envelope.TenantId); + if (_byDatabase.TryFind(database.Name, out sender)) + { + // This indicates that the database has been encountered before, + // but does not have the same name as the tenant id. This is possible + // in multi-level multi-tenancy + } + else + { + var sqlServerStore = (SqlServerMessageStore)database; + sender = new SqlServerQueueSender(_queue, sqlServerStore.Settings.ConnectionString, database.Name); + _byDatabase = _byDatabase.AddOrUpdate(database.Name, sender); + + if (_queue.Parent.AutoProvision) + { + await _queue.EnsureSchemaExists(database.Name, sqlServerStore.Settings.ConnectionString); + } + } + + _byDatabase = _byDatabase.AddOrUpdate(envelope.TenantId, sender); + } + finally + { + _lock.Release(); + } + + return sender; + } + + public ValueTask DisposeAsync() + { + _cancellation.Cancel(); + return new ValueTask(); + } +} diff --git a/src/Persistence/Wolverine.SqlServer/Transport/SqlServerQueue.cs b/src/Persistence/Wolverine.SqlServer/Transport/SqlServerQueue.cs index dbd520e11..e227547e2 100644 --- a/src/Persistence/Wolverine.SqlServer/Transport/SqlServerQueue.cs +++ b/src/Persistence/Wolverine.SqlServer/Transport/SqlServerQueue.cs @@ -1,33 +1,39 @@ +using ImTools; using JasperFx.Core; using Microsoft.Data.SqlClient; using Microsoft.Extensions.Logging; +using Weasel.Core; using Weasel.SqlServer; using Weasel.SqlServer.Tables; using Wolverine.Configuration; using Wolverine.RDBMS; using Wolverine.Runtime; using Wolverine.Runtime.Serialization; +using Wolverine.SqlServer.Persistence; using Wolverine.Transports; using Wolverine.Transports.Sending; -using Weasel.Core; namespace Wolverine.SqlServer.Transport; public class SqlServerQueue : Endpoint, IBrokerQueue, IDatabaseBackedEndpoint { + internal static Uri ToUri(string name, string? databaseName) + { + return databaseName.IsEmpty() + ? new Uri($"{SqlServerTransport.ProtocolName}://{name}") + : new Uri($"{SqlServerTransport.ProtocolName}://{name}/{databaseName}"); + } + private readonly string _queueTableName; private readonly string _scheduledTableName; private bool _hasInitialized; - private readonly string _writeDirectlyToQueueTableSql; - private readonly string _writeDirectlyToTheScheduledTable; - private readonly string _moveFromOutgoingToQueueSql; - private readonly string _moveFromOutgoingToScheduledSql; - private readonly string _moveScheduledToReadyQueueSql; - private readonly string _deleteExpiredSql; - private readonly string _tryPopMessagesDirectlySql; - private readonly string _tryPopMessagesToInboxSql; - - public SqlServerQueue(string name, SqlServerTransport parent, EndpointRole role = EndpointRole.Application) : base(new Uri($"{SqlServerTransport.ProtocolName}://{name}"), role) + private ISqlServerQueueSender? _sender; + private ImHashMap _checkedDatabases = ImHashMap.Empty; + private readonly Lazy _queueTable; + private readonly Lazy _scheduledTable; + + public SqlServerQueue(string name, SqlServerTransport parent, EndpointRole role = EndpointRole.Application, + string? databaseName = null) : base(ToUri(name, databaseName), role) { Parent = parent; _queueTableName = $"wolverine_queue_{name}"; @@ -37,93 +43,18 @@ public SqlServerQueue(string name, SqlServerTransport parent, EndpointRole role Name = name; EndpointName = name; - QueueTable = new QueueTable(Parent, _queueTableName); - ScheduledTable = new ScheduledMessageTable(Parent, _scheduledTableName); - - _writeDirectlyToQueueTableSql = $@"insert into {QueueTable.Identifier} ({DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, {DatabaseConstants.KeepUntil}) values (@id, @body, @type, @expires)"; - - _writeDirectlyToTheScheduledTable = $@" -merge {ScheduledTable.Identifier} as target -using (values (@id, @body, @type, @expires, @time)) as source ({DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, {DatabaseConstants.KeepUntil}, {DatabaseConstants.ExecutionTime}) -on target.id = @id -WHEN MATCHED THEN UPDATE set target.body = @body, @time = @time -WHEN NOT MATCHED THEN INSERT ({DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, {DatabaseConstants.KeepUntil}, {DatabaseConstants.ExecutionTime}) values (source.{DatabaseConstants.Id}, source.{DatabaseConstants.Body}, source.{DatabaseConstants.MessageType}, source.{DatabaseConstants.KeepUntil}, source.{DatabaseConstants.ExecutionTime}); -"; - - _moveFromOutgoingToQueueSql = $@" -INSERT into {QueueTable.Identifier} ({DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, {DatabaseConstants.KeepUntil}) -SELECT {DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, {DatabaseConstants.DeliverBy} -FROM - {Parent.MessageStorageSchemaName}.{DatabaseConstants.OutgoingTable} -WHERE {DatabaseConstants.Id} = @id; -DELETE FROM {Parent.MessageStorageSchemaName}.{DatabaseConstants.OutgoingTable} WHERE {DatabaseConstants.Id} = @id; -"; - - _moveFromOutgoingToScheduledSql = $@" -INSERT into {ScheduledTable.Identifier} ({DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, {DatabaseConstants.ExecutionTime}, {DatabaseConstants.KeepUntil}) -SELECT {DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, @time, {DatabaseConstants.DeliverBy} -FROM - {Parent.MessageStorageSchemaName}.{DatabaseConstants.OutgoingTable} -WHERE {DatabaseConstants.Id} = @id; -DELETE FROM {Parent.MessageStorageSchemaName}.{DatabaseConstants.OutgoingTable} WHERE {DatabaseConstants.Id} = @id; -"; - - _moveScheduledToReadyQueueSql = $@" -select id, body, message_type, keep_until into #temp_move_{Name} -FROM {ScheduledTable.Identifier} WITH (UPDLOCK, READPAST, ROWLOCK) -WHERE {DatabaseConstants.ExecutionTime} <= SYSDATETIMEOFFSET() AND ID NOT IN (select id from {QueueTable.Identifier}) -ORDER BY {ScheduledTable.Identifier}.timestamp; -delete from {ScheduledTable.Identifier} where id in (select id from #temp_move_{Name}); -INSERT INTO {QueueTable.Identifier} -(id, body, message_type, keep_until) - SELECT id, body, message_type, keep_until FROM #temp_move_{Name}; -select count(*) from #temp_move_{Name} -"; - - _deleteExpiredSql = $"delete from {QueueTable.Identifier} where {DatabaseConstants.KeepUntil} IS NOT NULL and {DatabaseConstants.KeepUntil} <= SYSDATETIMEOFFSET();delete from {ScheduledTable.Identifier} where {DatabaseConstants.KeepUntil} IS NOT NULL and {DatabaseConstants.KeepUntil} <= SYSDATETIMEOFFSET()"; - - _tryPopMessagesDirectlySql = $@" -DECLARE @NOCOUNT VARCHAR(3) = 'OFF'; -IF ( (512 & @@OPTIONS) = 512 ) SET @NOCOUNT = 'ON'; -SET NOCOUNT ON; - -WITH message AS ( - SELECT TOP(@count) {DatabaseConstants.Body}, {DatabaseConstants.KeepUntil} - FROM {QueueTable.Identifier} WITH (UPDLOCK, READPAST, ROWLOCK) - ORDER BY {QueueTable.Identifier}.timestamp) -DELETE FROM message -OUTPUT - deleted.{DatabaseConstants.Body}; - -IF (@NOCOUNT = 'ON') SET NOCOUNT ON; -IF (@NOCOUNT = 'OFF') SET NOCOUNT OFF;"; - - _tryPopMessagesToInboxSql = $@" -DECLARE @NOCOUNT VARCHAR(3) = 'OFF'; -IF ( (512 & @@OPTIONS) = 512 ) SET @NOCOUNT = 'ON'; -SET NOCOUNT ON; - -delete FROM {QueueTable.Identifier} WITH (UPDLOCK, READPAST, ROWLOCK) where id in (select id from {Parent.MessageStorageSchemaName}.{DatabaseConstants.IncomingTable}); -select top(@count) id, body, message_type, keep_until into #temp_pop_{Name} -FROM {QueueTable.Identifier} WITH (UPDLOCK, READPAST, ROWLOCK) -ORDER BY {QueueTable.Identifier}.timestamp; -delete from {QueueTable.Identifier} where id in (select id from #temp_pop_{Name}); -INSERT INTO {Parent.MessageStorageSchemaName}.{DatabaseConstants.IncomingTable} -(id, status, owner_id, body, message_type, received_at, keep_until) - SELECT id, 'Incoming', @node, body, message_type, '{Uri}', keep_until FROM #temp_pop_{Name}; -select body from #temp_pop_{Name}; - -IF (@NOCOUNT = 'ON') SET NOCOUNT ON; -IF (@NOCOUNT = 'OFF') SET NOCOUNT OFF;"; + // Gotta be lazy so the schema names get set + _queueTable = new Lazy(() => new QueueTable(Parent, _queueTableName)); + _scheduledTable = new Lazy(() => new ScheduledMessageTable(Parent, _scheduledTableName)); } public string Name { get; } internal SqlServerTransport Parent { get; } - internal Table QueueTable { get; private set; } + internal Table QueueTable => _queueTable.Value; - internal Table ScheduledTable { get; private set; } + internal Table ScheduledTable => _scheduledTable.Value; protected override bool supportsMode(EndpointMode mode) { @@ -136,15 +67,46 @@ protected override bool supportsMode(EndpointMode mode) /// public int MaximumMessagesToReceive { get; set; } = 20; - public override ValueTask BuildListenerAsync(IWolverineRuntime runtime, IReceiver receiver) + public override async ValueTask BuildListenerAsync(IWolverineRuntime runtime, IReceiver receiver) { + if (Parent.AutoProvision) + { + await SetupAsync(runtime.LoggerFactory.CreateLogger()); + } + + if (Parent.Databases != null) + { + var mtListener = new MultiTenantedQueueListener( + runtime.LoggerFactory.CreateLogger(), this, Parent.Databases, runtime, + receiver); + + await mtListener.StartAsync(); + return mtListener; + } + var listener = new SqlServerQueueListener(this, runtime, receiver); - return ValueTask.FromResult(listener); + await listener.StartAsync(); + return listener; + } + + private void buildSenderIfMissing() + { + if (_sender != null) return; + + if (Parent.Databases != null) + { + _sender = new MultiTenantedQueueSender(this, Parent.Databases); + } + else + { + _sender = new SqlServerQueueSender(this); + } } protected override ISender CreateSender(IWolverineRuntime runtime) { - return new SqlServerQueueSender(this); + buildSenderIfMissing(); + return _sender!; } public override async ValueTask InitializeAsync(ILogger logger) @@ -167,22 +129,48 @@ public override async ValueTask InitializeAsync(ILogger logger) _hasInitialized = true; } - public async ValueTask PurgeAsync(ILogger logger) + public ValueTask SendAsync(Envelope envelope) { - await using var conn = new SqlConnection(Parent.Settings.ConnectionString); - await conn.OpenAsync(); + buildSenderIfMissing(); + return _sender!.SendAsync(envelope); + } - try + private async ValueTask forEveryDatabase(Func action) + { + if (Parent.Databases != null) { - await conn.CreateCommand($"delete from {QueueTable.Identifier}").ExecuteNonQueryAsync(); - await conn.CreateCommand($"delete from {ScheduledTable.Identifier}").ExecuteNonQueryAsync(); + // Multi-tenant mode - iterate over all tenant databases + foreach (var database in Parent.Databases.ActiveDatabases().OfType()) + { + await action(database.Settings.ConnectionString, database.Identifier); + } } - finally + else { - await conn.CloseAsync(); + // Single-tenant mode - use the transport's connection string + await action(Parent.Settings.ConnectionString, Parent.Settings.SchemaName ?? "wolverine"); } } + public ValueTask PurgeAsync(ILogger logger) + { + return forEveryDatabase(async (connectionString, _) => + { + await using var conn = new SqlConnection(connectionString); + await conn.OpenAsync(); + + try + { + await conn.CreateCommand($"delete from {QueueTable.Identifier}").ExecuteNonQueryAsync(); + await conn.CreateCommand($"delete from {ScheduledTable.Identifier}").ExecuteNonQueryAsync(); + } + finally + { + await conn.CloseAsync(); + } + }); + } + public async ValueTask> GetAttributesAsync() { var count = await CountAsync(); @@ -194,48 +182,218 @@ public async ValueTask> GetAttributesAsync() public async ValueTask CheckAsync() { - await using var conn = new SqlConnection(Parent.Settings.ConnectionString); - await conn.OpenAsync(); - - try + var returnValue = true; + await forEveryDatabase(async (connectionString, _) => { - var queueDelta = await QueueTable.FindDeltaAsync(conn); - if (queueDelta.HasChanges()) return false; + await using var conn = new SqlConnection(connectionString); + await conn.OpenAsync(); - var scheduledDelta = await ScheduledTable.FindDeltaAsync(conn); + try + { + var queueDelta = await QueueTable.FindDeltaAsync(conn); + if (queueDelta.HasChanges()) + { + returnValue = false; + return; + } - return !scheduledDelta.HasChanges(); - } - finally - { - await conn.CloseAsync(); - } + var scheduledDelta = await ScheduledTable.FindDeltaAsync(conn); + + returnValue = returnValue && !scheduledDelta.HasChanges(); + } + finally + { + await conn.CloseAsync(); + } + }); + + return returnValue; } public async ValueTask TeardownAsync(ILogger logger) { - await using var conn = new SqlConnection(Parent.Settings.ConnectionString); - await conn.OpenAsync(); + await forEveryDatabase(async (connectionString, _) => + { + await using var conn = new SqlConnection(connectionString); + await conn.OpenAsync(); - await QueueTable.Drop(conn); - await ScheduledTable.Drop(conn); + await QueueTable.Drop(conn); + await ScheduledTable.Drop(conn); - await conn.CloseAsync(); + await conn.CloseAsync(); + }); } public async ValueTask SetupAsync(ILogger logger) { - await using var conn = new SqlConnection(Parent.Settings.ConnectionString); + await forEveryDatabase(async (connectionString, identifier) => + { + await EnsureSchemaExists(identifier, connectionString); + }); + } + + internal async Task EnsureSchemaExists(string identifier, string connectionString) + { + if (_checkedDatabases.Contains(identifier)) return; + + await using var conn = new SqlConnection(connectionString); await conn.OpenAsync(); await QueueTable.ApplyChangesAsync(conn); await ScheduledTable.ApplyChangesAsync(conn); await conn.CloseAsync(); + + _checkedDatabases = _checkedDatabases.AddOrUpdate(identifier, true); + } + + public async Task CountAsync() + { + var count = 0L; + await forEveryDatabase(async (connectionString, _) => + { + await using var conn = new SqlConnection(connectionString); + await conn.OpenAsync(); + + try + { + count += (int)await conn.CreateCommand($"select count(*) from {QueueTable.Identifier}") + .ExecuteScalarAsync(); + } + finally + { + await conn.CloseAsync(); + } + }); + + return count; + } + + public async Task ScheduledCountAsync() + { + var count = 0L; + await forEveryDatabase(async (connectionString, _) => + { + await using var conn = new SqlConnection(connectionString); + await conn.OpenAsync(); + + try + { + count += (int)await conn.CreateCommand($"select count(*) from {ScheduledTable.Identifier}") + .ExecuteScalarAsync(); + } + finally + { + await conn.CloseAsync(); + } + }); + + return count; + } + + public Task ScheduleRetryAsync(Envelope envelope, CancellationToken cancellation) + { + buildSenderIfMissing(); + return _sender!.ScheduleRetryAsync(envelope, cancellation); + } + + #region Test helper methods - These provide backward compatibility for tests + + private string? _writeDirectlyToQueueTableSql; + private string? _writeDirectlyToTheScheduledTable; + private string? _moveFromOutgoingToQueueSql; + private string? _moveFromOutgoingToScheduledSql; + private string? _moveScheduledToReadyQueueSql; + private string? _deleteExpiredSql; + private string? _tryPopMessagesDirectlySql; + private string? _tryPopMessagesToInboxSql; + + private void buildTestSqlIfMissing() + { + if (_writeDirectlyToQueueTableSql != null) return; + + _writeDirectlyToQueueTableSql = + $@"insert into {QueueTable.Identifier} ({DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, {DatabaseConstants.KeepUntil}) values (@id, @body, @type, @expires)"; + + _writeDirectlyToTheScheduledTable = $@" +merge {ScheduledTable.Identifier} as target +using (values (@id, @body, @type, @expires, @time)) as source ({DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, {DatabaseConstants.KeepUntil}, {DatabaseConstants.ExecutionTime}) +on target.id = @id +WHEN MATCHED THEN UPDATE set target.body = @body, target.{DatabaseConstants.ExecutionTime} = @time +WHEN NOT MATCHED THEN INSERT ({DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, {DatabaseConstants.KeepUntil}, {DatabaseConstants.ExecutionTime}) values (source.{DatabaseConstants.Id}, source.{DatabaseConstants.Body}, source.{DatabaseConstants.MessageType}, source.{DatabaseConstants.KeepUntil}, source.{DatabaseConstants.ExecutionTime}); +"; + + _moveFromOutgoingToQueueSql = $@" +INSERT into {QueueTable.Identifier} ({DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, {DatabaseConstants.KeepUntil}) +SELECT {DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, {DatabaseConstants.DeliverBy} +FROM + {Parent.MessageStorageSchemaName}.{DatabaseConstants.OutgoingTable} +WHERE {DatabaseConstants.Id} = @id; +DELETE FROM {Parent.MessageStorageSchemaName}.{DatabaseConstants.OutgoingTable} WHERE {DatabaseConstants.Id} = @id; +"; + + _moveFromOutgoingToScheduledSql = $@" +INSERT into {ScheduledTable.Identifier} ({DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, {DatabaseConstants.ExecutionTime}, {DatabaseConstants.KeepUntil}) +SELECT {DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, @time, {DatabaseConstants.DeliverBy} +FROM + {Parent.MessageStorageSchemaName}.{DatabaseConstants.OutgoingTable} +WHERE {DatabaseConstants.Id} = @id; +DELETE FROM {Parent.MessageStorageSchemaName}.{DatabaseConstants.OutgoingTable} WHERE {DatabaseConstants.Id} = @id; +"; + + _moveScheduledToReadyQueueSql = $@" +select id, body, message_type, keep_until into #temp_move_{Name} +FROM {ScheduledTable.Identifier} WITH (UPDLOCK, READPAST, ROWLOCK) +WHERE {DatabaseConstants.ExecutionTime} <= SYSDATETIMEOFFSET() AND ID NOT IN (select id from {QueueTable.Identifier}) +ORDER BY {ScheduledTable.Identifier}.timestamp; +delete from {ScheduledTable.Identifier} where id in (select id from #temp_move_{Name}); +INSERT INTO {QueueTable.Identifier} +(id, body, message_type, keep_until) + SELECT id, body, message_type, keep_until FROM #temp_move_{Name}; +select count(*) from #temp_move_{Name} +"; + + _deleteExpiredSql = + $"delete from {QueueTable.Identifier} where {DatabaseConstants.KeepUntil} IS NOT NULL and {DatabaseConstants.KeepUntil} <= SYSDATETIMEOFFSET();delete from {ScheduledTable.Identifier} where {DatabaseConstants.KeepUntil} IS NOT NULL and {DatabaseConstants.KeepUntil} <= SYSDATETIMEOFFSET()"; + + _tryPopMessagesDirectlySql = $@" +DECLARE @NOCOUNT VARCHAR(3) = 'OFF'; +IF ( (512 & @@OPTIONS) = 512 ) SET @NOCOUNT = 'ON'; +SET NOCOUNT ON; + +WITH message AS ( + SELECT TOP(@count) {DatabaseConstants.Body}, {DatabaseConstants.KeepUntil} + FROM {QueueTable.Identifier} WITH (UPDLOCK, READPAST, ROWLOCK) + ORDER BY {QueueTable.Identifier}.timestamp) +DELETE FROM message +OUTPUT + deleted.{DatabaseConstants.Body}; + +IF (@NOCOUNT = 'ON') SET NOCOUNT ON; +IF (@NOCOUNT = 'OFF') SET NOCOUNT OFF;"; + + _tryPopMessagesToInboxSql = $@" +DECLARE @NOCOUNT VARCHAR(3) = 'OFF'; +IF ( (512 & @@OPTIONS) = 512 ) SET @NOCOUNT = 'ON'; +SET NOCOUNT ON; + +delete FROM {QueueTable.Identifier} WITH (UPDLOCK, READPAST, ROWLOCK) where id in (select id from {Parent.MessageStorageSchemaName}.{DatabaseConstants.IncomingTable}); +select top(@count) id, body, message_type, keep_until into #temp_pop_{Name} +FROM {QueueTable.Identifier} WITH (UPDLOCK, READPAST, ROWLOCK) +ORDER BY {QueueTable.Identifier}.timestamp; +delete from {QueueTable.Identifier} where id in (select id from #temp_pop_{Name}); +INSERT INTO {Parent.MessageStorageSchemaName}.{DatabaseConstants.IncomingTable} +(id, status, owner_id, body, message_type, received_at, keep_until) + SELECT id, 'Incoming', @node, body, message_type, '{Uri}', keep_until FROM #temp_pop_{Name}; +select body from #temp_pop_{Name}; + +IF (@NOCOUNT = 'ON') SET NOCOUNT ON; +IF (@NOCOUNT = 'OFF') SET NOCOUNT OFF;"; } public async Task SendAsync(Envelope envelope, CancellationToken cancellationToken) { + buildTestSqlIfMissing(); await using var conn = new SqlConnection(Parent.Settings.ConnectionString); if (envelope.IsScheduledForLater(DateTimeOffset.UtcNow)) @@ -246,12 +404,14 @@ public async Task SendAsync(Envelope envelope, CancellationToken cancellationTok { try { - await conn.CreateCommand(_writeDirectlyToQueueTableSql) + await conn.OpenAsync(cancellationToken); + await conn.CreateCommand(_writeDirectlyToQueueTableSql!) .With("id", envelope.Id) .With("body", EnvelopeSerializer.Serialize(envelope)) .With("type", envelope.MessageType) .With("expires", envelope.DeliverBy) - .ExecuteOnce(cancellationToken); + .ExecuteNonQueryAsync(cancellationToken); + await conn.CloseAsync(); } catch (SqlException e) { @@ -264,48 +424,34 @@ await conn.CreateCommand(_writeDirectlyToQueueTableSql) private async Task scheduleMessageAsync(Envelope envelope, CancellationToken cancellationToken, SqlConnection conn) { - await conn.CreateCommand(_writeDirectlyToTheScheduledTable) + buildTestSqlIfMissing(); + await conn.OpenAsync(cancellationToken); + await conn.CreateCommand(_writeDirectlyToTheScheduledTable!) .With("id", envelope.Id) .With("body", EnvelopeSerializer.Serialize(envelope)) .With("type", envelope.MessageType) .With("expires", envelope.DeliverBy) .With("time", envelope.ScheduledTime) - .ExecuteOnce(cancellationToken); + .ExecuteNonQueryAsync(cancellationToken); + await conn.CloseAsync(); } public async Task ScheduleMessageAsync(Envelope envelope, CancellationToken cancellationToken) { await using var conn = new SqlConnection(Parent.Settings.ConnectionString); await scheduleMessageAsync(envelope, cancellationToken, conn); - await conn.CloseAsync(); - } - - public async Task ScheduleRetryAsync(Envelope envelope, CancellationToken cancellationToken) - { - await using var conn = new SqlConnection(Parent.Settings.ConnectionString); - await conn.CreateCommand($"delete from {Parent.MessageStorageSchemaName}.{DatabaseConstants.IncomingTable} where id = @id;" + _writeDirectlyToTheScheduledTable) - .With("id", envelope.Id) - .With("body", EnvelopeSerializer.Serialize(envelope)) - .With("type", envelope.MessageType) - .With("expires", envelope.DeliverBy) - .With("time", envelope.ScheduledTime) - .ExecuteOnce(cancellationToken); - - - var tx = conn.BeginTransactionAsync(cancellationToken); - await scheduleMessageAsync(envelope, cancellationToken, conn); - await conn.CloseAsync(); } public async Task MoveFromOutgoingToQueueAsync(Envelope envelope, CancellationToken cancellationToken) { + buildTestSqlIfMissing(); await using var conn = new SqlConnection(Parent.Settings.ConnectionString); await conn.OpenAsync(cancellationToken); try { - var count = await conn.CreateCommand(_moveFromOutgoingToQueueSql) + var count = await conn.CreateCommand(_moveFromOutgoingToQueueSql!) .With("id", envelope.Id) .ExecuteNonQueryAsync(cancellationToken); @@ -323,6 +469,7 @@ public async Task MoveFromOutgoingToQueueAsync(Envelope envelope, CancellationTo public async Task MoveFromOutgoingToScheduledAsync(Envelope envelope, CancellationToken cancellationToken) { + buildTestSqlIfMissing(); if (!envelope.ScheduledTime.HasValue) throw new InvalidOperationException("This envelope has no scheduled time"); @@ -331,7 +478,7 @@ public async Task MoveFromOutgoingToScheduledAsync(Envelope envelope, Cancellati await conn.OpenAsync(cancellationToken); try { - var count = await conn.CreateCommand(_moveFromOutgoingToScheduledSql) + var count = await conn.CreateCommand(_moveFromOutgoingToScheduledSql!) .With("id", envelope.Id) .With("time", envelope.ScheduledTime!.Value) .ExecuteNonQueryAsync(cancellationToken); @@ -343,12 +490,13 @@ public async Task MoveFromOutgoingToScheduledAsync(Envelope envelope, Cancellati if (e.Message.ContainsIgnoreCase("Violation of PRIMARY KEY constraint")) { await conn.CreateCommand( - $"delete * from {Parent.MessageStorageSchemaName}.{DatabaseConstants.OutgoingTable} where id = @id") + $"delete from {Parent.MessageStorageSchemaName}.{DatabaseConstants.OutgoingTable} where id = @id") .With("id", envelope.Id) .ExecuteNonQueryAsync(cancellationToken); return; } + throw; } @@ -357,10 +505,11 @@ await conn.CreateCommand( public async Task MoveScheduledToReadyQueueAsync(CancellationToken cancellationToken) { + buildTestSqlIfMissing(); await using var conn = new SqlConnection(Parent.Settings.ConnectionString); await conn.OpenAsync(cancellationToken); - var count = (int)await conn.CreateCommand(_moveScheduledToReadyQueueSql) + var count = (int)await conn.CreateCommand(_moveScheduledToReadyQueueSql!) .ExecuteScalarAsync(cancellationToken); await conn.CloseAsync(); @@ -370,33 +519,22 @@ public async Task MoveScheduledToReadyQueueAsync(CancellationToken cancella public async Task DeleteExpiredAsync(CancellationToken cancellationToken) { + buildTestSqlIfMissing(); await using var conn = new SqlConnection(Parent.Settings.ConnectionString); - await conn.CreateCommand(_deleteExpiredSql) - .ExecuteOnce(cancellationToken); - } - - public async Task CountAsync() - { - await using var conn = new SqlConnection(Parent.Settings.ConnectionString); - await conn.OpenAsync(); - return (int)await conn.CreateCommand($"select count(*) from {QueueTable.Identifier}").ExecuteScalarAsync(); - } - - public async Task ScheduledCountAsync() - { - await using var conn = new SqlConnection(Parent.Settings.ConnectionString); - await conn.OpenAsync(); - return (int)await conn.CreateCommand($"select count(*) from {ScheduledTable.Identifier}").ExecuteScalarAsync(); + await conn.OpenAsync(cancellationToken); + await conn.CreateCommand(_deleteExpiredSql!).ExecuteNonQueryAsync(cancellationToken); + await conn.CloseAsync(); } public async Task> TryPopAsync(int count, ILogger logger, CancellationToken cancellationToken) { + buildTestSqlIfMissing(); await using var conn = new SqlConnection(Parent.Settings.ConnectionString); await conn.OpenAsync(cancellationToken); return await conn - .CreateCommand(_tryPopMessagesDirectlySql) + .CreateCommand(_tryPopMessagesDirectlySql!) .With("count", count) .FetchListAsync(async reader => { @@ -407,21 +545,22 @@ public async Task> TryPopAsync(int count, ILogger logger } catch (Exception e) { - logger.LogError(e, "Error trying to deserialize Envelope data in Sql Transport Queue {Queue}, discarding", Name); + logger.LogError(e, + "Error trying to deserialize Envelope data in Sql Transport Queue {Queue}, discarding", Name); return Envelope.ForPing(Uri); // just a stand in } }, cancellation: cancellationToken); - } public async Task> TryPopDurablyAsync(int count, DurabilitySettings settings, ILogger logger, CancellationToken cancellationToken) { + buildTestSqlIfMissing(); await using var conn = new SqlConnection(Parent.Settings.ConnectionString); await conn.OpenAsync(cancellationToken); return await conn - .CreateCommand(_tryPopMessagesToInboxSql) + .CreateCommand(_tryPopMessagesToInboxSql!) .With("count", count) .With("node", settings.AssignedNodeNumber) .FetchListAsync(async reader => @@ -433,9 +572,12 @@ public async Task> TryPopDurablyAsync(int count, Durabil } catch (Exception e) { - logger.LogError(e, "Error trying to deserialize Envelope data in Sql Transport Queue {Queue}, discarding", Name); + logger.LogError(e, + "Error trying to deserialize Envelope data in Sql Transport Queue {Queue}, discarding", Name); return Envelope.ForPing(Uri); // just a stand in } }, cancellation: cancellationToken); } -} \ No newline at end of file + + #endregion +} diff --git a/src/Persistence/Wolverine.SqlServer/Transport/SqlServerQueueListener.cs b/src/Persistence/Wolverine.SqlServer/Transport/SqlServerQueueListener.cs index 517964092..622f95916 100644 --- a/src/Persistence/Wolverine.SqlServer/Transport/SqlServerQueueListener.cs +++ b/src/Persistence/Wolverine.SqlServer/Transport/SqlServerQueueListener.cs @@ -1,7 +1,12 @@ using JasperFx.Core; +using Microsoft.Data.SqlClient; using Microsoft.Extensions.Logging; +using Weasel.Core; +using Weasel.SqlServer; using Wolverine.Configuration; +using Wolverine.RDBMS; using Wolverine.Runtime; +using Wolverine.Runtime.Serialization; using Wolverine.Transports; namespace Wolverine.SqlServer.Transport; @@ -11,20 +16,87 @@ internal class SqlServerQueueListener : IListener private readonly CancellationTokenSource _cancellation = new(); private readonly SqlServerQueue _queue; private readonly IReceiver _receiver; + private readonly string _connectionString; + private readonly string? _databaseName; private readonly ILogger _logger; - private readonly Task _task; + private Task? _task; private readonly DurabilitySettings _settings; - private readonly Task _scheduledTask; + private Task? _scheduledTask; + private readonly SqlServerQueueSender _sender; + private readonly string _tryPopMessagesDirectlySql; + private readonly string _tryPopMessagesToInboxSql; + private readonly string _moveScheduledToReadyQueueSql; + private readonly string _deleteExpiredSql; public SqlServerQueueListener(SqlServerQueue queue, IWolverineRuntime runtime, IReceiver receiver) + : this(queue, runtime, receiver, queue.Parent.Settings.ConnectionString, null) { + } + + public SqlServerQueueListener(SqlServerQueue queue, IWolverineRuntime runtime, IReceiver receiver, + string connectionString, string? databaseName) + { + Address = SqlServerQueue.ToUri(queue.Name, databaseName); _queue = queue; _receiver = receiver; + _connectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString)); + _databaseName = databaseName; _logger = runtime.LoggerFactory.CreateLogger(); _settings = runtime.DurabilitySettings; - _task = Task.Run(listenForMessagesAsync, _cancellation.Token); - _scheduledTask = Task.Run(lookForScheduledMessagesAsync, _cancellation.Token); + _sender = new SqlServerQueueSender(queue, connectionString, databaseName); + + var queueTableIdentifier = queue.QueueTable.Identifier; + var scheduledTableIdentifier = queue.ScheduledTable.Identifier; + + _tryPopMessagesDirectlySql = $@" +DECLARE @NOCOUNT VARCHAR(3) = 'OFF'; +IF ( (512 & @@OPTIONS) = 512 ) SET @NOCOUNT = 'ON'; +SET NOCOUNT ON; + +WITH message AS ( + SELECT TOP(@count) {DatabaseConstants.Body}, {DatabaseConstants.KeepUntil} + FROM {queueTableIdentifier} WITH (UPDLOCK, READPAST, ROWLOCK) + ORDER BY {queueTableIdentifier}.timestamp) +DELETE FROM message +OUTPUT + deleted.{DatabaseConstants.Body}; + +IF (@NOCOUNT = 'ON') SET NOCOUNT ON; +IF (@NOCOUNT = 'OFF') SET NOCOUNT OFF;"; + + _tryPopMessagesToInboxSql = $@" +DECLARE @NOCOUNT VARCHAR(3) = 'OFF'; +IF ( (512 & @@OPTIONS) = 512 ) SET @NOCOUNT = 'ON'; +SET NOCOUNT ON; + +delete FROM {queueTableIdentifier} WITH (UPDLOCK, READPAST, ROWLOCK) where id in (select id from {queue.Parent.MessageStorageSchemaName}.{DatabaseConstants.IncomingTable}); +select top(@count) id, body, message_type, keep_until into #temp_pop_{queue.Name} +FROM {queueTableIdentifier} WITH (UPDLOCK, READPAST, ROWLOCK) +ORDER BY {queueTableIdentifier}.timestamp; +delete from {queueTableIdentifier} where id in (select id from #temp_pop_{queue.Name}); +INSERT INTO {queue.Parent.MessageStorageSchemaName}.{DatabaseConstants.IncomingTable} +(id, status, owner_id, body, message_type, received_at, keep_until) + SELECT id, 'Incoming', @node, body, message_type, '{Address}', keep_until FROM #temp_pop_{queue.Name}; +select body from #temp_pop_{queue.Name}; + +IF (@NOCOUNT = 'ON') SET NOCOUNT ON; +IF (@NOCOUNT = 'OFF') SET NOCOUNT OFF;"; + + _moveScheduledToReadyQueueSql = $@" +select id, body, message_type, keep_until into #temp_move_{queue.Name} +FROM {scheduledTableIdentifier} WITH (UPDLOCK, READPAST, ROWLOCK) +WHERE {DatabaseConstants.ExecutionTime} <= SYSDATETIMEOFFSET() AND ID NOT IN (select id from {queueTableIdentifier}) +ORDER BY {scheduledTableIdentifier}.timestamp; +delete from {scheduledTableIdentifier} where id in (select id from #temp_move_{queue.Name}); +INSERT INTO {queueTableIdentifier} +(id, body, message_type, keep_until) + SELECT id, body, message_type, keep_until FROM #temp_move_{queue.Name}; +select count(*) from #temp_move_{queue.Name} +"; + + _deleteExpiredSql = + $"delete from {queueTableIdentifier} where {DatabaseConstants.KeepUntil} IS NOT NULL and {DatabaseConstants.KeepUntil} <= SYSDATETIMEOFFSET();delete from {scheduledTableIdentifier} where {DatabaseConstants.KeepUntil} IS NOT NULL and {DatabaseConstants.KeepUntil} <= SYSDATETIMEOFFSET()"; } public IHandlerPipeline? Pipeline => _receiver.Pipeline; @@ -36,20 +108,36 @@ public ValueTask CompleteAsync(Envelope envelope) public async ValueTask DeferAsync(Envelope envelope) { - await _queue.SendAsync(envelope, _cancellation.Token); + await _sender.SendAsync(envelope, _cancellation.Token); } - public async ValueTask DisposeAsync() + public ValueTask DisposeAsync() { - await _cancellation.CancelAsync(); - _task.SafeDispose(); - _scheduledTask.SafeDispose(); + _cancellation.Cancel(); + _task?.SafeDispose(); + _scheduledTask?.SafeDispose(); + return ValueTask.CompletedTask; } - public Uri Address => _queue.Uri; - public async ValueTask StopAsync() + public Uri Address { get; } + + public ValueTask StopAsync() { - await _cancellation.CancelAsync(); + _cancellation.Cancel(); + _task?.SafeDispose(); + _scheduledTask?.SafeDispose(); + return ValueTask.CompletedTask; + } + + public async Task StartAsync() + { + if (_queue.Parent.AutoProvision) + { + await _queue.EnsureSchemaExists(_databaseName ?? string.Empty, _connectionString); + } + + _task = Task.Run(listenForMessagesAsync, _cancellation.Token); + _scheduledTask = Task.Run(lookForScheduledMessagesAsync, _cancellation.Token); } private async Task lookForScheduledMessagesAsync() @@ -64,18 +152,19 @@ private async Task lookForScheduledMessagesAsync() { try { - var count = await _queue.MoveScheduledToReadyQueueAsync(_cancellation.Token); + var count = await MoveScheduledToReadyQueueAsync(_cancellation.Token); if (count > 0) { - _logger.LogInformation("Propagated {Number} scheduled messages to Sql Server-backed queue {Queue}", count, _queue.Name); + _logger.LogInformation( + "Propagated {Number} scheduled messages to Sql Server-backed queue {Queue}", count, + _queue.Name); } - await _queue.DeleteExpiredAsync(CancellationToken.None); + await DeleteExpiredAsync(CancellationToken.None); failedCount = 0; await Task.Delay(_settings.ScheduledJobPollingTime); - } catch (Exception e) { @@ -95,6 +184,27 @@ private async Task lookForScheduledMessagesAsync() } } + public async Task MoveScheduledToReadyQueueAsync(CancellationToken cancellationToken) + { + await using var conn = new SqlConnection(_connectionString); + await conn.OpenAsync(cancellationToken); + + var count = (int)await conn.CreateCommand(_moveScheduledToReadyQueueSql) + .ExecuteScalarAsync(cancellationToken); + + await conn.CloseAsync(); + + return count; + } + + public async Task DeleteExpiredAsync(CancellationToken cancellationToken) + { + await using var conn = new SqlConnection(_connectionString); + await conn.OpenAsync(cancellationToken); + await conn.CreateCommand(_deleteExpiredSql).ExecuteNonQueryAsync(cancellationToken); + await conn.CloseAsync(); + } + private async Task listenForMessagesAsync() { var failedCount = 0; @@ -103,11 +213,10 @@ private async Task listenForMessagesAsync() { try { - var messages = _queue.Mode == EndpointMode.Durable - ? await _queue.TryPopDurablyAsync(_queue.MaximumMessagesToReceive, _settings, _logger, + ? await TryPopDurablyAsync(_queue.MaximumMessagesToReceive, _settings, _logger, _cancellation.Token) - : await _queue.TryPopAsync(_queue.MaximumMessagesToReceive, _logger, _cancellation.Token); + : await TryPopAsync(_queue.MaximumMessagesToReceive, _logger, _cancellation.Token); failedCount = 0; @@ -138,4 +247,57 @@ private async Task listenForMessagesAsync() } } } -} \ No newline at end of file + + public async Task> TryPopAsync(int count, ILogger logger, + CancellationToken cancellationToken) + { + await using var conn = new SqlConnection(_connectionString); + await conn.OpenAsync(cancellationToken); + + return await conn + .CreateCommand(_tryPopMessagesDirectlySql) + .With("count", count) + .FetchListAsync(async reader => + { + var data = await reader.GetFieldValueAsync(0, cancellationToken); + try + { + return EnvelopeSerializer.Deserialize(data); + } + catch (Exception e) + { + logger.LogError(e, + "Error trying to deserialize Envelope data in Sql Transport Queue {Queue}, discarding", + _queue.Name); + return Envelope.ForPing(Address); // just a stand in + } + }, cancellationToken); + } + + public async Task> TryPopDurablyAsync(int count, DurabilitySettings settings, + ILogger logger, CancellationToken cancellationToken) + { + await using var conn = new SqlConnection(_connectionString); + await conn.OpenAsync(cancellationToken); + + return await conn + .CreateCommand(_tryPopMessagesToInboxSql) + .With("count", count) + .With("node", settings.AssignedNodeNumber) + .FetchListAsync(async reader => + { + var data = await reader.GetFieldValueAsync(0, cancellationToken); + try + { + return EnvelopeSerializer.Deserialize(data); + } + catch (Exception e) + { + logger.LogError(e, + "Error trying to deserialize Envelope data in Sql Transport Queue {Queue}, discarding", + _queue.Name); + return Envelope.ForPing(Address); // just a stand in + } + }, cancellationToken); + } +} diff --git a/src/Persistence/Wolverine.SqlServer/Transport/SqlServerQueueSender.cs b/src/Persistence/Wolverine.SqlServer/Transport/SqlServerQueueSender.cs index e30082ab9..af1df0e51 100644 --- a/src/Persistence/Wolverine.SqlServer/Transport/SqlServerQueueSender.cs +++ b/src/Persistence/Wolverine.SqlServer/Transport/SqlServerQueueSender.cs @@ -1,19 +1,73 @@ +using JasperFx.Core; +using Microsoft.Data.SqlClient; +using Weasel.SqlServer; using Wolverine.Configuration; -using Wolverine.Transports.Sending; +using Wolverine.RDBMS; +using Wolverine.Runtime.Serialization; namespace Wolverine.SqlServer.Transport; -internal class SqlServerQueueSender : ISender +internal class SqlServerQueueSender : ISqlServerQueueSender { private readonly SqlServerQueue _queue; + private readonly string _connectionString; - public SqlServerQueueSender(SqlServerQueue queue) + private readonly string _moveFromOutgoingToQueueSql; + private readonly string _moveFromOutgoingToScheduledSql; + private readonly string _writeDirectlyToQueueTableSql; + private readonly string _writeDirectlyToTheScheduledTable; + private readonly string _deleteFromIncomingAndScheduleSql; + + // Strictly for testing + public SqlServerQueueSender(SqlServerQueue queue) : this(queue, queue.Parent.Settings.ConnectionString, null) + { + Destination = queue.Uri; + } + + public SqlServerQueueSender(SqlServerQueue queue, string connectionString, string? databaseName) { _queue = queue; + _connectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString)); + + Destination = SqlServerQueue.ToUri(queue.Name, databaseName); + + _moveFromOutgoingToQueueSql = $@" +INSERT into {queue.QueueTable.Identifier} ({DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, {DatabaseConstants.KeepUntil}) +SELECT {DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, {DatabaseConstants.DeliverBy} +FROM + {queue.Parent.MessageStorageSchemaName}.{DatabaseConstants.OutgoingTable} +WHERE {DatabaseConstants.Id} = @id; +DELETE FROM {queue.Parent.MessageStorageSchemaName}.{DatabaseConstants.OutgoingTable} WHERE {DatabaseConstants.Id} = @id; +"; + + _moveFromOutgoingToScheduledSql = $@" +INSERT into {queue.ScheduledTable.Identifier} ({DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, {DatabaseConstants.ExecutionTime}, {DatabaseConstants.KeepUntil}) +SELECT {DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, @time, {DatabaseConstants.DeliverBy} +FROM + {queue.Parent.MessageStorageSchemaName}.{DatabaseConstants.OutgoingTable} +WHERE {DatabaseConstants.Id} = @id; +DELETE FROM {queue.Parent.MessageStorageSchemaName}.{DatabaseConstants.OutgoingTable} WHERE {DatabaseConstants.Id} = @id; +"; + + _writeDirectlyToQueueTableSql = + $@"insert into {queue.QueueTable.Identifier} ({DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, {DatabaseConstants.KeepUntil}) values (@id, @body, @type, @expires)"; + + _writeDirectlyToTheScheduledTable = $@" +merge {queue.ScheduledTable.Identifier} as target +using (values (@id, @body, @type, @expires, @time)) as source ({DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, {DatabaseConstants.KeepUntil}, {DatabaseConstants.ExecutionTime}) +on target.id = @id +WHEN MATCHED THEN UPDATE set target.body = @body, target.{DatabaseConstants.ExecutionTime} = @time +WHEN NOT MATCHED THEN INSERT ({DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, {DatabaseConstants.KeepUntil}, {DatabaseConstants.ExecutionTime}) values (source.{DatabaseConstants.Id}, source.{DatabaseConstants.Body}, source.{DatabaseConstants.MessageType}, source.{DatabaseConstants.KeepUntil}, source.{DatabaseConstants.ExecutionTime}); +"; + + _deleteFromIncomingAndScheduleSql = + $"delete from {queue.Parent.MessageStorageSchemaName}.{DatabaseConstants.IncomingTable} where id = @id;" + + _writeDirectlyToTheScheduledTable; } public bool SupportsNativeScheduledSend => true; - public Uri Destination => _queue.Uri; + public Uri Destination { get; } + public async Task PingAsync() { try @@ -27,22 +81,152 @@ public async Task PingAsync() } } + public async Task ScheduleRetryAsync(Envelope envelope, CancellationToken cancellationToken) + { + await using var conn = new SqlConnection(_connectionString); + await conn.OpenAsync(cancellationToken); + try + { + await conn.CreateCommand(_deleteFromIncomingAndScheduleSql) + .With("id", envelope.Id) + .With("body", EnvelopeSerializer.Serialize(envelope)) + .With("type", envelope.MessageType) + .With("expires", envelope.DeliverBy) + .With("time", envelope.ScheduledTime) + .ExecuteNonQueryAsync(cancellationToken); + } + finally + { + await conn.CloseAsync(); + } + } + public async ValueTask SendAsync(Envelope envelope) { if (_queue.Mode == EndpointMode.Durable && envelope.WasPersistedInOutbox) { if (envelope.IsScheduledForLater(DateTimeOffset.UtcNow)) { - await _queue.MoveFromOutgoingToScheduledAsync(envelope, CancellationToken.None); + await MoveFromOutgoingToScheduledAsync(envelope, CancellationToken.None); } else { - await _queue.MoveFromOutgoingToQueueAsync(envelope, CancellationToken.None); + await MoveFromOutgoingToQueueAsync(envelope, CancellationToken.None); } } else { - await _queue.SendAsync(envelope, CancellationToken.None); + await SendAsync(envelope, CancellationToken.None); + } + } + + public async Task MoveFromOutgoingToQueueAsync(Envelope envelope, CancellationToken cancellationToken) + { + await using var conn = new SqlConnection(_connectionString); + await conn.OpenAsync(cancellationToken); + + try + { + var count = await conn.CreateCommand(_moveFromOutgoingToQueueSql) + .With("id", envelope.Id) + .ExecuteNonQueryAsync(cancellationToken); + + if (count == 0) throw new InvalidOperationException("No matching outgoing envelope"); + } + catch (SqlException e) + { + // Making this idempotent, but optimistically + if (e.Message.ContainsIgnoreCase("Violation of PRIMARY KEY constraint")) return; + throw; + } + finally + { + await conn.CloseAsync(); + } + } + + public async Task MoveFromOutgoingToScheduledAsync(Envelope envelope, CancellationToken cancellationToken) + { + if (!envelope.ScheduledTime.HasValue) + throw new InvalidOperationException("This envelope has no scheduled time"); + + await using var conn = new SqlConnection(_connectionString); + await conn.OpenAsync(cancellationToken); + + try + { + var count = await conn.CreateCommand(_moveFromOutgoingToScheduledSql) + .With("id", envelope.Id) + .With("time", envelope.ScheduledTime!.Value) + .ExecuteNonQueryAsync(cancellationToken); + + if (count == 0) throw new InvalidOperationException($"No matching outgoing envelope for {envelope}"); } + catch (SqlException e) + { + if (e.Message.ContainsIgnoreCase("Violation of PRIMARY KEY constraint")) + { + await conn.CreateCommand( + $"delete from {_queue.Parent.MessageStorageSchemaName}.{DatabaseConstants.OutgoingTable} where id = @id") + .With("id", envelope.Id) + .ExecuteNonQueryAsync(cancellationToken); + + return; + } + + throw; + } + finally + { + await conn.CloseAsync(); + } + } + + public async Task SendAsync(Envelope envelope, CancellationToken cancellationToken) + { + await using var conn = new SqlConnection(_connectionString); + await conn.OpenAsync(cancellationToken); + + try + { + if (envelope.IsScheduledForLater(DateTimeOffset.UtcNow)) + { + await scheduleMessageAsync(envelope, cancellationToken, conn); + } + else + { + try + { + await conn.CreateCommand(_writeDirectlyToQueueTableSql) + .With("id", envelope.Id) + .With("body", EnvelopeSerializer.Serialize(envelope)) + .With("type", envelope.MessageType) + .With("expires", envelope.DeliverBy) + .ExecuteNonQueryAsync(cancellationToken); + } + catch (SqlException e) + { + // Making this idempotent, but optimistically + if (e.Message.ContainsIgnoreCase("Violation of PRIMARY KEY constraint")) return; + throw; + } + } + } + finally + { + await conn.CloseAsync(); + } + } + + private async Task scheduleMessageAsync(Envelope envelope, CancellationToken cancellationToken, + SqlConnection conn) + { + await conn.CreateCommand(_writeDirectlyToTheScheduledTable) + .With("id", envelope.Id) + .With("body", EnvelopeSerializer.Serialize(envelope)) + .With("type", envelope.MessageType) + .With("expires", envelope.DeliverBy) + .With("time", envelope.ScheduledTime) + .ExecuteNonQueryAsync(cancellationToken); } -} \ No newline at end of file +} diff --git a/src/Persistence/Wolverine.SqlServer/Transport/SqlServerTransport.cs b/src/Persistence/Wolverine.SqlServer/Transport/SqlServerTransport.cs index b692b9925..0020427f0 100644 --- a/src/Persistence/Wolverine.SqlServer/Transport/SqlServerTransport.cs +++ b/src/Persistence/Wolverine.SqlServer/Transport/SqlServerTransport.cs @@ -8,6 +8,7 @@ using Wolverine.Runtime; using Wolverine.SqlServer.Persistence; using Wolverine.Transports; +using MultiTenantedMessageStore = Wolverine.Persistence.Durability.MultiTenantedMessageStore; namespace Wolverine.SqlServer.Transport; @@ -17,9 +18,10 @@ public class SqlServerTransport : BrokerTransport public SqlServerTransport(DatabaseSettings settings) : this(settings, settings.SchemaName) { - } - public SqlServerTransport(DatabaseSettings settings, string? transportSchemaName) : base(ProtocolName, "Sql Server Transport", [ProtocolName]) + + public SqlServerTransport(DatabaseSettings settings, string? transportSchemaName) : base(ProtocolName, + "Sql Server Transport", [ProtocolName]) { Queues = new LightweightCache(name => new SqlServerQueue(name, this)); Settings = settings; @@ -28,21 +30,22 @@ public SqlServerTransport(DatabaseSettings settings, string? transportSchemaName TransportSchemaName = settings.SchemaName; MessageStorageSchemaName = settings.SchemaName; } + if (transportSchemaName.IsNotEmpty()) { TransportSchemaName = transportSchemaName; } } - + public override Uri ResourceUri => new Uri("sqlserver-transport://"); public LightweightCache Queues { get; } - /// + /// /// Schema name for the queue and scheduled message tables /// public string TransportSchemaName { get; private set; } = "dbo"; - + /// /// Schema name for the message storage tables /// @@ -73,12 +76,22 @@ public override async ValueTask ConnectAsync(IWolverineRuntime runtime) { AutoProvision = AutoProvision || runtime.Options.AutoBuildMessageStorageOnStartup != AutoCreate.None; - var storage = await runtime.TryFindMainMessageStore(); - - Storage = storage ?? throw new InvalidOperationException( - "The Sql Server Transport can only be used if the message persistence is also Sql Server backed"); + if (runtime.Storage is SqlServerMessageStore store) + { + Storage = store; + } + else if (runtime.Storage is MultiTenantedMessageStore mt && mt.Main is SqlServerMessageStore s) + { + Storage = s; + Databases = mt; + } + else + { + throw new InvalidOperationException( + "The Sql Server Transport can only be used if the message persistence is also Sql Server backed"); + } - Settings = storage.Settings; + Settings = Storage.Settings; // This is de facto a little environment test await using var conn = new SqlConnection(Settings.ConnectionString); @@ -90,12 +103,13 @@ public override async ValueTask ConnectAsync(IWolverineRuntime runtime) internal SqlServerMessageStore Storage { get; set; } + internal MultiTenantedMessageStore? Databases { get; set; } + public override IEnumerable DiagnosticColumns() { yield return new PropertyColumn("Name"); yield return new PropertyColumn("Count", Justify.Right); yield return new PropertyColumn("Scheduled", Justify.Right); - } public async Task SystemTimeAsync() @@ -105,4 +119,4 @@ public async Task SystemTimeAsync() return (DateTimeOffset)await conn.CreateCommand("select SYSDATETIMEOFFSET()").ExecuteScalarAsync(); } -} \ No newline at end of file +} diff --git a/src/Testing/Wolverine.ComplianceTests/Compliance/TransportCompliance.cs b/src/Testing/Wolverine.ComplianceTests/Compliance/TransportCompliance.cs index 81b5f2238..cb0d1218b 100644 --- a/src/Testing/Wolverine.ComplianceTests/Compliance/TransportCompliance.cs +++ b/src/Testing/Wolverine.ComplianceTests/Compliance/TransportCompliance.cs @@ -114,6 +114,7 @@ public async Task ReceiverIs(Action configure) Receiver = await Host.CreateDefaultBuilder() .UseWolverine(opts => { + opts.Durability.Mode = DurabilityMode.Solo; configure(opts); configureReceiver(opts); }).StartAsync(); diff --git a/src/Wolverine/TestingExtensions.cs b/src/Wolverine/TestingExtensions.cs index 990c4dc05..5910267bc 100644 --- a/src/Wolverine/TestingExtensions.cs +++ b/src/Wolverine/TestingExtensions.cs @@ -353,7 +353,7 @@ private async Task writePersistedActualsAsync() public bool HasReached() { Func filter = AgentScheme.IsEmpty() - ? x => !x.Scheme.StartsWith("wolverine") + ? x => !x.Scheme.StartsWith("wolverine") && !x.Scheme.EqualsIgnoreCase("simple") : x => x.Scheme.EqualsIgnoreCase(AgentScheme); foreach (var pair in AgentCountByHost)