diff --git a/src/Transports/RabbitMQ/ChaosTesting/ChaosSpecifications.cs b/src/Transports/RabbitMQ/ChaosTesting/ChaosSpecifications.cs index d12c4f785..b2e3098f7 100644 --- a/src/Transports/RabbitMQ/ChaosTesting/ChaosSpecifications.cs +++ b/src/Transports/RabbitMQ/ChaosTesting/ChaosSpecifications.cs @@ -143,4 +143,30 @@ public Task RabbitMqDurableListener_Marten_ReceiverGoesUpAndDown() => [Fact] public Task RabbitMqFiveDurableListeners_Marten_ReceiverGoesUpAndDown() => execute(RabbitMqFiveDurableListeners); + + // Graceful shutdown tests + [Fact] + public Task RabbitMqDurableListener_Marten_GracefulShutdown() => + execute(RabbitMqDurableListener); + + [Fact] + public Task RabbitMqFiveDurableListeners_Marten_GracefulShutdown() => + execute(RabbitMqFiveDurableListeners); + + [Fact] + public Task RabbitMqBufferedListener_Marten_GracefulShutdown() => + execute(RabbitMqBufferedListener); + + [Fact] + public Task RabbitMqOneInlineListener_Marten_GracefulShutdown() => + execute(RabbitMqOneInlineListener); + + // Rolling restart tests + [Fact] + public Task RabbitMqDurableListener_Marten_RollingRestart() => + execute(RabbitMqDurableListener); + + [Fact] + public Task RabbitMqFiveDurableListeners_Marten_RollingRestart() => + execute(RabbitMqFiveDurableListeners); } \ No newline at end of file diff --git a/src/Transports/RabbitMQ/ChaosTesting/Scripts/GracefulShutdown.cs b/src/Transports/RabbitMQ/ChaosTesting/Scripts/GracefulShutdown.cs new file mode 100644 index 000000000..08a12dee4 --- /dev/null +++ b/src/Transports/RabbitMQ/ChaosTesting/Scripts/GracefulShutdown.cs @@ -0,0 +1,66 @@ +using JasperFx.Core; + +namespace ChaosTesting.Scripts; + +/// +/// Tests that in-flight messages complete during graceful shutdown and no messages +/// are left orphaned. Sends messages, then stops the receiver while messages are +/// still being processed, starts a new receiver, and verifies all messages complete. +/// +public class GracefulShutdown : ChaosScript +{ + public GracefulShutdown() + { + TimeOut = 2.Minutes(); + } + + public override async Task Drive(ChaosDriver driver) + { + await driver.StartReceiver("one"); + await driver.StartSender("one"); + + // Send a batch of messages + await driver.SendMessages("one", 200); + + // Give some time for processing to start but not complete + await Task.Delay(500.Milliseconds()); + + // Gracefully stop the receiver while messages are in-flight + await driver.StopReceiver("one"); + + // Start a new receiver to pick up any remaining messages + await driver.StartReceiver("two"); + } +} + +/// +/// Tests that multiple rapid shutdown/restart cycles don't lose messages. +/// Simulates rolling deployment behavior where receivers are stopped and new +/// ones started in quick succession. +/// +public class RollingRestart : ChaosScript +{ + public RollingRestart() + { + TimeOut = 3.Minutes(); + } + + public override async Task Drive(ChaosDriver driver) + { + await driver.StartReceiver("one"); + await driver.StartSender("one"); + + // Send initial batch + await driver.SendMessages("one", 300); + + // Simulate a rolling restart: stop old, start new, repeat + await Task.Delay(1.Seconds()); + await driver.StopReceiver("one"); + + await driver.StartReceiver("two"); + await Task.Delay(1.Seconds()); + await driver.StopReceiver("two"); + + await driver.StartReceiver("three"); + } +} diff --git a/src/Wolverine/Configuration/EndpointCollection.cs b/src/Wolverine/Configuration/EndpointCollection.cs index 09fddb7fd..f3dd05803 100644 --- a/src/Wolverine/Configuration/EndpointCollection.cs +++ b/src/Wolverine/Configuration/EndpointCollection.cs @@ -382,6 +382,24 @@ private ISendingAgent buildSendingAgent(Uri uri, Action? configureNewE return endpoint.StartSending(_runtime, transport.ReplyEndpoint()?.Uri); } + /// + /// Immediately latch all receivers to stop picking up new messages from their internal queues. + /// This is called as early as possible during shutdown (via IHostApplicationLifetime.ApplicationStopping) + /// so that messages already queued internally are not processed after the shutdown signal. + /// + public void LatchAllReceivers() + { + foreach (var listener in _listeners.Values) + { + listener.LatchReceiver(); + } + + foreach (var queue in _localSenders.Enumerate().Select(x => x.Value).OfType()) + { + queue.LatchReceiver(); + } + } + public async Task DrainAsync() { // Drain the listeners diff --git a/src/Wolverine/DurabilitySettings.cs b/src/Wolverine/DurabilitySettings.cs index 4d382f1fb..d3c047dcf 100644 --- a/src/Wolverine/DurabilitySettings.cs +++ b/src/Wolverine/DurabilitySettings.cs @@ -247,6 +247,12 @@ internal set /// public TimeSpan? NodeAssignmentHealthCheckTraceSamplingPeriod { get; set; } + /// + /// Maximum time to wait for in-flight message handlers to complete during graceful + /// shutdown before proceeding with the shutdown sequence. Default is 30 seconds. + /// + public TimeSpan DrainTimeout { get; set; } = 30.Seconds(); + /// /// Get or set the logical Wolverine service name. By default, this is /// derived from the name of a custom WolverineOptions diff --git a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs index 3e5917bec..a24e1c0bb 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs @@ -2,6 +2,7 @@ using JasperFx.CodeGeneration; using JasperFx.Core; using JasperFx.Core.Reflection; +using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Wolverine.Configuration; using Wolverine.Persistence.Durability; @@ -102,6 +103,19 @@ public async Task StartAsync(CancellationToken cancellationToken) await Observer.RuntimeIsFullyStarted(); _hasStarted = true; + + // Subscribe to the host shutdown signal so we can immediately latch all receivers + // the moment SIGTERM/ApplicationStopping fires, rather than waiting until our + // IHostedService.StopAsync is called (which may be delayed by other hosted services) + try + { + var lifetime = _container.Services.GetService(typeof(IHostApplicationLifetime)) as IHostApplicationLifetime; + lifetime?.ApplicationStopping.Register(OnApplicationStopping); + } + catch (Exception e) + { + Logger.LogDebug(e, "Could not subscribe to IHostApplicationLifetime.ApplicationStopping"); + } } catch (Exception? e) { @@ -110,6 +124,12 @@ public async Task StartAsync(CancellationToken cancellationToken) } } + internal void OnApplicationStopping() + { + Logger.LogInformation("Application stopping signal received, latching all message receivers"); + _endpoints.LatchAllReceivers(); + } + private bool _hasMigratedStorage; private async Task tryMigrateStorage() @@ -196,7 +216,21 @@ public async Task StopAsync(CancellationToken cancellationToken) DisableHealthChecks(); _idleAgentCleanupLoop?.SafeDispose(); - + + if (StopMode == StopMode.Normal) + { + // Step 1: Drain endpoints first — stop listeners from accepting new messages + // and wait for in-flight handlers to complete before releasing ownership. + // Receivers were already latched via IHostApplicationLifetime.ApplicationStopping + // to prevent new messages from being picked up, so this just waits for completion. + await _endpoints.DrainAsync(); + + if (_accumulator.IsValueCreated) + { + await _accumulator.Value.DrainAsync(); + } + } + if (_stores.IsValueCreated && StopMode == StopMode.Normal) { try @@ -210,7 +244,8 @@ public async Task StopAsync(CancellationToken cancellationToken) try { - // New to 3.0, try to release any ownership on the way out. Do this *after* the drain + // Release any ownership on the way out. Do this *after* draining endpoints + // so in-flight messages complete before their ownership is released. await _stores.Value.ReleaseAllOwnershipAsync(DurabilitySettings.AssignedNodeNumber); } catch (ObjectDisposedException) @@ -221,15 +256,8 @@ public async Task StopAsync(CancellationToken cancellationToken) if (StopMode == StopMode.Normal) { - // This MUST be called before draining the endpoints + // Step 2: Now teardown agents — safe after endpoints drained and ownership released await teardownAgentsAsync(); - - await _endpoints.DrainAsync(); - - if (_accumulator.IsValueCreated) - { - await _accumulator.Value.DrainAsync(); - } } DurabilitySettings.Cancel(); diff --git a/src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs index 3691fb40d..f1b4d9b20 100644 --- a/src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs @@ -114,11 +114,31 @@ async ValueTask IChannelCallback.DeferAsync(Envelope envelope) public int QueueCount => (int)_receivingBlock.Count; + /// + /// Immediately latch to stop processing new messages without draining. + /// + public void Latch() + { + _latched = true; + } + public async ValueTask DrainAsync() { _latched = true; _receivingBlock.Complete(); + // Wait for in-flight handler executions to complete, bounded by a timeout + // to prevent hanging during shutdown + try + { + var completion = _receivingBlock.WaitForCompletionAsync(); + await Task.WhenAny(completion, Task.Delay(_settings.DrainTimeout)); + } + catch (Exception e) + { + _logger.LogDebug(e, "Error waiting for in-flight message processing to complete at {Uri}", Uri); + } + await _completeBlock.DrainAsync(); await _deferBlock.DrainAsync(); @@ -126,9 +146,6 @@ public async ValueTask DrainAsync() { await _moveToErrors.DrainAsync(); } - - // It hangs, nothing to be done about this I think - //await _receivingBlock.Completion; } public void Enqueue(Envelope envelope) diff --git a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs index 71ae86b20..a380d709b 100644 --- a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs @@ -286,8 +286,17 @@ public async ValueTask DrainAsync() _latched = true; _receiver.Complete(); - // Latching is the best you can do here, otherwise it can hang - //await _receiver.Completion; + // Wait for in-flight handler executions to complete, bounded by a timeout + // to prevent hanging during shutdown + try + { + var completion = _receiver.WaitForCompletionAsync(); + await Task.WhenAny(completion, Task.Delay(_settings.DrainTimeout)); + } + catch (Exception e) + { + _logger.LogDebug(e, "Error waiting for in-flight message processing to complete at {Uri}", Uri); + } await _incrementAttempts.DrainAsync(); await _scheduleExecution.DrainAsync(); diff --git a/src/Wolverine/Transports/ListeningAgent.cs b/src/Wolverine/Transports/ListeningAgent.cs index ebba9a1a8..af5a97716 100644 --- a/src/Wolverine/Transports/ListeningAgent.cs +++ b/src/Wolverine/Transports/ListeningAgent.cs @@ -139,6 +139,24 @@ public async Task EnqueueDirectlyAsync(IEnumerable envelopes) public ListeningStatus Status { get; private set; } = ListeningStatus.Stopped; + /// + /// Immediately latch the receiver to stop processing new messages from its internal queue. + /// Does not stop the listener or drain — just prevents the receiver from executing any more messages. + /// + public void LatchReceiver() + { + if (_receiver is DurableReceiver dr) + { + dr.Latch(); + } + // BufferedReceiver latches via its _latched field in DrainAsync, + // but we need an immediate latch here too + else if (_receiver is BufferedReceiver br) + { + br.Latch(); + } + } + public async ValueTask StopAndDrainAsync() { if (Status == ListeningStatus.Stopped || Status == ListeningStatus.GloballyLatched) diff --git a/src/Wolverine/Transports/Local/DurableLocalQueue.cs b/src/Wolverine/Transports/Local/DurableLocalQueue.cs index 0c33abca5..295a01204 100644 --- a/src/Wolverine/Transports/Local/DurableLocalQueue.cs +++ b/src/Wolverine/Transports/Local/DurableLocalQueue.cs @@ -63,6 +63,15 @@ public DurableLocalQueue(Endpoint endpoint, WolverineRuntime runtime) public CircuitBreaker? CircuitBreaker { get; } + /// + /// Immediately latch the receiver to stop processing new messages. + /// + public void LatchReceiver() + { + Latched = true; + _receiver?.Latch(); + } + int IListenerCircuit.QueueCount => _receiver?.QueueCount ?? 0; async Task IListenerCircuit.EnqueueDirectlyAsync(IEnumerable envelopes)