Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/Http/WolverineWebApiFSharp/WolverineWebApiFSharp.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<LangVersion>8.0</LangVersion>
<OutputType>Exe</OutputType>
<!--
Pin FSharp.Core to the centrally-managed version (9.0.303) rather than the
SDK-bundled one. Without this, when built with the .NET 10 SDK (which bundles
FSharp.Core 10.x), WolverineWebApiFSharp.dll ends up referencing
FSharp.Core 10.0.0.0. Consumers in the test harness resolve FSharp.Core 9.0.0.0,
and JasperFx's runtime codegen then fails with CS1705 ("assembly uses FSharp.Core
with a version higher than the referenced assembly"), which bubbles up as a 500
on end-to-end tests that hit F# endpoints.
-->
<DisableImplicitFSharpCoreReference>true</DisableImplicitFSharpCoreReference>
</PropertyGroup>

<ItemGroup>
Expand All @@ -19,7 +29,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Update="FSharp.Core" VersionOverride="9.0.303" />
<PackageReference Include="FSharp.Core" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
using System.Diagnostics;
using IntegrationTests;
using JasperFx;
using JasperFx.Core;
using JasperFx.Resources;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Npgsql;
using Shouldly;
using Wolverine;
using Wolverine.Logging;
using Wolverine.Persistence.Durability;
using Wolverine.Persistence.Durability.DeadLetterManagement;
using Wolverine.Postgresql;
using Wolverine.RabbitMQ;
using Wolverine.Tracking;
using Xunit.Abstractions;

namespace PostgresqlTests.Bugs;

/// <summary>
/// GH-1942: When a non-durable endpoint (BufferedInMemory or Inline mode) persists a
/// failed message to the database-backed DLQ, marking the dead-letter row as replayable
/// causes the row to be moved back to wolverine_incoming. The durability agent picks
/// it up and dispatches it to the listener; the handler runs (and may even succeed),
/// but the inbox row is never marked Handled in the local-queue path because
/// BufferedReceiver.CompleteAsync is a no-op and BufferedLocalQueue.EnqueueDirectlyAsync
/// does not wrap the channel callback the way ListeningAgent.EnqueueDirectlyAsync does
/// for transport-backed endpoints. The result: the row sits in wolverine_incoming forever
/// and gets reprocessed every time ownership is reset (e.g. on every host restart).
///
/// These tests cover the two flavors of non-durable endpoint:
/// 1. A local queue with .BufferedInMemory() — currently broken (this PR fixes)
/// 2. A RabbitMQ listener with .ProcessInline() — works (covered by GH-1594 fix in
/// ListeningAgent.EnqueueDirectlyAsync)
/// </summary>
public class Bug_1942_replay_dlq_to_buffered_or_inline : PostgresqlContext, IAsyncLifetime
{
private readonly ITestOutputHelper _output;

public Bug_1942_replay_dlq_to_buffered_or_inline(ITestOutputHelper output)
{
_output = output;
}

public Task InitializeAsync()
{
Bug1942Handler.Reset();
return Task.CompletedTask;
}

public Task DisposeAsync() => Task.CompletedTask;

[Fact]
public async Task buffered_local_queue_replay_does_not_loop()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "bug1942_buffered");
opts.Durability.Mode = DurabilityMode.Solo;

// The local queue handling Bug1942Message uses the buffered (non-durable) receiver,
// but the host still has a database message store, so failures land in the DB DLQ.
opts.LocalQueueFor<Bug1942Message>().BufferedInMemory();

opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState);
}).StartAsync();

await runReplayScenarioAsync(host, "bug1942_buffered");
}

[Fact]
public async Task inline_rabbitmq_listener_replay_does_not_loop()
{
var queueName = $"bug1942-inline-{Guid.NewGuid()}";

using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "bug1942_inline");
opts.Durability.Mode = DurabilityMode.Solo;

// Disable Rabbit's native DLQ so failures land in the database-backed DLQ.
opts.UseRabbitMq()
.DisableDeadLetterQueueing()
.AutoProvision()
.AutoPurgeOnStartup();

opts.PublishMessage<Bug1942Message>().ToRabbitQueue(queueName);
opts.ListenToRabbitQueue(queueName).ProcessInline();

opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState);
}).StartAsync();

await runReplayScenarioAsync(host, "bug1942_inline");
}

