diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.DotNet.verified.txt index b6a437a6d2b..110606c472d 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.DotNet.verified.txt @@ -113,6 +113,18 @@ namespace Akka.Persistence public override int GetHashCode() { } public override string ToString() { } } + public sealed class CheckJournalHealth : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IJournalMessage, Akka.Persistence.IJournalRequest, Akka.Persistence.IPersistenceMessage + { + public CheckJournalHealth(System.Threading.CancellationToken cancellationToken) { } + public System.Threading.CancellationToken CancellationToken { get; } + public override string ToString() { } + } + public sealed class CheckSnapshotStoreHealth : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IPersistenceMessage, Akka.Persistence.ISnapshotMessage, Akka.Persistence.ISnapshotRequest + { + public CheckSnapshotStoreHealth(System.Threading.CancellationToken cancellationToken) { } + public System.Threading.CancellationToken CancellationToken { get; } + public override string ToString() { } + } public sealed class DeleteMessagesFailure : Akka.Actor.INoSerializationVerificationNeeded, System.IEquatable { public DeleteMessagesFailure(System.Exception cause, long toSequenceNr) { } @@ -318,6 +330,12 @@ namespace Akka.Persistence { Akka.Persistence.IStashOverflowStrategy Create(Akka.Configuration.Config config); } + public sealed class JournalHealthCheckResponse : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IJournalMessage, Akka.Persistence.IJournalResponse, Akka.Persistence.IPersistenceMessage + { + public JournalHealthCheckResponse(Akka.Persistence.PersistenceHealthCheckResult result) { } + public Akka.Persistence.PersistenceHealthCheckResult Result { get; } + public override string ToString() { } + } public sealed class LoadSnapshot : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IPersistenceMessage, Akka.Persistence.ISnapshotMessage, Akka.Persistence.ISnapshotRequest, System.IEquatable { public LoadSnapshot(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, long toSequenceNr) { } @@ -377,12 +395,37 @@ namespace Akka.Persistence public Akka.Persistence.IStashOverflowStrategy DefaultInternalStashOverflowStrategy { get; } public Akka.Persistence.PersistenceSettings Settings { get; } public Akka.Persistence.Journal.EventAdapters AdaptersFor(string journalPluginId) { } + public System.Threading.Tasks.Task CheckJournalHealthAsync(string journalPluginId, System.Threading.CancellationToken cancellationToken = null) { } + public System.Threading.Tasks.Task CheckSnapshotStoreHealthAsync(string snapshotStorePluginId, System.Threading.CancellationToken cancellationToken = null) { } [Akka.Annotations.InternalStableApiAttribute()] public Akka.Actor.IActorRef JournalFor(string journalPluginId) { } public string PersistenceId(Akka.Actor.IActorRef actor) { } [Akka.Annotations.InternalStableApiAttribute()] public Akka.Actor.IActorRef SnapshotStoreFor(string snapshotPluginId) { } } + [System.Runtime.CompilerServices.IsReadOnlyAttribute()] + [System.Runtime.CompilerServices.NullableAttribute(0)] + public struct PersistenceHealthCheckResult : System.IEquatable + { + public PersistenceHealthCheckResult(Akka.Persistence.PersistenceHealthStatus Status, string Description = null, System.Exception Exception = null, [System.Runtime.CompilerServices.NullableAttribute(new byte[] { + 2, + 0, + 0})] System.Collections.Generic.IReadOnlyDictionary Data = null) { } + [System.Runtime.CompilerServices.NullableAttribute(new byte[] { + 2, + 0, + 0})] + public System.Collections.Generic.IReadOnlyDictionary Data { get; set; } + public string Description { get; set; } + public System.Exception Exception { get; set; } + public Akka.Persistence.PersistenceHealthStatus Status { get; set; } + } + public enum PersistenceHealthStatus + { + Healthy = 0, + Degraded = 1, + Unhealthy = 2, + } public sealed class PersistenceSettings : Akka.Actor.Settings { public PersistenceSettings(Akka.Actor.ActorSystem system, Akka.Configuration.Config config) { } @@ -627,6 +670,12 @@ namespace Akka.Persistence public override int GetHashCode() { } public override string ToString() { } } + public sealed class SnapshotStoreHealthCheckResponse : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IPersistenceMessage, Akka.Persistence.ISnapshotMessage, Akka.Persistence.ISnapshotResponse + { + public SnapshotStoreHealthCheckResponse(Akka.Persistence.PersistenceHealthCheckResult result) { } + public Akka.Persistence.PersistenceHealthCheckResult Result { get; } + public override string ToString() { } + } public sealed class StashingHandlerInvocation : Akka.Persistence.IPendingHandlerInvocation { public StashingHandlerInvocation(object evt, System.Action handler) { } @@ -857,6 +906,7 @@ namespace Akka.Persistence.Journal { protected readonly bool CanPublish; protected AsyncWriteJournal() { } + public virtual System.Threading.Tasks.Task CheckHealthAsync(System.Threading.CancellationToken cancellationToken = null) { } protected abstract System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, System.Threading.CancellationToken cancellationToken); public abstract System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, System.Threading.CancellationToken cancellationToken); protected virtual bool Receive(object message) { } @@ -1215,6 +1265,7 @@ namespace Akka.Persistence.Snapshot public abstract class SnapshotStore : Akka.Actor.ActorBase { protected SnapshotStore() { } + public virtual System.Threading.Tasks.Task CheckHealthAsync(System.Threading.CancellationToken cancellationToken = null) { } protected abstract System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata, System.Threading.CancellationToken cancellationToken); protected abstract System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken); protected abstract System.Threading.Tasks.Task LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken); diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Net.verified.txt index 5b8a811b2bb..f33ef77425a 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Net.verified.txt @@ -113,6 +113,18 @@ namespace Akka.Persistence public override int GetHashCode() { } public override string ToString() { } } + public sealed class CheckJournalHealth : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IJournalMessage, Akka.Persistence.IJournalRequest, Akka.Persistence.IPersistenceMessage + { + public CheckJournalHealth(System.Threading.CancellationToken cancellationToken) { } + public System.Threading.CancellationToken CancellationToken { get; } + public override string ToString() { } + } + public sealed class CheckSnapshotStoreHealth : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IPersistenceMessage, Akka.Persistence.ISnapshotMessage, Akka.Persistence.ISnapshotRequest + { + public CheckSnapshotStoreHealth(System.Threading.CancellationToken cancellationToken) { } + public System.Threading.CancellationToken CancellationToken { get; } + public override string ToString() { } + } public sealed class DeleteMessagesFailure : Akka.Actor.INoSerializationVerificationNeeded, System.IEquatable { public DeleteMessagesFailure(System.Exception cause, long toSequenceNr) { } @@ -318,6 +330,12 @@ namespace Akka.Persistence { Akka.Persistence.IStashOverflowStrategy Create(Akka.Configuration.Config config); } + public sealed class JournalHealthCheckResponse : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IJournalMessage, Akka.Persistence.IJournalResponse, Akka.Persistence.IPersistenceMessage + { + public JournalHealthCheckResponse(Akka.Persistence.PersistenceHealthCheckResult result) { } + public Akka.Persistence.PersistenceHealthCheckResult Result { get; } + public override string ToString() { } + } public sealed class LoadSnapshot : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IPersistenceMessage, Akka.Persistence.ISnapshotMessage, Akka.Persistence.ISnapshotRequest, System.IEquatable { public LoadSnapshot(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, long toSequenceNr) { } @@ -377,12 +395,36 @@ namespace Akka.Persistence public Akka.Persistence.IStashOverflowStrategy DefaultInternalStashOverflowStrategy { get; } public Akka.Persistence.PersistenceSettings Settings { get; } public Akka.Persistence.Journal.EventAdapters AdaptersFor(string journalPluginId) { } + public System.Threading.Tasks.Task CheckJournalHealthAsync(string journalPluginId, System.Threading.CancellationToken cancellationToken = null) { } + public System.Threading.Tasks.Task CheckSnapshotStoreHealthAsync(string snapshotStorePluginId, System.Threading.CancellationToken cancellationToken = null) { } [Akka.Annotations.InternalStableApiAttribute()] public Akka.Actor.IActorRef JournalFor(string journalPluginId) { } public string PersistenceId(Akka.Actor.IActorRef actor) { } [Akka.Annotations.InternalStableApiAttribute()] public Akka.Actor.IActorRef SnapshotStoreFor(string snapshotPluginId) { } } + [System.Runtime.CompilerServices.NullableAttribute(0)] + public struct PersistenceHealthCheckResult : System.IEquatable + { + public PersistenceHealthCheckResult(Akka.Persistence.PersistenceHealthStatus Status, string Description = null, System.Exception Exception = null, [System.Runtime.CompilerServices.NullableAttribute(new byte[] { + 2, + 0, + 0})] System.Collections.Generic.IReadOnlyDictionary Data = null) { } + [System.Runtime.CompilerServices.NullableAttribute(new byte[] { + 2, + 0, + 0})] + public System.Collections.Generic.IReadOnlyDictionary Data { get; set; } + public string Description { get; set; } + public System.Exception Exception { get; set; } + public Akka.Persistence.PersistenceHealthStatus Status { get; set; } + } + public enum PersistenceHealthStatus + { + Healthy = 0, + Degraded = 1, + Unhealthy = 2, + } public sealed class PersistenceSettings : Akka.Actor.Settings { public PersistenceSettings(Akka.Actor.ActorSystem system, Akka.Configuration.Config config) { } @@ -627,6 +669,12 @@ namespace Akka.Persistence public override int GetHashCode() { } public override string ToString() { } } + public sealed class SnapshotStoreHealthCheckResponse : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IPersistenceMessage, Akka.Persistence.ISnapshotMessage, Akka.Persistence.ISnapshotResponse + { + public SnapshotStoreHealthCheckResponse(Akka.Persistence.PersistenceHealthCheckResult result) { } + public Akka.Persistence.PersistenceHealthCheckResult Result { get; } + public override string ToString() { } + } public sealed class StashingHandlerInvocation : Akka.Persistence.IPendingHandlerInvocation { public StashingHandlerInvocation(object evt, System.Action handler) { } @@ -857,6 +905,7 @@ namespace Akka.Persistence.Journal { protected readonly bool CanPublish; protected AsyncWriteJournal() { } + public virtual System.Threading.Tasks.Task CheckHealthAsync(System.Threading.CancellationToken cancellationToken = null) { } protected abstract System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, System.Threading.CancellationToken cancellationToken); public abstract System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, System.Threading.CancellationToken cancellationToken); protected virtual bool Receive(object message) { } @@ -1213,6 +1262,7 @@ namespace Akka.Persistence.Snapshot public abstract class SnapshotStore : Akka.Actor.ActorBase { protected SnapshotStore() { } + public virtual System.Threading.Tasks.Task CheckHealthAsync(System.Threading.CancellationToken cancellationToken = null) { } protected abstract System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata, System.Threading.CancellationToken cancellationToken); protected abstract System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken); protected abstract System.Threading.Tasks.Task LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken); diff --git a/src/core/Akka.Persistence.Tests/JournalHealthCheckSpec.cs b/src/core/Akka.Persistence.Tests/JournalHealthCheckSpec.cs new file mode 100644 index 00000000000..eb025970658 --- /dev/null +++ b/src/core/Akka.Persistence.Tests/JournalHealthCheckSpec.cs @@ -0,0 +1,132 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Threading; +using System.Threading.Tasks; +using Akka.Configuration; +using Akka.Persistence.Journal; +using Akka.TestKit; +using Akka.TestKit.Configs; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.Tests; + +public class JournalHealthCheckSpec : PersistenceSpec +{ + private static Config HealthCheckConfig() + { + const string extraConfig = """ + + akka.persistence.journal.failing-open { + class = "Akka.Persistence.Tests.FailingJournal, Akka.Persistence.Tests" + circuit-breaker { + max-failures = 1 + call-timeout = 1s + reset-timeout = 10s + } + } + akka.persistence.journal.failing-half-open { + class = "Akka.Persistence.Tests.FailingJournal, Akka.Persistence.Tests" + circuit-breaker { + max-failures = 1 + call-timeout = 1s + reset-timeout = 1s + } + } + # Disable message serialization for circuit breaker tests to avoid serialization issues + akka.actor.serialize-messages = off + + """; + return TestConfigs.TestSchedulerConfig + .WithFallback(Configuration("PersistenceHealthCheckSpec", extraConfig: extraConfig)); + } + + public JournalHealthCheckSpec(ITestOutputHelper output) : base(HealthCheckConfig(), output) + { + } + + [Theory] + [InlineData(null)] // default plugin + [InlineData("akka.persistence.journal.inmem")] + public async Task JournalHealthCheck_should_default_to_Healthy(string? pluginId) + { + using var cts = new CancellationTokenSource(RemainingOrDefault); + var pluginHealth = await Extension.CheckJournalHealthAsync(pluginId, cts.Token); + + Assert.Equal(PersistenceHealthStatus.Healthy, pluginHealth.Status); + Assert.NotNull(pluginHealth.Description); + } + + [Fact] + public async Task JournalHealthCheck_should_return_Degraded_when_CircuitBreaker_is_Open() + { + // Get the journal actor reference + var journal = Extension.JournalFor("akka.persistence.journal.failing-open"); + + // Trigger a failure to open the circuit breaker + var writeMsg = new WriteMessages(new[] { new AtomicWrite(new Persistent("test", 1, "test-pid")) }.ToImmutableList(), + TestActor, 1); + journal.Tell(writeMsg, TestActor); + + // Advance time to let the write fail and circuit breaker open + var testScheduler = (TestScheduler)Sys.Scheduler; + testScheduler.Advance(TimeSpan.FromSeconds(2)); + + using var cts = new CancellationTokenSource(RemainingOrDefault); + var pluginHealth = await Extension.CheckJournalHealthAsync("akka.persistence.journal.failing-open", cts.Token); + + Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status); + Assert.Contains("Circuit breaker is open", pluginHealth.Description); + } + + [Fact] + public async Task JournalHealthCheck_should_return_Degraded_when_CircuitBreaker_is_HalfOpen() + { + // Get the journal actor reference + var journal = Extension.JournalFor("akka.persistence.journal.failing-half-open"); + + // Trigger a failure to open the circuit breaker + var writeMsg = new WriteMessages(new[] { new AtomicWrite(new Persistent("test", 1, "test-pid")) }.ToImmutableList(), + TestActor, 1); + journal.Tell(writeMsg, TestActor); + + var testScheduler = (TestScheduler)Sys.Scheduler; + + // Advance time past call-timeout to let the write fail and circuit breaker open + testScheduler.Advance(TimeSpan.FromSeconds(1)); + + // Give the async operations time to complete + await Task.Delay(100); + + // Advance time past reset-timeout to transition to half-open + testScheduler.Advance(TimeSpan.FromSeconds(1)); + + // Give the transition time to complete + await Task.Delay(100); + + using var cts = new CancellationTokenSource(RemainingOrDefault); + var pluginHealth = await Extension.CheckJournalHealthAsync("akka.persistence.journal.failing-half-open", cts.Token); + + Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status); + Assert.Contains("Circuit breaker is half-open", pluginHealth.Description); + } +} + +/// +/// Test journal that always fails writes to trigger circuit breaker +/// +public class FailingJournal : MemoryJournal +{ + protected override Task> WriteMessagesAsync(IEnumerable messages, CancellationToken cancellationToken) + { + throw new InvalidOperationException("Simulated journal write failure"); + } +} \ No newline at end of file diff --git a/src/core/Akka.Persistence.Tests/SnapshotStoreHealthCheckSpec.cs b/src/core/Akka.Persistence.Tests/SnapshotStoreHealthCheckSpec.cs new file mode 100644 index 00000000000..51685948614 --- /dev/null +++ b/src/core/Akka.Persistence.Tests/SnapshotStoreHealthCheckSpec.cs @@ -0,0 +1,128 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.Threading; +using System.Threading.Tasks; +using Akka.Configuration; +using Akka.Persistence.Snapshot; +using Akka.TestKit; +using Akka.TestKit.Configs; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.Tests; + +public class SnapshotStoreHealthCheckSpec : PersistenceSpec +{ + private static Config HealthCheckConfig() + { + const string extraConfig = """ + + akka.persistence.snapshot-store.failing-open { + class = "Akka.Persistence.Tests.FailingSnapshotStore, Akka.Persistence.Tests" + circuit-breaker { + max-failures = 1 + call-timeout = 1s + reset-timeout = 10s + } + } + akka.persistence.snapshot-store.failing-half-open { + class = "Akka.Persistence.Tests.FailingSnapshotStore, Akka.Persistence.Tests" + circuit-breaker { + max-failures = 1 + call-timeout = 1s + reset-timeout = 1s + } + } + # Disable message serialization for circuit breaker tests to avoid serialization issues + akka.actor.serialize-messages = off + + """; + return TestConfigs.TestSchedulerConfig + .WithFallback(Configuration("SnapshotStoreHealthCheckSpec", extraConfig: extraConfig)); + } + + public SnapshotStoreHealthCheckSpec(ITestOutputHelper output) : base(HealthCheckConfig(), output) + { + } + + [Theory] + [InlineData(null)] // default plugin + [InlineData("akka.persistence.snapshot-store.inmem")] + public async Task SnapshotStoreHealthCheck_should_default_to_Healthy(string? pluginId) + { + using var cts = new CancellationTokenSource(RemainingOrDefault); + var pluginHealth = await Extension.CheckSnapshotStoreHealthAsync(pluginId, cts.Token); + + Assert.Equal(PersistenceHealthStatus.Healthy, pluginHealth.Status); + Assert.NotNull(pluginHealth.Description); + } + + [Fact] + public async Task SnapshotStoreHealthCheck_should_return_Degraded_when_CircuitBreaker_is_Open() + { + // Get the snapshot store actor reference + var snapshotStore = Extension.SnapshotStoreFor("akka.persistence.snapshot-store.failing-open"); + + // Trigger a failure to open the circuit breaker + var saveMsg = new SaveSnapshot(new SnapshotMetadata("test-pid", 1, DateTime.UtcNow), "test-snapshot"); + snapshotStore.Tell(saveMsg, TestActor); + + // Advance time to let the save fail and circuit breaker open + var testScheduler = (TestScheduler)Sys.Scheduler; + testScheduler.Advance(TimeSpan.FromSeconds(2)); + + using var cts = new CancellationTokenSource(RemainingOrDefault); + var pluginHealth = await Extension.CheckSnapshotStoreHealthAsync("akka.persistence.snapshot-store.failing-open", cts.Token); + + Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status); + Assert.Contains("Circuit breaker is open", pluginHealth.Description); + } + + [Fact] + public async Task SnapshotStoreHealthCheck_should_return_Degraded_when_CircuitBreaker_is_HalfOpen() + { + // Get the snapshot store actor reference + var snapshotStore = Extension.SnapshotStoreFor("akka.persistence.snapshot-store.failing-half-open"); + + // Trigger a failure to open the circuit breaker + var saveMsg = new SaveSnapshot(new SnapshotMetadata("test-pid", 1, DateTime.UtcNow), "test-snapshot"); + snapshotStore.Tell(saveMsg, TestActor); + + var testScheduler = (TestScheduler)Sys.Scheduler; + + // Advance time past call-timeout to let the save fail and circuit breaker open + testScheduler.Advance(TimeSpan.FromSeconds(1)); + + // Give the async operations time to complete + await Task.Delay(100); + + // Advance time past reset-timeout to transition to half-open + testScheduler.Advance(TimeSpan.FromSeconds(1)); + + // Give the transition time to complete + await Task.Delay(100); + + using var cts = new CancellationTokenSource(RemainingOrDefault); + var pluginHealth = await Extension.CheckSnapshotStoreHealthAsync("akka.persistence.snapshot-store.failing-half-open", cts.Token); + + Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status); + Assert.Contains("Circuit breaker is half-open", pluginHealth.Description); + } +} + +/// +/// Test snapshot store that always fails saves to trigger circuit breaker +/// +public class FailingSnapshotStore : LocalSnapshotStore +{ + protected override Task SaveAsync(SnapshotMetadata metadata, object snapshot, CancellationToken cancellationToken) + { + throw new InvalidOperationException("Simulated snapshot store save failure"); + } +} \ No newline at end of file diff --git a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs index 779ccdff36e..4939d2cd342 100644 --- a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs +++ b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs @@ -9,7 +9,6 @@ using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; -using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Akka.Actor; @@ -25,6 +24,7 @@ public abstract class AsyncWriteJournal : WriteJournalBase, IAsyncRecovery { protected readonly bool CanPublish; private readonly CircuitBreaker _breaker; + private readonly ReplayFilterMode _replayFilterMode; private readonly bool _isReplayFilterEnabled; private readonly int _replayFilterWindowSize; @@ -32,6 +32,8 @@ public abstract class AsyncWriteJournal : WriteJournalBase, IAsyncRecovery private readonly bool _replayDebugEnabled; private readonly IActorRef _resequencer; + private readonly IReadOnlyDictionary _defaultHealthCheckTags; + private long _resequencerCounter = 1L; /// @@ -84,6 +86,26 @@ protected AsyncWriteJournal() _replayDebugEnabled = config.GetBoolean("replay-filter.debug", false); _resequencer = Context.ActorOf(Props.Create(() => new Resequencer()), "resequencer"); + _defaultHealthCheckTags = new Dictionary + { + { "journal", Self.Path.Name } + }; + } + + /// + /// Health check for the journal. + /// + /// Cancellation token for the health check invocation. + /// A with a health status and optional error message. + public virtual Task CheckHealthAsync(CancellationToken cancellationToken = default) + { + if(_breaker.IsHalfOpen) + return Task.FromResult(new PersistenceHealthCheckResult(PersistenceHealthStatus.Degraded, + $"Circuit breaker is half-open, some operations may be failing intermittently", _breaker.LastCaughtException, _defaultHealthCheckTags)); + if(_breaker.IsOpen) + return Task.FromResult(new PersistenceHealthCheckResult(PersistenceHealthStatus.Degraded, + $"Circuit breaker is open, some operations may be failing intermittently", _breaker.LastCaughtException, _defaultHealthCheckTags)); + return Task.FromResult(new PersistenceHealthCheckResult(PersistenceHealthStatus.Healthy, Description:"Ok", Data: _defaultHealthCheckTags)); } /// @@ -162,7 +184,7 @@ protected AsyncWriteJournal() /// /// This call is protected with a circuit-breaker. /// - /// TBD + /// The set of messages to write. /// used to signal cancelled snapshot operation protected abstract Task> WriteMessagesAsync(IEnumerable messages, CancellationToken cancellationToken); @@ -170,8 +192,8 @@ protected AsyncWriteJournal() /// Asynchronously deletes all persistent messages up to inclusive /// bound. /// - /// TBD - /// TBD + /// The id of the entity. + /// The inclusive upper-bound of sequence numbers to delete. /// used to signal cancelled snapshot operation protected abstract Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, CancellationToken cancellationToken); @@ -179,8 +201,8 @@ protected AsyncWriteJournal() /// Plugin API: Allows plugin implementers to use f.PipeTo(Self) /// and handle additional messages for implementing advanced features /// - /// TBD - /// TBD + /// The message to receive + /// true if the message was handled, false otherwise. protected virtual bool ReceivePluginInternal(object message) { return false; @@ -205,6 +227,18 @@ protected bool ReceiveWriteJournal(object message) case DeleteMessagesTo deleteMessagesTo: HandleDeleteMessagesTo(deleteMessagesTo); return true; + case CheckJournalHealth checkHealth: + var sender = Sender; + CheckHealthAsync(checkHealth.CancellationToken) + // PipeTo implementation no longer requires a closure, but better safe than sorry + .PipeTo(sender, + success: result => new JournalHealthCheckResponse(result), + failure: ex => new JournalHealthCheckResponse( + new PersistenceHealthCheckResult(PersistenceHealthStatus.Unhealthy, + "Encountered an exception while performing health check", + ex, + _defaultHealthCheckTags))); + return true; default: return false; } @@ -256,16 +290,6 @@ private void HandleReplayMessages(ReplayMessages message) async Task ExecuteHighestSequenceNr() { - void CompleteHighSeqNo(long highSeqNo) - { - replyTo.Tell(new RecoverySuccess(highSeqNo)); - - if (CanPublish) - { - eventStream.Publish(message); - } - } - try { var highSequenceNr = await _breaker.WithCircuitBreaker((message, readHighestSequenceNrFrom, awj: this), (state, ct) => @@ -306,6 +330,18 @@ await ReplayMessagesAsync(context, message.PersistenceId, message.FromSequenceNr { replyTo.Tell(new ReplayMessagesFailure(TryUnwrapException(ex))); } + + return; + + void CompleteHighSeqNo(long highSeqNo) + { + replyTo.Tell(new RecoverySuccess(highSeqNo)); + + if (CanPublish) + { + eventStream.Publish(message); + } + } } // instead of ContinueWith @@ -315,10 +351,12 @@ await ReplayMessagesAsync(context, message.PersistenceId, message.FromSequenceNr } /// - /// TBD + /// INTERNAL API. + /// + /// used to flatten aggregate exceptions. /// - /// TBD - /// TBD + /// The input exception. + /// A possibly flattened exception. protected static Exception TryUnwrapException(Exception e) { if (e is not AggregateException aggregateException) return e; @@ -371,7 +409,7 @@ private async Task ExecuteBatch(WriteMessages message, int atomicWriteCount, IAc } } - private void ProcessResults(IImmutableList results, int atomicWriteCount, WriteMessages writeMessage, IActorRef resequencer, + private static void ProcessResults(IImmutableList results, int atomicWriteCount, WriteMessages writeMessage, IActorRef resequencer, long resequencerCounter, IActorRef writeJournal) { // there should be no circumstances under which `writeResult` can be `null` @@ -385,11 +423,11 @@ private void ProcessResults(IImmutableList results, int atomicWriteCo : new WriteMessageRejected(x, exception, writeMessage.ActorInstanceId), results, resequencerCounter, writeMessage, resequencer, writeJournal); } - private void Resequence(Func mapper, + private static void Resequence(Func mapper, IImmutableList results, long resequencerCounter, WriteMessages msg, IActorRef resequencer, IActorRef writeJournal) { var i = 0; - var enumerator = results?.GetEnumerator(); + using var enumerator = results?.GetEnumerator(); foreach (var resequencable in msg.Messages) { if (resequencable is AtomicWrite aw) diff --git a/src/core/Akka.Persistence/JournalProtocol.cs b/src/core/Akka.Persistence/JournalProtocol.cs index d97bfb3f208..66c98e69199 100644 --- a/src/core/Akka.Persistence/JournalProtocol.cs +++ b/src/core/Akka.Persistence/JournalProtocol.cs @@ -7,6 +7,7 @@ using System; using System.Collections.Generic; +using System.Threading; using Akka.Actor; using Akka.Event; @@ -180,6 +181,42 @@ public override int GetHashCode() public override string ToString() => $"DeleteMessagesTo"; } + /// + /// Invokes a health check on the journal plugin. + /// + public sealed class CheckJournalHealth : IJournalRequest + { + public CheckJournalHealth(CancellationToken cancellationToken) + { + CancellationToken = cancellationToken; + } + + public CancellationToken CancellationToken { get; } + + public override string ToString() + { + return "CheckJournalHealth"; + } + } + + /// + /// Health check response from the journal. + /// + public sealed class JournalHealthCheckResponse : IJournalResponse + { + public JournalHealthCheckResponse(PersistenceHealthCheckResult result) + { + Result = result; + } + + public PersistenceHealthCheckResult Result { get; } + + public override string ToString() + { + return $"JournalHealthCheckResponse<{Result}>"; + } + } + /// /// Request to write messages. /// diff --git a/src/core/Akka.Persistence/Persistence.cs b/src/core/Akka.Persistence/Persistence.cs index d6d8f052719..6a6b4fd613b 100644 --- a/src/core/Akka.Persistence/Persistence.cs +++ b/src/core/Akka.Persistence/Persistence.cs @@ -9,6 +9,7 @@ using System.Collections.Concurrent; using System.Linq; using System.Threading; +using System.Threading.Tasks; using Akka.Actor; using Akka.Annotations; using Akka.Configuration; @@ -252,6 +253,40 @@ public IActorRef JournalFor(string journalPluginId) return PluginHolderFor(configPath, JournalFallbackConfigPath).Ref; } + /// + /// Shortcut for invoking journal health checks. + /// + /// The HOCON id of the Akka.Persistence plugin./ + /// An optional cancellation token. + /// A with health status and possibly a descriptive message. + public async Task CheckJournalHealthAsync(string journalPluginId, + CancellationToken cancellationToken = default) + { + using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + timeoutCts.CancelAfter(Settings.AskTimeout); + + var pluginRef = JournalFor(journalPluginId); + var r = await pluginRef.Ask(new CheckJournalHealth(timeoutCts.Token), timeoutCts.Token); + return r.Result; + } + + /// + /// Shortcut for invoking snapshot store health checks. + /// + /// The HOCON id of the Akka.Persistence plugin. + /// An optional cancellation token. + /// A with health status and possibly a descriptive message. + public async Task CheckSnapshotStoreHealthAsync(string snapshotStorePluginId, + CancellationToken cancellationToken = default) + { + using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + timeoutCts.CancelAfter(Settings.AskTimeout); + + var pluginRef = SnapshotStoreFor(snapshotStorePluginId); + var r = await pluginRef.Ask(new CheckSnapshotStoreHealth(timeoutCts.Token), timeoutCts.Token); + return r.Result; + } + /// /// Returns a snapshot store plugin actor identified by . /// When empty, looks in `akka.persistence.snapshot-store.plugin` to find configuration entry path. diff --git a/src/core/Akka.Persistence/PersistenceHealthStatus.cs b/src/core/Akka.Persistence/PersistenceHealthStatus.cs new file mode 100644 index 00000000000..587b3d5e4bd --- /dev/null +++ b/src/core/Akka.Persistence/PersistenceHealthStatus.cs @@ -0,0 +1,40 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; + +namespace Akka.Persistence; + +/// +/// Used by SnapshotStore and Journal to indicate the health status of the underlying storage. +/// +public enum PersistenceHealthStatus +{ + /// + /// Akka.Persistence is working as expected. + /// + Healthy = 0, + + /// + /// Akka.Persistence is experiencing some issues that should be recoverable. + /// + Degraded = 1, + + /// + /// Akka.Persistence has experienced a fatal error. + /// + Unhealthy = 2, +} + +/// +/// Results from a health check. +/// +public readonly record struct PersistenceHealthCheckResult(PersistenceHealthStatus Status, + string? Description = null, + Exception? Exception = null, + IReadOnlyDictionary? Data = null); \ No newline at end of file diff --git a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs index cde611e7700..f78081fdcd9 100644 --- a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs +++ b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs @@ -6,6 +6,7 @@ //----------------------------------------------------------------------- using System; +using System.Collections.Generic; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -20,10 +21,11 @@ namespace Akka.Persistence.Snapshot /// public abstract class SnapshotStore : ActorBase { - private readonly TaskContinuationOptions _continuationOptions = TaskContinuationOptions.ExecuteSynchronously; + private const TaskContinuationOptions ContinuationOptions = TaskContinuationOptions.ExecuteSynchronously; private readonly bool _publish; private readonly CircuitBreaker _breaker; private readonly ILoggingAdapter _log; + private readonly IReadOnlyDictionary _defaultHealthCheckTags; /// /// Initializes a new instance of the class. @@ -46,8 +48,28 @@ protected SnapshotStore() config.GetInt("circuit-breaker.max-failures", 10), config.GetTimeSpan("circuit-breaker.call-timeout", TimeSpan.FromSeconds(10)), config.GetTimeSpan("circuit-breaker.reset-timeout", TimeSpan.FromSeconds(30))); - + _log = Context.GetLogger(); + _defaultHealthCheckTags = new Dictionary + { + { "snapshot-store", Self.Path.Name } + }; + } + + /// + /// Health check for the snapshot store. + /// + /// Cancellation token for the health check invocation. + /// A with a health status and optional error message. + public virtual Task CheckHealthAsync(CancellationToken cancellationToken = default) + { + if(_breaker.IsHalfOpen) + return Task.FromResult(new PersistenceHealthCheckResult(PersistenceHealthStatus.Degraded, + $"Circuit breaker is half-open, some operations may be failing intermittently.", _breaker.LastCaughtException, _defaultHealthCheckTags)); + if(_breaker.IsOpen) + return Task.FromResult(new PersistenceHealthCheckResult(PersistenceHealthStatus.Degraded, + $"Circuit breaker is open, some operations may be failing intermittently.", _breaker.LastCaughtException, _defaultHealthCheckTags)); + return Task.FromResult(new PersistenceHealthCheckResult(PersistenceHealthStatus.Healthy, "OK.", Data: _defaultHealthCheckTags)); } /// @@ -74,7 +96,7 @@ private bool ReceiveSnapshotStore(object message) : new LoadSnapshotFailed(t.IsFaulted ? TryUnwrapException(t.Exception) : new OperationCanceledException("LoadAsync canceled, possibly due to timing out.")), - _continuationOptions) + ContinuationOptions) .PipeTo(senderPersistentActor); break; @@ -92,7 +114,7 @@ private bool ReceiveSnapshotStore(object message) t.IsFaulted ? TryUnwrapException(t.Exception) : new OperationCanceledException("SaveAsync canceled, possibly due to timing out.", TryUnwrapException(t.Exception))), - _continuationOptions) + ContinuationOptions) .PipeTo(self, senderPersistentActor); break; @@ -138,13 +160,13 @@ private bool ReceiveSnapshotStore(object message) t.IsFaulted ? TryUnwrapException(t.Exception) : new OperationCanceledException("DeleteAsync canceled, possibly due to timing out.")), - _continuationOptions) + ContinuationOptions) .PipeTo(self, senderPersistentActor) .ContinueWith(_ => { if (_publish) eventStream.Publish(message); - }, _continuationOptions); + }, ContinuationOptions); break; } @@ -180,13 +202,13 @@ private bool ReceiveSnapshotStore(object message) t.IsFaulted ? TryUnwrapException(t.Exception) : new OperationCanceledException("DeleteAsync canceled, possibly due to timing out.")), - _continuationOptions) + ContinuationOptions) .PipeTo(self, senderPersistentActor) .ContinueWith(_ => { if (_publish) eventStream.Publish(message); - }, _continuationOptions); + }, ContinuationOptions); break; } @@ -212,6 +234,17 @@ private bool ReceiveSnapshotStore(object message) } break; + case CheckSnapshotStoreHealth checkHealth: + var sender = Sender; + CheckHealthAsync(checkHealth.CancellationToken) + // PipeTo implementation no longer requires a closure, but better safe than sorry + .PipeTo(sender, + success: result => new SnapshotStoreHealthCheckResponse(result), + failure: ex => new SnapshotStoreHealthCheckResponse( + new PersistenceHealthCheckResult(PersistenceHealthStatus.Unhealthy, + "Encountered exception while performing health check", + ex, _defaultHealthCheckTags))); + break; default: return false; diff --git a/src/core/Akka.Persistence/SnapshotProtocol.cs b/src/core/Akka.Persistence/SnapshotProtocol.cs index 915d09887f5..3ff845b6b4d 100644 --- a/src/core/Akka.Persistence/SnapshotProtocol.cs +++ b/src/core/Akka.Persistence/SnapshotProtocol.cs @@ -7,6 +7,7 @@ using System; using System.Collections.Generic; +using System.Threading; using Newtonsoft.Json; namespace Akka.Persistence @@ -26,6 +27,39 @@ public interface ISnapshotRequest : ISnapshotMessage { } /// public interface ISnapshotResponse : ISnapshotMessage { } + public sealed class CheckSnapshotStoreHealth : ISnapshotRequest + { + public CheckSnapshotStoreHealth(CancellationToken cancellationToken) + { + CancellationToken = cancellationToken; + } + + public CancellationToken CancellationToken { get; } + + public override string ToString() + { + return "CheckSnapshotStoreHealth"; + } + } + + /// + /// Health check response from the SnapshotStore. + /// + public sealed class SnapshotStoreHealthCheckResponse : ISnapshotResponse + { + public SnapshotStoreHealthCheckResponse(PersistenceHealthCheckResult result) + { + Result = result; + } + + public PersistenceHealthCheckResult Result { get; } + + public override string ToString() + { + return $"SnapshotStoreHealthCheckResponse<{Result}>"; + } + } + /// /// Metadata for all persisted snapshot records. /// diff --git a/src/core/Akka.TestKit/TestScheduler.cs b/src/core/Akka.TestKit/TestScheduler.cs index d19f6d29be7..4959223a231 100644 --- a/src/core/Akka.TestKit/TestScheduler.cs +++ b/src/core/Akka.TestKit/TestScheduler.cs @@ -82,7 +82,7 @@ public void AdvanceTo(DateTimeOffset when) Advance(when.Subtract(_now)); } - private void InternalSchedule(TimeSpan? initialDelay, TimeSpan delay, ICanTell receiver, object message, Action action, + private void InternalSchedule(TimeSpan? initialDelay, TimeSpan delay, ICanTell receiver, object message, Action action, IActorRef sender, ICancelable cancelable, int deliveryCount = 0) { var scheduledTime = _now.Add(initialDelay ?? delay).UtcTicks;