Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
using System.Diagnostics;
using System.Net.Sockets;
using NSubstitute;
using Shouldly;
using Wolverine.Runtime;
using Wolverine.Runtime.WorkerQueues;
using Wolverine.Transports.Stub;
using Xunit;

namespace CoreTests.Runtime.WorkerQueues;

/// <summary>
/// Regression coverage for GH-2671: <see cref="DurableReceiver.DrainAsync"/> must
/// terminate within a reasonable time even when the underlying message store is
/// throwing on every attempt to release inbox ownership. The pre-fix
/// <c>executeWithRetriesAsync</c> was an unbounded <c>while (true)</c> that
/// (a) ignored the cancellation token, (b) logged every failure at Error level,
/// and (c) never gave up — so during a shutdown sequence, where the Npgsql
/// <c>DbDataSource</c> has already been disposed, the loop would spin forever
/// against a dead socket and emit a flood of <c>SocketException</c> log noise.
/// </summary>
public class durable_receiver_release_incoming_during_shutdown : IAsyncLifetime
{
private readonly IHandlerPipeline _pipeline = Substitute.For<IHandlerPipeline>();
private readonly MockWolverineRuntime _runtime;
private readonly DurableReceiver _receiver;

public durable_receiver_release_incoming_during_shutdown()
{
_runtime = new MockWolverineRuntime();

// Simulate the user's reported failure mode: every call to ReleaseIncomingAsync
// throws SocketException because the Npgsql connector can't reach the server
// anymore. The exact exception type matches the Npgsql shutdown path the user
// reported in the issue; the implementation only needs *some* exception to
// demonstrate the retry-loop bug.
_runtime.Storage.Inbox
.When(x => x.ReleaseIncomingAsync(Arg.Any<int>(), Arg.Any<Uri>()))
.Do(_ => throw new SocketException(10054 /* WSAECONNRESET */));

var endpoint = new StubEndpoint("test://2671", new StubTransport());
_receiver = new DurableReceiver(endpoint, _runtime, _pipeline);
}

public Task InitializeAsync() => Task.CompletedTask;
public Task DisposeAsync() => Task.CompletedTask;

[Fact]
public async Task drain_terminates_within_seconds_when_inbox_release_throws_repeatedly()
{
// Shutdown not signalled — the bounded retry path applies. With
// MaxReleaseRetries = 5 and linear backoff (100ms, 200ms, 300ms, 400ms),
// the upper bound is around 1s of sleep + per-attempt work. Anything
// under five seconds proves we are no longer in an unbounded loop.
var sw = Stopwatch.StartNew();
await _receiver.DrainAsync();
sw.Stop();

sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(5),
"DrainAsync looped beyond the bounded retry budget — the GH-2671 unbounded " +
"while-true regression has returned.");
}

[Fact]
public async Task drain_exits_immediately_when_cancellation_is_signalled()
{
// Cancel the durability cancellation token before draining — this is the
// shape of the user-reported scenario where the host is shutting down and
// the connection pool has already gone away. We expect DrainAsync to bail
// out on the very first failure rather than burn the full retry budget.
_runtime.DurabilitySettings.Cancel();

var sw = Stopwatch.StartNew();
await _receiver.DrainAsync();
sw.Stop();

sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(1),
"Shutdown-aware exit didn't fire — cancellation signal must short-circuit " +
"the retry loop on the first failure.");
}
}
58 changes: 53 additions & 5 deletions src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -537,10 +537,26 @@ private async Task handleDuplicateIncomingEnvelope(Envelope envelope, DuplicateI
}
}

/// <summary>
/// Bounded retry helper used for best-effort persistence operations like
/// <see cref="IMessageInbox.ReleaseIncomingAsync"/> on drain. Three properties
/// matter for shutdown correctness (see GH-2671):
/// <list type="bullet">
/// <item>The loop is finite — capped at <see cref="MaxReleaseRetries"/> attempts —
/// so a permanently unreachable database can't hang shutdown.</item>
/// <item>The loop honours <see cref="DurabilitySettings.Cancellation"/>: when the
/// host is stopping we exit immediately on the first failure rather than
/// hammering an already-disposed connection pool.</item>
/// <item>Log severity is demoted to Debug when cancellation has been signalled.
/// During teardown, transient socket / connection failures from the data
/// source are expected and don't warrant Error-level noise.</item>
/// </list>
/// </summary>
internal const int MaxReleaseRetries = 5;

private async Task executeWithRetriesAsync(Func<Task> action)
{
var i = 0;
while (true)
for (var attempt = 1; ; attempt++)
{
try
{
Expand All @@ -549,9 +565,41 @@ private async Task executeWithRetriesAsync(Func<Task> action)
}
catch (Exception e)
{
_logger.LogError(e, "Unexpected failure");
i++;
await Task.Delay(i * 100).ConfigureAwait(false);
// Shutdown-aware exit: when the cancellation token has been signalled
// we treat any failure as terminal and demote the log level. Retrying
// here is futile (the DataSource is being torn down) and the inbox
// ownership we failed to release will be reclaimed by the durability
// agent on the next live node.
if (_settings.Cancellation.IsCancellationRequested)
{
_logger.LogDebug(e,
"Database operation failed during shutdown at {Uri}; exiting retry loop",
Uri);
return;
}

if (attempt >= MaxReleaseRetries)
{
_logger.LogError(e,
"Database operation at {Uri} failed after {Attempts} attempts; giving up",
Uri, attempt);
return;
}

_logger.LogError(e,
"Unexpected failure at {Uri} (attempt {Attempt}/{Max})",
Uri, attempt, MaxReleaseRetries);

try
{
await Task.Delay(attempt * 100, _settings.Cancellation).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Cancellation fired while we were backing off — exit cleanly
// instead of throwing out of a best-effort cleanup path.
return;
}
}
}
}
Expand Down
Loading