diff --git a/src/Testing/CoreTests/Runtime/WorkerQueues/inline_receiver_drain_and_latch.cs b/src/Testing/CoreTests/Runtime/WorkerQueues/inline_receiver_drain_and_latch.cs new file mode 100644 index 000000000..74f0d3733 --- /dev/null +++ b/src/Testing/CoreTests/Runtime/WorkerQueues/inline_receiver_drain_and_latch.cs @@ -0,0 +1,269 @@ +using NSubstitute; +using Wolverine.ComplianceTests; +using Wolverine.Runtime; +using Wolverine.Runtime.WorkerQueues; +using Wolverine.Transports; +using Wolverine.Transports.Stub; +using Xunit; + +namespace CoreTests.Runtime.WorkerQueues; + +public class inline_receiver_drain_when_idle +{ + private readonly IHandlerPipeline thePipeline = Substitute.For(); + private readonly MockWolverineRuntime theRuntime = new(); + private readonly InlineReceiver theReceiver; + + public inline_receiver_drain_when_idle() + { + var stubEndpoint = new StubEndpoint("one", new StubTransport()); + theReceiver = new InlineReceiver(stubEndpoint, theRuntime, thePipeline); + } + + [Fact] + public async Task drain_completes_immediately_when_no_messages_in_flight() + { + // DrainAsync should return immediately when nothing is in-flight + var drainTask = theReceiver.DrainAsync(); + Assert.True(drainTask.IsCompleted); + await drainTask; + } + + [Fact] + public void queue_count_is_zero_when_idle() + { + Assert.Equal(0, theReceiver.QueueCount); + } +} + +public class inline_receiver_drain_waits_for_in_flight +{ + private readonly IListener theListener = Substitute.For(); + private readonly IHandlerPipeline thePipeline = Substitute.For(); + private readonly MockWolverineRuntime theRuntime = new(); + private readonly InlineReceiver theReceiver; + + public inline_receiver_drain_waits_for_in_flight() + { + var stubEndpoint = new StubEndpoint("one", new StubTransport()); + theReceiver = new InlineReceiver(stubEndpoint, theRuntime, thePipeline); + theListener.Address.Returns(new Uri("stub://one")); + } + + [Fact] + public async Task drain_waits_for_in_flight_message_to_complete_when_latched() + { + var messageBlocking = new TaskCompletionSource(); + + // Make pipeline.InvokeAsync block until we release it + thePipeline.InvokeAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(async _ => await messageBlocking.Task); + + var envelope = ObjectMother.Envelope(); + + // Start receiving on a background task — it will block in InvokeAsync + var receiveTask = Task.Run(() => theReceiver.ReceivedAsync(theListener, envelope).AsTask()); + + // Give the receive task time to enter the pipeline + await Task.Delay(50); + + Assert.Equal(1, theReceiver.QueueCount); + + // Simulate shutdown: Latch() is called first, then DrainAsync() + theReceiver.Latch(); + var drainTask = theReceiver.DrainAsync().AsTask(); + await Task.Delay(50); + + Assert.False(drainTask.IsCompleted, "DrainAsync should not complete while a message is in-flight"); + + // Release the message + messageBlocking.SetResult(); + await receiveTask; + + // Drain should now complete + await drainTask.WaitAsync(TimeSpan.FromSeconds(5)); + + Assert.Equal(0, theReceiver.QueueCount); + } + + [Fact] + public async Task drain_returns_immediately_without_prior_latch_to_avoid_deadlock() + { + var messageBlocking = new TaskCompletionSource(); + + // Make pipeline.InvokeAsync block until we release it + thePipeline.InvokeAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(async _ => await messageBlocking.Task); + + var envelope = ObjectMother.Envelope(); + + // Start receiving on a background task — it will block in InvokeAsync + var receiveTask = Task.Run(() => theReceiver.ReceivedAsync(theListener, envelope).AsTask()); + + // Give the receive task time to enter the pipeline + await Task.Delay(50); + + Assert.Equal(1, theReceiver.QueueCount); + + // Drain WITHOUT prior Latch() — simulates pause from within handler pipeline + // (e.g., PauseListenerContinuation). Must return immediately to avoid deadlock. + var drainTask = theReceiver.DrainAsync(); + Assert.True(drainTask.IsCompleted, "DrainAsync should return immediately without prior Latch() to avoid deadlock"); + + // Clean up + messageBlocking.SetResult(); + await receiveTask; + } +} + +public class inline_receiver_latch_defers_messages +{ + private readonly IListener theListener = Substitute.For(); + private readonly IHandlerPipeline thePipeline = Substitute.For(); + private readonly MockWolverineRuntime theRuntime = new(); + private readonly InlineReceiver theReceiver; + + public inline_receiver_latch_defers_messages() + { + var stubEndpoint = new StubEndpoint("one", new StubTransport()); + theReceiver = new InlineReceiver(stubEndpoint, theRuntime, thePipeline); + theListener.Address.Returns(new Uri("stub://one")); + } + + [Fact] + public async Task latched_receiver_defers_new_messages() + { + theReceiver.Latch(); + + var envelope = ObjectMother.Envelope(); + await theReceiver.ReceivedAsync(theListener, envelope); + + // Should have deferred the message back to the listener + await theListener.Received(1).DeferAsync(envelope); + + // Pipeline should NOT have been invoked + await thePipeline.DidNotReceive() + .InvokeAsync(Arg.Any(), Arg.Any(), Arg.Any()); + } + + [Fact] + public async Task latched_receiver_defers_batch_messages() + { + theReceiver.Latch(); + + var envelope1 = ObjectMother.Envelope(); + var envelope2 = ObjectMother.Envelope(); + await theReceiver.ReceivedAsync(theListener, new[] { envelope1, envelope2 }); + + // Both messages should have been deferred + await theListener.Received(1).DeferAsync(envelope1); + await theListener.Received(1).DeferAsync(envelope2); + } + + [Fact] + public async Task queue_count_stays_zero_for_latched_messages() + { + theReceiver.Latch(); + + var envelope = ObjectMother.Envelope(); + await theReceiver.ReceivedAsync(theListener, envelope); + + Assert.Equal(0, theReceiver.QueueCount); + } +} + +public class inline_receiver_drain_respects_timeout +{ + private readonly IListener theListener = Substitute.For(); + private readonly IHandlerPipeline thePipeline = Substitute.For(); + private readonly MockWolverineRuntime theRuntime = new(); + private readonly InlineReceiver theReceiver; + + public inline_receiver_drain_respects_timeout() + { + // Set a very short drain timeout for this test + theRuntime.DurabilitySettings.DrainTimeout = TimeSpan.FromMilliseconds(200); + + var stubEndpoint = new StubEndpoint("one", new StubTransport()); + theReceiver = new InlineReceiver(stubEndpoint, theRuntime, thePipeline); + theListener.Address.Returns(new Uri("stub://one")); + } + + [Fact] + public async Task drain_times_out_when_message_blocks_forever() + { + // Make pipeline block forever + thePipeline.InvokeAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(async _ => await Task.Delay(Timeout.Infinite)); + + var envelope = ObjectMother.Envelope(); + + // Start a receive that will block + _ = Task.Run(() => theReceiver.ReceivedAsync(theListener, envelope).AsTask()); + await Task.Delay(50); + + // Simulate shutdown: Latch() first, then DrainAsync should time out + theReceiver.Latch(); + await Assert.ThrowsAsync(() => theReceiver.DrainAsync().AsTask()); + } +} + +public class inline_receiver_batch_drain_waits_for_all_messages +{ + private readonly IListener theListener = Substitute.For(); + private readonly IHandlerPipeline thePipeline = Substitute.For(); + private readonly MockWolverineRuntime theRuntime = new(); + private readonly InlineReceiver theReceiver; + + public inline_receiver_batch_drain_waits_for_all_messages() + { + var stubEndpoint = new StubEndpoint("one", new StubTransport()); + theReceiver = new InlineReceiver(stubEndpoint, theRuntime, thePipeline); + theListener.Address.Returns(new Uri("stub://one")); + } + + [Fact] + public async Task drain_does_not_signal_until_all_batch_messages_are_handled() + { + var firstMessageBlocking = new TaskCompletionSource(); + + // First InvokeAsync call blocks; subsequent calls won't be reached because + // after we latch, remaining messages will be deferred instead of invoked. + thePipeline.InvokeAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(async _ => await firstMessageBlocking.Task); + + var envelope1 = ObjectMother.Envelope(); + var envelope2 = ObjectMother.Envelope(); + var envelope3 = ObjectMother.Envelope(); + + // Start batch receive on a background task — it will block on the first message + var receiveTask = Task.Run(() => theReceiver.ReceivedAsync(theListener, new[] { envelope1, envelope2, envelope3 }).AsTask()); + + // Give the receive task time to enter the pipeline for envelope1 + await Task.Delay(50); + + Assert.Equal(3, theReceiver.QueueCount); + + // Simulate shutdown: Latch() first, then DrainAsync while the first message is still in-flight. + theReceiver.Latch(); + var drainTask = theReceiver.DrainAsync().AsTask(); + await Task.Delay(50); + + Assert.False(drainTask.IsCompleted, "DrainAsync must not complete while batch messages are still in-flight"); + + // Release the first message — the remaining two should be deferred (latched) + firstMessageBlocking.SetResult(); + + // Wait for the full batch receive to complete + await receiveTask.WaitAsync(TimeSpan.FromSeconds(5)); + + // Drain should now complete since all messages are processed/deferred + await drainTask.WaitAsync(TimeSpan.FromSeconds(5)); + + Assert.Equal(0, theReceiver.QueueCount); + + // Remaining messages should have been deferred + await theListener.Received(1).DeferAsync(envelope2); + await theListener.Received(1).DeferAsync(envelope3); + } +} diff --git a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsListener.cs b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsListener.cs index e690df2f2..1dd689b82 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsListener.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsListener.cs @@ -18,6 +18,8 @@ internal class SqsListener : IListener, ISupportDeadLetterQueue private readonly Task _task; private readonly AmazonSqsTransport _transport; private readonly ISqsEnvelopeMapper _mapper; + private readonly TimeSpan _drainTimeout; + private readonly ILogger _logger; public SqsListener(IWolverineRuntime runtime, AmazonSqsQueue queue, AmazonSqsTransport transport, IReceiver receiver) @@ -29,7 +31,10 @@ public SqsListener(IWolverineRuntime runtime, AmazonSqsQueue queue, AmazonSqsTra _mapper = queue.BuildMapper(runtime); + _drainTimeout = runtime.DurabilitySettings.DrainTimeout; + var logger = runtime.LoggerFactory.CreateLogger(); + _logger = logger; _queue = queue; _transport = transport; _receiver = receiver; @@ -157,19 +162,34 @@ public async ValueTask DeferAsync(Envelope envelope) public ValueTask DisposeAsync() { + if (!_cancellation.IsCancellationRequested) + { + _cancellation.Cancel(); + } + _requeueBlock.Dispose(); - _cancellation.Cancel(); _deadLetterBlock?.Dispose(); - _task.SafeDispose(); return ValueTask.CompletedTask; } public Uri Address => _queue.Uri; - public ValueTask StopAsync() + public async ValueTask StopAsync() { - return DisposeAsync(); + _cancellation.Cancel(); + + try + { + await _task.WaitAsync(_drainTimeout); + } + catch (Exception e) + { + if (e is not TaskCanceledException) + { + _logger.LogDebug(e, "Error waiting for SQS polling task to complete during shutdown for {Uri}", _queue.Uri); + } + } } public async Task TryRequeueAsync(Envelope envelope) @@ -203,4 +223,4 @@ public Task CompleteAsync(Message sqsMessage) { return _transport.Client!.DeleteMessageAsync(_queue.QueueUrl, sqsMessage.ReceiptHandle); } -} \ No newline at end of file +} diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqListener.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqListener.cs index 092b9d7e7..34d9d99e3 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqListener.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqListener.cs @@ -107,13 +107,13 @@ public async ValueTask StopAsync() await channel.BasicCancelAsync(consumerTag, true, default); } } - - consumer.Dispose(); - _consumer = null; } public override async ValueTask DisposeAsync() { + _consumer?.Dispose(); + _consumer = null; + await base.DisposeAsync(); if (_sender.IsValueCreated && _sender.Value is IAsyncDisposable ad) @@ -249,4 +249,4 @@ public async Task CompleteAsync(ulong deliveryTag) { await Channel!.BasicAckAsync(deliveryTag, true, _cancellation); } -} \ No newline at end of file +} diff --git a/src/Wolverine/Runtime/WorkerQueues/InlineReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/InlineReceiver.cs index 8c10fdf27..97792c697 100644 --- a/src/Wolverine/Runtime/WorkerQueues/InlineReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/InlineReceiver.cs @@ -1,4 +1,5 @@ using System.Diagnostics; +using System.Threading; using Microsoft.Extensions.Logging; using Wolverine.Configuration; using Wolverine.Logging; @@ -13,6 +14,10 @@ internal class InlineReceiver : IReceiver private readonly IHandlerPipeline _pipeline; private readonly DurabilitySettings _settings; + private int _inFlightCount; + private readonly TaskCompletionSource _drainComplete = new(TaskCreationOptions.RunContinuationsAsynchronously); + private volatile bool _latched; + public InlineReceiver(Endpoint endpoint, IWolverineRuntime runtime, IHandlerPipeline pipeline) { _endpoint = endpoint; @@ -23,25 +28,91 @@ public InlineReceiver(Endpoint endpoint, IWolverineRuntime runtime, IHandlerPipe public IHandlerPipeline Pipeline => _pipeline; - public int QueueCount => 0; + public int QueueCount => Volatile.Read(ref _inFlightCount); public void Dispose() { // Nothing } + public void Latch() + { + _latched = true; + } + public ValueTask DrainAsync() { - return ValueTask.CompletedTask; + // If _latched was already true, this drain was triggered during shutdown + // (after OnApplicationStopping called Latch()). Safe to wait for in-flight items. + // If _latched was false, this drain may have been triggered from within the handler + // pipeline (e.g., rate limiting pause via PauseListenerContinuation). Waiting for + // in-flight items to complete would deadlock because the current message's + // execute function is still on the call stack. + var waitForCompletion = _latched; + _latched = true; + + if (!waitForCompletion) + { + return ValueTask.CompletedTask; + } + + if (Volatile.Read(ref _inFlightCount) == 0) + { + _drainComplete.TrySetResult(); + } + + return new ValueTask(_drainComplete.Task.WaitAsync(_settings.DrainTimeout)); } public async ValueTask ReceivedAsync(IListener listener, Envelope[] messages) { - foreach (var envelope in messages) await ReceivedAsync(listener, envelope); + if (messages.Length == 0) return; + + Interlocked.Add(ref _inFlightCount, messages.Length); + + foreach (var envelope in messages) + { + try + { + await ProcessMessageAsync(listener, envelope); + } + finally + { + DecrementInFlightCount(); + } + } } public async ValueTask ReceivedAsync(IListener listener, Envelope envelope) { + Interlocked.Increment(ref _inFlightCount); + + try + { + await ProcessMessageAsync(listener, envelope); + } + finally + { + DecrementInFlightCount(); + } + } + + private async ValueTask ProcessMessageAsync(IListener listener, Envelope envelope) + { + if (_latched) + { + try + { + await listener.DeferAsync(envelope); + } + catch (Exception e) + { + _logger.LogError(e, "Error deferring envelope {EnvelopeId} after latch", envelope.Id); + } + + return; + } + using var activity = _endpoint.TelemetryEnabled ? WolverineTracing.StartReceiving(envelope) : null; try @@ -73,4 +144,12 @@ public async ValueTask ReceivedAsync(IListener listener, Envelope envelope) activity?.Stop(); } } -} \ No newline at end of file + + private void DecrementInFlightCount() + { + if (Interlocked.Decrement(ref _inFlightCount) == 0 && _latched) + { + _drainComplete.TrySetResult(); + } + } +} diff --git a/src/Wolverine/Transports/ListeningAgent.cs b/src/Wolverine/Transports/ListeningAgent.cs index af5a97716..86dd91edf 100644 --- a/src/Wolverine/Transports/ListeningAgent.cs +++ b/src/Wolverine/Transports/ListeningAgent.cs @@ -145,16 +145,19 @@ public async Task EnqueueDirectlyAsync(IEnumerable envelopes) /// public void LatchReceiver() { - if (_receiver is DurableReceiver dr) + var actual = _receiver is ReceiverWithRules rwr ? rwr.Inner : _receiver; + if (actual is DurableReceiver dr) { dr.Latch(); } - // BufferedReceiver latches via its _latched field in DrainAsync, - // but we need an immediate latch here too - else if (_receiver is BufferedReceiver br) + else if (actual is BufferedReceiver br) { br.Latch(); } + else if (actual is InlineReceiver ir) + { + ir.Latch(); + } } public async ValueTask StopAndDrainAsync()