From b4120beaad1b832f903cdd58385076ba4b8432fa Mon Sep 17 00:00:00 2001 From: Lyall Guiney Date: Mon, 30 Mar 2026 21:24:18 +0100 Subject: [PATCH] Fix DurableReceiver not completing listener after early dead-letter When an envelope arrived with a missing MessageType or failed metadata unwrapping, DurableReceiver moved it to the dead letter queue but never called CompleteAsync on the listener. For transports with manual offset management (e.g. Kafka with auto-commit disabled), this meant the offset was never advanced and the message was redelivered indefinitely on reconnect or pod restart. Fix by calling _completeBlock.PostAsync after _moveToErrors.PostAsync in both early-exit paths within ShouldPersistBeforeProcessing. --- ...eiver_fails_to_unwrap_envelope_metadata.cs | 56 +++++++++++++++++++ ...ives_envelope_with_missing_message_type.cs | 48 ++++++++++++++++ .../Runtime/WorkerQueues/DurableReceiver.cs | 2 + 3 files changed, 106 insertions(+) create mode 100644 src/Testing/CoreTests/Runtime/WorkerQueues/when_durable_receiver_fails_to_unwrap_envelope_metadata.cs create mode 100644 src/Testing/CoreTests/Runtime/WorkerQueues/when_durable_receiver_receives_envelope_with_missing_message_type.cs diff --git a/src/Testing/CoreTests/Runtime/WorkerQueues/when_durable_receiver_fails_to_unwrap_envelope_metadata.cs b/src/Testing/CoreTests/Runtime/WorkerQueues/when_durable_receiver_fails_to_unwrap_envelope_metadata.cs new file mode 100644 index 000000000..69c4577d7 --- /dev/null +++ b/src/Testing/CoreTests/Runtime/WorkerQueues/when_durable_receiver_fails_to_unwrap_envelope_metadata.cs @@ -0,0 +1,56 @@ +using NSubstitute; +using Wolverine.ComplianceTests; +using Wolverine.Runtime; +using Wolverine.Runtime.Serialization; +using Wolverine.Runtime.WorkerQueues; +using Wolverine.Transports; +using Wolverine.Transports.Stub; +using Xunit; + +namespace CoreTests.Runtime.WorkerQueues; + +public class when_durable_receiver_fails_to_unwrap_envelope_metadata : IAsyncLifetime +{ + private readonly Envelope theEnvelope = ObjectMother.Envelope(); + private readonly IListener theListener = Substitute.For(); + private readonly IHandlerPipeline thePipeline = Substitute.For(); + private readonly DurableReceiver theReceiver; + private readonly MockWolverineRuntime theRuntime; + + public when_durable_receiver_fails_to_unwrap_envelope_metadata() + { + theRuntime = new MockWolverineRuntime(); + + var stubEndpoint = new StubEndpoint("one", new StubTransport()); + theReceiver = new DurableReceiver(stubEndpoint, theRuntime, thePipeline); + + // Use a serializer that implements IUnwrapsMetadataMessageSerializer so + // UnwrapEnvelopeIfNecessary will call Unwrap — then make it throw + var serializer = Substitute.For(); + serializer.When(s => s.Unwrap(Arg.Any())) + .Throw(new Exception("Failed to unwrap metadata")); + + theEnvelope.MessageType = null; // triggers the unwrap path + theEnvelope.Serializer = serializer; + } + + public async Task InitializeAsync() + { + await theReceiver.ReceivedAsync(theListener, theEnvelope); + await theReceiver.DrainAsync(); + } + + public Task DisposeAsync() => Task.CompletedTask; + + [Fact] + public async Task the_listener_was_completed_so_the_transport_does_not_redeliver_the_message() + { + await theListener.Received().CompleteAsync(theEnvelope); + } + + [Fact] + public async Task the_envelope_was_not_processed() + { + await thePipeline.DidNotReceive().InvokeAsync(theEnvelope, theReceiver); + } +} diff --git a/src/Testing/CoreTests/Runtime/WorkerQueues/when_durable_receiver_receives_envelope_with_missing_message_type.cs b/src/Testing/CoreTests/Runtime/WorkerQueues/when_durable_receiver_receives_envelope_with_missing_message_type.cs new file mode 100644 index 000000000..1dec4ebcd --- /dev/null +++ b/src/Testing/CoreTests/Runtime/WorkerQueues/when_durable_receiver_receives_envelope_with_missing_message_type.cs @@ -0,0 +1,48 @@ +using NSubstitute; +using Wolverine.ComplianceTests; +using Wolverine.Runtime; +using Wolverine.Runtime.WorkerQueues; +using Wolverine.Transports; +using Wolverine.Transports.Stub; +using Xunit; + +namespace CoreTests.Runtime.WorkerQueues; + +public class when_durable_receiver_receives_envelope_with_missing_message_type : IAsyncLifetime +{ + private readonly Envelope theEnvelope = ObjectMother.Envelope(); + private readonly IListener theListener = Substitute.For(); + private readonly IHandlerPipeline thePipeline = Substitute.For(); + private readonly DurableReceiver theReceiver; + private readonly MockWolverineRuntime theRuntime; + + public when_durable_receiver_receives_envelope_with_missing_message_type() + { + theRuntime = new MockWolverineRuntime(); + + var stubEndpoint = new StubEndpoint("one", new StubTransport()); + theReceiver = new DurableReceiver(stubEndpoint, theRuntime, thePipeline); + + theEnvelope.MessageType = null; + } + + public async Task InitializeAsync() + { + await theReceiver.ReceivedAsync(theListener, theEnvelope); + await theReceiver.DrainAsync(); + } + + public Task DisposeAsync() => Task.CompletedTask; + + [Fact] + public async Task the_listener_was_completed_so_the_transport_does_not_redeliver_the_message() + { + await theListener.Received().CompleteAsync(theEnvelope); + } + + [Fact] + public async Task the_envelope_was_not_processed() + { + await thePipeline.DidNotReceive().InvokeAsync(theEnvelope, theReceiver); + } +} diff --git a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs index a088df9c8..b81a50f13 100644 --- a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs @@ -429,6 +429,7 @@ await executeWithRetriesAsync(async () => envelope.MessageType ??= $"unknown/{e.GetType().Name}"; envelope.Failure = e; await _moveToErrors.PostAsync(envelope); + await _completeBlock.PostAsync(envelope); return; } @@ -442,6 +443,7 @@ await executeWithRetriesAsync(async () => { _logger.LogInformation("Empty or missing message type name for Envelope {Id} received at durable {Destination}. Moving to dead letter queue", envelope.Id, envelope.Destination); await _moveToErrors.PostAsync(envelope); + await _completeBlock.PostAsync(envelope); return; }