Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/Transports/RabbitMQ/ChaosTesting/ChaosSpecifications.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,30 @@ public Task RabbitMqDurableListener_Marten_ReceiverGoesUpAndDown() =>
[Fact]
public Task RabbitMqFiveDurableListeners_Marten_ReceiverGoesUpAndDown() =>
execute<MartenStorageStrategy, ReceiverGoesUpAndDown>(RabbitMqFiveDurableListeners);

// Graceful shutdown tests
[Fact]
public Task RabbitMqDurableListener_Marten_GracefulShutdown() =>
execute<MartenStorageStrategy, GracefulShutdown>(RabbitMqDurableListener);

[Fact]
public Task RabbitMqFiveDurableListeners_Marten_GracefulShutdown() =>
execute<MartenStorageStrategy, GracefulShutdown>(RabbitMqFiveDurableListeners);

[Fact]
public Task RabbitMqBufferedListener_Marten_GracefulShutdown() =>
execute<MartenStorageStrategy, GracefulShutdown>(RabbitMqBufferedListener);

[Fact]
public Task RabbitMqOneInlineListener_Marten_GracefulShutdown() =>
execute<MartenStorageStrategy, GracefulShutdown>(RabbitMqOneInlineListener);

// Rolling restart tests
[Fact]
public Task RabbitMqDurableListener_Marten_RollingRestart() =>
execute<MartenStorageStrategy, RollingRestart>(RabbitMqDurableListener);

[Fact]
public Task RabbitMqFiveDurableListeners_Marten_RollingRestart() =>
execute<MartenStorageStrategy, RollingRestart>(RabbitMqFiveDurableListeners);
}
66 changes: 66 additions & 0 deletions src/Transports/RabbitMQ/ChaosTesting/Scripts/GracefulShutdown.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
using JasperFx.Core;

namespace ChaosTesting.Scripts;

/// <summary>
/// Tests that in-flight messages complete during graceful shutdown and no messages
/// are left orphaned. Sends messages, then stops the receiver while messages are
/// still being processed, starts a new receiver, and verifies all messages complete.
/// </summary>
public class GracefulShutdown : ChaosScript
{
public GracefulShutdown()
{
TimeOut = 2.Minutes();
}

public override async Task Drive(ChaosDriver driver)
{
await driver.StartReceiver("one");
await driver.StartSender("one");

// Send a batch of messages
await driver.SendMessages("one", 200);

// Give some time for processing to start but not complete
await Task.Delay(500.Milliseconds());

// Gracefully stop the receiver while messages are in-flight
await driver.StopReceiver("one");

// Start a new receiver to pick up any remaining messages
await driver.StartReceiver("two");
}
}

/// <summary>
/// Tests that multiple rapid shutdown/restart cycles don't lose messages.
/// Simulates rolling deployment behavior where receivers are stopped and new
/// ones started in quick succession.
/// </summary>
public class RollingRestart : ChaosScript
{
public RollingRestart()
{
TimeOut = 3.Minutes();
}

public override async Task Drive(ChaosDriver driver)
{
await driver.StartReceiver("one");
await driver.StartSender("one");

// Send initial batch
await driver.SendMessages("one", 300);

// Simulate a rolling restart: stop old, start new, repeat
await Task.Delay(1.Seconds());
await driver.StopReceiver("one");

await driver.StartReceiver("two");
await Task.Delay(1.Seconds());
await driver.StopReceiver("two");

await driver.StartReceiver("three");
}
}
18 changes: 18 additions & 0 deletions src/Wolverine/Configuration/EndpointCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,24 @@ private ISendingAgent buildSendingAgent(Uri uri, Action<Endpoint>? configureNewE
return endpoint.StartSending(_runtime, transport.ReplyEndpoint()?.Uri);
}

