diff --git a/src/JasperFx.Events/Daemon/GroupedProjectionExecution.cs b/src/JasperFx.Events/Daemon/GroupedProjectionExecution.cs index c406a05..dfb8658 100644 --- a/src/JasperFx.Events/Daemon/GroupedProjectionExecution.cs +++ b/src/JasperFx.Events/Daemon/GroupedProjectionExecution.cs @@ -128,6 +128,11 @@ private async Task groupEventRangeAsync(EventRange range, Cancellati return range; } + catch when (_cancellation.IsCancellationRequested) + { + // Shard is being torn down — don't promote the cancellation to a critical failure. + return null!; + } catch (Exception e) { activity?.AddException(e); @@ -183,6 +188,17 @@ private async Task processRangeAsync(EventRange range, CancellationToken _) range.Agent.Metrics.UpdateProcessed(range.Size); } + catch (OperationCanceledException) when (_cancellation.IsCancellationRequested) + { + // Daemon-internal cancellation (StopAllAsync / HardStopAsync / DisposeAsync fired the + // shard's CTS). Don't surface as a critical shard failure — matches the guard already + // used in applyBatchOperationsToDatabaseAsync and SubscriptionExecutionBase.executeRange. + } + catch when (_cancellation.IsCancellationRequested) + { + // Wrapped/aggregated cancellation (e.g. Npgsql 57014 surfaced through a non-OCE) that + // is really a side effect of the shard being torn down. Same rationale as above. + } catch (Exception e) { activity?.AddException(e); diff --git a/src/JasperFx.Events/Daemon/JasperFxAsyncDaemon.cs b/src/JasperFx.Events/Daemon/JasperFxAsyncDaemon.cs index ddaf6a3..441894e 100644 --- a/src/JasperFx.Events/Daemon/JasperFxAsyncDaemon.cs +++ b/src/JasperFx.Events/Daemon/JasperFxAsyncDaemon.cs @@ -710,7 +710,11 @@ public async Task CatchUpAsync(CancellationToken cancellation) var agent = buildAgentForShard(asyncShard); await agent.CatchUpAsync(HighWaterMark(), state, cancellation); - var exceptions = recorder.States.Select(x => x.Exception).Where(x => x != null).ToArray(); + var exceptions = recorder.States + .Select(x => x.Exception) + .Where(x => x != null) + .Where(x => cancellation.IsCancellationRequested || !isCancellationNoise(x!)) + .ToArray(); if (exceptions.Length != 0) { throw new AggregateException(exceptions!); @@ -724,6 +728,18 @@ public async Task CatchUpAsync(TimeSpan timeout, CancellationToken cancellation) cts.CancelAfter(timeout); await CatchUpAsync(cts.Token); } + + private static bool isCancellationNoise(Exception exception) + { + if (exception is OperationCanceledException) return true; + if (exception is AggregateException aggregate) + { + return aggregate.InnerExceptions.Count > 0 + && aggregate.InnerExceptions.All(isCancellationNoise); + } + + return false; + } } internal class Recorder : IObserver