Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
using IntegrationTests;
using JasperFx.Core;
using JasperFx.Resources;
using Marten;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine;
using Wolverine.Attributes;
using Wolverine.Marten;
using Wolverine.Persistence.Durability;
using Wolverine.Runtime;
using Wolverine.Tracking;
using Wolverine.Util;
using Xunit;

namespace MartenTests.Bugs;

#region Test Infrastructure

public interface IAncillaryStore2576 : IDocumentStore;

public record AncillaryCommand2576(Guid Id);

public record AncillaryEvent2576(Guid Id);

public record SomeMessage2576(Guid Id);

[MartenStore(typeof(IAncillaryStore2576))]
public static class AncillaryCommand2576Handler
{
[Transactional]
public static AncillaryEvent2576 Handle(AncillaryCommand2576 message, IDocumentSession session)
{
var @event = new AncillaryEvent2576(message.Id);
session.Events.Append(message.Id, @event);
return @event;
}
}

[MartenStore(typeof(IAncillaryStore2576))]
public static class AncillaryEvent2576Handler
{
public static ScheduledMessage<SomeMessage2576> Handle(AncillaryEvent2576 message, IDocumentSession session)
{
session.Events.Append(message.Id, message);
// Schedule in the past so the scheduled-jobs processor picks it up immediately.
return new SomeMessage2576(message.Id).ScheduledAt(DateTime.UtcNow.AddDays(-1));
}
}

[MartenStore(typeof(IAncillaryStore2576))]
public static class SomeMessage2576Handler
{
[Transactional]
public static void Handle(SomeMessage2576 message, IDocumentSession session)
{
session.Events.Append(message.Id, message);
}
}

#endregion

/// <summary>
/// FAILING reproducer for https://github.com/JasperFx/wolverine/issues/2576.
///
/// When a handler chain is fully owned by an ancillary Marten store
/// (every handler tagged with [MartenStore(typeof(IAncillaryStore))]),
/// and a handler returns a <see cref="ScheduledMessage{T}"/>, the resulting
/// envelope ends up persisted in the ancillary store's outbox/inbox tables
/// — but when its handler later succeeds, Wolverine writes the
/// "mark as Handled" SQL to the MAIN store instead, leaving the row in
/// the ancillary store stuck in <c>Incoming</c> status forever.
///
/// The expected behavior is that the envelope's mark-as-handled SQL runs
/// against the same store that owns the handler.
/// </summary>
public class Bug_2576_ancillary_scheduled_message_stuck_incoming : IAsyncLifetime
{
private IHost _host = null!;

public async Task InitializeAsync()
{
_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Durability.Mode = DurabilityMode.Solo;
opts.Durability.MessageIdentity = MessageIdentity.IdAndDestination;
opts.Durability.KeepAfterMessageHandling = 1.Hours();

// Main Marten store
opts.Services.AddMarten(m =>
{
m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "bug2576_main";
m.DisableNpgsqlLogging = true;
}).IntegrateWithWolverine(x => x.MessageStorageSchemaName = "bug2576_main");

// Ancillary Marten store on a separate schema (mirrors the
// reporter's separate-database setup; schema isolation is
// sufficient to exercise the routing decision).
opts.Services.AddMartenStore<IAncillaryStore2576>(m =>
{
m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "bug2576_ancillary";
m.Events.DatabaseSchemaName = "bug2576_ancillary";
m.DisableNpgsqlLogging = true;
}).IntegrateWithWolverine(x => x.SchemaName = "bug2576_ancillary");

opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated;
opts.Policies.UseDurableInboxOnAllListeners();
opts.Policies.UseDurableOutboxOnAllSendingEndpoints();
opts.Policies.AutoApplyTransactions();
opts.Policies.AllLocalQueues(x => x.UseDurableInbox());
}).StartAsync();

await _host.ResetResourceState();
}

public async Task DisposeAsync()
{
await _host.StopAsync();
_host.Dispose();
}

