diff --git a/Directory.Packages.props b/Directory.Packages.props index dea907a74..856526271 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -31,16 +31,16 @@ - - + + - + - + diff --git a/src/Persistence/PolecatTests/Bugs/Bug_2668_outboxed_session_listener_null_message_store.cs b/src/Persistence/PolecatTests/Bugs/Bug_2668_outboxed_session_listener_null_message_store.cs new file mode 100644 index 000000000..70dd76ee6 --- /dev/null +++ b/src/Persistence/PolecatTests/Bugs/Bug_2668_outboxed_session_listener_null_message_store.cs @@ -0,0 +1,133 @@ +using IntegrationTests; +using JasperFx; +using JasperFx.Core; +using JasperFx.Resources; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Polecat; +using Shouldly; +using Wolverine; +using Wolverine.Attributes; +using Wolverine.Polecat; +using Wolverine.Tracking; + +namespace PolecatTests.Bugs; + +/// +/// Reproducer + regression coverage for GH-2668. Before the fix, +/// +/// added a listener with +/// null! for the SqlServerMessageStore, with a comment claiming the store +/// would be set after transaction creation. No such setter ever existed (the listener's +/// field is readonly), so the listener carried null for its lifetime and +/// the first time BeforeSaveChangesAsync read _messageStore.Role it +/// NullReferenceException'd — failing every Polecat-backed Wolverine handler that +/// calls IDocumentSession.SaveChangesAsync. +/// +/// The NRE only fires for envelopes that traverse the durable inbox (where +/// Envelope.WasPersistedInInbox = true); that's why the existing PolecatTests +/// suite, which uses InvokeMessageAndWaitAsync against non-durable defaults, +/// doesn't catch it. This test wires UseDurableLocalQueues + sends via +/// SendMessageAndWaitAsync so the local queue persists the envelope to +/// wolverine_incoming_envelopes before the handler runs, which is what triggers the +/// listener's interesting branch. +/// +/// Assertion: the document the handler stores actually lands in the document store. With +/// the bug present the handler throws inside SaveChangesAsync and the +/// transaction rolls back; with the fix the document is loadable after handling +/// completes. +/// +public class Bug_2668_outboxed_session_listener_null_message_store : IAsyncLifetime +{ + private IHost _host = null!; + private IDocumentStore _store = null!; + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Discovery.IncludeType(); + + opts.Services.AddPolecat(m => + { + m.ConnectionString = Servers.SqlServerConnectionString; + m.DatabaseSchemaName = "bug2668"; + // Polecat 2.0 defaults UseNativeJsonType=true (SQL Server 2025). + // Repo docker-compose pins 2022-latest for Apple Silicon support; + // the polecat workflow overrides to 2025-latest in CI. Stay on + // string body so the test runs on either image. + m.UseNativeJsonType = false; + }).IntegrateWithWolverine(integration => + { + // Keep Wolverine's tables in their own schema so a stale ResetState + // pass doesn't fight with a different test's wolverine_* tables. + integration.MessageStorageSchemaName = "bug2668_wol"; + }); + + // Promote the local queue handling Bug2668Command to a durable + // receiver so the inbox writes the envelope before the handler runs. + // That's what flips Envelope.WasPersistedInInbox to true and exercises + // the FlushOutgoingMessagesOnCommit branch the bug lives in. + opts.Policies.UseDurableLocalQueues(); + opts.Policies.AutoApplyTransactions(); + + opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState); + }).StartAsync(); + + _store = _host.Services.GetRequiredService(); + await ((DocumentStore)_store).Database.ApplyAllConfiguredChangesToDatabaseAsync(); + + await using var session = _store.LightweightSession(); + session.DeleteWhere(x => true); + await session.SaveChangesAsync(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } + + [Fact] + public async Task handler_against_polecat_session_does_not_NRE_in_BeforeSaveChangesAsync() + { + var id = Guid.NewGuid(); + + // DoNotAssertOnExceptionsDetected: with the bug present the handler + // would NRE inside SaveChanges and TrackActivity would otherwise rethrow + // before we got to the document-state assertion. The assertion below is + // the user-visible symptom (no document persisted). + await _host.TrackActivity() + .DoNotAssertOnExceptionsDetected() + .Timeout(30.Seconds()) + .SendMessageAndWaitAsync(new Bug2668Command(id, "Joe Mixon")); + + await using var session = _store.LightweightSession(); + var doc = await session.LoadAsync(id); + doc.ShouldNotBeNull( + "Handler did not persist the document — most likely because BeforeSaveChangesAsync threw NullReferenceException on the null SqlServerMessageStore (GH-2668). Confirm OutboxedSessionFactory.buildSessionOptions passes a real store to FlushOutgoingMessagesOnCommit."); + doc.Name.ShouldBe("Joe Mixon"); + } +} + +public record Bug2668Command(Guid Id, string Name); + +public class Bug2668Doc +{ + public Guid Id { get; set; } + public string Name { get; set; } = null!; +} + +public class Bug2668Handler +{ + // Takes IDocumentSession so the handler is wired through OutboxedSessionFactory's + // OpenSession path — the only path that adds the FlushOutgoingMessagesOnCommit + // listener to SessionOptions.Listeners. Calling session.Store + the implicit + // SaveChangesAsync inserted by Wolverine's transactional middleware is what + // triggers BeforeSaveChangesAsync. + public void Handle(Bug2668Command command, IDocumentSession session) + { + session.Store(new Bug2668Doc { Id = command.Id, Name = command.Name }); + } +} diff --git a/src/Persistence/Wolverine.Polecat/Publishing/OutboxedSessionFactory.cs b/src/Persistence/Wolverine.Polecat/Publishing/OutboxedSessionFactory.cs index 6203adb3a..2e21c8d4f 100644 --- a/src/Persistence/Wolverine.Polecat/Publishing/OutboxedSessionFactory.cs +++ b/src/Persistence/Wolverine.Polecat/Publishing/OutboxedSessionFactory.cs @@ -3,6 +3,8 @@ using Polecat; using Wolverine.Persistence.Durability; using Wolverine.Runtime; +using Wolverine.SqlServer.Persistence; +using MultiTenantedMessageStore = Wolverine.Persistence.Durability.MultiTenantedMessageStore; namespace Wolverine.Polecat.Publishing; @@ -88,11 +90,46 @@ private SessionOptions buildSessionOptions(MessageContext context) options.Listeners.Add(new PublishIncomingEventsBeforeCommit(context)); } - options.Listeners.Add(new FlushOutgoingMessagesOnCommit(context, null!)); // store set after transaction creation + // The FlushOutgoingMessagesOnCommit listener needs the SQL Server + // message store so it can mark the incoming envelope as Handled in + // the same transaction as the document changes. The factory's + // MessageStore property carries this from runtime.Storage at ctor + // time — earlier code passed `null!` here with a comment claiming a + // post-construction setter would fill it in, but no such setter + // exists on the listener (the field is readonly), and the result + // was a NullReferenceException the first time the listener tried + // to read messageStore.Role. See GH-2668. + options.Listeners.Add(new FlushOutgoingMessagesOnCommit( + context, + resolveSqlServerMessageStore())); return options; } + /// + /// Resolve the SQL-Server-backed message store from the factory's + /// . Mirrors the resolution in + /// 's constructor — for a + /// multi-tenanted runtime runtime.Storage is a + /// wrapper around the + /// SQL-Server-backed root, so a direct cast (the original GH-2668 fix) + /// would InvalidCastException in that mode. Throws a clear error + /// rather than NRE'ing in a Polecat session callback if the runtime + /// isn't SQL-Server-backed at all. + /// + private SqlServerMessageStore resolveSqlServerMessageStore() + { + return MessageStore switch + { + SqlServerMessageStore store => store, + MultiTenantedMessageStore { Main: SqlServerMessageStore mainStore } => mainStore, + _ => throw new InvalidOperationException( + "Wolverine.Polecat requires a SQL Server-backed message store. " + + $"The configured store was {MessageStore?.GetType().FullName ?? "null"}. " + + "Call PersistMessagesWithSqlServer(...) on WolverineOptions to wire one up.") + }; + } + private void configureSession(MessageContext context, IDocumentSession session) { context.OverrideStorage(MessageStore); diff --git a/src/Wolverine/Configuration/Capabilities/ServiceCapabilities.cs b/src/Wolverine/Configuration/Capabilities/ServiceCapabilities.cs index 81f360656..28bd05362 100644 --- a/src/Wolverine/Configuration/Capabilities/ServiceCapabilities.cs +++ b/src/Wolverine/Configuration/Capabilities/ServiceCapabilities.cs @@ -48,6 +48,15 @@ public ServiceCapabilities(WolverineOptions options) public List EventStores { get; set; } = []; + /// + /// Diagnostic snapshots of every IDocumentStore registered in the + /// service container — Marten and Polecat alike. Populated by walking + /// services through DI; mirrors + /// the collection so monitoring tools + /// (CritterWatch) can render document-side configuration the same way. + /// + public List DocumentStores { get; set; } = []; + public List Messages { get; set; } = []; /// @@ -96,6 +105,8 @@ public static async Task ReadFrom(IWolverineRuntime runtime await readEventStores(runtime, token, capabilities); + await readDocumentStores(runtime, token, capabilities); + readMessageTypes(runtime, capabilities); readEndpoints(runtime, capabilities); @@ -265,6 +276,38 @@ private static async Task readEventStores(IWolverineRuntime runtime, Cancellatio capabilities.EventStores.AddRange(storeList.OrderBy(x => x.SubjectUri.ToString())); } + /// + /// Mirror of for the document side. Walks + /// every registered in DI (Marten + /// stores satisfy this via IDocumentStore; Polecat stores too), and + /// asks each one for a snapshot. Stores + /// that return null (transient-init failure) are silently skipped — same + /// permissive policy as the event-store path. + /// + private static async Task readDocumentStores(IWolverineRuntime runtime, CancellationToken token, + ServiceCapabilities capabilities) + { + var stores = runtime.Services.GetServices(); + var seen = new HashSet(); + var storeList = new List(); + foreach (var store in stores) + { + // Marten stores typically also register as IEventStore on the same + // instance — once Wolverine boots both interfaces resolve to the + // same concrete object. Dedupe by Subject URI so we don't double- + // count when a store wears both hats. + if (!seen.Add(store.Subject)) continue; + + var usage = await store.TryCreateUsage(token); + if (usage != null) + { + storeList.Add(usage); + } + } + + capabilities.DocumentStores.AddRange(storeList.OrderBy(x => x.SubjectUri.ToString())); + } + private static async Task readMessageStores(IWolverineRuntime runtime, ServiceCapabilities capabilities) { var collection = runtime.Stores;