private async Task runReplayScenarioAsync(IHost host, string schema)
{
// 1) Send a message that will fail on first attempt.
Bug1942Handler.FailNext = true;

await host
.TrackActivity()
.DoNotAssertOnExceptionsDetected()
.Timeout(30.Seconds())
.IncludeExternalTransports()
.SendMessageAndWaitAsync(new Bug1942Message(Guid.NewGuid()));

var store = host.Services.GetRequiredService<IMessageStore>();

// 2) Wait for the dead-letter row to materialize in the database.
var deadLetterId = await waitForDeadLetterAsync(store);
deadLetterId.ShouldNotBeNull("Failed message should land in the database DLQ");

// 3) Allow the handler to succeed on the next attempt, then mark the row replayable.
// The durability agent should move the row back to incoming and re-dispatch it.
Bug1942Handler.FailNext = false;

var tracked = await host
.TrackActivity()
.Timeout(30.Seconds())
.DoNotAssertOnExceptionsDetected()
.IncludeExternalTransports()
.WaitForMessageToBeReceivedAt<Bug1942Message>(host)
.ExecuteAndWaitAsync((IMessageContext _) =>
store.DeadLetters.MarkDeadLetterEnvelopesAsReplayableAsync(new[] { deadLetterId.Value }));

tracked.MessageSucceeded.SingleMessage<Bug1942Message>()
.ShouldNotBeNull("Replayed message should be processed successfully on retry");

// 4) The bug: the row is left in wolverine_incoming after a successful replay because
// BufferedReceiver/InlineReceiver never call MarkIncomingEnvelopeAsHandledAsync when
// dispatched via the local-queue path. We poll briefly to give any async cleanup
// a chance to run, then assert the inbox is drained.
var sw = Stopwatch.StartNew();
PersistedCounts counts;
do
{
counts = await store.Admin.FetchCountsAsync();
_output.WriteLine($"[POLL] DLQ={counts.DeadLetter}, Incoming={counts.Incoming}, Handled={counts.Handled}");
if (counts.Incoming == 0) break;
await Task.Delay(250);
} while (sw.Elapsed < TimeSpan.FromSeconds(5));

if (counts.Incoming != 0)
{
_output.WriteLine($"[STUCK] Inbox rows after replay (handler invocations: {Bug1942Handler.InvocationCount}):");
await dumpIncomingRowsAsync(schema);
}

counts.DeadLetter.ShouldBe(0);
counts.Incoming.ShouldBe(0);
}

private static async Task<Guid?> waitForDeadLetterAsync(IMessageStore store)
{
var sw = Stopwatch.StartNew();
while (sw.Elapsed < TimeSpan.FromSeconds(15))
{
var page = await store.DeadLetters.QueryAsync(new DeadLetterEnvelopeQuery { PageSize = 10 },
CancellationToken.None);
if (page.Envelopes.Any())
{
return page.Envelopes.First().Id;
}

await Task.Delay(100);
}

return null;
}

private async Task dumpIncomingRowsAsync(string schema)
{
await using var conn = new NpgsqlConnection(Servers.PostgresConnectionString);
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
cmd.CommandText =
$"select id, status, owner_id, message_type, received_at, attempts from {schema}.wolverine_incoming_envelopes";
await using var reader = await cmd.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
_output.WriteLine(
$" [INCOMING] id={reader.GetGuid(0)} status={reader.GetString(1)} owner_id={reader.GetInt32(2)} message_type={reader.GetString(3)} received_at={reader.GetString(4)} attempts={reader.GetInt32(5)}");
}
}
}

public record Bug1942Message(Guid Id);

public static class Bug1942Handler
{
public static bool FailNext;
public static int InvocationCount;

public static void Reset()
{
FailNext = false;
InvocationCount = 0;
}

public static void Handle(Bug1942Message _)
{
Interlocked.Increment(ref InvocationCount);
if (FailNext)
{
FailNext = false;
throw new InvalidOperationException("Bug 1942 forced failure");
}
}
}
1 change: 1 addition & 0 deletions src/Persistence/PostgresqlTests/PostgresqlTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<ProjectReference Include="..\..\Samples\OrderSagaSample\OrderSagaSample.csproj"/>
<ProjectReference Include="..\Wolverine.RDBMS\Wolverine.RDBMS.csproj"/>
<ProjectReference Include="..\Wolverine.Postgresql\Wolverine.Postgresql.csproj"/>
<ProjectReference Include="..\..\Transports\RabbitMQ\Wolverine.RabbitMQ\Wolverine.RabbitMQ.csproj" />
<ProjectReference Include="..\..\Testing\Wolverine.ComplianceTests\Wolverine.ComplianceTests.csproj" />

</ItemGroup>
Expand Down
11 changes: 11 additions & 0 deletions src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Wolverine.Runtime.Partitioning;
using Wolverine.Runtime.Scheduled;
using Wolverine.Transports;
using Wolverine.Transports.Local;
using Wolverine.Transports.Sending;

