diff --git a/Directory.Packages.props b/Directory.Packages.props
index f265d5323..094e4ca98 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -21,7 +21,7 @@
-
+
diff --git a/src/Testing/CoreTests/Runtime/WorkerQueues/inline_receiver_drain_and_latch.cs b/src/Testing/CoreTests/Runtime/WorkerQueues/inline_receiver_drain_and_latch.cs
new file mode 100644
index 000000000..74f0d3733
--- /dev/null
+++ b/src/Testing/CoreTests/Runtime/WorkerQueues/inline_receiver_drain_and_latch.cs
@@ -0,0 +1,269 @@
+using NSubstitute;
+using Wolverine.ComplianceTests;
+using Wolverine.Runtime;
+using Wolverine.Runtime.WorkerQueues;
+using Wolverine.Transports;
+using Wolverine.Transports.Stub;
+using Xunit;
+
+namespace CoreTests.Runtime.WorkerQueues;
+
+public class inline_receiver_drain_when_idle
+{
+ private readonly IHandlerPipeline thePipeline = Substitute.For();
+ private readonly MockWolverineRuntime theRuntime = new();
+ private readonly InlineReceiver theReceiver;
+
+ public inline_receiver_drain_when_idle()
+ {
+ var stubEndpoint = new StubEndpoint("one", new StubTransport());
+ theReceiver = new InlineReceiver(stubEndpoint, theRuntime, thePipeline);
+ }
+
+ [Fact]
+ public async Task drain_completes_immediately_when_no_messages_in_flight()
+ {
+ // DrainAsync should return immediately when nothing is in-flight
+ var drainTask = theReceiver.DrainAsync();
+ Assert.True(drainTask.IsCompleted);
+ await drainTask;
+ }
+
+ [Fact]
+ public void queue_count_is_zero_when_idle()
+ {
+ Assert.Equal(0, theReceiver.QueueCount);
+ }
+}
+
+public class inline_receiver_drain_waits_for_in_flight
+{
+ private readonly IListener theListener = Substitute.For();
+ private readonly IHandlerPipeline thePipeline = Substitute.For();
+ private readonly MockWolverineRuntime theRuntime = new();
+ private readonly InlineReceiver theReceiver;
+
+ public inline_receiver_drain_waits_for_in_flight()
+ {
+ var stubEndpoint = new StubEndpoint("one", new StubTransport());
+ theReceiver = new InlineReceiver(stubEndpoint, theRuntime, thePipeline);
+ theListener.Address.Returns(new Uri("stub://one"));
+ }
+
+ [Fact]
+ public async Task drain_waits_for_in_flight_message_to_complete_when_latched()
+ {
+ var messageBlocking = new TaskCompletionSource();
+
+ // Make pipeline.InvokeAsync block until we release it
+ thePipeline.InvokeAsync(Arg.Any(), Arg.Any(), Arg.Any())
+ .Returns(async _ => await messageBlocking.Task);
+
+ var envelope = ObjectMother.Envelope();
+
+ // Start receiving on a background task — it will block in InvokeAsync
+ var receiveTask = Task.Run(() => theReceiver.ReceivedAsync(theListener, envelope).AsTask());
+
+ // Give the receive task time to enter the pipeline
+ await Task.Delay(50);
+
+ Assert.Equal(1, theReceiver.QueueCount);
+
+ // Simulate shutdown: Latch() is called first, then DrainAsync()
+ theReceiver.Latch();
+ var drainTask = theReceiver.DrainAsync().AsTask();
+ await Task.Delay(50);
+
+ Assert.False(drainTask.IsCompleted, "DrainAsync should not complete while a message is in-flight");
+
+ // Release the message
+ messageBlocking.SetResult();
+ await receiveTask;
+
+ // Drain should now complete
+ await drainTask.WaitAsync(TimeSpan.FromSeconds(5));
+
+ Assert.Equal(0, theReceiver.QueueCount);
+ }
+
+ [Fact]
+ public async Task drain_returns_immediately_without_prior_latch_to_avoid_deadlock()
+ {
+ var messageBlocking = new TaskCompletionSource();
+
+ // Make pipeline.InvokeAsync block until we release it
+ thePipeline.InvokeAsync(Arg.Any(), Arg.Any(), Arg.Any())
+ .Returns(async _ => await messageBlocking.Task);
+
+ var envelope = ObjectMother.Envelope();
+
+ // Start receiving on a background task — it will block in InvokeAsync
+ var receiveTask = Task.Run(() => theReceiver.ReceivedAsync(theListener, envelope).AsTask());
+
+ // Give the receive task time to enter the pipeline
+ await Task.Delay(50);
+
+ Assert.Equal(1, theReceiver.QueueCount);
+
+ // Drain WITHOUT prior Latch() — simulates pause from within handler pipeline
+ // (e.g., PauseListenerContinuation). Must return immediately to avoid deadlock.
+ var drainTask = theReceiver.DrainAsync();
+ Assert.True(drainTask.IsCompleted, "DrainAsync should return immediately without prior Latch() to avoid deadlock");
+
+ // Clean up
+ messageBlocking.SetResult();
+ await receiveTask;
+ }
+}
+
+public class inline_receiver_latch_defers_messages
+{
+ private readonly IListener theListener = Substitute.For();
+ private readonly IHandlerPipeline thePipeline = Substitute.For();
+ private readonly MockWolverineRuntime theRuntime = new();
+ private readonly InlineReceiver theReceiver;
+
+ public inline_receiver_latch_defers_messages()
+ {
+ var stubEndpoint = new StubEndpoint("one", new StubTransport());
+ theReceiver = new InlineReceiver(stubEndpoint, theRuntime, thePipeline);
+ theListener.Address.Returns(new Uri("stub://one"));
+ }
+
+ [Fact]
+ public async Task latched_receiver_defers_new_messages()
+ {
+ theReceiver.Latch();
+
+ var envelope = ObjectMother.Envelope();
+ await theReceiver.ReceivedAsync(theListener, envelope);
+
+ // Should have deferred the message back to the listener
+ await theListener.Received(1).DeferAsync(envelope);
+
+ // Pipeline should NOT have been invoked
+ await thePipeline.DidNotReceive()
+ .InvokeAsync(Arg.Any(), Arg.Any(), Arg.Any());
+ }
+
+ [Fact]
+ public async Task latched_receiver_defers_batch_messages()
+ {
+ theReceiver.Latch();
+
+ var envelope1 = ObjectMother.Envelope();
+ var envelope2 = ObjectMother.Envelope();
+ await theReceiver.ReceivedAsync(theListener, new[] { envelope1, envelope2 });
+
+ // Both messages should have been deferred
+ await theListener.Received(1).DeferAsync(envelope1);
+ await theListener.Received(1).DeferAsync(envelope2);
+ }
+
+ [Fact]
+ public async Task queue_count_stays_zero_for_latched_messages()
+ {
+ theReceiver.Latch();
+
+ var envelope = ObjectMother.Envelope();
+ await theReceiver.ReceivedAsync(theListener, envelope);
+
+ Assert.Equal(0, theReceiver.QueueCount);
+ }
+}
+
+public class inline_receiver_drain_respects_timeout
+{
+ private readonly IListener theListener = Substitute.For();
+ private readonly IHandlerPipeline thePipeline = Substitute.For();
+ private readonly MockWolverineRuntime theRuntime = new();
+ private readonly InlineReceiver theReceiver;
+
+ public inline_receiver_drain_respects_timeout()
+ {
+ // Set a very short drain timeout for this test
+ theRuntime.DurabilitySettings.DrainTimeout = TimeSpan.FromMilliseconds(200);
+
+ var stubEndpoint = new StubEndpoint("one", new StubTransport());
+ theReceiver = new InlineReceiver(stubEndpoint, theRuntime, thePipeline);
+ theListener.Address.Returns(new Uri("stub://one"));
+ }
+
+ [Fact]
+ public async Task drain_times_out_when_message_blocks_forever()
+ {
+ // Make pipeline block forever
+ thePipeline.InvokeAsync(Arg.Any(), Arg.Any(), Arg.Any())
+ .Returns(async _ => await Task.Delay(Timeout.Infinite));
+
+ var envelope = ObjectMother.Envelope();
+
+ // Start a receive that will block
+ _ = Task.Run(() => theReceiver.ReceivedAsync(theListener, envelope).AsTask());
+ await Task.Delay(50);
+
+ // Simulate shutdown: Latch() first, then DrainAsync should time out
+ theReceiver.Latch();
+ await Assert.ThrowsAsync(() => theReceiver.DrainAsync().AsTask());
+ }
+}
+
+public class inline_receiver_batch_drain_waits_for_all_messages
+{
+ private readonly IListener theListener = Substitute.For();
+ private readonly IHandlerPipeline thePipeline = Substitute.For();
+ private readonly MockWolverineRuntime theRuntime = new();
+ private readonly InlineReceiver theReceiver;
+
+ public inline_receiver_batch_drain_waits_for_all_messages()
+ {
+ var stubEndpoint = new StubEndpoint("one", new StubTransport());
+ theReceiver = new InlineReceiver(stubEndpoint, theRuntime, thePipeline);
+ theListener.Address.Returns(new Uri("stub://one"));
+ }
+
+ [Fact]
+ public async Task drain_does_not_signal_until_all_batch_messages_are_handled()
+ {
+ var firstMessageBlocking = new TaskCompletionSource();
+
+ // First InvokeAsync call blocks; subsequent calls won't be reached because
+ // after we latch, remaining messages will be deferred instead of invoked.
+ thePipeline.InvokeAsync(Arg.Any(), Arg.Any(), Arg.Any())
+ .Returns(async _ => await firstMessageBlocking.Task);
+
+ var envelope1 = ObjectMother.Envelope();
+ var envelope2 = ObjectMother.Envelope();
+ var envelope3 = ObjectMother.Envelope();
+
+ // Start batch receive on a background task — it will block on the first message
+ var receiveTask = Task.Run(() => theReceiver.ReceivedAsync(theListener, new[] { envelope1, envelope2, envelope3 }).AsTask());
+
+ // Give the receive task time to enter the pipeline for envelope1
+ await Task.Delay(50);
+
+ Assert.Equal(3, theReceiver.QueueCount);
+
+ // Simulate shutdown: Latch() first, then DrainAsync while the first message is still in-flight.
+ theReceiver.Latch();
+ var drainTask = theReceiver.DrainAsync().AsTask();
+ await Task.Delay(50);
+
+ Assert.False(drainTask.IsCompleted, "DrainAsync must not complete while batch messages are still in-flight");
+
+ // Release the first message — the remaining two should be deferred (latched)
+ firstMessageBlocking.SetResult();
+
+ // Wait for the full batch receive to complete
+ await receiveTask.WaitAsync(TimeSpan.FromSeconds(5));
+
+ // Drain should now complete since all messages are processed/deferred
+ await drainTask.WaitAsync(TimeSpan.FromSeconds(5));
+
+ Assert.Equal(0, theReceiver.QueueCount);
+
+ // Remaining messages should have been deferred
+ await theListener.Received(1).DeferAsync(envelope2);
+ await theListener.Received(1).DeferAsync(envelope3);
+ }
+}
diff --git a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsListener.cs b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsListener.cs
index e690df2f2..1dd689b82 100644
--- a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsListener.cs
+++ b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsListener.cs
@@ -18,6 +18,8 @@ internal class SqsListener : IListener, ISupportDeadLetterQueue
private readonly Task _task;
private readonly AmazonSqsTransport _transport;
private readonly ISqsEnvelopeMapper _mapper;
+ private readonly TimeSpan _drainTimeout;
+ private readonly ILogger _logger;
public SqsListener(IWolverineRuntime runtime, AmazonSqsQueue queue, AmazonSqsTransport transport,
IReceiver receiver)
@@ -29,7 +31,10 @@ public SqsListener(IWolverineRuntime runtime, AmazonSqsQueue queue, AmazonSqsTra
_mapper = queue.BuildMapper(runtime);
+ _drainTimeout = runtime.DurabilitySettings.DrainTimeout;
+
var logger = runtime.LoggerFactory.CreateLogger();
+ _logger = logger;
_queue = queue;
_transport = transport;
_receiver = receiver;
@@ -157,19 +162,34 @@ public async ValueTask DeferAsync(Envelope envelope)
public ValueTask DisposeAsync()
{
+ if (!_cancellation.IsCancellationRequested)
+ {
+ _cancellation.Cancel();
+ }
+
_requeueBlock.Dispose();
- _cancellation.Cancel();
_deadLetterBlock?.Dispose();
-
_task.SafeDispose();
return ValueTask.CompletedTask;
}
public Uri Address => _queue.Uri;
- public ValueTask StopAsync()
+ public async ValueTask StopAsync()
{
- return DisposeAsync();
+ _cancellation.Cancel();
+
+ try
+ {
+ await _task.WaitAsync(_drainTimeout);
+ }
+ catch (Exception e)
+ {
+ if (e is not TaskCanceledException)
+ {
+ _logger.LogDebug(e, "Error waiting for SQS polling task to complete during shutdown for {Uri}", _queue.Uri);
+ }
+ }
}
public async Task TryRequeueAsync(Envelope envelope)
@@ -203,4 +223,4 @@ public Task CompleteAsync(Message sqsMessage)
{
return _transport.Client!.DeleteMessageAsync(_queue.QueueUrl, sqsMessage.ReceiptHandle);
}
-}
\ No newline at end of file
+}
diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/multi_tenancy_through_virtual_hosts.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/multi_tenancy_through_virtual_hosts.cs
index 5834f0f4f..3309e371f 100644
--- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/multi_tenancy_through_virtual_hosts.cs
+++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/multi_tenancy_through_virtual_hosts.cs
@@ -144,6 +144,7 @@ private static async Task declareVirtualHost(string vhname)
}
}
+[Trait("Category", "Flaky")]
public class multi_tenancy_through_virtual_hosts : IClassFixture
{
private readonly MultiTenantedRabbitFixture _fixture;
diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqListener.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqListener.cs
index 051c5743b..6c5647404 100644
--- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqListener.cs
+++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqListener.cs
@@ -107,13 +107,13 @@ public async ValueTask StopAsync()
await channel.BasicCancelAsync(consumerTag, true, default);
}
}
-
- consumer.Dispose();
- _consumer = null;
}
public override async ValueTask DisposeAsync()
{
+ _consumer?.Dispose();
+ _consumer = null;
+
await base.DisposeAsync();
if (_sender.IsValueCreated && _sender.Value is IAsyncDisposable ad)
@@ -249,4 +249,4 @@ public async Task CompleteAsync(ulong deliveryTag)
{
await Channel!.BasicAckAsync(deliveryTag, true, _cancellation);
}
-}
\ No newline at end of file
+}
diff --git a/src/Wolverine/Runtime/WorkerQueues/InlineReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/InlineReceiver.cs
index 8c10fdf27..97792c697 100644
--- a/src/Wolverine/Runtime/WorkerQueues/InlineReceiver.cs
+++ b/src/Wolverine/Runtime/WorkerQueues/InlineReceiver.cs
@@ -1,4 +1,5 @@
using System.Diagnostics;
+using System.Threading;
using Microsoft.Extensions.Logging;
using Wolverine.Configuration;
using Wolverine.Logging;
@@ -13,6 +14,10 @@ internal class InlineReceiver : IReceiver
private readonly IHandlerPipeline _pipeline;
private readonly DurabilitySettings _settings;
+ private int _inFlightCount;
+ private readonly TaskCompletionSource _drainComplete = new(TaskCreationOptions.RunContinuationsAsynchronously);
+ private volatile bool _latched;
+
public InlineReceiver(Endpoint endpoint, IWolverineRuntime runtime, IHandlerPipeline pipeline)
{
_endpoint = endpoint;
@@ -23,25 +28,91 @@ public InlineReceiver(Endpoint endpoint, IWolverineRuntime runtime, IHandlerPipe
public IHandlerPipeline Pipeline => _pipeline;
- public int QueueCount => 0;
+ public int QueueCount => Volatile.Read(ref _inFlightCount);
public void Dispose()
{
// Nothing
}
+ public void Latch()
+ {
+ _latched = true;
+ }
+
public ValueTask DrainAsync()
{
- return ValueTask.CompletedTask;
+ // If _latched was already true, this drain was triggered during shutdown
+ // (after OnApplicationStopping called Latch()). Safe to wait for in-flight items.
+ // If _latched was false, this drain may have been triggered from within the handler
+ // pipeline (e.g., rate limiting pause via PauseListenerContinuation). Waiting for
+ // in-flight items to complete would deadlock because the current message's
+ // execute function is still on the call stack.
+ var waitForCompletion = _latched;
+ _latched = true;
+
+ if (!waitForCompletion)
+ {
+ return ValueTask.CompletedTask;
+ }
+
+ if (Volatile.Read(ref _inFlightCount) == 0)
+ {
+ _drainComplete.TrySetResult();
+ }
+
+ return new ValueTask(_drainComplete.Task.WaitAsync(_settings.DrainTimeout));
}
public async ValueTask ReceivedAsync(IListener listener, Envelope[] messages)
{
- foreach (var envelope in messages) await ReceivedAsync(listener, envelope);
+ if (messages.Length == 0) return;
+
+ Interlocked.Add(ref _inFlightCount, messages.Length);
+
+ foreach (var envelope in messages)
+ {
+ try
+ {
+ await ProcessMessageAsync(listener, envelope);
+ }
+ finally
+ {
+ DecrementInFlightCount();
+ }
+ }
}
public async ValueTask ReceivedAsync(IListener listener, Envelope envelope)
{
+ Interlocked.Increment(ref _inFlightCount);
+
+ try
+ {
+ await ProcessMessageAsync(listener, envelope);
+ }
+ finally
+ {
+ DecrementInFlightCount();
+ }
+ }
+
+ private async ValueTask ProcessMessageAsync(IListener listener, Envelope envelope)
+ {
+ if (_latched)
+ {
+ try
+ {
+ await listener.DeferAsync(envelope);
+ }
+ catch (Exception e)
+ {
+ _logger.LogError(e, "Error deferring envelope {EnvelopeId} after latch", envelope.Id);
+ }
+
+ return;
+ }
+
using var activity = _endpoint.TelemetryEnabled ? WolverineTracing.StartReceiving(envelope) : null;
try
@@ -73,4 +144,12 @@ public async ValueTask ReceivedAsync(IListener listener, Envelope envelope)
activity?.Stop();
}
}
-}
\ No newline at end of file
+
+ private void DecrementInFlightCount()
+ {
+ if (Interlocked.Decrement(ref _inFlightCount) == 0 && _latched)
+ {
+ _drainComplete.TrySetResult();
+ }
+ }
+}
diff --git a/src/Wolverine/Transports/ListeningAgent.cs b/src/Wolverine/Transports/ListeningAgent.cs
index 296a17e0a..9231d6912 100644
--- a/src/Wolverine/Transports/ListeningAgent.cs
+++ b/src/Wolverine/Transports/ListeningAgent.cs
@@ -145,16 +145,19 @@ public async Task EnqueueDirectlyAsync(IEnumerable envelopes)
///
public void LatchReceiver()
{
- if (_receiver is DurableReceiver dr)
+ var actual = _receiver is ReceiverWithRules rwr ? rwr.Inner : _receiver;
+ if (actual is DurableReceiver dr)
{
dr.Latch();
}
- // BufferedReceiver latches via its _latched field in DrainAsync,
- // but we need an immediate latch here too
- else if (_receiver is BufferedReceiver br)
+ else if (actual is BufferedReceiver br)
{
br.Latch();
}
+ else if (actual is InlineReceiver ir)
+ {
+ ir.Latch();
+ }
}
public async ValueTask StopAndDrainAsync()