diff --git a/docs/guide/messaging/listeners.md b/docs/guide/messaging/listeners.md index e3696bd8e..305069942 100644 --- a/docs/guide/messaging/listeners.md +++ b/docs/guide/messaging/listeners.md @@ -66,6 +66,30 @@ without having to use any kind of message persistence. To improve throughput, you can direct Wolverine to use a number of parallel listeners, but the default is just 1 per listening endpoint. +### Processing Inline While Draining + +By default, when Wolverine begins draining an inline listener during graceful shutdown, any messages still queued +in the receiver are immediately deferred back to the transport broker. If you'd prefer that already-ingested messages +continue processing to completion before the receiver shuts down, you can enable the `ProcessInlineWhileDraining` option: + +```cs +opts.ListenToRabbitQueue("inline") + .ProcessInline() + + // Allow messages already received by the listener to finish + // processing during graceful shutdown instead of being deferred + // back to the broker immediately. + .ProcessInlineWhileDraining(); +``` + +With this flag enabled: + +* Messages that have already been received by the listener will continue to be processed through the handler pipeline + while the drain is in progress. +* Once the drain completes, any new messages that arrive will be deferred as usual. + +This is useful when deferring partially-processed batches could lead to latency outliers. + ## Buffered Endpoints ::: tip diff --git a/src/Testing/CoreTests/Runtime/WorkerQueues/inline_receiver_drain_and_latch.cs b/src/Testing/CoreTests/Runtime/WorkerQueues/inline_receiver_drain_and_latch.cs index 74f0d3733..7df3d4eeb 100644 --- a/src/Testing/CoreTests/Runtime/WorkerQueues/inline_receiver_drain_and_latch.cs +++ b/src/Testing/CoreTests/Runtime/WorkerQueues/inline_receiver_drain_and_latch.cs @@ -1,5 +1,6 @@ using NSubstitute; using Wolverine.ComplianceTests; +using Wolverine.Configuration; using Wolverine.Runtime; using Wolverine.Runtime.WorkerQueues; using Wolverine.Transports; @@ -267,3 +268,139 @@ public async Task drain_does_not_signal_until_all_batch_messages_are_handled() await theListener.Received(1).DeferAsync(envelope3); } } + +public class inline_receiver_process_inline_while_draining_processes_batch +{ + private readonly IListener theListener = Substitute.For(); + private readonly IHandlerPipeline thePipeline = Substitute.For(); + private readonly MockWolverineRuntime theRuntime = new(); + private readonly InlineReceiver theReceiver; + + public inline_receiver_process_inline_while_draining_processes_batch() + { + var stubEndpoint = new StubEndpoint("one", new StubTransport()); + stubEndpoint.ProcessInlineWhileDraining = true; + theReceiver = new InlineReceiver(stubEndpoint, theRuntime, thePipeline); + theListener.Address.Returns(new Uri("stub://one")); + } + + [Fact] + public async Task batch_messages_are_processed_not_deferred_while_draining() + { + var firstMessageBlocking = new TaskCompletionSource(); + + thePipeline.InvokeAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(async _ => await firstMessageBlocking.Task); + + var envelope1 = ObjectMother.Envelope(); + var envelope2 = ObjectMother.Envelope(); + var envelope3 = ObjectMother.Envelope(); + + // Start batch receive — it will block on the first message + var receiveTask = Task.Run(() => theReceiver.ReceivedAsync(theListener, new[] { envelope1, envelope2, envelope3 }).AsTask()); + + // Give the receive task time to enter the pipeline for envelope1 + await Task.Delay(50); + + Assert.Equal(3, theReceiver.QueueCount); + + // Simulate shutdown: Latch() first, then DrainAsync while the first message is still in-flight + theReceiver.Latch(); + var drainTask = theReceiver.DrainAsync().AsTask(); + await Task.Delay(50); + + Assert.False(drainTask.IsCompleted, "DrainAsync must not complete while batch messages are still in-flight"); + + // Release the first message — with ProcessInlineWhileDraining, remaining messages should be processed, not deferred + firstMessageBlocking.SetResult(); + + await receiveTask.WaitAsync(TimeSpan.FromSeconds(5)); + await drainTask.WaitAsync(TimeSpan.FromSeconds(5)); + + Assert.Equal(0, theReceiver.QueueCount); + + // Remaining messages should have been processed through the pipeline, NOT deferred + await theListener.DidNotReceive().DeferAsync(envelope2); + await theListener.DidNotReceive().DeferAsync(envelope3); + + // All three messages should have been invoked through the pipeline + await thePipeline.Received(3).InvokeAsync(Arg.Any(), Arg.Any(), Arg.Any()); + } +} + +public class inline_receiver_process_inline_while_draining_defers_after_drain_completes +{ + private readonly IListener theListener = Substitute.For(); + private readonly IHandlerPipeline thePipeline = Substitute.For(); + private readonly MockWolverineRuntime theRuntime = new(); + private readonly InlineReceiver theReceiver; + + public inline_receiver_process_inline_while_draining_defers_after_drain_completes() + { + var stubEndpoint = new StubEndpoint("one", new StubTransport()); + stubEndpoint.ProcessInlineWhileDraining = true; + theReceiver = new InlineReceiver(stubEndpoint, theRuntime, thePipeline); + theListener.Address.Returns(new Uri("stub://one")); + } + + [Fact] + public async Task messages_are_deferred_after_drain_has_completed() + { + // Latch and drain with nothing in flight — drain completes immediately + theReceiver.Latch(); + await theReceiver.DrainAsync(); + + var envelope = ObjectMother.Envelope(); + await theReceiver.ReceivedAsync(theListener, envelope); + + // After drain completed, messages should be deferred even with the flag on + await theListener.Received(1).DeferAsync(envelope); + + await thePipeline.DidNotReceive() + .InvokeAsync(Arg.Any(), Arg.Any(), Arg.Any()); + } +} + +public class inline_receiver_process_inline_while_draining_non_wait_drain +{ + private readonly IListener theListener = Substitute.For(); + private readonly IHandlerPipeline thePipeline = Substitute.For(); + private readonly MockWolverineRuntime theRuntime = new(); + private readonly InlineReceiver theReceiver; + + public inline_receiver_process_inline_while_draining_non_wait_drain() + { + var stubEndpoint = new StubEndpoint("one", new StubTransport()); + stubEndpoint.ProcessInlineWhileDraining = true; + theReceiver = new InlineReceiver(stubEndpoint, theRuntime, thePipeline); + theListener.Address.Returns(new Uri("stub://one")); + } + + [Fact] + public async Task messages_are_processed_during_non_wait_drain() + { + var firstMessageBlocking = new TaskCompletionSource(); + + thePipeline.InvokeAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(async _ => await firstMessageBlocking.Task); + + var envelope1 = ObjectMother.Envelope(); + var envelope2 = ObjectMother.Envelope(); + + // Start batch receive — it will block on the first message + var receiveTask = Task.Run(() => theReceiver.ReceivedAsync(theListener, new[] { envelope1, envelope2 }).AsTask()); + await Task.Delay(50); + + // DrainAsync without prior Latch() — returns immediately (non-wait path) + var drainTask = theReceiver.DrainAsync(); + Assert.True(drainTask.IsCompleted, "DrainAsync should return immediately without prior Latch()"); + + // Release the first message — envelope2 should still be processed + firstMessageBlocking.SetResult(); + await receiveTask.WaitAsync(TimeSpan.FromSeconds(5)); + + // Both messages should have been processed, not deferred + await theListener.DidNotReceive().DeferAsync(envelope2); + await thePipeline.Received(2).InvokeAsync(Arg.Any(), Arg.Any(), Arg.Any()); + } +} diff --git a/src/Wolverine/Configuration/Endpoint.cs b/src/Wolverine/Configuration/Endpoint.cs index ea34efd6e..99530a7a3 100644 --- a/src/Wolverine/Configuration/Endpoint.cs +++ b/src/Wolverine/Configuration/Endpoint.cs @@ -218,6 +218,14 @@ protected Endpoint(Uri uri, EndpointRole role) /// public bool TelemetryEnabled { get; set; } = true; + /// + /// When using , setting this to true will allow + /// already-ingested messages to continue processing while the receiver is draining, only + /// deferring messages after the drain has fully completed. When false (the default), + /// messages are deferred as soon as the drain begins. + /// + public bool ProcessInlineWhileDraining { get; set; } + /// /// Is the endpoint controlled and configured by the application or Wolverine itself? /// diff --git a/src/Wolverine/Configuration/IListenerConfiguration.cs b/src/Wolverine/Configuration/IListenerConfiguration.cs index 704c78f15..db7a18b13 100644 --- a/src/Wolverine/Configuration/IListenerConfiguration.cs +++ b/src/Wolverine/Configuration/IListenerConfiguration.cs @@ -119,6 +119,14 @@ public interface IListenerConfiguration : IEndpointConfiguration /// T UseForReplies(); + /// + /// When using inline processing, allow already-ingested messages to continue processing + /// while the receiver is draining. Messages will only be deferred after the drain has + /// fully completed rather than as soon as it begins. + /// + /// + T ProcessInlineWhileDraining(); + /// /// Direct Wolverine to use the specified handler type for its messages on /// only this listening endpoint. This is helpful to create "sticky" handlers for the diff --git a/src/Wolverine/Configuration/ListenerConfiguration.cs b/src/Wolverine/Configuration/ListenerConfiguration.cs index 5620f7d5c..dedb7e315 100644 --- a/src/Wolverine/Configuration/ListenerConfiguration.cs +++ b/src/Wolverine/Configuration/ListenerConfiguration.cs @@ -326,6 +326,12 @@ public TSelf UseForReplies() return this.As(); } + public TSelf ProcessInlineWhileDraining() + { + add(e => e.ProcessInlineWhileDraining = true); + return this.As(); + } + public TSelf Named(string name) { add(e => e.EndpointName = name); diff --git a/src/Wolverine/Runtime/WorkerQueues/InlineReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/InlineReceiver.cs index 97792c697..c8b41c663 100644 --- a/src/Wolverine/Runtime/WorkerQueues/InlineReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/InlineReceiver.cs @@ -99,7 +99,7 @@ public async ValueTask ReceivedAsync(IListener listener, Envelope envelope) private async ValueTask ProcessMessageAsync(IListener listener, Envelope envelope) { - if (_latched) + if (_latched && (!_endpoint.ProcessInlineWhileDraining || _drainComplete.Task.IsCompleted)) { try {