Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@
<PackageVersion Include="Grpc.StatusProto" Version="2.76.0" />
<PackageVersion Include="Grpc.Tools" Version="2.76.0" />
<PackageVersion Include="HtmlTags" Version="9.0.0" />
<PackageVersion Include="JasperFx" Version="1.28.2" />
<PackageVersion Include="JasperFx.Events" Version="1.31.1" />
<PackageVersion Include="JasperFx" Version="1.29.0" />
<PackageVersion Include="JasperFx.Events" Version="1.33.1" />
<PackageVersion Include="JasperFx.RuntimeCompiler" Version="4.5.0" />
<PackageVersion Include="JasperFx.SourceGeneration" Version="1.1.0" />
<PackageVersion Include="Lamar.Microsoft.DependencyInjection" Version="15.0.1" />
<PackageVersion Include="Marten" Version="8.32.0" />
<PackageVersion Include="Marten" Version="8.35.0" />
<PackageVersion Include="Microsoft.Data.SqlClient" Version="6.1.3" />
<PackageVersion Include="Polecat" Version="2.1.0" />
<PackageVersion Include="Microsoft.Azure.Cosmos" Version="3.46.1" />
<PackageVersion Include="Marten.AspNetCore" Version="8.32.0" />
<PackageVersion Include="Marten.AspNetCore" Version="8.35.0" />
<PackageVersion Include="MemoryPack" Version="1.21.3" />
<PackageVersion Include="MessagePack" Version="3.1.3" />
<PackageVersion Include="Meziantou.Extensions.Logging.Xunit" Version="1.0.15" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
using IntegrationTests;
using JasperFx;
using JasperFx.Core;
using JasperFx.Resources;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Polecat;
using Shouldly;
using Wolverine;
using Wolverine.Attributes;
using Wolverine.Polecat;
using Wolverine.Tracking;

namespace PolecatTests.Bugs;

/// <summary>
/// Reproducer + regression coverage for GH-2668. Before the fix,
/// <see cref="Wolverine.Polecat.Publishing.OutboxedSessionFactory.buildSessionOptions"/>
/// added a <see cref="Wolverine.Polecat.FlushOutgoingMessagesOnCommit"/> listener with
/// <c>null!</c> for the <c>SqlServerMessageStore</c>, with a comment claiming the store
/// would be set after transaction creation. No such setter ever existed (the listener's
/// field is <c>readonly</c>), so the listener carried <c>null</c> for its lifetime and
/// the first time <c>BeforeSaveChangesAsync</c> read <c>_messageStore.Role</c> it
/// <c>NullReferenceException</c>'d — failing every Polecat-backed Wolverine handler that
/// calls <c>IDocumentSession.SaveChangesAsync</c>.
///
/// The NRE only fires for envelopes that traverse the durable inbox (where
/// <c>Envelope.WasPersistedInInbox = true</c>); that's why the existing PolecatTests
/// suite, which uses <c>InvokeMessageAndWaitAsync</c> against non-durable defaults,
/// doesn't catch it. This test wires <c>UseDurableLocalQueues</c> + sends via
/// <c>SendMessageAndWaitAsync</c> so the local queue persists the envelope to
/// wolverine_incoming_envelopes before the handler runs, which is what triggers the
/// listener's interesting branch.
///
/// Assertion: the document the handler stores actually lands in the document store. With
/// the bug present the handler throws inside <c>SaveChangesAsync</c> and the
/// transaction rolls back; with the fix the document is loadable after handling
/// completes.
/// </summary>
public class Bug_2668_outboxed_session_listener_null_message_store : IAsyncLifetime
{
private IHost _host = null!;
private IDocumentStore _store = null!;

public async Task InitializeAsync()
{
_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Discovery.IncludeType<Bug2668Handler>();

opts.Services.AddPolecat(m =>
{
m.ConnectionString = Servers.SqlServerConnectionString;
m.DatabaseSchemaName = "bug2668";
// Polecat 2.0 defaults UseNativeJsonType=true (SQL Server 2025).
// Repo docker-compose pins 2022-latest for Apple Silicon support;
// the polecat workflow overrides to 2025-latest in CI. Stay on
// string body so the test runs on either image.
m.UseNativeJsonType = false;
}).IntegrateWithWolverine(integration =>
{
// Keep Wolverine's tables in their own schema so a stale ResetState
// pass doesn't fight with a different test's wolverine_* tables.
integration.MessageStorageSchemaName = "bug2668_wol";
});

// Promote the local queue handling Bug2668Command to a durable
// receiver so the inbox writes the envelope before the handler runs.
// That's what flips Envelope.WasPersistedInInbox to true and exercises
// the FlushOutgoingMessagesOnCommit branch the bug lives in.
opts.Policies.UseDurableLocalQueues();
opts.Policies.AutoApplyTransactions();

opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState);
}).StartAsync();