namespace Wolverine.Runtime.WorkerQueues;
Expand Down Expand Up @@ -82,6 +83,16 @@ internal async Task executeAsync(Envelope envelope, CancellationToken _)

ValueTask IChannelCallback.CompleteAsync(Envelope envelope)
{
// When the durability agent recovers a persisted envelope and dispatches it to a
// non-durable local queue (DLQ replay per GH-1942, or scheduled-message firing),
// BufferedLocalQueue.EnqueueDirectlyAsync attaches a LocalQueueRecoveryListener so
// that successful pipeline completion marks the inbox row Handled. Without this,
// the row sits in wolverine_incoming forever.
if (envelope.Listener is LocalQueueRecoveryListener recovery)
{
return recovery.CompleteAsync(envelope);
}

return ValueTask.CompletedTask;
}

Expand Down
14 changes: 14 additions & 0 deletions src/Wolverine/Transports/Local/BufferedLocalQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ namespace Wolverine.Transports.Local;
internal class BufferedLocalQueue : BufferedReceiver, ISendingAgent, IListenerCircuit
{
private readonly IMessageTracker _messageTracker;
private readonly IWolverineRuntime _runtime;

public BufferedLocalQueue(Endpoint endpoint, IWolverineRuntime runtime) : base(endpoint, runtime, new HandlerPipeline((WolverineRuntime)runtime, (IExecutorFactory)runtime, endpoint))
{
_messageTracker = runtime.MessageTracking;
_runtime = runtime;
Destination = endpoint.Uri;
Endpoint = endpoint;
}
Expand All @@ -33,8 +35,20 @@ ValueTask IListenerCircuit.StartAsync()

Task IListenerCircuit.EnqueueDirectlyAsync(IEnumerable<Envelope> envelopes)
{
// Recovery path: when the durability agent moves persisted incoming envelopes back
// to this non-durable local queue (either DLQ replay per GH-1942 or scheduled
// message firing), attach a LocalQueueRecoveryListener so that the inbox row gets
// marked Handled *after* the pipeline successfully completes. Without this, the
// default BufferedReceiver.CompleteAsync is a no-op and the row sits in
// wolverine_incoming forever.
//
// Note: we deliberately do NOT go through IReceiver.ReceivedAsync here — that
// path fires _completeBlock eagerly at receipt time, which would mark scheduled
// messages Handled before their handler has a chance to run.
var listener = new LocalQueueRecoveryListener(Destination, _runtime);
foreach (var envelope in envelopes)
{
envelope.Listener = listener;
EnqueueDirectly(envelope);
}

Expand Down
51 changes: 51 additions & 0 deletions src/Wolverine/Transports/Local/LocalQueueRecoveryListener.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using Microsoft.Extensions.Logging;
using Wolverine.Runtime;

namespace Wolverine.Transports.Local;

/// <summary>
/// Pseudo-listener used when the durability agent recovers persisted incoming envelopes
/// and dispatches them to a non-durable local queue (BufferedLocalQueue). The local queue
/// path goes through <see cref="IReceiver.ReceivedAsync(IListener, Envelope[])"/>, which
/// uses <c>envelope.Listener</c> as the channel callback for completion. By wrapping the
/// recovered envelope with this listener, we ensure that successful handling marks the
/// inbox row as <see cref="EnvelopeStatus.Handled"/> in the database — without this, the
/// row stays in <c>wolverine_incoming</c> forever and gets reprocessed on every host
/// restart. See https://github.com/JasperFx/wolverine/issues/1942.
/// </summary>
internal sealed class LocalQueueRecoveryListener : IListener
{
private readonly IWolverineRuntime _runtime;

public LocalQueueRecoveryListener(Uri address, IWolverineRuntime runtime)
{
Address = address;
_runtime = runtime;
}

public Uri Address { get; }

public IHandlerPipeline? Pipeline => null;

public async ValueTask CompleteAsync(Envelope envelope)
{
try
{
await _runtime.Storage.Inbox.MarkIncomingEnvelopeAsHandledAsync(envelope);
}
catch (Exception e)
{
_runtime.Logger.LogError(e,
"Error trying to mark recovered envelope {Id} as handled in the transactional inbox",
envelope.Id);
}
}

// If the handler defers, leave the inbox row alone — it will be picked up by the
// durability agent again on the next ownership reset.
public ValueTask DeferAsync(Envelope envelope) => ValueTask.CompletedTask;

public ValueTask DisposeAsync() => ValueTask.CompletedTask;

public ValueTask StopAsync() => ValueTask.CompletedTask;
}
Loading