Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 211 additions & 0 deletions src/Persistence/MartenTests/Bugs/Bug_2382_ancillary_store_inbox.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
using IntegrationTests;
using JasperFx.Core;
using JasperFx.Resources;
using Marten;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine;
using Wolverine.Attributes;
using Wolverine.Marten;
using Wolverine.Persistence;
using Wolverine.Persistence.Durability;
using Wolverine.Runtime;
using Wolverine.Tracking;
using Wolverine.Util;
using Xunit;

namespace MartenTests.Bugs;

#region Test Infrastructure

public interface IAncillaryStore2382 : IDocumentStore;

// Message handled by ancillary store
public record AncillaryMessage2382(Guid Id);

// Message handled by main store
public record MainStoreMessage2382(Guid Id);

// Handler targeting the ancillary store
[MartenStore(typeof(IAncillaryStore2382))]
public static class AncillaryMessage2382Handler
{
[Transactional]
public static void Handle(AncillaryMessage2382 message, IDocumentSession session)
{
session.Store(new AncillaryDoc2382 { Id = message.Id });
}
}

// Handler using the main store
public static class MainStoreMessage2382Handler
{
[Transactional]
public static void Handle(MainStoreMessage2382 message, IDocumentSession session)
{
session.Store(new MainDoc2382 { Id = message.Id });
}
}

public class AncillaryDoc2382
{
public Guid Id { get; set; }
}

public class MainDoc2382
{
public Guid Id { get; set; }
}

#endregion

/// <summary>
/// Tests that when a handler targets an ancillary store, incoming envelopes
/// are persisted in the ancillary store's inbox (not the main store),
/// ensuring transactional atomicity between the handler's side effects and
/// the envelope status update.
///
/// This matters when the ancillary store targets a different database.
/// </summary>
public class Bug_2382_ancillary_store_inbox : IAsyncLifetime
{
private IHost _host = null!;

public async Task InitializeAsync()
{
_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Durability.Mode = DurabilityMode.Solo;

// Main Marten store
opts.Services.AddMarten(m =>
{
m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "bug2382_main";
m.DisableNpgsqlLogging = true;
}).IntegrateWithWolverine(x => x.MessageStorageSchemaName = "bug2382_main");

// Ancillary Marten store — same PostgreSQL server but different schema
// (simulates a separate database for inbox isolation)
opts.Services.AddMartenStore<IAncillaryStore2382>(m =>
{
m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "bug2382_ancillary";
m.DisableNpgsqlLogging = true;
}).IntegrateWithWolverine(x => x.SchemaName = "bug2382_ancillary");

// Use durable local queues to exercise the DurableReceiver code path
opts.LocalQueue("ancillary").UseDurableInbox();
opts.LocalQueue("main").UseDurableInbox();

opts.PublishMessage<AncillaryMessage2382>().ToLocalQueue("ancillary");
opts.PublishMessage<MainStoreMessage2382>().ToLocalQueue("main");

opts.Policies.AutoApplyTransactions();
opts.Services.AddResourceSetupOnStartup();
}).StartAsync();

await _host.ResetResourceState();
}

public async Task DisposeAsync()
{
await _host.StopAsync();
_host.Dispose();
}

[Fact]
public async Task ancillary_store_handler_should_persist_envelope_in_ancillary_inbox()
{
var message = new AncillaryMessage2382(Guid.NewGuid());

await _host
.TrackActivity()
.SendMessageAndWaitAsync(message);

await Task.Delay(500);

var runtime = _host.Services.GetRequiredService<IWolverineRuntime>();

// The main store should NOT have any lingering incoming envelopes for this message type
var mainIncoming = await runtime.Storage.Admin.AllIncomingAsync();
mainIncoming
.Where(e => e.MessageType == typeof(AncillaryMessage2382).ToMessageTypeName()
&& e.Status == EnvelopeStatus.Incoming)
.ShouldBeEmpty("Ancillary message envelope should not be stuck as Incoming in main store");
}

[Fact]
public async Task main_store_handler_should_persist_envelope_in_main_inbox()
{
var message = new MainStoreMessage2382(Guid.NewGuid());

await _host
.TrackActivity()
.SendMessageAndWaitAsync(message);

await Task.Delay(500);

var runtime = _host.Services.GetRequiredService<IWolverineRuntime>();

// Main store messages should be handled normally — no lingering Incoming
var mainIncoming = await runtime.Storage.Admin.AllIncomingAsync();
mainIncoming
.Where(e => e.MessageType == typeof(MainStoreMessage2382).ToMessageTypeName()
&& e.Status == EnvelopeStatus.Incoming)
.ShouldBeEmpty("Main store message envelope should not be stuck as Incoming");
}

