diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_2361_outbox_stuck_with_tenanted_broker.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_2361_outbox_stuck_with_tenanted_broker.cs new file mode 100644 index 000000000..9149c988b --- /dev/null +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_2361_outbox_stuck_with_tenanted_broker.cs @@ -0,0 +1,137 @@ +using System.Net; +using IntegrationTests; +using JasperFx.Core; +using JasperFx.Resources; +using Marten; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Npgsql; +using Shouldly; +using Wolverine.Marten; +using Wolverine.Tracking; +using Wolverine.Transports.Sending; +using Xunit; +using Xunit.Abstractions; + +namespace Wolverine.RabbitMQ.Tests.Bugs; + +/// +/// Reproduces https://github.com/JasperFx/wolverine/issues/2361 +/// When using multi-tenant RabbitMQ with durable outbox, messages sent to +/// a non-default broker get stuck in the outbox and are re-delivered every +/// 5 seconds by the durability agent, causing duplicate envelope exceptions. +/// +/// Root cause: TenantedSender implemented ISenderRequiresCallback, which caused +/// SendingAgent to use sendWithCallbackHandlingAsync (assumes the inner sender +/// calls back on success). But RabbitMqSender does NOT implement ISenderRequiresCallback, +/// so MarkSuccessfulAsync was never called and the outbox entry was never deleted. +/// +public class Bug_2361_outbox_stuck_with_tenanted_broker +{ + private readonly ITestOutputHelper _output; + + public Bug_2361_outbox_stuck_with_tenanted_broker(ITestOutputHelper output) + { + _output = output; + } + + [Fact] + public async Task messages_sent_to_tenanted_broker_should_be_removed_from_outbox() + { + // Create a virtual host for the tenant + await declareVirtualHost("bug2361"); + + var queueName = $"bug2361_{Guid.NewGuid():N}"; + + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.ServiceName = "Bug2361Sender"; + opts.Durability.Mode = DurabilityMode.Solo; + + opts.Services.AddMarten(Servers.PostgresConnectionString) + .IntegrateWithWolverine(x => x.MessageStorageSchemaName = "bug2361"); + + opts.Services.AddResourceSetupOnStartup(); + + // Set up multi-tenant RabbitMQ: default + "tenant1" on a different virtual host + opts.UseRabbitMq() + .AutoProvision() + .AutoPurgeOnStartup() + .DisableDeadLetterQueueing() + .AddTenant("tenant1", "bug2361") + .TenantIdBehavior(TenantedIdBehavior.FallbackToDefault); + + opts.Policies.DisableConventionalLocalRouting(); + + // Publish to a specific queue with durable outbox + opts.PublishAllMessages() + .ToRabbitQueue(queueName) + .UseDurableOutbox(); + + // Listen on the tenant's queue + opts.ListenToRabbitQueue(queueName); + }).StartAsync(); + + // Clean up any stale outbox data from previous runs + await host.ResetResourceState(); + + // Send a message targeted at the tenant + var session = await host + .TrackActivity() + .Timeout(30.Seconds()) + .IncludeExternalTransports() + .ExecuteAndWaitAsync((Func)(async bus => + { + await bus.PublishAsync(new Bug2361Message("Hello from tenant"), + new DeliveryOptions { TenantId = "tenant1" }); + })); + + // The message should have been received + session.Received.SingleMessage() + .ShouldNotBeNull(); + + // Wait for async outbox cleanup to complete + await Task.Delay(3.Seconds()); + + // Verify the outbox is empty - no stuck messages + await using var conn = new NpgsqlConnection(Servers.PostgresConnectionString); + await conn.OpenAsync(); + + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "SELECT count(*) FROM bug2361.wolverine_outgoing_envelopes"; + var stuckCount = (long)(await cmd.ExecuteScalarAsync())!; + + _output.WriteLine($"Outbox messages remaining: {stuckCount}"); + stuckCount.ShouldBe(0, "Messages should not be stuck in the outbox after successful send to tenanted broker"); + } + + private static async Task declareVirtualHost(string vhname) + { + var credentials = new NetworkCredential("guest", "guest"); + using var handler = new HttpClientHandler { Credentials = credentials }; + using var client = new HttpClient(handler); + + var request = new HttpRequestMessage(HttpMethod.Put, $"http://localhost:15672/api/vhosts/{vhname}"); + await client.SendAsync(request); + + // Grant permissions + var permRequest = new HttpRequestMessage(HttpMethod.Put, + $"http://localhost:15672/api/permissions/{vhname}/guest"); + permRequest.Content = new StringContent( + """{"configure":".*","write":".*","read":".*"}""", + System.Text.Encoding.UTF8, + "application/json"); + await client.SendAsync(permRequest); + } +} + +public record Bug2361Message(string Text); + +public static class Bug2361MessageHandler +{ + public static void Handle(Bug2361Message message) + { + // Simple handler - just receives the message + } +} diff --git a/src/Wolverine/Transports/Sending/TenantedSender.cs b/src/Wolverine/Transports/Sending/TenantedSender.cs index c7c770401..869a15abf 100644 --- a/src/Wolverine/Transports/Sending/TenantedSender.cs +++ b/src/Wolverine/Transports/Sending/TenantedSender.cs @@ -46,7 +46,19 @@ public ValueTask SendAsync(Envelope envelope) } } -public class TenantedSender : ISender, ISenderRequiresCallback, IAsyncDisposable +/// +/// Routes messages to tenant-specific senders based on envelope TenantId. +/// +/// IMPORTANT: This class intentionally does NOT implement ISenderRequiresCallback. +/// When it did, SendingAgent chose the sendWithCallbackHandlingAsync path which +/// assumes the inner sender calls back on success. But transport senders like +/// RabbitMqSender are simple fire-and-forget (not ISenderRequiresCallback), so +/// MarkSuccessfulAsync was never called and outbox entries were never deleted. +/// Without ISenderRequiresCallback, SendingAgent uses sendWithExplicitHandlingAsync +/// which explicitly calls MarkSuccessfulAsync after a successful send. +/// See https://github.com/JasperFx/wolverine/issues/2361 +/// +public class TenantedSender : ISender, IDisposable, IAsyncDisposable { public TenantedIdBehavior TenantedIdBehavior { get; } private readonly ISender _defaultSender = null!; @@ -70,22 +82,6 @@ public void RegisterSender(string tenantId, ISender sender) _senders = _senders.AddOrUpdate(tenantId, sender); } - public void RegisterCallback(ISenderCallback senderCallback) - { - if (_defaultSender is ISenderRequiresCallback defaultCallback) - { - defaultCallback.RegisterCallback(senderCallback); - } - - foreach (var entry in _senders.Enumerate()) - { - if (entry.Value is ISenderRequiresCallback tenantCallback) - { - tenantCallback.RegisterCallback(senderCallback); - } - } - } - public bool SupportsNativeScheduledSend => _defaultSender.SupportsNativeScheduledSend; public Uri Destination { get; } public async Task PingAsync() @@ -128,10 +124,10 @@ private ISender senderForTenantId(string tenantId) case TenantedIdBehavior.FallbackToDefault: _senders = _senders.AddOrUpdate(tenantId, _defaultSender); return _defaultSender; - + case TenantedIdBehavior.IgnoreUnknownTenants: return new NullSender(Destination); - + case TenantedIdBehavior.TenantIdRequired: var invalid = new InvalidTenantSender(Destination, tenantId); _senders = _senders.AddOrUpdate(tenantId, invalid); @@ -143,16 +139,16 @@ private ISender senderForTenantId(string tenantId) public void Dispose() { - if (_defaultSender is ISenderRequiresCallback defaultDisposable) + if (_defaultSender is IDisposable defaultDisposable) { - defaultDisposable.Dispose(); + defaultDisposable.SafeDispose(); } foreach (var entry in _senders.Enumerate()) { - if (entry.Value is ISenderRequiresCallback tenantDisposable) + if (entry.Value is IDisposable tenantDisposable) { - tenantDisposable.Dispose(); + tenantDisposable.SafeDispose(); } } }