_store = _host.Services.GetRequiredService<IDocumentStore>();
await ((DocumentStore)_store).Database.ApplyAllConfiguredChangesToDatabaseAsync();

await using var session = _store.LightweightSession();
session.DeleteWhere<Bug2668Doc>(x => true);
await session.SaveChangesAsync();
}

public async Task DisposeAsync()
{
await _host.StopAsync();
_host.Dispose();
}

[Fact]
public async Task handler_against_polecat_session_does_not_NRE_in_BeforeSaveChangesAsync()
{
var id = Guid.NewGuid();

// DoNotAssertOnExceptionsDetected: with the bug present the handler
// would NRE inside SaveChanges and TrackActivity would otherwise rethrow
// before we got to the document-state assertion. The assertion below is
// the user-visible symptom (no document persisted).
await _host.TrackActivity()
.DoNotAssertOnExceptionsDetected()
.Timeout(30.Seconds())
.SendMessageAndWaitAsync(new Bug2668Command(id, "Joe Mixon"));

await using var session = _store.LightweightSession();
var doc = await session.LoadAsync<Bug2668Doc>(id);
doc.ShouldNotBeNull(
"Handler did not persist the document — most likely because BeforeSaveChangesAsync threw NullReferenceException on the null SqlServerMessageStore (GH-2668). Confirm OutboxedSessionFactory.buildSessionOptions passes a real store to FlushOutgoingMessagesOnCommit.");
doc.Name.ShouldBe("Joe Mixon");
}
}

public record Bug2668Command(Guid Id, string Name);

public class Bug2668Doc
{
public Guid Id { get; set; }
public string Name { get; set; } = null!;
}

public class Bug2668Handler
{
// Takes IDocumentSession so the handler is wired through OutboxedSessionFactory's
// OpenSession path — the only path that adds the FlushOutgoingMessagesOnCommit
// listener to SessionOptions.Listeners. Calling session.Store + the implicit
// SaveChangesAsync inserted by Wolverine's transactional middleware is what
// triggers BeforeSaveChangesAsync.
public void Handle(Bug2668Command command, IDocumentSession session)
{
session.Store(new Bug2668Doc { Id = command.Id, Name = command.Name });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
using Polecat;
using Wolverine.Persistence.Durability;
using Wolverine.Runtime;
using Wolverine.SqlServer.Persistence;
using MultiTenantedMessageStore = Wolverine.Persistence.Durability.MultiTenantedMessageStore;

namespace Wolverine.Polecat.Publishing;

Expand Down Expand Up @@ -88,11 +90,46 @@ private SessionOptions buildSessionOptions(MessageContext context)
options.Listeners.Add(new PublishIncomingEventsBeforeCommit(context));
}

options.Listeners.Add(new FlushOutgoingMessagesOnCommit(context, null!)); // store set after transaction creation
// The FlushOutgoingMessagesOnCommit listener needs the SQL Server
// message store so it can mark the incoming envelope as Handled in
// the same transaction as the document changes. The factory's
// MessageStore property carries this from runtime.Storage at ctor
// time — earlier code passed `null!` here with a comment claiming a
// post-construction setter would fill it in, but no such setter
// exists on the listener (the field is readonly), and the result
// was a NullReferenceException the first time the listener tried
// to read messageStore.Role. See GH-2668.
options.Listeners.Add(new FlushOutgoingMessagesOnCommit(
context,
resolveSqlServerMessageStore()));

return options;
}

