From 14b117419b7c42031f4a4fe80925c3bf3d6ffcc6 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 9 Feb 2026 07:53:14 -0600 Subject: [PATCH 1/2] Add regression tests for multi-tenant durability agents. Closes GH-2085 Verify that scheduled messages and durability agents work correctly for non-master tenant databases in multi-tenant PostgreSQL setups. Co-Authored-By: Claude Opus 4.6 --- .../multi_tenant_durability_agents.cs | 209 ++++++++++++++++++ 1 file changed, 209 insertions(+) create mode 100644 src/Persistence/PostgresqlTests/MultiTenancy/multi_tenant_durability_agents.cs diff --git a/src/Persistence/PostgresqlTests/MultiTenancy/multi_tenant_durability_agents.cs b/src/Persistence/PostgresqlTests/MultiTenancy/multi_tenant_durability_agents.cs new file mode 100644 index 000000000..ed0a8b2f9 --- /dev/null +++ b/src/Persistence/PostgresqlTests/MultiTenancy/multi_tenant_durability_agents.cs @@ -0,0 +1,209 @@ +using System.Collections.Concurrent; +using IntegrationTests; +using JasperFx.Core; +using JasperFx.Resources; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Npgsql; +using Shouldly; +using Weasel.Postgresql; +using Weasel.Postgresql.Migrations; +using Wolverine; +using Wolverine.Persistence; +using Wolverine.Postgresql; +using Wolverine.Runtime; +using Xunit.Abstractions; + +namespace PostgresqlTests.MultiTenancy; + +public class multi_tenant_durability_agents : PostgresqlContext, IAsyncLifetime +{ + private readonly ITestOutputHelper _output; + private IHost theHost; + private string tenant1ConnectionString; + private string tenant2ConnectionString; + private TenantMessageTracker theTracker = new(); + + public multi_tenant_durability_agents(ITestOutputHelper output) + { + _output = output; + } + + public async Task InitializeAsync() + { + await using var conn = new NpgsqlConnection(Servers.PostgresConnectionString); + await conn.OpenAsync(); + + tenant1ConnectionString = await CreateDatabaseIfNotExists(conn, "db1"); + tenant2ConnectionString = await CreateDatabaseIfNotExists(conn, "db2"); + + await conn.CloseAsync(); + + theHost = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Durability.Mode = DurabilityMode.Solo; + opts.Durability.ScheduledJobPollingTime = 1.Seconds(); + opts.Durability.ScheduledJobFirstExecution = 500.Milliseconds(); + + opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "mt_durability") + .RegisterStaticTenants(tenants => + { + tenants.Register("red", tenant1ConnectionString); + tenants.Register("blue", tenant2ConnectionString); + }); + + opts.Services.AddResourceSetupOnStartup(); + opts.Services.AddSingleton(theTracker); + + opts.Policies.UseDurableLocalQueues(); + + opts.Discovery.DisableConventionalDiscovery() + .IncludeType(); + }).StartAsync(); + } + + public async Task DisposeAsync() + { + await theHost.StopAsync(); + theHost.Dispose(); + } + + [Fact] + public async Task all_tenant_durability_agents_are_started() + { + // Should have 3 durability agents: main + red + blue + var result = await theHost.WaitUntilAssignmentsChangeTo(w => + { + w.AgentScheme = PersistenceConstants.AgentScheme; + w.ExpectRunningAgents(theHost, 3); + }, 30.Seconds()); + + result.ShouldBeTrue("Expected 3 durability agents (main + 2 tenants) to be running"); + + var agents = theHost.RunningAgents() + .Where(u => u.Scheme == PersistenceConstants.AgentScheme) + .ToArray(); + + _output.WriteLine("Running durability agents:"); + foreach (var agent in agents) + { + _output.WriteLine($" {agent}"); + } + + agents.Length.ShouldBe(3); + } + + [Fact] + public async Task scheduled_message_for_tenant_is_processed() + { + // Wait for durability agents to start + await theHost.WaitUntilAssignmentsChangeTo(w => + { + w.AgentScheme = PersistenceConstants.AgentScheme; + w.ExpectRunningAgents(theHost, 3); + }, 30.Seconds()); + + var messageId = Guid.NewGuid(); + + // Schedule a message for the "red" tenant with a short delay + var bus = theHost.Services.GetRequiredService(); + await bus.ScheduleAsync(new TenantScheduledMessage(messageId), 2.Seconds(), + new DeliveryOptions { TenantId = "red" }); + + _output.WriteLine($"Scheduled message {messageId} for tenant 'red'"); + + // Poll until handled + var handled = await Poll(30.Seconds(), () => + theTracker.Received.Any(r => r.Id == messageId)); + + handled.ShouldBeTrue($"Scheduled message {messageId} for tenant 'red' was not handled within timeout"); + + var received = theTracker.Received.First(r => r.Id == messageId); + received.TenantId.ShouldBe("red"); + + _output.WriteLine($"Message {messageId} handled with tenant '{received.TenantId}'"); + } + + [Fact] + public async Task scheduled_messages_for_all_tenants_are_processed() + { + // Wait for durability agents to start + await theHost.WaitUntilAssignmentsChangeTo(w => + { + w.AgentScheme = PersistenceConstants.AgentScheme; + w.ExpectRunningAgents(theHost, 3); + }, 30.Seconds()); + + var redId = Guid.NewGuid(); + var blueId = Guid.NewGuid(); + var mainId = Guid.NewGuid(); + + var bus = theHost.Services.GetRequiredService(); + + // Schedule messages for each tenant and the main database + await bus.ScheduleAsync(new TenantScheduledMessage(redId), 2.Seconds(), + new DeliveryOptions { TenantId = "red" }); + await bus.ScheduleAsync(new TenantScheduledMessage(blueId), 2.Seconds(), + new DeliveryOptions { TenantId = "blue" }); + await bus.ScheduleAsync(new TenantScheduledMessage(mainId), 2.Seconds()); + + _output.WriteLine($"Scheduled 3 messages: red={redId}, blue={blueId}, main={mainId}"); + + // Wait for all 3 to arrive + var allHandled = await Poll(30.Seconds(), () => + theTracker.Received.Any(r => r.Id == redId) && + theTracker.Received.Any(r => r.Id == blueId) && + theTracker.Received.Any(r => r.Id == mainId)); + + allHandled.ShouldBeTrue("Not all scheduled messages were handled within timeout"); + + theTracker.Received.First(r => r.Id == redId).TenantId.ShouldBe("red"); + theTracker.Received.First(r => r.Id == blueId).TenantId.ShouldBe("blue"); + + // Main database message should have null or empty tenant id + var mainReceived = theTracker.Received.First(r => r.Id == mainId); + _output.WriteLine($"Main message tenant: '{mainReceived.TenantId}'"); + } + + private static async Task Poll(TimeSpan timeout, Func condition) + { + var cts = new CancellationTokenSource(timeout); + while (!cts.IsCancellationRequested) + { + if (condition()) return true; + await Task.Delay(250.Milliseconds()); + } + + return condition(); + } + + private async Task CreateDatabaseIfNotExists(NpgsqlConnection conn, string databaseName) + { + var builder = new NpgsqlConnectionStringBuilder(Servers.PostgresConnectionString); + + var exists = await conn.DatabaseExists(databaseName); + if (!exists) + { + await new DatabaseSpecification().BuildDatabase(conn, databaseName); + } + + builder.Database = databaseName; + return builder.ConnectionString; + } +} + +public record TenantScheduledMessage(Guid Id); + +public class TenantMessageTracker +{ + public ConcurrentBag<(Guid Id, string? TenantId)> Received { get; } = new(); +} + +public class TenantScheduledMessageHandler +{ + public static void Handle(TenantScheduledMessage message, Envelope envelope, TenantMessageTracker tracker) + { + tracker.Received.Add((message.Id, envelope.TenantId)); + } +} From 129bc2153a143f13cc70b2c1e88ebe34b11542d7 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 9 Feb 2026 08:28:36 -0600 Subject: [PATCH 2/2] cleaning up after Claude --- .../MultiTenancy/multi_tenant_durability_agents.cs | 4 ++-- src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Persistence/PostgresqlTests/MultiTenancy/multi_tenant_durability_agents.cs b/src/Persistence/PostgresqlTests/MultiTenancy/multi_tenant_durability_agents.cs index ed0a8b2f9..6fb3cef0b 100644 --- a/src/Persistence/PostgresqlTests/MultiTenancy/multi_tenant_durability_agents.cs +++ b/src/Persistence/PostgresqlTests/MultiTenancy/multi_tenant_durability_agents.cs @@ -107,7 +107,7 @@ await theHost.WaitUntilAssignmentsChangeTo(w => var messageId = Guid.NewGuid(); // Schedule a message for the "red" tenant with a short delay - var bus = theHost.Services.GetRequiredService(); + var bus = theHost.MessageBus(); await bus.ScheduleAsync(new TenantScheduledMessage(messageId), 2.Seconds(), new DeliveryOptions { TenantId = "red" }); @@ -139,7 +139,7 @@ await theHost.WaitUntilAssignmentsChangeTo(w => var blueId = Guid.NewGuid(); var mainId = Guid.NewGuid(); - var bus = theHost.Services.GetRequiredService(); + var bus = theHost.MessageBus(); // Schedule messages for each tenant and the main database await bus.ScheduleAsync(new TenantScheduledMessage(redId), 2.Seconds(), diff --git a/src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj b/src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj index 4524b73b0..03bfdcda9 100644 --- a/src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj +++ b/src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj @@ -12,7 +12,7 @@ - +