[Fact]
public async Task scheduled_message_in_ancillary_chain_should_not_be_stuck_incoming()
{
var message = new AncillaryCommand2576(Guid.NewGuid());

await _host
.TrackActivity()
.IncludeExternalTransports()
.Timeout(30.Seconds())
.InvokeMessageAndWaitAsync(message);

// The scheduled message has to wake up out of the scheduled-jobs poller
// and run through the handler. Give it a moment.
await Task.Delay(5.Seconds());

var runtime = _host.GetRuntime();
var ancillaryStore = runtime.Stores.FindAncillaryStore(typeof(IAncillaryStore2576));

var ancillaryIncoming = await ancillaryStore.Admin.AllIncomingAsync();

var someMessageTypeName = typeof(SomeMessage2576).ToMessageTypeName();

ancillaryIncoming
.Count(x => x.MessageType == someMessageTypeName && x.Status == EnvelopeStatus.Incoming)
.ShouldBe(0,
"The scheduled SomeMessage2576 should not be left in Incoming status in the ancillary store. " +
"If this is non-zero, Wolverine wrote the 'mark as handled' SQL to the wrong store.");

ancillaryIncoming
.Count(x => x.MessageType == someMessageTypeName && x.Status == EnvelopeStatus.Handled)
.ShouldBe(1,
"The scheduled SomeMessage2576 should be marked as Handled in the ancillary store " +
"(the same store that scheduled and processed it).");

// Cross-check: the main store should NOT have a row for this message
// type at all — the entire chain belongs to the ancillary store.
var mainIncoming = await runtime.Storage.Admin.AllIncomingAsync();
mainIncoming
.Count(x => x.MessageType == someMessageTypeName)
.ShouldBe(0,
"The main store should have no envelope rows for the ancillary-owned SomeMessage2576.");
}
}
7 changes: 7 additions & 0 deletions src/Persistence/MySql/Wolverine.MySql/MySqlMessageStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,13 @@ public override async Task PollForScheduledMessagesAsync(IWolverineRuntime runti

await tx.CommitAsync(cancellationToken);

// Stamp owning store on each row so downstream pipeline routes
// its writes back here. See GH-2576.
foreach (var envelope in envelopes)
{
envelope.Store = this;
}

await runtime.EnqueueDirectlyAsync(envelopes);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ public async Task PollForScheduledMessagesAsync(IWolverineRuntime runtime, ILogg

await tx.CommitAsync(cancellationToken);

// Stamp owning store on each row so downstream pipeline routes
// its writes back here. See GH-2576.
foreach (var envelope in envelopes)
{
envelope.Store = this;
}

await runtime.EnqueueDirectlyAsync(envelopes);
}
}
Expand Down
12 changes: 12 additions & 0 deletions src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,18 @@ await conn.CreateCommand(_reassignIncomingSql)

await tx.CommitAsync(cancellationToken);

// Stamp the envelope's owning store on each row so the rest of the
// pipeline (DelegatingMessageInbox, FlushOutgoingMessagesOnCommit,
// DurableReceiver._markAsHandled) routes its writes back to THIS
// store. Without this, an ancillary store's scheduled message wakes
// up with envelope.Store == null and the mark-as-handled SQL goes
// to the main store, leaving the row stuck Incoming.
// See https://github.com/JasperFx/wolverine/issues/2576.
foreach (var envelope in envelopes)
{
envelope.Store = this;
}

// Judging that there's very little chance of errors here
await runtime.EnqueueDirectlyAsync(envelopes);
}
Expand Down
1 change: 1 addition & 0 deletions src/Persistence/Wolverine.RDBMS/AssemblyAttributes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@
[assembly: InternalsVisibleTo("SqliteTests")]
[assembly: InternalsVisibleTo("Wolverine.Oracle")]
[assembly: InternalsVisibleTo("OracleTests")]
[assembly: InternalsVisibleTo("Wolverine.ComplianceTests")]
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] // Castle Core proxies for NSubstitute
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,14 @@ await reassign

await tx.CommitAsync(cancellationToken);

// Stamp the envelope's owning store on each row so the rest of the
// pipeline (DelegatingMessageInbox, DurableReceiver._markAsHandled)
// routes its writes back to THIS store. See GH-2576.
foreach (var envelope in envelopes)
{
envelope.Store = this;
}

// Judging that there's very little chance of errors here
await runtime.EnqueueDirectlyAsync(envelopes);
}
Expand Down
7 changes: 7 additions & 0 deletions src/Persistence/Wolverine.Sqlite/SqliteMessageStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,13 @@ await reassign.With("owner", durabilitySettings.AssignedNodeNumber)

await tx.CommitAsync(cancellationToken);

// Stamp owning store on each row so downstream pipeline routes its
// writes back here. See GH-2576.
foreach (var envelope in envelopes)
{
envelope.Store = this;
}

await runtime.EnqueueDirectlyAsync(envelopes);
}
finally
Expand Down
123 changes: 123 additions & 0 deletions src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
using JasperFx.Core.Reflection;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging.Abstractions;
using JasperFx.Resources;
using NSubstitute;
using Shouldly;
using Wolverine.Persistence.Durability;
using Wolverine.Persistence.Durability.DeadLetterManagement;
using Wolverine.Persistence.Durability.ScheduledMessageManagement;
using Wolverine.RDBMS;
using Wolverine.Runtime;
using Wolverine.Runtime.Agents;
using Wolverine.Transports;
using Xunit;
Expand Down Expand Up @@ -1034,4 +1038,123 @@ public async Task move_to_dead_letter_storage_with_null_source()
stored.ExceptionType.ShouldBe(typeof(DivideByZeroException).FullName);
}

