Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions src/Wolverine/Configuration/EndpointCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,9 @@ private ISendingAgent buildSendingAgent(Uri uri, Action<Endpoint>? configureNewE

/// <summary>
/// Immediately latch all receivers to stop picking up new messages from their internal queues.
/// This is called as early as possible during shutdown (via IHostApplicationLifetime.ApplicationStopping)
/// so that messages already queued internally are not processed after the shutdown signal.
/// During normal shutdown this is no longer called globally; instead each ListeningAgent
/// latches its own receiver after stopping the listener (see StopAndDrainAsync).
/// Kept to not break public api compatability.
/// </summary>
public void LatchAllReceivers()
{
Expand All @@ -402,8 +403,7 @@ public void LatchAllReceivers()

public async Task DrainAsync()
{
// Drain the listeners
foreach (var listener in ActiveListeners().ToArray())
await Task.WhenAll(ActiveListeners().ToArray().Select(async listener =>
{
try
{
Expand All @@ -413,9 +413,9 @@ public async Task DrainAsync()
{
_runtime.Logger.LogError(e, "Failed to 'drain' outstanding messages in listener {Uri}", listener.Uri);
}
}
}));

foreach (var queue in _localSenders.Enumerate().Select(x => x.Value).OfType<ILocalQueue>())
await Task.WhenAll(_localSenders.Enumerate().Select(x => x.Value).OfType<ILocalQueue>().Select(async queue =>
{
try
{
Expand All @@ -425,7 +425,7 @@ public async Task DrainAsync()
{
_runtime.Logger.LogError(e, "Failed to 'drain' outstanding messages in local sender {Queue}", queue);
}
}
}));
}

internal void StoreSendingAgent(ISendingAgent agent)
Expand Down Expand Up @@ -457,4 +457,4 @@ internal async Task RemoveSendingAgentAsync(Uri destination)
await ad.DisposeAsync();
}
}
}
}
12 changes: 5 additions & 7 deletions src/Wolverine/Runtime/WolverineRuntime.HostService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ public async Task StartAsync(CancellationToken cancellationToken)

internal void OnApplicationStopping()
{
Logger.LogInformation("Application stopping signal received, latching all message receivers");
_endpoints.LatchAllReceivers();
Logger.LogInformation("Application stopping signal received");
}

private bool _hasMigratedStorage;
Expand Down Expand Up @@ -219,10 +218,9 @@ public async Task StopAsync(CancellationToken cancellationToken)

if (StopMode == StopMode.Normal)
{
// Step 1: Drain endpoints first — stop listeners from accepting new messages
// and wait for in-flight handlers to complete before releasing ownership.
// Receivers were already latched via IHostApplicationLifetime.ApplicationStopping
// to prevent new messages from being picked up, so this just waits for completion.
// Step 1: Drain endpoints — each listener is stopped, its receiver latched,
// then in-flight handlers are drained. Receivers are not latched up front,
// since messages might be unnecessarily deferred before listeners are stopped.
await _endpoints.DrainAsync();

if (_accumulator.IsValueCreated)
Expand Down Expand Up @@ -440,4 +438,4 @@ public enum StopMode
/// Honestly, don't use this except in Wolverine testing...
/// </summary>
Quick
}
}
2 changes: 1 addition & 1 deletion src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void Latch()
public async ValueTask DrainAsync()
{
// If _latched was already true, this drain was triggered during shutdown
// (after OnApplicationStopping called Latch()). Safe to wait for in-flight items.
// (after StopAndDrainAsync called Latch()). Safe to wait for in-flight items.
// If _latched was false, this drain may have been triggered from within the handler
// pipeline (e.g., rate limiting pause via PauseListenerContinuation). Waiting for
// the receiving block to complete would deadlock because the current message's
Expand Down
2 changes: 1 addition & 1 deletion src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ public async ValueTask ReceivedAsync(IListener listener, Envelope envelope)
public async ValueTask DrainAsync()
{
// If _latched was already true, this drain was triggered during shutdown
// (after OnApplicationStopping called Latch()). Safe to wait for in-flight items.
// (after StopAndDrainAsync called Latch()). Safe to wait for in-flight items.
// If _latched was false, this drain may have been triggered from within the handler
// pipeline (e.g., rate limiting pause via PauseListenerContinuation). Waiting for
// the receiver block to complete would deadlock because the current message's
Expand Down
2 changes: 1 addition & 1 deletion src/Wolverine/Runtime/WorkerQueues/InlineReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void Latch()
public ValueTask DrainAsync()
{
// If _latched was already true, this drain was triggered during shutdown
// (after OnApplicationStopping called Latch()). Safe to wait for in-flight items.
// (after StopAndDrainAsync called LatchReceiver()). Safe to wait for in-flight items.
// If _latched was false, this drain may have been triggered from within the handler
// pipeline (e.g., rate limiting pause via PauseListenerContinuation). Waiting for
// in-flight items to complete would deadlock because the current message's
Expand Down
11 changes: 7 additions & 4 deletions src/Wolverine/Transports/ListeningAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,18 @@ public async ValueTask StopAndDrainAsync()
return;
}

Listener = null;
_receiver = null;

try
{
using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.StoppingListener);
activity?.SetTag(WolverineTracing.EndpointAddress, Uri);

await listener.StopAsync();

LatchReceiver();

Listener = null;
_receiver = null;

if (receiver != null)
{
await receiver.DrainAsync();
Expand Down Expand Up @@ -518,4 +521,4 @@ public ValueTask StopAsync()
{
return new ValueTask();
}
}
}
9 changes: 8 additions & 1 deletion src/Wolverine/Transports/ParallelListener.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Collections.Concurrent;
using JasperFx.Core;
using Wolverine.Runtime;

Expand Down Expand Up @@ -32,7 +33,13 @@ public ValueTask DeferAsync(Envelope envelope) =>

public async ValueTask StopAsync()
{
foreach (var listener in _listeners) await listener.StopAsync();
var exceptions = new ConcurrentBag<Exception>();
await Task.WhenAll(_listeners.Select(async l =>
{
try { await l.StopAsync(); }
catch (Exception e) { exceptions.Add(e); }
}));
if (!exceptions.IsEmpty) throw new AggregateException(exceptions);
}

public ValueTask DisposeAsync() =>
Expand Down
Loading