/// <summary>
/// Immediately latch all receivers to stop picking up new messages from their internal queues.
/// This is called as early as possible during shutdown (via IHostApplicationLifetime.ApplicationStopping)
/// so that messages already queued internally are not processed after the shutdown signal.
/// </summary>
public void LatchAllReceivers()
{
foreach (var listener in _listeners.Values)
{
listener.LatchReceiver();
}

foreach (var queue in _localSenders.Enumerate().Select(x => x.Value).OfType<DurableLocalQueue>())
{
queue.LatchReceiver();
}
}

public async Task DrainAsync()
{
// Drain the listeners
Expand Down
6 changes: 6 additions & 0 deletions src/Wolverine/DurabilitySettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,12 @@ internal set
/// </summary>
public TimeSpan? NodeAssignmentHealthCheckTraceSamplingPeriod { get; set; }

/// <summary>
/// Maximum time to wait for in-flight message handlers to complete during graceful
/// shutdown before proceeding with the shutdown sequence. Default is 30 seconds.
/// </summary>
public TimeSpan DrainTimeout { get; set; } = 30.Seconds();

/// <summary>
/// Get or set the logical Wolverine service name. By default, this is
/// derived from the name of a custom WolverineOptions
Expand Down
48 changes: 38 additions & 10 deletions src/Wolverine/Runtime/WolverineRuntime.HostService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using JasperFx.CodeGeneration;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Wolverine.Configuration;
using Wolverine.Persistence.Durability;
Expand Down Expand Up @@ -102,6 +103,19 @@ public async Task StartAsync(CancellationToken cancellationToken)

await Observer.RuntimeIsFullyStarted();
_hasStarted = true;

// Subscribe to the host shutdown signal so we can immediately latch all receivers
// the moment SIGTERM/ApplicationStopping fires, rather than waiting until our
// IHostedService.StopAsync is called (which may be delayed by other hosted services)
try
{
var lifetime = _container.Services.GetService(typeof(IHostApplicationLifetime)) as IHostApplicationLifetime;
lifetime?.ApplicationStopping.Register(OnApplicationStopping);
}
catch (Exception e)
{
Logger.LogDebug(e, "Could not subscribe to IHostApplicationLifetime.ApplicationStopping");
}
}
catch (Exception? e)
{
Expand All @@ -110,6 +124,12 @@ public async Task StartAsync(CancellationToken cancellationToken)
}
}

internal void OnApplicationStopping()
{
Logger.LogInformation("Application stopping signal received, latching all message receivers");
_endpoints.LatchAllReceivers();
}

private bool _hasMigratedStorage;

private async Task tryMigrateStorage()
Expand Down Expand Up @@ -196,7 +216,21 @@ public async Task StopAsync(CancellationToken cancellationToken)
DisableHealthChecks();

_idleAgentCleanupLoop?.SafeDispose();


if (StopMode == StopMode.Normal)
{
// Step 1: Drain endpoints first — stop listeners from accepting new messages
// and wait for in-flight handlers to complete before releasing ownership.
// Receivers were already latched via IHostApplicationLifetime.ApplicationStopping
// to prevent new messages from being picked up, so this just waits for completion.
await _endpoints.DrainAsync();

if (_accumulator.IsValueCreated)
{
await _accumulator.Value.DrainAsync();
}
}

if (_stores.IsValueCreated && StopMode == StopMode.Normal)
{
try
Expand All @@ -210,7 +244,8 @@ public async Task StopAsync(CancellationToken cancellationToken)

try
{
// New to 3.0, try to release any ownership on the way out. Do this *after* the drain
// Release any ownership on the way out. Do this *after* draining endpoints
// so in-flight messages complete before their ownership is released.
await _stores.Value.ReleaseAllOwnershipAsync(DurabilitySettings.AssignedNodeNumber);
}
catch (ObjectDisposedException)
Expand All @@ -221,15 +256,8 @@ public async Task StopAsync(CancellationToken cancellationToken)

if (StopMode == StopMode.Normal)
{
// This MUST be called before draining the endpoints
// Step 2: Now teardown agents — safe after endpoints drained and ownership released
await teardownAgentsAsync();

await _endpoints.DrainAsync();

if (_accumulator.IsValueCreated)
{
await _accumulator.Value.DrainAsync();
}
}

DurabilitySettings.Cancel();
Expand Down
23 changes: 20 additions & 3 deletions src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,27 +108,44 @@
}
}

