From 153d42b9cdb62d285010c4b29b068bed46793633 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 31 Mar 2026 10:57:16 -0500 Subject: [PATCH] Fix durable inbox to mark envelopes handled atomically with ancillary stores When a handler targets an ancillary MartenStore, the incoming envelope was persisted in the main store's inbox but the handler's transaction committed against the ancillary store. The envelope status update was either skipped or targeted the wrong schema, leaving envelopes stuck as Incoming. Changes: - IChain.AncillaryStoreType property tracks which ancillary store a handler chain targets (set by MartenStoreAttribute.Modify) - MessageStoreCollection memoizes message-type-to-ancillary-store mapping at startup for fast lookup - DurableReceiver sets envelope.Store before StoreIncomingAsync so DelegatingMessageInbox routes to the correct store - FlushOutgoingMessagesOnCommit resolves the correct incoming table name for same-database ancillary stores instead of skipping the mark-as-handled update Fixes #2382 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../Bugs/Bug_2382_ancillary_store_inbox.cs | 211 ++++++++++++++++++ .../FlushOutgoingMessagesOnCommit.cs | 35 ++- .../Wolverine.Marten/MartenStoreAttribute.cs | 1 + src/Wolverine/Configuration/Chain.cs | 3 + src/Wolverine/Configuration/IChain.cs | 7 + .../Persistence/MessageStoreCollection.cs | 26 +++ .../Runtime/WolverineRuntime.HostService.cs | 13 ++ .../Runtime/WorkerQueues/DurableReceiver.cs | 27 +++ 8 files changed, 316 insertions(+), 7 deletions(-) create mode 100644 src/Persistence/MartenTests/Bugs/Bug_2382_ancillary_store_inbox.cs diff --git a/src/Persistence/MartenTests/Bugs/Bug_2382_ancillary_store_inbox.cs b/src/Persistence/MartenTests/Bugs/Bug_2382_ancillary_store_inbox.cs new file mode 100644 index 000000000..c63b85351 --- /dev/null +++ b/src/Persistence/MartenTests/Bugs/Bug_2382_ancillary_store_inbox.cs @@ -0,0 +1,211 @@ +using IntegrationTests; +using JasperFx.Core; +using JasperFx.Resources; +using Marten; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine; +using Wolverine.Attributes; +using Wolverine.Marten; +using Wolverine.Persistence; +using Wolverine.Persistence.Durability; +using Wolverine.Runtime; +using Wolverine.Tracking; +using Wolverine.Util; +using Xunit; + +namespace MartenTests.Bugs; + +#region Test Infrastructure + +public interface IAncillaryStore2382 : IDocumentStore; + +// Message handled by ancillary store +public record AncillaryMessage2382(Guid Id); + +// Message handled by main store +public record MainStoreMessage2382(Guid Id); + +// Handler targeting the ancillary store +[MartenStore(typeof(IAncillaryStore2382))] +public static class AncillaryMessage2382Handler +{ + [Transactional] + public static void Handle(AncillaryMessage2382 message, IDocumentSession session) + { + session.Store(new AncillaryDoc2382 { Id = message.Id }); + } +} + +// Handler using the main store +public static class MainStoreMessage2382Handler +{ + [Transactional] + public static void Handle(MainStoreMessage2382 message, IDocumentSession session) + { + session.Store(new MainDoc2382 { Id = message.Id }); + } +} + +public class AncillaryDoc2382 +{ + public Guid Id { get; set; } +} + +public class MainDoc2382 +{ + public Guid Id { get; set; } +} + +#endregion + +/// +/// Tests that when a handler targets an ancillary store, incoming envelopes +/// are persisted in the ancillary store's inbox (not the main store), +/// ensuring transactional atomicity between the handler's side effects and +/// the envelope status update. +/// +/// This matters when the ancillary store targets a different database. +/// +public class Bug_2382_ancillary_store_inbox : IAsyncLifetime +{ + private IHost _host = null!; + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Durability.Mode = DurabilityMode.Solo; + + // Main Marten store + opts.Services.AddMarten(m => + { + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "bug2382_main"; + m.DisableNpgsqlLogging = true; + }).IntegrateWithWolverine(x => x.MessageStorageSchemaName = "bug2382_main"); + + // Ancillary Marten store — same PostgreSQL server but different schema + // (simulates a separate database for inbox isolation) + opts.Services.AddMartenStore(m => + { + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "bug2382_ancillary"; + m.DisableNpgsqlLogging = true; + }).IntegrateWithWolverine(x => x.SchemaName = "bug2382_ancillary"); + + // Use durable local queues to exercise the DurableReceiver code path + opts.LocalQueue("ancillary").UseDurableInbox(); + opts.LocalQueue("main").UseDurableInbox(); + + opts.PublishMessage().ToLocalQueue("ancillary"); + opts.PublishMessage().ToLocalQueue("main"); + + opts.Policies.AutoApplyTransactions(); + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); + + await _host.ResetResourceState(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } + + [Fact] + public async Task ancillary_store_handler_should_persist_envelope_in_ancillary_inbox() + { + var message = new AncillaryMessage2382(Guid.NewGuid()); + + await _host + .TrackActivity() + .SendMessageAndWaitAsync(message); + + await Task.Delay(500); + + var runtime = _host.Services.GetRequiredService(); + + // The main store should NOT have any lingering incoming envelopes for this message type + var mainIncoming = await runtime.Storage.Admin.AllIncomingAsync(); + mainIncoming + .Where(e => e.MessageType == typeof(AncillaryMessage2382).ToMessageTypeName() + && e.Status == EnvelopeStatus.Incoming) + .ShouldBeEmpty("Ancillary message envelope should not be stuck as Incoming in main store"); + } + + [Fact] + public async Task main_store_handler_should_persist_envelope_in_main_inbox() + { + var message = new MainStoreMessage2382(Guid.NewGuid()); + + await _host + .TrackActivity() + .SendMessageAndWaitAsync(message); + + await Task.Delay(500); + + var runtime = _host.Services.GetRequiredService(); + + // Main store messages should be handled normally — no lingering Incoming + var mainIncoming = await runtime.Storage.Admin.AllIncomingAsync(); + mainIncoming + .Where(e => e.MessageType == typeof(MainStoreMessage2382).ToMessageTypeName() + && e.Status == EnvelopeStatus.Incoming) + .ShouldBeEmpty("Main store message envelope should not be stuck as Incoming"); + } + + [Fact] + public async Task mixed_messages_both_handlers_succeed() + { + // Send both an ancillary and main store message — both should be handled successfully + var ancillaryMessage = new AncillaryMessage2382(Guid.NewGuid()); + var mainMessage = new MainStoreMessage2382(Guid.NewGuid()); + + await _host + .TrackActivity() + .SendMessageAndWaitAsync(ancillaryMessage); + + await _host + .TrackActivity() + .SendMessageAndWaitAsync(mainMessage); + + await Task.Delay(500); + + // Verify ancillary store has the document + var ancillaryStore = _host.Services.GetRequiredService(); + await using var ancillarySession = ancillaryStore.LightweightSession(); + var ancillaryDoc = await ancillarySession.LoadAsync(ancillaryMessage.Id); + ancillaryDoc.ShouldNotBeNull(); + + // Verify main store has the document + var mainStore = _host.Services.GetRequiredService(); + await using var mainSession = mainStore.LightweightSession(); + var mainDoc = await mainSession.LoadAsync(mainMessage.Id); + mainDoc.ShouldNotBeNull(); + + // Neither should have lingering incoming envelopes + var runtime = _host.Services.GetRequiredService(); + var incoming = await runtime.Storage.Admin.AllIncomingAsync(); + incoming.Where(e => e.Status == EnvelopeStatus.Incoming).ShouldBeEmpty(); + } + + [Fact] + public async Task handler_side_effects_committed_to_ancillary_store() + { + var message = new AncillaryMessage2382(Guid.NewGuid()); + + await _host + .TrackActivity() + .SendMessageAndWaitAsync(message); + + // Verify the document was stored in the ancillary store + var ancillaryStore = _host.Services.GetRequiredService(); + await using var session = ancillaryStore.LightweightSession(); + var doc = await session.LoadAsync(message.Id); + doc.ShouldNotBeNull(); + } +} diff --git a/src/Persistence/Wolverine.Marten/FlushOutgoingMessagesOnCommit.cs b/src/Persistence/Wolverine.Marten/FlushOutgoingMessagesOnCommit.cs index 54360e364..8754b90a0 100644 --- a/src/Persistence/Wolverine.Marten/FlushOutgoingMessagesOnCommit.cs +++ b/src/Persistence/Wolverine.Marten/FlushOutgoingMessagesOnCommit.cs @@ -32,17 +32,38 @@ public override Task BeforeSaveChangesAsync(IDocumentSession session, Cancellati { if (_context.Envelope.WasPersistedInInbox) { - // GH-2155: When using ancillary stores (e.g., [MartenStore]), the incoming - // envelope was persisted in the main store by DurableReceiver. We should only - // mark it as handled within this Marten session if the session's store is - // the same store. Otherwise, let DurableReceiver handle it via the main store. - if (_context.Envelope.Store == null && _messageStore.Role == MessageStoreRole.Ancillary) + // Determine which incoming table to update. The envelope may have been + // persisted in the ancillary store (if on a different database) or the + // main store (default). We need to update the correct table. + var incomingTableName = _messageStore.IncomingFullName; + + if (_messageStore.Role == MessageStoreRole.Ancillary) { - return Task.CompletedTask; + if (_context.Envelope.Store is PostgresqlMessageStore envelopeStore) + { + // Envelope was routed to a specific store (possibly this one) + incomingTableName = envelopeStore.IncomingFullName; + } + else if (_context.Envelope.Store == null) + { + // GH-2382: Envelope was persisted in the main store. If we're on the + // same database, we can still update it within this transaction. + if (_context.Runtime.Storage is PostgresqlMessageStore mainStore + && mainStore.Uri == _messageStore.Uri) + { + incomingTableName = mainStore.IncomingFullName; + } + else + { + // Different database — can't update cross-database in one transaction. + // Let DurableReceiver handle it via the main store. + return Task.CompletedTask; + } + } } var keepUntil = DateTimeOffset.UtcNow.Add(_context.Runtime.Options.Durability.KeepAfterMessageHandling); - session.QueueSqlCommand($"update {_messageStore.IncomingFullName} set {DatabaseConstants.Status} = '{EnvelopeStatus.Handled}', {DatabaseConstants.KeepUntil} = ? where id = ?", keepUntil, _context.Envelope.Id); + session.QueueSqlCommand($"update {incomingTableName} set {DatabaseConstants.Status} = '{EnvelopeStatus.Handled}', {DatabaseConstants.KeepUntil} = ? where id = ?", keepUntil, _context.Envelope.Id); _context.Envelope.Status = EnvelopeStatus.Handled; } diff --git a/src/Persistence/Wolverine.Marten/MartenStoreAttribute.cs b/src/Persistence/Wolverine.Marten/MartenStoreAttribute.cs index 2dd5b7d38..ffd5765f2 100644 --- a/src/Persistence/Wolverine.Marten/MartenStoreAttribute.cs +++ b/src/Persistence/Wolverine.Marten/MartenStoreAttribute.cs @@ -19,6 +19,7 @@ public MartenStoreAttribute(Type storeType) public override void Modify(IChain chain, GenerationRules rules, IServiceContainer container) { + chain.AncillaryStoreType = StoreType; chain.Middleware.Insert(0, new AncillaryOutboxFactoryFrame(StoreType)); } } \ No newline at end of file diff --git a/src/Wolverine/Configuration/Chain.cs b/src/Wolverine/Configuration/Chain.cs index 6cab70e4f..2f3660005 100644 --- a/src/Wolverine/Configuration/Chain.cs +++ b/src/Wolverine/Configuration/Chain.cs @@ -43,6 +43,9 @@ public abstract class Chain : IChain public abstract string Description { get; } public List AuditedMembers { get; } = []; + /// + public Type? AncillaryStoreType { get; set; } + public abstract bool TryInferMessageIdentity(out PropertyInfo? property); /// diff --git a/src/Wolverine/Configuration/IChain.cs b/src/Wolverine/Configuration/IChain.cs index ce9070b3c..8cdfa2fe6 100644 --- a/src/Wolverine/Configuration/IChain.cs +++ b/src/Wolverine/Configuration/IChain.cs @@ -63,6 +63,13 @@ public interface IChain List AuditedMembers { get; } Dictionary Tags { get; } + /// + /// When set, indicates that this handler chain targets an ancillary message store + /// identified by this marker type (e.g., IAncillaryStore). This is used to route + /// incoming durable inbox envelopes to the correct store for transactional atomicity. + /// + Type? AncillaryStoreType { get; set; } + /// /// Strategy for dealing with any return values from the handler methods /// diff --git a/src/Wolverine/Persistence/MessageStoreCollection.cs b/src/Wolverine/Persistence/MessageStoreCollection.cs index 9a2ae0877..eefd3f70d 100644 --- a/src/Wolverine/Persistence/MessageStoreCollection.cs +++ b/src/Wolverine/Persistence/MessageStoreCollection.cs @@ -458,6 +458,32 @@ public bool HasAncillaryStoreFor(Type applicationType) { return _ancillaryStores.Contains(applicationType); } + + private ImHashMap _messageTypeToAncillaryStore = ImHashMap.Empty; + + /// + /// Register a mapping from a message type name to an ancillary store. + /// Used so that DurableReceiver can persist incoming envelopes in the + /// correct ancillary store when the handler targets a different database. + /// + internal void MapMessageTypeToAncillaryStore(string messageTypeName, Type ancillaryMarkerType) + { + if (_ancillaryStores.TryFind(ancillaryMarkerType, out var store)) + { + _messageTypeToAncillaryStore = _messageTypeToAncillaryStore.AddOrUpdate(messageTypeName, store); + } + } + + /// + /// Try to find the ancillary store that should be used to persist an incoming + /// envelope based on the handler's [MartenStore] attribute. Returns null if + /// the message type's handler uses the main store. + /// + public IMessageStore? TryFindAncillaryStoreForMessageType(string? messageTypeName) + { + if (messageTypeName == null) return null; + return _messageTypeToAncillaryStore.TryFind(messageTypeName, out var store) ? store : null; + } } public class InvalidWolverineStorageConfigurationException : Exception diff --git a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs index b53e564bd..ac1eb1187 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs @@ -11,6 +11,7 @@ using Wolverine.Runtime.WorkerQueues; using Wolverine.Transports; using Wolverine.Transports.Local; +using Wolverine.Util; namespace Wolverine.Runtime; @@ -308,6 +309,18 @@ private async Task startMessagingTransportsAsync() } } + // Build message-type-to-ancillary-store mapping for durable inbox routing. + // When a handler targets an ancillary store on a different database, incoming + // envelopes should be persisted in that store for transactional atomicity. + if (Stores != null && Stores.HasAnyAncillaryStores()) + { + foreach (var chain in Handlers.Chains.Where(c => c.AncillaryStoreType != null)) + { + var messageTypeName = chain.MessageType.ToMessageTypeName(); + Stores.MapMessageTypeToAncillaryStore(messageTypeName, chain.AncillaryStoreType!); + } + } + // No local queues if running in Serverless if (Options.Durability.Mode == DurabilityMode.Serverless) { diff --git a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs index 83875f203..7ac1777bd 100644 --- a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs @@ -144,6 +144,30 @@ public DurableReceiver(Endpoint endpoint, IWolverineRuntime runtime, IHandlerPip public bool ShouldPersistBeforeProcessing { get; set; } + /// + /// If the handler for this message type targets an ancillary store on a + /// different database, set envelope.Store so that the DelegatingMessageInbox + /// persists it in the correct store for transactional atomicity. + /// + private void assignAncillaryStoreIfNeeded(Envelope envelope) + { + if (_runtime.Stores == null) return; + var store = _runtime.Stores.TryFindAncillaryStoreForMessageType(envelope.MessageType); + if (store != null) + { + envelope.Store = store; + } + } + + private void assignAncillaryStoreIfNeeded(IReadOnlyList envelopes) + { + if (_runtime.Stores == null) return; + foreach (var envelope in envelopes) + { + assignAncillaryStoreIfNeeded(envelope); + } + } + public async ValueTask DisposeAsync() { await _receiver.WaitForCompletionAsync(); @@ -382,6 +406,7 @@ private async Task receiveOneAsync(Envelope envelope) await executeWithRetriesAsync(async () => { envelope.OwnerId = TransportConstants.AnyNode; + assignAncillaryStoreIfNeeded(envelope); try { await _inbox.StoreIncomingAsync(envelope); @@ -448,6 +473,7 @@ await executeWithRetriesAsync(async () => } envelope.OwnerId = _settings.AssignedNodeNumber; + assignAncillaryStoreIfNeeded(envelope); await _inbox.StoreIncomingAsync(envelope); envelope.WasPersistedInInbox = true; } @@ -529,6 +555,7 @@ public async ValueTask ProcessReceivedMessagesAsync(DateTimeOffset now, IListene { try { + assignAncillaryStoreIfNeeded(envelopes); await _inbox.StoreIncomingAsync(envelopes); foreach (var envelope in envelopes) {