diff --git a/src/Persistence/Wolverine.Marten/FlushOutgoingMessagesOnCommit.cs b/src/Persistence/Wolverine.Marten/FlushOutgoingMessagesOnCommit.cs index 16d7236a2..54360e364 100644 --- a/src/Persistence/Wolverine.Marten/FlushOutgoingMessagesOnCommit.cs +++ b/src/Persistence/Wolverine.Marten/FlushOutgoingMessagesOnCommit.cs @@ -1,6 +1,7 @@ using Marten; using Marten.Services; using Wolverine.Marten.Persistence.Operations; +using Wolverine.Persistence.Durability; using Wolverine.Postgresql; using Wolverine.RDBMS; using Wolverine.Runtime; @@ -25,27 +26,36 @@ public override Task BeforeSaveChangesAsync(IDocumentSession session, Cancellati { return Task.CompletedTask; } - + // Mark as handled! if (_context.Envelope.Destination != null) { if (_context.Envelope.WasPersistedInInbox) { + // GH-2155: When using ancillary stores (e.g., [MartenStore]), the incoming + // envelope was persisted in the main store by DurableReceiver. We should only + // mark it as handled within this Marten session if the session's store is + // the same store. Otherwise, let DurableReceiver handle it via the main store. + if (_context.Envelope.Store == null && _messageStore.Role == MessageStoreRole.Ancillary) + { + return Task.CompletedTask; + } + var keepUntil = DateTimeOffset.UtcNow.Add(_context.Runtime.Options.Durability.KeepAfterMessageHandling); session.QueueSqlCommand($"update {_messageStore.IncomingFullName} set {DatabaseConstants.Status} = '{EnvelopeStatus.Handled}', {DatabaseConstants.KeepUntil} = ? where id = ?", keepUntil, _context.Envelope.Id); _context.Envelope.Status = EnvelopeStatus.Handled; } - - // This was buggy in real usage. + + // This was buggy in real usage. // else // { // var envelope = Envelope.ForPersistedHandled(_context.Envelope, DateTimeOffset.UtcNow, _context.Runtime.Options.Durability); // session.QueueOperation(new StoreIncomingEnvelope(_messageStore.IncomingFullName, envelope)); // } - - + + } - + return Task.CompletedTask; } diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_2155_ancillary_store_inbox_persistence.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_2155_ancillary_store_inbox_persistence.cs new file mode 100644 index 000000000..83092bd98 --- /dev/null +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_2155_ancillary_store_inbox_persistence.cs @@ -0,0 +1,109 @@ +using IntegrationTests; +using JasperFx.Core; +using JasperFx.Resources; +using Marten; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.Attributes; +using Wolverine.Marten; +using Wolverine.Persistence.Durability; +using Wolverine.Runtime; +using Wolverine.Util; +using Wolverine.Tracking; +using Xunit; + +namespace Wolverine.RabbitMQ.Tests.Bugs; + +public interface IAncillaryStore2155 : IDocumentStore; + +public record AncillaryMessage2155(Guid Id); + +[MartenStore(typeof(IAncillaryStore2155))] +public static class AncillaryMessage2155Handler +{ + [Transactional] + public static void Handle(AncillaryMessage2155 message, IDocumentSession session) + { + // Just touch the session so the transactional middleware commits + session.Store(new AncillaryDoc2155 { Id = message.Id }); + } +} + +public class AncillaryDoc2155 +{ + public Guid Id { get; set; } +} + +public class Bug_2155_ancillary_store_inbox_persistence : IAsyncLifetime +{ + private IHost _host; + private string _queueName; + + public async Task InitializeAsync() + { + _queueName = RabbitTesting.NextQueueName(); + + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Durability.Mode = DurabilityMode.Solo; + + // Main Marten store + opts.Services.AddMarten(m => + { + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "bug2155_main"; + m.DisableNpgsqlLogging = true; + }).IntegrateWithWolverine(x => x.MessageStorageSchemaName = "bug2155_main"); + + // Ancillary Marten store on same database but different schema + opts.Services.AddMartenStore(m => + { + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "bug2155_ancillary"; + m.DisableNpgsqlLogging = true; + }).IntegrateWithWolverine(x => x.SchemaName = "bug2155_ancillary"); + + opts.UseRabbitMq().AutoProvision().AutoPurgeOnStartup(); + + opts.PublishMessage().ToRabbitQueue(_queueName).UseDurableOutbox(); + opts.ListenToRabbitQueue(_queueName).UseDurableInbox(); + + opts.Policies.AutoApplyTransactions(); + + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); + + await _host.ResetResourceState(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } + + [Fact] + public async Task incoming_envelope_should_be_marked_as_handled_in_main_store() + { + var message = new AncillaryMessage2155(Guid.NewGuid()); + + await _host + .TrackActivity() + .IncludeExternalTransports() + .SendMessageAndWaitAsync(message); + + // Give a moment for the async mark-as-handled to complete + await Task.Delay(500); + + // The main store should have the envelope marked as Handled (not stuck as Incoming) + var runtime = _host.GetRuntime(); + var incoming = await runtime.Storage.Admin.AllIncomingAsync(); + var stuck = incoming.Where(x => + x.MessageType == typeof(AncillaryMessage2155).ToMessageTypeName() + && x.Status == EnvelopeStatus.Incoming).ToList(); + + stuck.ShouldBeEmpty("Incoming envelopes should not be stuck in 'Incoming' status in the main store"); + } +}