Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions docs/guide/messaging/listeners.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,30 @@ without having to use any kind of message persistence.
To improve throughput, you can direct Wolverine to use a number of parallel listeners, but the default is
just 1 per listening endpoint.

### Processing Inline While Draining

By default, when Wolverine begins draining an inline listener during graceful shutdown, any messages still queued
in the receiver are immediately deferred back to the transport broker. If you'd prefer that already-ingested messages
continue processing to completion before the receiver shuts down, you can enable the `ProcessInlineWhileDraining` option:

```cs
opts.ListenToRabbitQueue("inline")
.ProcessInline()

// Allow messages already received by the listener to finish
// processing during graceful shutdown instead of being deferred
// back to the broker immediately.
.ProcessInlineWhileDraining();
```

With this flag enabled:

* Messages that have already been received by the listener will continue to be processed through the handler pipeline
while the drain is in progress.
* Once the drain completes, any new messages that arrive will be deferred as usual.

This is useful when deferring partially-processed batches could lead to latency outliers.

## Buffered Endpoints

::: tip
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using NSubstitute;
using Wolverine.ComplianceTests;
using Wolverine.Configuration;
using Wolverine.Runtime;
using Wolverine.Runtime.WorkerQueues;
using Wolverine.Transports;
Expand Down Expand Up @@ -267,3 +268,139 @@ public async Task drain_does_not_signal_until_all_batch_messages_are_handled()
await theListener.Received(1).DeferAsync(envelope3);
}
}

public class inline_receiver_process_inline_while_draining_processes_batch
{
private readonly IListener theListener = Substitute.For<IListener>();
private readonly IHandlerPipeline thePipeline = Substitute.For<IHandlerPipeline>();
private readonly MockWolverineRuntime theRuntime = new();
private readonly InlineReceiver theReceiver;

public inline_receiver_process_inline_while_draining_processes_batch()
{
var stubEndpoint = new StubEndpoint("one", new StubTransport());
stubEndpoint.ProcessInlineWhileDraining = true;
theReceiver = new InlineReceiver(stubEndpoint, theRuntime, thePipeline);
theListener.Address.Returns(new Uri("stub://one"));
}

[Fact]
public async Task batch_messages_are_processed_not_deferred_while_draining()
{
var firstMessageBlocking = new TaskCompletionSource();

thePipeline.InvokeAsync(Arg.Any<Envelope>(), Arg.Any<IChannelCallback>(), Arg.Any<System.Diagnostics.Activity>())
.Returns(async _ => await firstMessageBlocking.Task);

var envelope1 = ObjectMother.Envelope();
var envelope2 = ObjectMother.Envelope();
var envelope3 = ObjectMother.Envelope();

// Start batch receive — it will block on the first message
var receiveTask = Task.Run(() => theReceiver.ReceivedAsync(theListener, new[] { envelope1, envelope2, envelope3 }).AsTask());

// Give the receive task time to enter the pipeline for envelope1
await Task.Delay(50);

Assert.Equal(3, theReceiver.QueueCount);

// Simulate shutdown: Latch() first, then DrainAsync while the first message is still in-flight
theReceiver.Latch();
var drainTask = theReceiver.DrainAsync().AsTask();
await Task.Delay(50);

Assert.False(drainTask.IsCompleted, "DrainAsync must not complete while batch messages are still in-flight");

// Release the first message — with ProcessInlineWhileDraining, remaining messages should be processed, not deferred
firstMessageBlocking.SetResult();

await receiveTask.WaitAsync(TimeSpan.FromSeconds(5));
await drainTask.WaitAsync(TimeSpan.FromSeconds(5));

Assert.Equal(0, theReceiver.QueueCount);

// Remaining messages should have been processed through the pipeline, NOT deferred
await theListener.DidNotReceive().DeferAsync(envelope2);
await theListener.DidNotReceive().DeferAsync(envelope3);

// All three messages should have been invoked through the pipeline
await thePipeline.Received(3).InvokeAsync(Arg.Any<Envelope>(), Arg.Any<IChannelCallback>(), Arg.Any<System.Diagnostics.Activity>());
}
}

public class inline_receiver_process_inline_while_draining_defers_after_drain_completes
{
private readonly IListener theListener = Substitute.For<IListener>();
private readonly IHandlerPipeline thePipeline = Substitute.For<IHandlerPipeline>();
private readonly MockWolverineRuntime theRuntime = new();
private readonly InlineReceiver theReceiver;

public inline_receiver_process_inline_while_draining_defers_after_drain_completes()
{
var stubEndpoint = new StubEndpoint("one", new StubTransport());
stubEndpoint.ProcessInlineWhileDraining = true;
theReceiver = new InlineReceiver(stubEndpoint, theRuntime, thePipeline);
theListener.Address.Returns(new Uri("stub://one"));
}

[Fact]
public async Task messages_are_deferred_after_drain_has_completed()
{
// Latch and drain with nothing in flight — drain completes immediately
theReceiver.Latch();
await theReceiver.DrainAsync();

var envelope = ObjectMother.Envelope();
await theReceiver.ReceivedAsync(theListener, envelope);

// After drain completed, messages should be deferred even with the flag on
await theListener.Received(1).DeferAsync(envelope);

await thePipeline.DidNotReceive()
.InvokeAsync(Arg.Any<Envelope>(), Arg.Any<IChannelCallback>(), Arg.Any<System.Diagnostics.Activity>());
}
}