/// <summary>
/// Resolve the SQL-Server-backed message store from the factory's
/// <see cref="MessageStore"/>. Mirrors the resolution in
/// <see cref="PolecatEnvelopeTransaction"/>'s constructor — for a
/// multi-tenanted runtime <c>runtime.Storage</c> is a
/// <see cref="MultiTenantedMessageStore"/> wrapper around the
/// SQL-Server-backed root, so a direct cast (the original GH-2668 fix)
/// would <c>InvalidCastException</c> in that mode. Throws a clear error
/// rather than NRE'ing in a Polecat session callback if the runtime
/// isn't SQL-Server-backed at all.
/// </summary>
private SqlServerMessageStore resolveSqlServerMessageStore()
{
return MessageStore switch
{
SqlServerMessageStore store => store,
MultiTenantedMessageStore { Main: SqlServerMessageStore mainStore } => mainStore,
_ => throw new InvalidOperationException(
"Wolverine.Polecat requires a SQL Server-backed message store. " +
$"The configured store was {MessageStore?.GetType().FullName ?? "null"}. " +
"Call PersistMessagesWithSqlServer(...) on WolverineOptions to wire one up.")
};
}

private void configureSession(MessageContext context, IDocumentSession session)
{
context.OverrideStorage(MessageStore);
Expand Down
43 changes: 43 additions & 0 deletions src/Wolverine/Configuration/Capabilities/ServiceCapabilities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ public ServiceCapabilities(WolverineOptions options)

public List<EventStoreUsage> EventStores { get; set; } = [];

/// <summary>
/// Diagnostic snapshots of every <c>IDocumentStore</c> registered in the
/// service container — Marten and Polecat alike. Populated by walking
/// <see cref="IDocumentStoreUsageSource"/> services through DI; mirrors
/// the <see cref="EventStores"/> collection so monitoring tools
/// (CritterWatch) can render document-side configuration the same way.
/// </summary>
public List<DocumentStoreUsage> DocumentStores { get; set; } = [];

public List<MessageDescriptor> Messages { get; set; } = [];

/// <summary>
Expand Down Expand Up @@ -96,6 +105,8 @@ public static async Task<ServiceCapabilities> ReadFrom(IWolverineRuntime runtime

await readEventStores(runtime, token, capabilities);

await readDocumentStores(runtime, token, capabilities);

readMessageTypes(runtime, capabilities);

readEndpoints(runtime, capabilities);
Expand Down Expand Up @@ -265,6 +276,38 @@ private static async Task readEventStores(IWolverineRuntime runtime, Cancellatio
capabilities.EventStores.AddRange(storeList.OrderBy(x => x.SubjectUri.ToString()));
}

/// <summary>
/// Mirror of <see cref="readEventStores"/> for the document side. Walks
/// every <see cref="IDocumentStoreUsageSource"/> registered in DI (Marten
/// stores satisfy this via <c>IDocumentStore</c>; Polecat stores too), and
/// asks each one for a <see cref="DocumentStoreUsage"/> snapshot. Stores
/// that return null (transient-init failure) are silently skipped — same
/// permissive policy as the event-store path.
/// </summary>
private static async Task readDocumentStores(IWolverineRuntime runtime, CancellationToken token,
ServiceCapabilities capabilities)
{
var stores = runtime.Services.GetServices<IDocumentStoreUsageSource>();
var seen = new HashSet<Uri>();
var storeList = new List<DocumentStoreUsage>();
foreach (var store in stores)
{
// Marten stores typically also register as IEventStore on the same
// instance — once Wolverine boots both interfaces resolve to the
// same concrete object. Dedupe by Subject URI so we don't double-
// count when a store wears both hats.
if (!seen.Add(store.Subject)) continue;

var usage = await store.TryCreateUsage(token);
if (usage != null)
{
storeList.Add(usage);
}
}

capabilities.DocumentStores.AddRange(storeList.OrderBy(x => x.SubjectUri.ToString()));
}

private static async Task readMessageStores(IWolverineRuntime runtime, ServiceCapabilities capabilities)
{
var collection = runtime.Stores;
Expand Down
Loading