diff --git a/src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs index f1b4d9b20..d62143bad 100644 --- a/src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs @@ -124,19 +124,27 @@ public void Latch() public async ValueTask DrainAsync() { + // If _latched was already true, this drain was triggered during shutdown + // (after OnApplicationStopping called Latch()). Safe to wait for in-flight items. + // If _latched was false, this drain may have been triggered from within the handler + // pipeline (e.g., rate limiting pause via PauseListenerContinuation). Waiting for + // the receiving block to complete would deadlock because the current message's + // execute function is still on the call stack. + var waitForCompletion = _latched; _latched = true; _receivingBlock.Complete(); - // Wait for in-flight handler executions to complete, bounded by a timeout - // to prevent hanging during shutdown - try - { - var completion = _receivingBlock.WaitForCompletionAsync(); - await Task.WhenAny(completion, Task.Delay(_settings.DrainTimeout)); - } - catch (Exception e) + if (waitForCompletion) { - _logger.LogDebug(e, "Error waiting for in-flight message processing to complete at {Uri}", Uri); + try + { + var completion = _receivingBlock.WaitForCompletionAsync(); + await Task.WhenAny(completion, Task.Delay(_settings.DrainTimeout)); + } + catch (Exception e) + { + _logger.LogDebug(e, "Error waiting for in-flight message processing to complete at {Uri}", Uri); + } } await _completeBlock.DrainAsync(); diff --git a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs index a380d709b..a60110062 100644 --- a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs @@ -283,19 +283,27 @@ public async ValueTask ReceivedAsync(IListener listener, Envelope envelope) public async ValueTask DrainAsync() { + // If _latched was already true, this drain was triggered during shutdown + // (after OnApplicationStopping called Latch()). Safe to wait for in-flight items. + // If _latched was false, this drain may have been triggered from within the handler + // pipeline (e.g., rate limiting pause via PauseListenerContinuation). Waiting for + // the receiver block to complete would deadlock because the current message's + // execute function is still on the call stack. + var waitForCompletion = _latched; _latched = true; _receiver.Complete(); - // Wait for in-flight handler executions to complete, bounded by a timeout - // to prevent hanging during shutdown - try - { - var completion = _receiver.WaitForCompletionAsync(); - await Task.WhenAny(completion, Task.Delay(_settings.DrainTimeout)); - } - catch (Exception e) + if (waitForCompletion) { - _logger.LogDebug(e, "Error waiting for in-flight message processing to complete at {Uri}", Uri); + try + { + var completion = _receiver.WaitForCompletionAsync(); + await Task.WhenAny(completion, Task.Delay(_settings.DrainTimeout)); + } + catch (Exception e) + { + _logger.LogDebug(e, "Error waiting for in-flight message processing to complete at {Uri}", Uri); + } } await _incrementAttempts.DrainAsync();