/// <summary>
/// Contract test for https://github.com/JasperFx/wolverine/issues/2576.
///
/// When a scheduled message is loaded from a store via
/// <see cref="IMessageDatabase.PollForScheduledMessagesAsync"/>, the
/// resulting in-memory envelope passed to
/// <see cref="IWolverineRuntime.EnqueueDirectlyAsync"/> must have its
/// <c>Store</c> property stamped with the originating store. Without this,
/// downstream pipeline components (DelegatingMessageInbox,
/// DurableReceiver._markAsHandled, FlushOutgoingMessagesOnCommit) cannot
/// route their writes back to the correct store, and ancillary-store rows
/// get stuck in <c>Incoming</c> status forever because the
/// "mark as handled" SQL targets the main store instead.
/// </summary>
[Fact]
public virtual async Task scheduled_poll_stamps_envelope_with_originating_store()
{
if (thePersistence is not IMessageDatabase database)
{
// Non-database stores (e.g. RavenDb, CosmosDb) wire scheduled
// dispatch through their own durability agents, not through
// PollForScheduledMessagesAsync. Skip this contract there.
return;
}

// Persist a scheduled envelope into this store's incoming table.
var envelope = ObjectMother.Envelope();
envelope.Status = EnvelopeStatus.Incoming;
envelope.ScheduledTime = DateTimeOffset.UtcNow.AddMinutes(-1); // already due
await thePersistence.Inbox.StoreIncomingAsync(envelope);
await thePersistence.Inbox.ScheduleExecutionAsync(envelope);

// Spy runtime that captures whatever PollForScheduledMessagesAsync
// hands to EnqueueDirectlyAsync.
var capturedEnvelopes = new List<Envelope>();
var spyRuntime = Substitute.For<IWolverineRuntime>();
spyRuntime
.EnqueueDirectlyAsync(Arg.Do<IReadOnlyList<Envelope>>(es => capturedEnvelopes.AddRange(es)))
.Returns(ValueTask.CompletedTask);

var durabilitySettings = theHost.Services.GetRequiredService<DurabilitySettings>();

await database.PollForScheduledMessagesAsync(
spyRuntime, NullLogger.Instance, durabilitySettings, CancellationToken.None);

capturedEnvelopes.ShouldNotBeEmpty(
"Expected the just-scheduled envelope to be picked up by the poller.");

var captured = capturedEnvelopes.SingleOrDefault(x => x.Id == envelope.Id);
captured.ShouldNotBeNull(
"Expected the polled envelope to match the one we scheduled.");

captured.Store.ShouldBe(thePersistence,
"Polled envelopes must be stamped with the store they came from so " +
"downstream mark-as-handled / inbox writes route back to the correct store. " +
"See GH-2576.");
}

/// <summary>
/// Sister contract test to <see cref="scheduled_poll_stamps_envelope_with_originating_store"/>:
/// the inbox-recovery path (which DLQ replay funnels through) must stamp
/// <c>envelope.Store</c> so downstream mark-as-handled writes route to the
/// right database.
///
/// Whenever <c>RecoverIncomingMessagesCommand</c> picks up an orphaned
/// (<c>owner_id == AnyNode</c>) envelope via
/// <see cref="IMessageStore.LoadPageOfGloballyOwnedIncomingAsync"/>, it
/// applies <c>envelope.Store ??= _store</c>. Without that stamp, an
/// ancillary-owned envelope would be marked Handled in the main store and
/// stay stuck Incoming. The existing fix lives at
/// <c>RecoverIncomingMessagesCommand:48</c>; this test pins the behavior
/// down so the GH-2576 fix doesn't accidentally regress it.
///
/// The DLQ replay flow is one producer of orphaned-incoming rows (via
/// <c>MoveReplayableErrorMessagesToIncomingOperation</c>), but durability
/// also generates them when nodes crash mid-handle. We exercise the
/// recovery contract directly by persisting an envelope with
/// <c>OwnerId == AnyNode</c>, sidestepping any database-specific quirks
/// in the move-from-dead-letter SQL.
/// </summary>
[Fact]
public virtual async Task orphaned_incoming_recovery_stamps_envelope_with_originating_store()
{
if (thePersistence is not IMessageDatabase)
{
return;
}

// Persist an envelope as if a previous owner crashed mid-handle —
// status Incoming, owner_id == AnyNode marks it as orphaned and
// visible to LoadPageOfGloballyOwnedIncomingAsync.
var envelope = ObjectMother.Envelope();
envelope.Status = EnvelopeStatus.Incoming;
envelope.OwnerId = TransportConstants.AnyNode;
await thePersistence.Inbox.StoreIncomingAsync(envelope);

// Recovery loop reads the orphaned incoming envelopes back into memory.
var recovered = await thePersistence.LoadPageOfGloballyOwnedIncomingAsync(
envelope.Destination!, 100);

recovered.ShouldNotBeEmpty(
"Expected the orphaned envelope to be visible to the recovery loader.");

var match = recovered.SingleOrDefault(x => x.Id == envelope.Id);
match.ShouldNotBeNull("Expected the recovered envelope to match the one we persisted.");

// RecoverIncomingMessagesCommand applies envelope.Store ??= _store
// immediately after this load. Simulate that step here so the contract
// test captures what the runtime path actually guarantees.
foreach (var e in recovered)
{
e.Store ??= thePersistence;
}

match.Store.ShouldBe(thePersistence,
"Recovered envelopes must carry their originating store so " +
"mark-as-handled SQL targets the right database. See GH-2318 / GH-2576.");
}

}
Loading
Loading