diff --git a/src/Persistence/MartenTests/Bugs/Bug_2382_ancillary_store_inbox.cs b/src/Persistence/MartenTests/Bugs/Bug_2382_ancillary_store_inbox.cs new file mode 100644 index 000000000..c63b85351 --- /dev/null +++ b/src/Persistence/MartenTests/Bugs/Bug_2382_ancillary_store_inbox.cs @@ -0,0 +1,211 @@ +using IntegrationTests; +using JasperFx.Core; +using JasperFx.Resources; +using Marten; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine; +using Wolverine.Attributes; +using Wolverine.Marten; +using Wolverine.Persistence; +using Wolverine.Persistence.Durability; +using Wolverine.Runtime; +using Wolverine.Tracking; +using Wolverine.Util; +using Xunit; + +namespace MartenTests.Bugs; + +#region Test Infrastructure + +public interface IAncillaryStore2382 : IDocumentStore; + +// Message handled by ancillary store +public record AncillaryMessage2382(Guid Id); + +// Message handled by main store +public record MainStoreMessage2382(Guid Id); + +// Handler targeting the ancillary store +[MartenStore(typeof(IAncillaryStore2382))] +public static class AncillaryMessage2382Handler +{ + [Transactional] + public static void Handle(AncillaryMessage2382 message, IDocumentSession session) + { + session.Store(new AncillaryDoc2382 { Id = message.Id }); + } +} + +// Handler using the main store +public static class MainStoreMessage2382Handler +{ + [Transactional] + public static void Handle(MainStoreMessage2382 message, IDocumentSession session) + { + session.Store(new MainDoc2382 { Id = message.Id }); + } +} + +public class AncillaryDoc2382 +{ + public Guid Id { get; set; } +} + +public class MainDoc2382 +{ + public Guid Id { get; set; } +} + +#endregion + +/// +/// Tests that when a handler targets an ancillary store, incoming envelopes +/// are persisted in the ancillary store's inbox (not the main store), +/// ensuring transactional atomicity between the handler's side effects and +/// the envelope status update. +/// +/// This matters when the ancillary store targets a different database. +/// +public class Bug_2382_ancillary_store_inbox : IAsyncLifetime +{ + private IHost _host = null!; + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Durability.Mode = DurabilityMode.Solo; + + // Main Marten store + opts.Services.AddMarten(m => + { + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "bug2382_main"; + m.DisableNpgsqlLogging = true; + }).IntegrateWithWolverine(x => x.MessageStorageSchemaName = "bug2382_main"); + + // Ancillary Marten store — same PostgreSQL server but different schema + // (simulates a separate database for inbox isolation) + opts.Services.AddMartenStore(m => + { + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "bug2382_ancillary"; + m.DisableNpgsqlLogging = true; + }).IntegrateWithWolverine(x => x.SchemaName = "bug2382_ancillary"); + + // Use durable local queues to exercise the DurableReceiver code path + opts.LocalQueue("ancillary").UseDurableInbox(); + opts.LocalQueue("main").UseDurableInbox(); + + opts.PublishMessage().ToLocalQueue("ancillary"); + opts.PublishMessage().ToLocalQueue("main"); + + opts.Policies.AutoApplyTransactions(); + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); + + await _host.ResetResourceState(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } + + [Fact] + public async Task ancillary_store_handler_should_persist_envelope_in_ancillary_inbox() + { + var message = new AncillaryMessage2382(Guid.NewGuid()); + + await _host + .TrackActivity() + .SendMessageAndWaitAsync(message); + + await Task.Delay(500); + + var runtime = _host.Services.GetRequiredService(); + + // The main store should NOT have any lingering incoming envelopes for this message type + var mainIncoming = await runtime.Storage.Admin.AllIncomingAsync(); + mainIncoming + .Where(e => e.MessageType == typeof(AncillaryMessage2382).ToMessageTypeName() + && e.Status == EnvelopeStatus.Incoming) + .ShouldBeEmpty("Ancillary message envelope should not be stuck as Incoming in main store"); + } + + [Fact] + public async Task main_store_handler_should_persist_envelope_in_main_inbox() + { + var message = new MainStoreMessage2382(Guid.NewGuid()); + + await _host + .TrackActivity() + .SendMessageAndWaitAsync(message); + + await Task.Delay(500); + + var runtime = _host.Services.GetRequiredService(); + + // Main store messages should be handled normally — no lingering Incoming + var mainIncoming = await runtime.Storage.Admin.AllIncomingAsync(); + mainIncoming + .Where(e => e.MessageType == typeof(MainStoreMessage2382).ToMessageTypeName() + && e.Status == EnvelopeStatus.Incoming) + .ShouldBeEmpty("Main store message envelope should not be stuck as Incoming"); + } + + [Fact] + public async Task mixed_messages_both_handlers_succeed() + { + // Send both an ancillary and main store message — both should be handled successfully + var ancillaryMessage = new AncillaryMessage2382(Guid.NewGuid()); + var mainMessage = new MainStoreMessage2382(Guid.NewGuid()); + + await _host + .TrackActivity() + .SendMessageAndWaitAsync(ancillaryMessage); + + await _host + .TrackActivity() + .SendMessageAndWaitAsync(mainMessage); + + await Task.Delay(500); + + // Verify ancillary store has the document + var ancillaryStore = _host.Services.GetRequiredService(); + await using var ancillarySession = ancillaryStore.LightweightSession(); + var ancillaryDoc = await ancillarySession.LoadAsync(ancillaryMessage.Id); + ancillaryDoc.ShouldNotBeNull(); + + // Verify main store has the document + var mainStore = _host.Services.GetRequiredService(); + await using var mainSession = mainStore.LightweightSession(); + var mainDoc = await mainSession.LoadAsync(mainMessage.Id); + mainDoc.ShouldNotBeNull(); + + // Neither should have lingering incoming envelopes + var runtime = _host.Services.GetRequiredService(); + var incoming = await runtime.Storage.Admin.AllIncomingAsync(); + incoming.Where(e => e.Status == EnvelopeStatus.Incoming).ShouldBeEmpty(); + } + + [Fact] + public async Task handler_side_effects_committed_to_ancillary_store() + { + var message = new AncillaryMessage2382(Guid.NewGuid()); + + await _host + .TrackActivity() + .SendMessageAndWaitAsync(message); + + // Verify the document was stored in the ancillary store + var ancillaryStore = _host.Services.GetRequiredService(); + await using var session = ancillaryStore.LightweightSession(); + var doc = await session.LoadAsync(message.Id); + doc.ShouldNotBeNull(); + } +} diff --git a/src/Persistence/Wolverine.Marten/FlushOutgoingMessagesOnCommit.cs b/src/Persistence/Wolverine.Marten/FlushOutgoingMessagesOnCommit.cs index 54360e364..8754b90a0 100644 --- a/src/Persistence/Wolverine.Marten/FlushOutgoingMessagesOnCommit.cs +++ b/src/Persistence/Wolverine.Marten/FlushOutgoingMessagesOnCommit.cs @@ -32,17 +32,38 @@ public override Task BeforeSaveChangesAsync(IDocumentSession session, Cancellati { if (_context.Envelope.WasPersistedInInbox) { - // GH-2155: When using ancillary stores (e.g., [MartenStore]), the incoming - // envelope was persisted in the main store by DurableReceiver. We should only - // mark it as handled within this Marten session if the session's store is - // the same store. Otherwise, let DurableReceiver handle it via the main store. - if (_context.Envelope.Store == null && _messageStore.Role == MessageStoreRole.Ancillary) + // Determine which incoming table to update. The envelope may have been + // persisted in the ancillary store (if on a different database) or the + // main store (default). We need to update the correct table. + var incomingTableName = _messageStore.IncomingFullName; + + if (_messageStore.Role == MessageStoreRole.Ancillary) { - return Task.CompletedTask; + if (_context.Envelope.Store is PostgresqlMessageStore envelopeStore) + { + // Envelope was routed to a specific store (possibly this one) + incomingTableName = envelopeStore.IncomingFullName; + } + else if (_context.Envelope.Store == null) + { + // GH-2382: Envelope was persisted in the main store. If we're on the + // same database, we can still update it within this transaction. + if (_context.Runtime.Storage is PostgresqlMessageStore mainStore + && mainStore.Uri == _messageStore.Uri) + { + incomingTableName = mainStore.IncomingFullName; + } + else + { + // Different database — can't update cross-database in one transaction. + // Let DurableReceiver handle it via the main store. + return Task.CompletedTask; + } + } } var keepUntil = DateTimeOffset.UtcNow.Add(_context.Runtime.Options.Durability.KeepAfterMessageHandling); - session.QueueSqlCommand($"update {_messageStore.IncomingFullName} set {DatabaseConstants.Status} = '{EnvelopeStatus.Handled}', {DatabaseConstants.KeepUntil} = ? where id = ?", keepUntil, _context.Envelope.Id); + session.QueueSqlCommand($"update {incomingTableName} set {DatabaseConstants.Status} = '{EnvelopeStatus.Handled}', {DatabaseConstants.KeepUntil} = ? where id = ?", keepUntil, _context.Envelope.Id); _context.Envelope.Status = EnvelopeStatus.Handled; } diff --git a/src/Persistence/Wolverine.Marten/MartenStoreAttribute.cs b/src/Persistence/Wolverine.Marten/MartenStoreAttribute.cs index 2dd5b7d38..ffd5765f2 100644 --- a/src/Persistence/Wolverine.Marten/MartenStoreAttribute.cs +++ b/src/Persistence/Wolverine.Marten/MartenStoreAttribute.cs @@ -19,6 +19,7 @@ public MartenStoreAttribute(Type storeType) public override void Modify(IChain chain, GenerationRules rules, IServiceContainer container) { + chain.AncillaryStoreType = StoreType; chain.Middleware.Insert(0, new AncillaryOutboxFactoryFrame(StoreType)); } } \ No newline at end of file diff --git a/src/Wolverine/Configuration/Chain.cs b/src/Wolverine/Configuration/Chain.cs index 6cab70e4f..2f3660005 100644 --- a/src/Wolverine/Configuration/Chain.cs +++ b/src/Wolverine/Configuration/Chain.cs @@ -43,6 +43,9 @@ public abstract class Chain : IChain public abstract string Description { get; } public List AuditedMembers { get; } = []; + /// + public Type? AncillaryStoreType { get; set; } + public abstract bool TryInferMessageIdentity(out PropertyInfo? property); /// diff --git a/src/Wolverine/Configuration/IChain.cs b/src/Wolverine/Configuration/IChain.cs index ce9070b3c..8cdfa2fe6 100644 --- a/src/Wolverine/Configuration/IChain.cs +++ b/src/Wolverine/Configuration/IChain.cs @@ -63,6 +63,13 @@ public interface IChain List AuditedMembers { get; } Dictionary Tags { get; } + /// + /// When set, indicates that this handler chain targets an ancillary message store + /// identified by this marker type (e.g., IAncillaryStore). This is used to route + /// incoming durable inbox envelopes to the correct store for transactional atomicity. + /// + Type? AncillaryStoreType { get; set; } + /// /// Strategy for dealing with any return values from the handler methods /// diff --git a/src/Wolverine/Persistence/MessageStoreCollection.cs b/src/Wolverine/Persistence/MessageStoreCollection.cs index 9a2ae0877..eefd3f70d 100644 --- a/src/Wolverine/Persistence/MessageStoreCollection.cs +++ b/src/Wolverine/Persistence/MessageStoreCollection.cs @@ -458,6 +458,32 @@ public bool HasAncillaryStoreFor(Type applicationType) { return _ancillaryStores.Contains(applicationType); } + + private ImHashMap _messageTypeToAncillaryStore = ImHashMap.Empty; + + /// + /// Register a mapping from a message type name to an ancillary store. + /// Used so that DurableReceiver can persist incoming envelopes in the + /// correct ancillary store when the handler targets a different database. + /// + internal void MapMessageTypeToAncillaryStore(string messageTypeName, Type ancillaryMarkerType) + { + if (_ancillaryStores.TryFind(ancillaryMarkerType, out var store)) + { + _messageTypeToAncillaryStore = _messageTypeToAncillaryStore.AddOrUpdate(messageTypeName, store); + } + } + + /// + /// Try to find the ancillary store that should be used to persist an incoming + /// envelope based on the handler's [MartenStore] attribute. Returns null if + /// the message type's handler uses the main store. + /// + public IMessageStore? TryFindAncillaryStoreForMessageType(string? messageTypeName) + { + if (messageTypeName == null) return null; + return _messageTypeToAncillaryStore.TryFind(messageTypeName, out var store) ? store : null; + } } public class InvalidWolverineStorageConfigurationException : Exception diff --git a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs index b53e564bd..ac1eb1187 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs @@ -11,6 +11,7 @@ using Wolverine.Runtime.WorkerQueues; using Wolverine.Transports; using Wolverine.Transports.Local; +using Wolverine.Util; namespace Wolverine.Runtime; @@ -308,6 +309,18 @@ private async Task startMessagingTransportsAsync() } } + // Build message-type-to-ancillary-store mapping for durable inbox routing. + // When a handler targets an ancillary store on a different database, incoming + // envelopes should be persisted in that store for transactional atomicity. + if (Stores != null && Stores.HasAnyAncillaryStores()) + { + foreach (var chain in Handlers.Chains.Where(c => c.AncillaryStoreType != null)) + { + var messageTypeName = chain.MessageType.ToMessageTypeName(); + Stores.MapMessageTypeToAncillaryStore(messageTypeName, chain.AncillaryStoreType!); + } + } + // No local queues if running in Serverless if (Options.Durability.Mode == DurabilityMode.Serverless) { diff --git a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs index 83875f203..7ac1777bd 100644 --- a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs @@ -144,6 +144,30 @@ public DurableReceiver(Endpoint endpoint, IWolverineRuntime runtime, IHandlerPip public bool ShouldPersistBeforeProcessing { get; set; } + /// + /// If the handler for this message type targets an ancillary store on a + /// different database, set envelope.Store so that the DelegatingMessageInbox + /// persists it in the correct store for transactional atomicity. + /// + private void assignAncillaryStoreIfNeeded(Envelope envelope) + { + if (_runtime.Stores == null) return; + var store = _runtime.Stores.TryFindAncillaryStoreForMessageType(envelope.MessageType); + if (store != null) + { + envelope.Store = store; + } + } + + private void assignAncillaryStoreIfNeeded(IReadOnlyList envelopes) + { + if (_runtime.Stores == null) return; + foreach (var envelope in envelopes) + { + assignAncillaryStoreIfNeeded(envelope); + } + } + public async ValueTask DisposeAsync() { await _receiver.WaitForCompletionAsync(); @@ -382,6 +406,7 @@ private async Task receiveOneAsync(Envelope envelope) await executeWithRetriesAsync(async () => { envelope.OwnerId = TransportConstants.AnyNode; + assignAncillaryStoreIfNeeded(envelope); try { await _inbox.StoreIncomingAsync(envelope); @@ -448,6 +473,7 @@ await executeWithRetriesAsync(async () => } envelope.OwnerId = _settings.AssignedNodeNumber; + assignAncillaryStoreIfNeeded(envelope); await _inbox.StoreIncomingAsync(envelope); envelope.WasPersistedInInbox = true; } @@ -529,6 +555,7 @@ public async ValueTask ProcessReceivedMessagesAsync(DateTimeOffset now, IListene { try { + assignAncillaryStoreIfNeeded(envelopes); await _inbox.StoreIncomingAsync(envelopes); foreach (var envelope in envelopes) {