public class inline_receiver_process_inline_while_draining_non_wait_drain
{
private readonly IListener theListener = Substitute.For<IListener>();
private readonly IHandlerPipeline thePipeline = Substitute.For<IHandlerPipeline>();
private readonly MockWolverineRuntime theRuntime = new();
private readonly InlineReceiver theReceiver;

public inline_receiver_process_inline_while_draining_non_wait_drain()
{
var stubEndpoint = new StubEndpoint("one", new StubTransport());
stubEndpoint.ProcessInlineWhileDraining = true;
theReceiver = new InlineReceiver(stubEndpoint, theRuntime, thePipeline);
theListener.Address.Returns(new Uri("stub://one"));
}

[Fact]
public async Task messages_are_processed_during_non_wait_drain()
{
var firstMessageBlocking = new TaskCompletionSource();

thePipeline.InvokeAsync(Arg.Any<Envelope>(), Arg.Any<IChannelCallback>(), Arg.Any<System.Diagnostics.Activity>())
.Returns(async _ => await firstMessageBlocking.Task);

var envelope1 = ObjectMother.Envelope();
var envelope2 = ObjectMother.Envelope();

// Start batch receive — it will block on the first message
var receiveTask = Task.Run(() => theReceiver.ReceivedAsync(theListener, new[] { envelope1, envelope2 }).AsTask());
await Task.Delay(50);

// DrainAsync without prior Latch() — returns immediately (non-wait path)
var drainTask = theReceiver.DrainAsync();
Assert.True(drainTask.IsCompleted, "DrainAsync should return immediately without prior Latch()");

// Release the first message — envelope2 should still be processed
firstMessageBlocking.SetResult();
await receiveTask.WaitAsync(TimeSpan.FromSeconds(5));

// Both messages should have been processed, not deferred
await theListener.DidNotReceive().DeferAsync(envelope2);
await thePipeline.Received(2).InvokeAsync(Arg.Any<Envelope>(), Arg.Any<IChannelCallback>(), Arg.Any<System.Diagnostics.Activity>());
}
}
8 changes: 8 additions & 0 deletions src/Wolverine/Configuration/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,14 @@ protected Endpoint(Uri uri, EndpointRole role)
/// </summary>
public bool TelemetryEnabled { get; set; } = true;

/// <summary>
/// When using <see cref="EndpointMode.Inline"/>, setting this to <c>true</c> will allow
/// already-ingested messages to continue processing while the receiver is draining, only
/// deferring messages after the drain has fully completed. When <c>false</c> (the default),
/// messages are deferred as soon as the drain begins.
/// </summary>
public bool ProcessInlineWhileDraining { get; set; }

/// <summary>
/// Is the endpoint controlled and configured by the application or Wolverine itself?
/// </summary>
Expand Down
8 changes: 8 additions & 0 deletions src/Wolverine/Configuration/IListenerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ public interface IListenerConfiguration<T> : IEndpointConfiguration<T>
/// <returns></returns>
T UseForReplies();

/// <summary>
/// When using inline processing, allow already-ingested messages to continue processing
/// while the receiver is draining. Messages will only be deferred after the drain has
/// fully completed rather than as soon as it begins.
/// </summary>
/// <returns></returns>
T ProcessInlineWhileDraining();

/// <summary>
/// Direct Wolverine to use the specified handler type for its messages on
/// only this listening endpoint. This is helpful to create "sticky" handlers for the
Expand Down
6 changes: 6 additions & 0 deletions src/Wolverine/Configuration/ListenerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,12 @@ public TSelf UseForReplies()
return this.As<TSelf>();
}

public TSelf ProcessInlineWhileDraining()
{
add(e => e.ProcessInlineWhileDraining = true);
return this.As<TSelf>();
}

public TSelf Named(string name)
{
add(e => e.EndpointName = name);
Expand Down
2 changes: 1 addition & 1 deletion src/Wolverine/Runtime/WorkerQueues/InlineReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public async ValueTask ReceivedAsync(IListener listener, Envelope envelope)

private async ValueTask ProcessMessageAsync(IListener listener, Envelope envelope)
{
if (_latched)
if (_latched && (!_endpoint.ProcessInlineWhileDraining || _drainComplete.Task.IsCompleted))
{
try
{
Expand Down
Loading