diff --git a/src/Testing/CoreTests/Runtime/WorkerQueues/durable_receiver_release_incoming_during_shutdown.cs b/src/Testing/CoreTests/Runtime/WorkerQueues/durable_receiver_release_incoming_during_shutdown.cs new file mode 100644 index 000000000..5837daeff --- /dev/null +++ b/src/Testing/CoreTests/Runtime/WorkerQueues/durable_receiver_release_incoming_during_shutdown.cs @@ -0,0 +1,81 @@ +using System.Diagnostics; +using System.Net.Sockets; +using NSubstitute; +using Shouldly; +using Wolverine.Runtime; +using Wolverine.Runtime.WorkerQueues; +using Wolverine.Transports.Stub; +using Xunit; + +namespace CoreTests.Runtime.WorkerQueues; + +/// +/// Regression coverage for GH-2671: must +/// terminate within a reasonable time even when the underlying message store is +/// throwing on every attempt to release inbox ownership. The pre-fix +/// executeWithRetriesAsync was an unbounded while (true) that +/// (a) ignored the cancellation token, (b) logged every failure at Error level, +/// and (c) never gave up — so during a shutdown sequence, where the Npgsql +/// DbDataSource has already been disposed, the loop would spin forever +/// against a dead socket and emit a flood of SocketException log noise. +/// +public class durable_receiver_release_incoming_during_shutdown : IAsyncLifetime +{ + private readonly IHandlerPipeline _pipeline = Substitute.For(); + private readonly MockWolverineRuntime _runtime; + private readonly DurableReceiver _receiver; + + public durable_receiver_release_incoming_during_shutdown() + { + _runtime = new MockWolverineRuntime(); + + // Simulate the user's reported failure mode: every call to ReleaseIncomingAsync + // throws SocketException because the Npgsql connector can't reach the server + // anymore. The exact exception type matches the Npgsql shutdown path the user + // reported in the issue; the implementation only needs *some* exception to + // demonstrate the retry-loop bug. + _runtime.Storage.Inbox + .When(x => x.ReleaseIncomingAsync(Arg.Any(), Arg.Any())) + .Do(_ => throw new SocketException(10054 /* WSAECONNRESET */)); + + var endpoint = new StubEndpoint("test://2671", new StubTransport()); + _receiver = new DurableReceiver(endpoint, _runtime, _pipeline); + } + + public Task InitializeAsync() => Task.CompletedTask; + public Task DisposeAsync() => Task.CompletedTask; + + [Fact] + public async Task drain_terminates_within_seconds_when_inbox_release_throws_repeatedly() + { + // Shutdown not signalled — the bounded retry path applies. With + // MaxReleaseRetries = 5 and linear backoff (100ms, 200ms, 300ms, 400ms), + // the upper bound is around 1s of sleep + per-attempt work. Anything + // under five seconds proves we are no longer in an unbounded loop. + var sw = Stopwatch.StartNew(); + await _receiver.DrainAsync(); + sw.Stop(); + + sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(5), + "DrainAsync looped beyond the bounded retry budget — the GH-2671 unbounded " + + "while-true regression has returned."); + } + + [Fact] + public async Task drain_exits_immediately_when_cancellation_is_signalled() + { + // Cancel the durability cancellation token before draining — this is the + // shape of the user-reported scenario where the host is shutting down and + // the connection pool has already gone away. We expect DrainAsync to bail + // out on the very first failure rather than burn the full retry budget. + _runtime.DurabilitySettings.Cancel(); + + var sw = Stopwatch.StartNew(); + await _receiver.DrainAsync(); + sw.Stop(); + + sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(1), + "Shutdown-aware exit didn't fire — cancellation signal must short-circuit " + + "the retry loop on the first failure."); + } +} diff --git a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs index 3a5225ad7..064385980 100644 --- a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs @@ -537,10 +537,26 @@ private async Task handleDuplicateIncomingEnvelope(Envelope envelope, DuplicateI } } + /// + /// Bounded retry helper used for best-effort persistence operations like + /// on drain. Three properties + /// matter for shutdown correctness (see GH-2671): + /// + /// The loop is finite — capped at attempts — + /// so a permanently unreachable database can't hang shutdown. + /// The loop honours : when the + /// host is stopping we exit immediately on the first failure rather than + /// hammering an already-disposed connection pool. + /// Log severity is demoted to Debug when cancellation has been signalled. + /// During teardown, transient socket / connection failures from the data + /// source are expected and don't warrant Error-level noise. + /// + /// + internal const int MaxReleaseRetries = 5; + private async Task executeWithRetriesAsync(Func action) { - var i = 0; - while (true) + for (var attempt = 1; ; attempt++) { try { @@ -549,9 +565,41 @@ private async Task executeWithRetriesAsync(Func action) } catch (Exception e) { - _logger.LogError(e, "Unexpected failure"); - i++; - await Task.Delay(i * 100).ConfigureAwait(false); + // Shutdown-aware exit: when the cancellation token has been signalled + // we treat any failure as terminal and demote the log level. Retrying + // here is futile (the DataSource is being torn down) and the inbox + // ownership we failed to release will be reclaimed by the durability + // agent on the next live node. + if (_settings.Cancellation.IsCancellationRequested) + { + _logger.LogDebug(e, + "Database operation failed during shutdown at {Uri}; exiting retry loop", + Uri); + return; + } + + if (attempt >= MaxReleaseRetries) + { + _logger.LogError(e, + "Database operation at {Uri} failed after {Attempts} attempts; giving up", + Uri, attempt); + return; + } + + _logger.LogError(e, + "Unexpected failure at {Uri} (attempt {Attempt}/{Max})", + Uri, attempt, MaxReleaseRetries); + + try + { + await Task.Delay(attempt * 100, _settings.Cancellation).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + // Cancellation fired while we were backing off — exit cleanly + // instead of throwing out of a best-effort cleanup path. + return; + } } } }