public IHandlerPipeline? Pipeline { get; }

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

public Uri Uri { get; }

public int QueueCount => (int)_receivingBlock.Count;

/// <summary>
/// Immediately latch to stop processing new messages without draining.
/// </summary>
public void Latch()
{
_latched = true;
}

public async ValueTask DrainAsync()
{
_latched = true;
_receivingBlock.Complete();

// Wait for in-flight handler executions to complete, bounded by a timeout
// to prevent hanging during shutdown
try
{
var completion = _receivingBlock.WaitForCompletionAsync();
await Task.WhenAny(completion, Task.Delay(_settings.DrainTimeout));
}
catch (Exception e)
{
_logger.LogDebug(e, "Error waiting for in-flight message processing to complete at {Uri}", Uri);
}

await _completeBlock.DrainAsync();
await _deferBlock.DrainAsync();

if (_moveToErrors != null)
{
await _moveToErrors.DrainAsync();
}

// It hangs, nothing to be done about this I think
//await _receivingBlock.Completion;
}

public void Enqueue(Envelope envelope)
Expand Down
13 changes: 11 additions & 2 deletions src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@
await EnqueueAsync(envelope);
}

public IHandlerPipeline? Pipeline { get; }

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

public Uri Uri { get; set; }

Expand Down Expand Up @@ -286,8 +286,17 @@
_latched = true;
_receiver.Complete();

// Latching is the best you can do here, otherwise it can hang
//await _receiver.Completion;
// Wait for in-flight handler executions to complete, bounded by a timeout
// to prevent hanging during shutdown
try
{
var completion = _receiver.WaitForCompletionAsync();
await Task.WhenAny(completion, Task.Delay(_settings.DrainTimeout));
}
catch (Exception e)
{
_logger.LogDebug(e, "Error waiting for in-flight message processing to complete at {Uri}", Uri);
}

await _incrementAttempts.DrainAsync();
await _scheduleExecution.DrainAsync();
Expand Down
18 changes: 18 additions & 0 deletions src/Wolverine/Transports/ListeningAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,24 @@ public async Task EnqueueDirectlyAsync(IEnumerable<Envelope> envelopes)

public ListeningStatus Status { get; private set; } = ListeningStatus.Stopped;

/// <summary>
/// Immediately latch the receiver to stop processing new messages from its internal queue.
/// Does not stop the listener or drain — just prevents the receiver from executing any more messages.
/// </summary>
public void LatchReceiver()
{
if (_receiver is DurableReceiver dr)
{
dr.Latch();
}
// BufferedReceiver latches via its _latched field in DrainAsync,
// but we need an immediate latch here too
else if (_receiver is BufferedReceiver br)
{
br.Latch();
}
}

public async ValueTask StopAndDrainAsync()
{
if (Status == ListeningStatus.Stopped || Status == ListeningStatus.GloballyLatched)
Expand Down
9 changes: 9 additions & 0 deletions src/Wolverine/Transports/Local/DurableLocalQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ public DurableLocalQueue(Endpoint endpoint, WolverineRuntime runtime)

public CircuitBreaker? CircuitBreaker { get; }

/// <summary>
/// Immediately latch the receiver to stop processing new messages.
/// </summary>
public void LatchReceiver()
{
Latched = true;
_receiver?.Latch();
}

int IListenerCircuit.QueueCount => _receiver?.QueueCount ?? 0;

async Task IListenerCircuit.EnqueueDirectlyAsync(IEnumerable<Envelope> envelopes)
Expand Down
Loading