diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/Durability/CosmosDbDurabilityAgent.cs b/src/Persistence/Wolverine.CosmosDb/Internals/Durability/CosmosDbDurabilityAgent.cs index 98baf628b..8ad177d92 100644 --- a/src/Persistence/Wolverine.CosmosDb/Internals/Durability/CosmosDbDurabilityAgent.cs +++ b/src/Persistence/Wolverine.CosmosDb/Internals/Durability/CosmosDbDurabilityAgent.cs @@ -2,8 +2,11 @@ using JasperFx.Core; using JasperFx.Core.Reflection; using Microsoft.Azure.Cosmos; +using Microsoft.Extensions.Diagnostics.HealthChecks; using Microsoft.Extensions.Logging; +using Wolverine.Logging; using Wolverine.Persistence; +using Wolverine.Persistence.Durability; using Wolverine.Runtime; using Wolverine.Runtime.Agents; using Wolverine.Runtime.WorkerQueues; @@ -26,6 +29,7 @@ public partial class CosmosDbDurabilityAgent : IAgent private readonly CancellationTokenSource _cancellation = new(); private readonly CancellationTokenSource _combined; private PersistenceMetrics? _metrics; + private readonly DurabilityHealthSignals _health; public CosmosDbDurabilityAgent(Container container, IWolverineRuntime runtime, CosmosDbMessageStore parent) @@ -41,6 +45,7 @@ public CosmosDbDurabilityAgent(Container container, IWolverineRuntime runtime, _logger = runtime.LoggerFactory.CreateLogger(); _combined = CancellationTokenSource.CreateLinkedTokenSource(runtime.Cancellation, _cancellation.Token); + _health = new DurabilityHealthSignals(_settings); } public Task StartAsync(CancellationToken cancellationToken) @@ -69,16 +74,26 @@ internal void StartTimers() { var lastExpiredTime = DateTimeOffset.UtcNow; - await tryRecoverIncomingMessages(); - await tryRecoverOutgoingMessagesAsync(); - - if (_settings.DeadLetterQueueExpirationEnabled) + try { - var now = DateTimeOffset.UtcNow; - if (now > lastExpiredTime.AddHours(1)) + await tryRecoverIncomingMessages(); + await tryRecoverOutgoingMessagesAsync(); + + if (_settings.DeadLetterQueueExpirationEnabled) { - await tryDeleteExpiredDeadLetters(); + var now = DateTimeOffset.UtcNow; + if (now > lastExpiredTime.AddHours(1)) + { + await tryDeleteExpiredDeadLetters(); + } } + + _health.RecordPollSuccess(); + } + catch (Exception e) when (!_combined.IsCancellationRequested) + { + _health.RecordPollFailure(e); + _logger.LogError(e, "Recovery loop tick failed"); } await timer.WaitForNextTickAsync(_combined.Token); @@ -92,12 +107,41 @@ internal void StartTimers() while (!_combined.IsCancellationRequested) { - await runScheduledJobs(); + try + { + await runScheduledJobs(); + _health.RecordPollSuccess(); + } + catch (Exception e) when (!_combined.IsCancellationRequested) + { + _health.RecordPollFailure(e); + _logger.LogError(e, "Scheduled-job loop tick failed"); + } + await timer.WaitForNextTickAsync(_combined.Token); } }, _combined.Token); } + public async Task CheckHealthAsync(HealthCheckContext context, + CancellationToken cancellationToken = default) + { + PersistedCounts? counts = null; + if (Status == AgentStatus.Running) + { + try + { + counts = await _parent.Admin.FetchCountsAsync(); + } + catch (Exception e) + { + _health.RecordPollFailure(e); + } + } + + return _health.Evaluate(Status, Uri, counts, DateTimeOffset.UtcNow); + } + private async Task tryDeleteExpiredDeadLetters() { var now = DateTimeOffset.UtcNow; diff --git a/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs b/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs index ab59253b4..e96320ce9 100644 --- a/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs +++ b/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs @@ -5,6 +5,7 @@ using Microsoft.Extensions.Diagnostics.HealthChecks; using Microsoft.Extensions.Logging; using Weasel.Core; +using Wolverine.Logging; using Wolverine.Persistence; using Wolverine.Persistence.Durability; using Wolverine.RDBMS.Durability; @@ -29,8 +30,7 @@ internal class DurabilityAgent : IAgent private Timer? _recoveryTimer; private Timer? _scheduledJobTimer; - private int _successCount; - private int _exceptionCount; + private readonly DurabilityHealthSignals _health; private DateTime _lastHealthCheck = DateTime.UtcNow; public DurabilityAgent(IWolverineRuntime runtime, IMessageDatabase database) @@ -45,6 +45,8 @@ public DurabilityAgent(IWolverineRuntime runtime, IMessageDatabase database) _logger = runtime.LoggerFactory.CreateLogger(); + _health = new DurabilityHealthSignals(_settings); + _runningBlock = new Block(async batch => { if (runtime.Cancellation.IsCancellationRequested) @@ -55,11 +57,11 @@ public DurabilityAgent(IWolverineRuntime runtime, IMessageDatabase database) try { await executor.InvokeAsync(batch, new MessageBus(runtime)); - Interlocked.Increment(ref _successCount); + _health.RecordPollSuccess(); } catch (Exception e) { - Interlocked.Increment(ref _exceptionCount); + _health.RecordPollFailure(e); _logger.LogError(e, "Error trying to run durability agent commands"); } }); @@ -232,28 +234,27 @@ public void StartScheduledJobPolling() _settings, _settings.ScheduledJobFirstExecution, _settings.ScheduledJobPollingTime); } - public Task CheckHealthAsync(HealthCheckContext context, + public async Task CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default) { - if (Status != AgentStatus.Running) - { - return Task.FromResult(HealthCheckResult.Unhealthy($"Agent {Uri} is {Status}")); - } - - var exceptions = Interlocked.Exchange(ref _exceptionCount, 0); - var successes = Interlocked.Exchange(ref _successCount, 0); _lastHealthCheck = DateTime.UtcNow; - if (exceptions > 0 && successes == 0) - { - return Task.FromResult(HealthCheckResult.Unhealthy("All database operations failed")); - } - - if (exceptions > 0) + // Skip the count fetch when the agent isn't running — the status check below will + // short-circuit anyway, and a stopped agent shouldn't spin up a fresh DB query just + // to be told the same thing. + PersistedCounts? counts = null; + if (Status == AgentStatus.Running) { - return Task.FromResult(HealthCheckResult.Degraded("Some database operations failed")); + try + { + counts = await _database.FetchCountsAsync(); + } + catch (Exception e) + { + _health.RecordPollFailure(e); + } } - return Task.FromResult(HealthCheckResult.Healthy()); + return _health.Evaluate(Status, Uri, counts, DateTimeOffset.UtcNow); } } \ No newline at end of file diff --git a/src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.cs b/src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.cs index 164b43fb7..be1fc2c11 100644 --- a/src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.cs +++ b/src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.cs @@ -1,10 +1,13 @@ using JasperFx; using JasperFx.Core; using JasperFx.Core.Reflection; +using Microsoft.Extensions.Diagnostics.HealthChecks; using Microsoft.Extensions.Logging; using Raven.Client.Documents; using Raven.Client.Documents.Operations; +using Wolverine.Logging; using Wolverine.Persistence; +using Wolverine.Persistence.Durability; using Wolverine.Runtime; using Wolverine.Runtime.Agents; using Wolverine.Runtime.Handlers; @@ -28,7 +31,8 @@ public partial class RavenDbDurabilityAgent : IAgent private readonly CancellationTokenSource _cancellation = new(); private readonly CancellationTokenSource _combined; private PersistenceMetrics _metrics = null!; - + private readonly DurabilityHealthSignals _health; + public RavenDbDurabilityAgent(IDocumentStore store, IWolverineRuntime runtime, RavenDbMessageStore parent) { _store = store; @@ -40,8 +44,9 @@ public RavenDbDurabilityAgent(IDocumentStore store, IWolverineRuntime runtime, R Uri = new Uri($"{PersistenceConstants.AgentScheme}://ravendb/durability"); _logger = runtime.LoggerFactory.CreateLogger(); - + _combined = CancellationTokenSource.CreateLinkedTokenSource(runtime.Cancellation, _cancellation.Token); + _health = new DurabilityHealthSignals(_settings); } public Task StartAsync(CancellationToken cancellationToken) @@ -70,18 +75,28 @@ internal void StartTimers() while (!_combined.IsCancellationRequested) { var lastExpiredTime = DateTimeOffset.UtcNow; - - await tryRecoverIncomingMessages(); - await tryRecoverOutgoingMessagesAsync(); - if (_settings.DeadLetterQueueExpirationEnabled) + try { - // Crudely just doing this every hour - var now = DateTimeOffset.UtcNow; - if (now > lastExpiredTime.AddHours(1)) + await tryRecoverIncomingMessages(); + await tryRecoverOutgoingMessagesAsync(); + + if (_settings.DeadLetterQueueExpirationEnabled) { - await tryDeleteExpiredDeadLetters(); + // Crudely just doing this every hour + var now = DateTimeOffset.UtcNow; + if (now > lastExpiredTime.AddHours(1)) + { + await tryDeleteExpiredDeadLetters(); + } } + + _health.RecordPollSuccess(); + } + catch (Exception e) when (!_combined.IsCancellationRequested) + { + _health.RecordPollFailure(e); + _logger.LogError(e, "Recovery loop tick failed"); } await timer.WaitForNextTickAsync(_combined.Token); @@ -92,16 +107,45 @@ internal void StartTimers() { await Task.Delay(recoveryStart, _combined.Token); using var timer = new PeriodicTimer(_settings.ScheduledJobPollingTime); - + while (!_combined.IsCancellationRequested) { - await runScheduledJobs(); + try + { + await runScheduledJobs(); + _health.RecordPollSuccess(); + } + catch (Exception e) when (!_combined.IsCancellationRequested) + { + _health.RecordPollFailure(e); + _logger.LogError(e, "Scheduled-job loop tick failed"); + } + await timer.WaitForNextTickAsync(_combined.Token); } }, _combined.Token); } + public async Task CheckHealthAsync(HealthCheckContext context, + CancellationToken cancellationToken = default) + { + PersistedCounts? counts = null; + if (Status == AgentStatus.Running) + { + try + { + counts = await _parent.Admin.FetchCountsAsync(); + } + catch (Exception e) + { + _health.RecordPollFailure(e); + } + } + + return _health.Evaluate(Status, Uri, counts, DateTimeOffset.UtcNow); + } + private async Task tryDeleteExpiredDeadLetters() { var now = DateTimeOffset.UtcNow; diff --git a/src/Testing/CoreTests/Persistence/durability_health_signals_tests.cs b/src/Testing/CoreTests/Persistence/durability_health_signals_tests.cs new file mode 100644 index 000000000..5533dc199 --- /dev/null +++ b/src/Testing/CoreTests/Persistence/durability_health_signals_tests.cs @@ -0,0 +1,220 @@ +using JasperFx; +using Microsoft.Extensions.Diagnostics.HealthChecks; +using Shouldly; +using Wolverine; +using Wolverine.Logging; +using Wolverine.Persistence.Durability; +using Xunit; + +namespace CoreTests.Persistence; + +// Unit tests for the per-agent health-signal aggregator added in #2646. +// The class is shared by the RDBMS, RavenDb, and CosmosDb durability agents — each +// owns one instance and feeds it RecordPollSuccess/RecordPollFailure from its +// background loop. CheckHealthAsync calls Evaluate(...) with a fresh PersistedCounts +// snapshot to fold in dead-letter-growth + stuck-poller signals on top of the +// reachability signal. +public class durability_health_signals_tests +{ + private static readonly Uri AgentUri = new("wolverinedb://test/durability"); + + private static DurabilitySettings Settings(int unhealthyAfter = 3, int stuckAfter = 3, int dlqGrowthThreshold = 100) + { + return new DurabilitySettings + { + HealthConsecutiveFailureUnhealthyThreshold = unhealthyAfter, + HealthStuckPollCycleThreshold = stuckAfter, + HealthDeadLetterGrowthPerMinuteThreshold = dlqGrowthThreshold + }; + } + + [Fact] + public void unhealthy_when_status_is_not_running() + { + var signals = new DurabilityHealthSignals(Settings()); + + var result = signals.Evaluate(AgentStatus.Stopped, AgentUri, counts: null, DateTimeOffset.UtcNow); + + result.Status.ShouldBe(HealthStatus.Unhealthy); + result.Description.ShouldContain("Stopped"); + } + + [Fact] + public void healthy_when_running_and_no_failures_and_counts_unchanged() + { + var signals = new DurabilityHealthSignals(Settings()); + var t0 = DateTimeOffset.UtcNow; + var counts = new PersistedCounts(); + + // First evaluation establishes baseline; second is the real check + signals.Evaluate(AgentStatus.Running, AgentUri, counts, t0).Status.ShouldBe(HealthStatus.Healthy); + signals.Evaluate(AgentStatus.Running, AgentUri, counts, t0.AddMinutes(1)).Status.ShouldBe(HealthStatus.Healthy); + } + + [Fact] + public void degraded_after_a_single_failure_then_healthy_after_success() + { + var signals = new DurabilityHealthSignals(Settings(unhealthyAfter: 3)); + signals.RecordPollFailure(new InvalidOperationException("simulated DB timeout")); + + var degraded = signals.Evaluate(AgentStatus.Running, AgentUri, counts: null, DateTimeOffset.UtcNow); + degraded.Status.ShouldBe(HealthStatus.Degraded); + degraded.Description.ShouldContain("simulated DB timeout"); + + signals.RecordPollSuccess(); + + var healthy = signals.Evaluate(AgentStatus.Running, AgentUri, counts: null, DateTimeOffset.UtcNow); + healthy.Status.ShouldBe(HealthStatus.Healthy); + } + + [Fact] + public void unhealthy_after_consecutive_failure_threshold() + { + var signals = new DurabilityHealthSignals(Settings(unhealthyAfter: 3)); + + signals.RecordPollFailure(new Exception("boom")); + signals.RecordPollFailure(new Exception("boom")); + signals.RecordPollFailure(new Exception("third strike")); + + var result = signals.Evaluate(AgentStatus.Running, AgentUri, counts: null, DateTimeOffset.UtcNow); + + result.Status.ShouldBe(HealthStatus.Unhealthy); + result.Description.ShouldContain("3 consecutive cycles"); + result.Description.ShouldContain("third strike"); + } + + [Fact] + public void degraded_when_dead_letter_queue_grows_above_threshold() + { + var signals = new DurabilityHealthSignals(Settings(dlqGrowthThreshold: 100)); + var t0 = DateTimeOffset.UtcNow; + + // Baseline: 0 dead letters + signals.Evaluate(AgentStatus.Running, AgentUri, new PersistedCounts(), t0); + + // 1 minute later: +200 dead letters → 200/min, above the 100/min threshold + var grown = new PersistedCounts { DeadLetter = 200 }; + var result = signals.Evaluate(AgentStatus.Running, AgentUri, grown, t0.AddMinutes(1)); + + result.Status.ShouldBe(HealthStatus.Degraded); + result.Description.ShouldContain("Dead-letter queue grew by 200"); + } + + [Fact] + public void healthy_when_dead_letter_queue_growth_below_threshold() + { + var signals = new DurabilityHealthSignals(Settings(dlqGrowthThreshold: 100)); + var t0 = DateTimeOffset.UtcNow; + + signals.Evaluate(AgentStatus.Running, AgentUri, new PersistedCounts(), t0); + + var grown = new PersistedCounts { DeadLetter = 5 }; + var result = signals.Evaluate(AgentStatus.Running, AgentUri, grown, t0.AddMinutes(1)); + + result.Status.ShouldBe(HealthStatus.Healthy); + } + + [Fact] + public void degraded_when_recovery_pending_does_not_drain_for_threshold_cycles() + { + var signals = new DurabilityHealthSignals(Settings(stuckAfter: 3)); + var t0 = DateTimeOffset.UtcNow; + var stuck = new PersistedCounts { Incoming = 50, Outgoing = 25 }; + + // Baseline establishes pendingPrev = 75 + signals.Evaluate(AgentStatus.Running, AgentUri, stuck, t0); + + // Three more evals with same-or-higher pending + signals.Evaluate(AgentStatus.Running, AgentUri, stuck, t0.AddSeconds(10)).Status.ShouldBe(HealthStatus.Healthy); + signals.Evaluate(AgentStatus.Running, AgentUri, stuck, t0.AddSeconds(20)).Status.ShouldBe(HealthStatus.Healthy); + + var result = signals.Evaluate(AgentStatus.Running, AgentUri, stuck, t0.AddSeconds(30)); + result.Status.ShouldBe(HealthStatus.Degraded); + result.Description.ShouldContain("Recovery batch may be stuck"); + result.Description.ShouldContain("75 pending"); + } + + [Fact] + public void recovery_stuck_counter_resets_when_pending_decreases() + { + var signals = new DurabilityHealthSignals(Settings(stuckAfter: 3)); + var t0 = DateTimeOffset.UtcNow; + + var heavy = new PersistedCounts { Incoming = 100 }; + signals.Evaluate(AgentStatus.Running, AgentUri, heavy, t0); + signals.Evaluate(AgentStatus.Running, AgentUri, heavy, t0.AddSeconds(10)); + signals.Evaluate(AgentStatus.Running, AgentUri, heavy, t0.AddSeconds(20)); + + // Pending drains; counter should reset + var lighter = new PersistedCounts { Incoming = 10 }; + var result = signals.Evaluate(AgentStatus.Running, AgentUri, lighter, t0.AddSeconds(30)); + result.Status.ShouldBe(HealthStatus.Healthy); + + // Have to climb back up the threshold from zero + signals.Evaluate(AgentStatus.Running, AgentUri, lighter, t0.AddSeconds(40)).Status.ShouldBe(HealthStatus.Healthy); + signals.Evaluate(AgentStatus.Running, AgentUri, lighter, t0.AddSeconds(50)).Status.ShouldBe(HealthStatus.Healthy); + } + + [Fact] + public void degraded_when_scheduled_count_does_not_drain() + { + var signals = new DurabilityHealthSignals(Settings(stuckAfter: 3)); + var t0 = DateTimeOffset.UtcNow; + var pending = new PersistedCounts { Scheduled = 42 }; + + signals.Evaluate(AgentStatus.Running, AgentUri, pending, t0); + signals.Evaluate(AgentStatus.Running, AgentUri, pending, t0.AddSeconds(10)); + signals.Evaluate(AgentStatus.Running, AgentUri, pending, t0.AddSeconds(20)); + var result = signals.Evaluate(AgentStatus.Running, AgentUri, pending, t0.AddSeconds(30)); + + result.Status.ShouldBe(HealthStatus.Degraded); + result.Description.ShouldContain("Scheduled-job poller may be stuck"); + result.Description.ShouldContain("42 scheduled"); + } + + [Fact] + public void status_takes_precedence_over_count_signals() + { + // Even with a clean recent history, a non-Running status flips Unhealthy. + var signals = new DurabilityHealthSignals(Settings()); + signals.RecordPollSuccess(); + + var result = signals.Evaluate(AgentStatus.Stopped, AgentUri, new PersistedCounts(), DateTimeOffset.UtcNow); + + result.Status.ShouldBe(HealthStatus.Unhealthy); + } + + [Fact] + public void multiple_degraded_signals_aggregate_into_one_description() + { + var signals = new DurabilityHealthSignals(Settings(stuckAfter: 2, dlqGrowthThreshold: 10)); + var t0 = DateTimeOffset.UtcNow; + + signals.Evaluate(AgentStatus.Running, AgentUri, new PersistedCounts { Incoming = 50 }, t0); + signals.Evaluate(AgentStatus.Running, AgentUri, + new PersistedCounts { Incoming = 50, DeadLetter = 100 }, t0.AddMinutes(1)); + + // One more tick at the threshold; DLQ stays put (no further growth) so only the stuck signal fires now. + signals.RecordPollFailure(new Exception("transient blip")); + var result = signals.Evaluate(AgentStatus.Running, AgentUri, + new PersistedCounts { Incoming = 50, DeadLetter = 100 }, t0.AddMinutes(2)); + + result.Status.ShouldBe(HealthStatus.Degraded); + result.Description.ShouldContain("Last persistence poll failed"); + result.Description.ShouldContain("Recovery batch may be stuck"); + } + + [Fact] + public void exposes_consecutive_failure_count_for_diagnostics() + { + var signals = new DurabilityHealthSignals(Settings()); + signals.ConsecutiveFailureCount.ShouldBe(0); + + signals.RecordPollFailure(new Exception("a")); + signals.RecordPollFailure(new Exception("b")); + signals.ConsecutiveFailureCount.ShouldBe(2); + + signals.RecordPollSuccess(); + signals.ConsecutiveFailureCount.ShouldBe(0); + } +} diff --git a/src/Wolverine/DurabilitySettings.cs b/src/Wolverine/DurabilitySettings.cs index 9df459154..621a48d10 100644 --- a/src/Wolverine/DurabilitySettings.cs +++ b/src/Wolverine/DurabilitySettings.cs @@ -222,6 +222,27 @@ internal set /// public TimeSpan NodeEventRecordExpirationTime { get; set; } = 5.Days(); + /// + /// Health-check threshold for dead-letter-queue growth. When the persisted DLQ count grows + /// faster than this many envelopes per minute between two consecutive health-check evaluations, + /// the durability agent reports Degraded. Default is 100/min. See #2646. + /// + public int HealthDeadLetterGrowthPerMinuteThreshold { get; set; } = 100; + + /// + /// Health-check threshold for stuck recovery / scheduled-job pollers. When the persisted + /// inbox+outbox count (or the scheduled count) is non-zero and has not decreased over this + /// many consecutive evaluations, the durability agent reports Degraded. Default is 3. See #2646. + /// + public int HealthStuckPollCycleThreshold { get; set; } = 3; + + /// + /// Health-check threshold for consecutive persistence-layer failures. After this many + /// consecutive failed poll cycles, the durability agent reports Unhealthy (a single failure + /// reports Degraded). Default is 3. See #2646. + /// + public int HealthConsecutiveFailureUnhealthyThreshold { get; set; } = 3; + /// /// How long a sending agent can be idle before it is considered stale /// and eligible for cleanup. Default is 5 minutes. diff --git a/src/Wolverine/Persistence/Durability/DurabilityHealthSignals.cs b/src/Wolverine/Persistence/Durability/DurabilityHealthSignals.cs new file mode 100644 index 000000000..24cbdf2c2 --- /dev/null +++ b/src/Wolverine/Persistence/Durability/DurabilityHealthSignals.cs @@ -0,0 +1,209 @@ +using JasperFx; +using Microsoft.Extensions.Diagnostics.HealthChecks; +using Wolverine.Logging; + +namespace Wolverine.Persistence.Durability; + +/// +/// Shared, mutable state + evaluator that lets a durability-style agent surface richer +/// per-store health signals than the default `Status == Running ? Healthy : Unhealthy`. +/// One instance per agent. The agent's poll loop calls +/// and ; the agent's CheckHealthAsync implementation +/// calls , which folds in fresh persisted-counts deltas. +/// +/// Three signals are layered on top of : +/// +/// 1. Persistence reachability — driven by / +/// . One failed cycle ⇒ Degraded; +/// or more consecutive failures ⇒ Unhealthy. The most recent failure's message is +/// surfaced as the description so operators see e.g. "Persistence unreachable: connection timeout". +/// +/// 2. Dead-letter growth — at each , the previous +/// snapshot is compared against the current +/// one. If the rate exceeds , +/// Degraded. +/// +/// 3. Stuck recovery / scheduled-job pollers — if persisted inbox+outbox or +/// scheduled counts stay non-zero and never decrease across +/// evaluations, Degraded. +/// +/// Status precedence: Status != Running always returns Unhealthy first. Then +/// consecutive-failure Unhealthy. Otherwise the worst of the remaining signals +/// (Unhealthy > Degraded > Healthy) is returned. +/// +public sealed class DurabilityHealthSignals +{ + private readonly object _lock = new(); + private readonly DurabilitySettings _settings; + + private int _consecutiveFailures; + private string? _lastFailureMessage; + + private PersistedCounts? _previousCounts; + private DateTimeOffset _previousCountsAt; + + private int _stuckRecoveryCycles; + private int _stuckScheduledCycles; + + public DurabilityHealthSignals(DurabilitySettings settings) + { + _settings = settings; + } + + /// Reset the consecutive-failure counter after a clean poll cycle. + public void RecordPollSuccess() + { + lock (_lock) + { + _consecutiveFailures = 0; + _lastFailureMessage = null; + } + } + + /// Bump the consecutive-failure counter and remember the most recent error message. + public void RecordPollFailure(Exception exception) + { + if (exception is null) throw new ArgumentNullException(nameof(exception)); + + lock (_lock) + { + _consecutiveFailures++; + _lastFailureMessage = exception.Message; + } + } + + /// + /// Test-only — surface the current consecutive-failure count. + /// + internal int ConsecutiveFailureCount + { + get { lock (_lock) return _consecutiveFailures; } + } + + /// + /// Compute a HealthCheckResult that folds the agent's together + /// with the recorded poll-failure history and a fresh fetch of persisted counts. Pass + /// null for to skip the count-based signals (e.g. when + /// the caller could not fetch counts because the store is down — the consecutive-failure + /// signal will already capture that). + /// + public HealthCheckResult Evaluate(AgentStatus status, Uri agentUri, PersistedCounts? counts, DateTimeOffset now) + { + if (status != AgentStatus.Running) + { + return HealthCheckResult.Unhealthy($"Agent {agentUri} is {status}"); + } + + int consecutiveFailures; + string? lastFailureMessage; + lock (_lock) + { + consecutiveFailures = _consecutiveFailures; + lastFailureMessage = _lastFailureMessage; + } + + // Reachability — fail fast on consecutive failures before anything else. + if (consecutiveFailures >= _settings.HealthConsecutiveFailureUnhealthyThreshold) + { + return HealthCheckResult.Unhealthy( + $"Persistence unreachable for {consecutiveFailures} consecutive cycles" + + (lastFailureMessage is null ? "" : $": {lastFailureMessage}")); + } + + var degraded = new List(capacity: 4); + + if (consecutiveFailures > 0) + { + degraded.Add($"Last persistence poll failed" + + (lastFailureMessage is null ? "" : $": {lastFailureMessage}")); + } + + if (counts is not null) + { + // First evaluation: just snapshot and skip every count-based signal. Without a + // previous baseline there's nothing meaningful to compare against; the next + // evaluation will be the real one. + if (_previousCounts is null) + { + _previousCounts = Snapshot(counts); + _previousCountsAt = now; + } + else + { + EvaluateDeadLetterGrowth(counts, now, degraded); + EvaluateStuckPollers(counts, degraded); + _previousCounts = Snapshot(counts); + _previousCountsAt = now; + } + } + + return degraded.Count == 0 + ? HealthCheckResult.Healthy() + : HealthCheckResult.Degraded(string.Join("; ", degraded)); + } + + private void EvaluateDeadLetterGrowth(PersistedCounts counts, DateTimeOffset now, List degraded) + { + var deltaCount = counts.DeadLetter - _previousCounts!.DeadLetter; + var elapsed = now - _previousCountsAt; + if (elapsed > TimeSpan.Zero && deltaCount > 0) + { + var perMinute = deltaCount / Math.Max(elapsed.TotalMinutes, 1.0 / 60); + if (perMinute >= _settings.HealthDeadLetterGrowthPerMinuteThreshold) + { + degraded.Add( + $"Dead-letter queue grew by {deltaCount} ({perMinute:F0}/min, threshold " + + $"{_settings.HealthDeadLetterGrowthPerMinuteThreshold}/min)"); + } + } + } + + private void EvaluateStuckPollers(PersistedCounts counts, List degraded) + { + var threshold = _settings.HealthStuckPollCycleThreshold; + + var pendingNow = counts.Incoming + counts.Outgoing; + var pendingPrev = _previousCounts!.Incoming + _previousCounts.Outgoing; + if (pendingNow > 0 && pendingNow >= pendingPrev) + { + _stuckRecoveryCycles++; + if (_stuckRecoveryCycles >= threshold) + { + degraded.Add( + $"Recovery batch may be stuck — {pendingNow} pending envelopes (inbox+outbox) " + + $"have not drained over {_stuckRecoveryCycles} consecutive checks"); + } + } + else + { + _stuckRecoveryCycles = 0; + } + + if (counts.Scheduled > 0 && counts.Scheduled >= _previousCounts.Scheduled) + { + _stuckScheduledCycles++; + if (_stuckScheduledCycles >= threshold) + { + degraded.Add( + $"Scheduled-job poller may be stuck — {counts.Scheduled} scheduled envelopes " + + $"have not drained over {_stuckScheduledCycles} consecutive checks"); + } + } + else + { + _stuckScheduledCycles = 0; + } + } + + private static PersistedCounts Snapshot(PersistedCounts source) + { + return new PersistedCounts + { + Incoming = source.Incoming, + Outgoing = source.Outgoing, + Scheduled = source.Scheduled, + DeadLetter = source.DeadLetter, + Handled = source.Handled + }; + } +}