diff --git a/src/Wolverine/Configuration/EndpointCollection.cs b/src/Wolverine/Configuration/EndpointCollection.cs index f3dd05803..c61965bac 100644 --- a/src/Wolverine/Configuration/EndpointCollection.cs +++ b/src/Wolverine/Configuration/EndpointCollection.cs @@ -384,8 +384,9 @@ private ISendingAgent buildSendingAgent(Uri uri, Action? configureNewE /// /// Immediately latch all receivers to stop picking up new messages from their internal queues. - /// This is called as early as possible during shutdown (via IHostApplicationLifetime.ApplicationStopping) - /// so that messages already queued internally are not processed after the shutdown signal. + /// During normal shutdown this is no longer called globally; instead each ListeningAgent + /// latches its own receiver after stopping the listener (see StopAndDrainAsync). + /// Kept to not break public api compatability. /// public void LatchAllReceivers() { @@ -402,8 +403,7 @@ public void LatchAllReceivers() public async Task DrainAsync() { - // Drain the listeners - foreach (var listener in ActiveListeners().ToArray()) + await Task.WhenAll(ActiveListeners().ToArray().Select(async listener => { try { @@ -413,9 +413,9 @@ public async Task DrainAsync() { _runtime.Logger.LogError(e, "Failed to 'drain' outstanding messages in listener {Uri}", listener.Uri); } - } + })); - foreach (var queue in _localSenders.Enumerate().Select(x => x.Value).OfType()) + await Task.WhenAll(_localSenders.Enumerate().Select(x => x.Value).OfType().Select(async queue => { try { @@ -425,7 +425,7 @@ public async Task DrainAsync() { _runtime.Logger.LogError(e, "Failed to 'drain' outstanding messages in local sender {Queue}", queue); } - } + })); } internal void StoreSendingAgent(ISendingAgent agent) @@ -457,4 +457,4 @@ internal async Task RemoveSendingAgentAsync(Uri destination) await ad.DisposeAsync(); } } -} \ No newline at end of file +} diff --git a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs index 7f6948b62..b53e564bd 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs @@ -126,8 +126,7 @@ public async Task StartAsync(CancellationToken cancellationToken) internal void OnApplicationStopping() { - Logger.LogInformation("Application stopping signal received, latching all message receivers"); - _endpoints.LatchAllReceivers(); + Logger.LogInformation("Application stopping signal received"); } private bool _hasMigratedStorage; @@ -219,10 +218,9 @@ public async Task StopAsync(CancellationToken cancellationToken) if (StopMode == StopMode.Normal) { - // Step 1: Drain endpoints first — stop listeners from accepting new messages - // and wait for in-flight handlers to complete before releasing ownership. - // Receivers were already latched via IHostApplicationLifetime.ApplicationStopping - // to prevent new messages from being picked up, so this just waits for completion. + // Step 1: Drain endpoints — each listener is stopped, its receiver latched, + // then in-flight handlers are drained. Receivers are not latched up front, + // since messages might be unnecessarily deferred before listeners are stopped. await _endpoints.DrainAsync(); if (_accumulator.IsValueCreated) @@ -440,4 +438,4 @@ public enum StopMode /// Honestly, don't use this except in Wolverine testing... /// Quick -} \ No newline at end of file +} diff --git a/src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs index 584260ad2..27dc6ac51 100644 --- a/src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs @@ -125,7 +125,7 @@ public void Latch() public async ValueTask DrainAsync() { // If _latched was already true, this drain was triggered during shutdown - // (after OnApplicationStopping called Latch()). Safe to wait for in-flight items. + // (after StopAndDrainAsync called Latch()). Safe to wait for in-flight items. // If _latched was false, this drain may have been triggered from within the handler // pipeline (e.g., rate limiting pause via PauseListenerContinuation). Waiting for // the receiving block to complete would deadlock because the current message's diff --git a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs index a088df9c8..a89dad5a9 100644 --- a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs @@ -284,7 +284,7 @@ public async ValueTask ReceivedAsync(IListener listener, Envelope envelope) public async ValueTask DrainAsync() { // If _latched was already true, this drain was triggered during shutdown - // (after OnApplicationStopping called Latch()). Safe to wait for in-flight items. + // (after StopAndDrainAsync called Latch()). Safe to wait for in-flight items. // If _latched was false, this drain may have been triggered from within the handler // pipeline (e.g., rate limiting pause via PauseListenerContinuation). Waiting for // the receiver block to complete would deadlock because the current message's diff --git a/src/Wolverine/Runtime/WorkerQueues/InlineReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/InlineReceiver.cs index c8b41c663..a2381458a 100644 --- a/src/Wolverine/Runtime/WorkerQueues/InlineReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/InlineReceiver.cs @@ -43,7 +43,7 @@ public void Latch() public ValueTask DrainAsync() { // If _latched was already true, this drain was triggered during shutdown - // (after OnApplicationStopping called Latch()). Safe to wait for in-flight items. + // (after StopAndDrainAsync called LatchReceiver()). Safe to wait for in-flight items. // If _latched was false, this drain may have been triggered from within the handler // pipeline (e.g., rate limiting pause via PauseListenerContinuation). Waiting for // in-flight items to complete would deadlock because the current message's diff --git a/src/Wolverine/Transports/ListeningAgent.cs b/src/Wolverine/Transports/ListeningAgent.cs index 17498a44f..5d2b35736 100644 --- a/src/Wolverine/Transports/ListeningAgent.cs +++ b/src/Wolverine/Transports/ListeningAgent.cs @@ -182,15 +182,18 @@ public async ValueTask StopAndDrainAsync() return; } - Listener = null; - _receiver = null; - try { using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.StoppingListener); activity?.SetTag(WolverineTracing.EndpointAddress, Uri); await listener.StopAsync(); + + LatchReceiver(); + + Listener = null; + _receiver = null; + if (receiver != null) { await receiver.DrainAsync(); @@ -518,4 +521,4 @@ public ValueTask StopAsync() { return new ValueTask(); } -} \ No newline at end of file +} diff --git a/src/Wolverine/Transports/ParallelListener.cs b/src/Wolverine/Transports/ParallelListener.cs index 7c6ec292b..ba92a7706 100644 --- a/src/Wolverine/Transports/ParallelListener.cs +++ b/src/Wolverine/Transports/ParallelListener.cs @@ -1,3 +1,4 @@ +using System.Collections.Concurrent; using JasperFx.Core; using Wolverine.Runtime; @@ -32,7 +33,13 @@ public ValueTask DeferAsync(Envelope envelope) => public async ValueTask StopAsync() { - foreach (var listener in _listeners) await listener.StopAsync(); + var exceptions = new ConcurrentBag(); + await Task.WhenAll(_listeners.Select(async l => + { + try { await l.StopAsync(); } + catch (Exception e) { exceptions.Add(e); } + })); + if (!exceptions.IsEmpty) throw new AggregateException(exceptions); } public ValueTask DisposeAsync() =>