diff --git a/src/Http/WolverineWebApiFSharp/WolverineWebApiFSharp.fsproj b/src/Http/WolverineWebApiFSharp/WolverineWebApiFSharp.fsproj index dd9d43c07..d23580116 100644 --- a/src/Http/WolverineWebApiFSharp/WolverineWebApiFSharp.fsproj +++ b/src/Http/WolverineWebApiFSharp/WolverineWebApiFSharp.fsproj @@ -5,6 +5,16 @@ true 8.0 Exe + + true @@ -19,7 +29,7 @@ - + diff --git a/src/Persistence/PostgresqlTests/Bugs/Bug_1942_replay_dlq_to_buffered_or_inline.cs b/src/Persistence/PostgresqlTests/Bugs/Bug_1942_replay_dlq_to_buffered_or_inline.cs new file mode 100644 index 000000000..761a90aae --- /dev/null +++ b/src/Persistence/PostgresqlTests/Bugs/Bug_1942_replay_dlq_to_buffered_or_inline.cs @@ -0,0 +1,213 @@ +using System.Diagnostics; +using IntegrationTests; +using JasperFx; +using JasperFx.Core; +using JasperFx.Resources; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Npgsql; +using Shouldly; +using Wolverine; +using Wolverine.Logging; +using Wolverine.Persistence.Durability; +using Wolverine.Persistence.Durability.DeadLetterManagement; +using Wolverine.Postgresql; +using Wolverine.RabbitMQ; +using Wolverine.Tracking; +using Xunit.Abstractions; + +namespace PostgresqlTests.Bugs; + +/// +/// GH-1942: When a non-durable endpoint (BufferedInMemory or Inline mode) persists a +/// failed message to the database-backed DLQ, marking the dead-letter row as replayable +/// causes the row to be moved back to wolverine_incoming. The durability agent picks +/// it up and dispatches it to the listener; the handler runs (and may even succeed), +/// but the inbox row is never marked Handled in the local-queue path because +/// BufferedReceiver.CompleteAsync is a no-op and BufferedLocalQueue.EnqueueDirectlyAsync +/// does not wrap the channel callback the way ListeningAgent.EnqueueDirectlyAsync does +/// for transport-backed endpoints. The result: the row sits in wolverine_incoming forever +/// and gets reprocessed every time ownership is reset (e.g. on every host restart). +/// +/// These tests cover the two flavors of non-durable endpoint: +/// 1. A local queue with .BufferedInMemory() — currently broken (this PR fixes) +/// 2. A RabbitMQ listener with .ProcessInline() — works (covered by GH-1594 fix in +/// ListeningAgent.EnqueueDirectlyAsync) +/// +public class Bug_1942_replay_dlq_to_buffered_or_inline : PostgresqlContext, IAsyncLifetime +{ + private readonly ITestOutputHelper _output; + + public Bug_1942_replay_dlq_to_buffered_or_inline(ITestOutputHelper output) + { + _output = output; + } + + public Task InitializeAsync() + { + Bug1942Handler.Reset(); + return Task.CompletedTask; + } + + public Task DisposeAsync() => Task.CompletedTask; + + [Fact] + public async Task buffered_local_queue_replay_does_not_loop() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "bug1942_buffered"); + opts.Durability.Mode = DurabilityMode.Solo; + + // The local queue handling Bug1942Message uses the buffered (non-durable) receiver, + // but the host still has a database message store, so failures land in the DB DLQ. + opts.LocalQueueFor().BufferedInMemory(); + + opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState); + }).StartAsync(); + + await runReplayScenarioAsync(host, "bug1942_buffered"); + } + + [Fact] + public async Task inline_rabbitmq_listener_replay_does_not_loop() + { + var queueName = $"bug1942-inline-{Guid.NewGuid()}"; + + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "bug1942_inline"); + opts.Durability.Mode = DurabilityMode.Solo; + + // Disable Rabbit's native DLQ so failures land in the database-backed DLQ. + opts.UseRabbitMq() + .DisableDeadLetterQueueing() + .AutoProvision() + .AutoPurgeOnStartup(); + + opts.PublishMessage().ToRabbitQueue(queueName); + opts.ListenToRabbitQueue(queueName).ProcessInline(); + + opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState); + }).StartAsync(); + + await runReplayScenarioAsync(host, "bug1942_inline"); + } + + private async Task runReplayScenarioAsync(IHost host, string schema) + { + // 1) Send a message that will fail on first attempt. + Bug1942Handler.FailNext = true; + + await host + .TrackActivity() + .DoNotAssertOnExceptionsDetected() + .Timeout(30.Seconds()) + .IncludeExternalTransports() + .SendMessageAndWaitAsync(new Bug1942Message(Guid.NewGuid())); + + var store = host.Services.GetRequiredService(); + + // 2) Wait for the dead-letter row to materialize in the database. + var deadLetterId = await waitForDeadLetterAsync(store); + deadLetterId.ShouldNotBeNull("Failed message should land in the database DLQ"); + + // 3) Allow the handler to succeed on the next attempt, then mark the row replayable. + // The durability agent should move the row back to incoming and re-dispatch it. + Bug1942Handler.FailNext = false; + + var tracked = await host + .TrackActivity() + .Timeout(30.Seconds()) + .DoNotAssertOnExceptionsDetected() + .IncludeExternalTransports() + .WaitForMessageToBeReceivedAt(host) + .ExecuteAndWaitAsync((IMessageContext _) => + store.DeadLetters.MarkDeadLetterEnvelopesAsReplayableAsync(new[] { deadLetterId.Value })); + + tracked.MessageSucceeded.SingleMessage() + .ShouldNotBeNull("Replayed message should be processed successfully on retry"); + + // 4) The bug: the row is left in wolverine_incoming after a successful replay because + // BufferedReceiver/InlineReceiver never call MarkIncomingEnvelopeAsHandledAsync when + // dispatched via the local-queue path. We poll briefly to give any async cleanup + // a chance to run, then assert the inbox is drained. + var sw = Stopwatch.StartNew(); + PersistedCounts counts; + do + { + counts = await store.Admin.FetchCountsAsync(); + _output.WriteLine($"[POLL] DLQ={counts.DeadLetter}, Incoming={counts.Incoming}, Handled={counts.Handled}"); + if (counts.Incoming == 0) break; + await Task.Delay(250); + } while (sw.Elapsed < TimeSpan.FromSeconds(5)); + + if (counts.Incoming != 0) + { + _output.WriteLine($"[STUCK] Inbox rows after replay (handler invocations: {Bug1942Handler.InvocationCount}):"); + await dumpIncomingRowsAsync(schema); + } + + counts.DeadLetter.ShouldBe(0); + counts.Incoming.ShouldBe(0); + } + + private static async Task waitForDeadLetterAsync(IMessageStore store) + { + var sw = Stopwatch.StartNew(); + while (sw.Elapsed < TimeSpan.FromSeconds(15)) + { + var page = await store.DeadLetters.QueryAsync(new DeadLetterEnvelopeQuery { PageSize = 10 }, + CancellationToken.None); + if (page.Envelopes.Any()) + { + return page.Envelopes.First().Id; + } + + await Task.Delay(100); + } + + return null; + } + + private async Task dumpIncomingRowsAsync(string schema) + { + await using var conn = new NpgsqlConnection(Servers.PostgresConnectionString); + await conn.OpenAsync(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = + $"select id, status, owner_id, message_type, received_at, attempts from {schema}.wolverine_incoming_envelopes"; + await using var reader = await cmd.ExecuteReaderAsync(); + while (await reader.ReadAsync()) + { + _output.WriteLine( + $" [INCOMING] id={reader.GetGuid(0)} status={reader.GetString(1)} owner_id={reader.GetInt32(2)} message_type={reader.GetString(3)} received_at={reader.GetString(4)} attempts={reader.GetInt32(5)}"); + } + } +} + +public record Bug1942Message(Guid Id); + +public static class Bug1942Handler +{ + public static bool FailNext; + public static int InvocationCount; + + public static void Reset() + { + FailNext = false; + InvocationCount = 0; + } + + public static void Handle(Bug1942Message _) + { + Interlocked.Increment(ref InvocationCount); + if (FailNext) + { + FailNext = false; + throw new InvalidOperationException("Bug 1942 forced failure"); + } + } +} diff --git a/src/Persistence/PostgresqlTests/PostgresqlTests.csproj b/src/Persistence/PostgresqlTests/PostgresqlTests.csproj index b4b236336..f5817e3b1 100644 --- a/src/Persistence/PostgresqlTests/PostgresqlTests.csproj +++ b/src/Persistence/PostgresqlTests/PostgresqlTests.csproj @@ -19,6 +19,7 @@ + diff --git a/src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs index 63d8898df..9b0c6e003 100644 --- a/src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs @@ -7,6 +7,7 @@ using Wolverine.Runtime.Partitioning; using Wolverine.Runtime.Scheduled; using Wolverine.Transports; +using Wolverine.Transports.Local; using Wolverine.Transports.Sending; namespace Wolverine.Runtime.WorkerQueues; @@ -82,6 +83,16 @@ internal async Task executeAsync(Envelope envelope, CancellationToken _) ValueTask IChannelCallback.CompleteAsync(Envelope envelope) { + // When the durability agent recovers a persisted envelope and dispatches it to a + // non-durable local queue (DLQ replay per GH-1942, or scheduled-message firing), + // BufferedLocalQueue.EnqueueDirectlyAsync attaches a LocalQueueRecoveryListener so + // that successful pipeline completion marks the inbox row Handled. Without this, + // the row sits in wolverine_incoming forever. + if (envelope.Listener is LocalQueueRecoveryListener recovery) + { + return recovery.CompleteAsync(envelope); + } + return ValueTask.CompletedTask; } diff --git a/src/Wolverine/Transports/Local/BufferedLocalQueue.cs b/src/Wolverine/Transports/Local/BufferedLocalQueue.cs index eb3cdc8b7..0e3062c58 100644 --- a/src/Wolverine/Transports/Local/BufferedLocalQueue.cs +++ b/src/Wolverine/Transports/Local/BufferedLocalQueue.cs @@ -9,10 +9,12 @@ namespace Wolverine.Transports.Local; internal class BufferedLocalQueue : BufferedReceiver, ISendingAgent, IListenerCircuit { private readonly IMessageTracker _messageTracker; + private readonly IWolverineRuntime _runtime; public BufferedLocalQueue(Endpoint endpoint, IWolverineRuntime runtime) : base(endpoint, runtime, new HandlerPipeline((WolverineRuntime)runtime, (IExecutorFactory)runtime, endpoint)) { _messageTracker = runtime.MessageTracking; + _runtime = runtime; Destination = endpoint.Uri; Endpoint = endpoint; } @@ -33,8 +35,20 @@ ValueTask IListenerCircuit.StartAsync() Task IListenerCircuit.EnqueueDirectlyAsync(IEnumerable envelopes) { + // Recovery path: when the durability agent moves persisted incoming envelopes back + // to this non-durable local queue (either DLQ replay per GH-1942 or scheduled + // message firing), attach a LocalQueueRecoveryListener so that the inbox row gets + // marked Handled *after* the pipeline successfully completes. Without this, the + // default BufferedReceiver.CompleteAsync is a no-op and the row sits in + // wolverine_incoming forever. + // + // Note: we deliberately do NOT go through IReceiver.ReceivedAsync here — that + // path fires _completeBlock eagerly at receipt time, which would mark scheduled + // messages Handled before their handler has a chance to run. + var listener = new LocalQueueRecoveryListener(Destination, _runtime); foreach (var envelope in envelopes) { + envelope.Listener = listener; EnqueueDirectly(envelope); } diff --git a/src/Wolverine/Transports/Local/LocalQueueRecoveryListener.cs b/src/Wolverine/Transports/Local/LocalQueueRecoveryListener.cs new file mode 100644 index 000000000..15aa7204d --- /dev/null +++ b/src/Wolverine/Transports/Local/LocalQueueRecoveryListener.cs @@ -0,0 +1,51 @@ +using Microsoft.Extensions.Logging; +using Wolverine.Runtime; + +namespace Wolverine.Transports.Local; + +/// +/// Pseudo-listener used when the durability agent recovers persisted incoming envelopes +/// and dispatches them to a non-durable local queue (BufferedLocalQueue). The local queue +/// path goes through , which +/// uses envelope.Listener as the channel callback for completion. By wrapping the +/// recovered envelope with this listener, we ensure that successful handling marks the +/// inbox row as in the database — without this, the +/// row stays in wolverine_incoming forever and gets reprocessed on every host +/// restart. See https://github.com/JasperFx/wolverine/issues/1942. +/// +internal sealed class LocalQueueRecoveryListener : IListener +{ + private readonly IWolverineRuntime _runtime; + + public LocalQueueRecoveryListener(Uri address, IWolverineRuntime runtime) + { + Address = address; + _runtime = runtime; + } + + public Uri Address { get; } + + public IHandlerPipeline? Pipeline => null; + + public async ValueTask CompleteAsync(Envelope envelope) + { + try + { + await _runtime.Storage.Inbox.MarkIncomingEnvelopeAsHandledAsync(envelope); + } + catch (Exception e) + { + _runtime.Logger.LogError(e, + "Error trying to mark recovered envelope {Id} as handled in the transactional inbox", + envelope.Id); + } + } + + // If the handler defers, leave the inbox row alone — it will be picked up by the + // durability agent again on the next ownership reset. + public ValueTask DeferAsync(Envelope envelope) => ValueTask.CompletedTask; + + public ValueTask DisposeAsync() => ValueTask.CompletedTask; + + public ValueTask StopAsync() => ValueTask.CompletedTask; +}