[Fact]
public async Task mixed_messages_both_handlers_succeed()
{
// Send both an ancillary and main store message — both should be handled successfully
var ancillaryMessage = new AncillaryMessage2382(Guid.NewGuid());
var mainMessage = new MainStoreMessage2382(Guid.NewGuid());

await _host
.TrackActivity()
.SendMessageAndWaitAsync(ancillaryMessage);

await _host
.TrackActivity()
.SendMessageAndWaitAsync(mainMessage);

await Task.Delay(500);

// Verify ancillary store has the document
var ancillaryStore = _host.Services.GetRequiredService<IAncillaryStore2382>();
await using var ancillarySession = ancillaryStore.LightweightSession();
var ancillaryDoc = await ancillarySession.LoadAsync<AncillaryDoc2382>(ancillaryMessage.Id);
ancillaryDoc.ShouldNotBeNull();

// Verify main store has the document
var mainStore = _host.Services.GetRequiredService<IDocumentStore>();
await using var mainSession = mainStore.LightweightSession();
var mainDoc = await mainSession.LoadAsync<MainDoc2382>(mainMessage.Id);
mainDoc.ShouldNotBeNull();

// Neither should have lingering incoming envelopes
var runtime = _host.Services.GetRequiredService<IWolverineRuntime>();
var incoming = await runtime.Storage.Admin.AllIncomingAsync();
incoming.Where(e => e.Status == EnvelopeStatus.Incoming).ShouldBeEmpty();
}

[Fact]
public async Task handler_side_effects_committed_to_ancillary_store()
{
var message = new AncillaryMessage2382(Guid.NewGuid());

await _host
.TrackActivity()
.SendMessageAndWaitAsync(message);

// Verify the document was stored in the ancillary store
var ancillaryStore = _host.Services.GetRequiredService<IAncillaryStore2382>();
await using var session = ancillaryStore.LightweightSession();
var doc = await session.LoadAsync<AncillaryDoc2382>(message.Id);
doc.ShouldNotBeNull();
}
}
35 changes: 28 additions & 7 deletions src/Persistence/Wolverine.Marten/FlushOutgoingMessagesOnCommit.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,38 @@ public override Task BeforeSaveChangesAsync(IDocumentSession session, Cancellati
{
if (_context.Envelope.WasPersistedInInbox)
{
// GH-2155: When using ancillary stores (e.g., [MartenStore]), the incoming
// envelope was persisted in the main store by DurableReceiver. We should only
// mark it as handled within this Marten session if the session's store is
// the same store. Otherwise, let DurableReceiver handle it via the main store.
if (_context.Envelope.Store == null && _messageStore.Role == MessageStoreRole.Ancillary)
// Determine which incoming table to update. The envelope may have been
// persisted in the ancillary store (if on a different database) or the
// main store (default). We need to update the correct table.
var incomingTableName = _messageStore.IncomingFullName;

if (_messageStore.Role == MessageStoreRole.Ancillary)
{
return Task.CompletedTask;
if (_context.Envelope.Store is PostgresqlMessageStore envelopeStore)
{
// Envelope was routed to a specific store (possibly this one)
incomingTableName = envelopeStore.IncomingFullName;
}
else if (_context.Envelope.Store == null)
{
// GH-2382: Envelope was persisted in the main store. If we're on the
// same database, we can still update it within this transaction.
if (_context.Runtime.Storage is PostgresqlMessageStore mainStore
&& mainStore.Uri == _messageStore.Uri)
{
incomingTableName = mainStore.IncomingFullName;
}
else
{
// Different database — can't update cross-database in one transaction.
// Let DurableReceiver handle it via the main store.
return Task.CompletedTask;
}
}
}

var keepUntil = DateTimeOffset.UtcNow.Add(_context.Runtime.Options.Durability.KeepAfterMessageHandling);
session.QueueSqlCommand($"update {_messageStore.IncomingFullName} set {DatabaseConstants.Status} = '{EnvelopeStatus.Handled}', {DatabaseConstants.KeepUntil} = ? where id = ?", keepUntil, _context.Envelope.Id);
session.QueueSqlCommand($"update {incomingTableName} set {DatabaseConstants.Status} = '{EnvelopeStatus.Handled}', {DatabaseConstants.KeepUntil} = ? where id = ?", keepUntil, _context.Envelope.Id);
_context.Envelope.Status = EnvelopeStatus.Handled;
}

Expand Down
1 change: 1 addition & 0 deletions src/Persistence/Wolverine.Marten/MartenStoreAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public MartenStoreAttribute(Type storeType)

public override void Modify(IChain chain, GenerationRules rules, IServiceContainer container)
{
chain.AncillaryStoreType = StoreType;
chain.Middleware.Insert(0, new AncillaryOutboxFactoryFrame(StoreType));
}
}
3 changes: 3 additions & 0 deletions src/Wolverine/Configuration/Chain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public abstract class Chain<TChain, TModifyAttribute> : IChain
public abstract string Description { get; }
public List<AuditedMember> AuditedMembers { get; } = [];

/// <inheritdoc />
public Type? AncillaryStoreType { get; set; }

public abstract bool TryInferMessageIdentity(out PropertyInfo? property);

/// <summary>
Expand Down
7 changes: 7 additions & 0 deletions src/Wolverine/Configuration/IChain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ public interface IChain
List<AuditedMember> AuditedMembers { get; }
Dictionary<string, object> Tags { get; }

/// <summary>
/// When set, indicates that this handler chain targets an ancillary message store
/// identified by this marker type (e.g., IAncillaryStore). This is used to route
/// incoming durable inbox envelopes to the correct store for transactional atomicity.
/// </summary>
Type? AncillaryStoreType { get; set; }

/// <summary>
/// Strategy for dealing with any return values from the handler methods
/// </summary>
Expand Down
26 changes: 26 additions & 0 deletions src/Wolverine/Persistence/MessageStoreCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,32 @@ public bool HasAncillaryStoreFor(Type applicationType)
{
return _ancillaryStores.Contains(applicationType);
}

private ImHashMap<string, IMessageStore> _messageTypeToAncillaryStore = ImHashMap<string, IMessageStore>.Empty;

/// <summary>
/// Register a mapping from a message type name to an ancillary store.
/// Used so that DurableReceiver can persist incoming envelopes in the
/// correct ancillary store when the handler targets a different database.
/// </summary>
internal void MapMessageTypeToAncillaryStore(string messageTypeName, Type ancillaryMarkerType)
{
if (_ancillaryStores.TryFind(ancillaryMarkerType, out var store))
{
_messageTypeToAncillaryStore = _messageTypeToAncillaryStore.AddOrUpdate(messageTypeName, store);
}
}

/// <summary>
/// Try to find the ancillary store that should be used to persist an incoming
/// envelope based on the handler's [MartenStore] attribute. Returns null if
/// the message type's handler uses the main store.
/// </summary>
public IMessageStore? TryFindAncillaryStoreForMessageType(string? messageTypeName)
{
if (messageTypeName == null) return null;
return _messageTypeToAncillaryStore.TryFind(messageTypeName, out var store) ? store : null;
}
}

public class InvalidWolverineStorageConfigurationException : Exception
Expand Down
13 changes: 13 additions & 0 deletions src/Wolverine/Runtime/WolverineRuntime.HostService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Wolverine.Runtime.WorkerQueues;
using Wolverine.Transports;
using Wolverine.Transports.Local;
using Wolverine.Util;

namespace Wolverine.Runtime;

Expand Down Expand Up @@ -308,6 +309,18 @@ private async Task startMessagingTransportsAsync()
}
}

// Build message-type-to-ancillary-store mapping for durable inbox routing.
// When a handler targets an ancillary store on a different database, incoming
// envelopes should be persisted in that store for transactional atomicity.
if (Stores != null && Stores.HasAnyAncillaryStores())
{
foreach (var chain in Handlers.Chains.Where(c => c.AncillaryStoreType != null))
{
var messageTypeName = chain.MessageType.ToMessageTypeName();
Stores.MapMessageTypeToAncillaryStore(messageTypeName, chain.AncillaryStoreType!);
}
}

// No local queues if running in Serverless
if (Options.Durability.Mode == DurabilityMode.Serverless)
{
Expand Down
Loading
Loading