diff --git a/src/core/Akka.Persistence.Tests/JournalHealthCheckSpec.cs b/src/core/Akka.Persistence.Tests/JournalHealthCheckSpec.cs index 06a88b87024..95a60186f88 100644 --- a/src/core/Akka.Persistence.Tests/JournalHealthCheckSpec.cs +++ b/src/core/Akka.Persistence.Tests/JournalHealthCheckSpec.cs @@ -75,15 +75,20 @@ public async Task JournalHealthCheck_should_return_Degraded_when_CircuitBreaker_ TestActor, 1); journal.Tell(writeMsg, TestActor); + await ExpectMsgAsync(); + // Advance time to let the write fail and circuit breaker open var testScheduler = (TestScheduler)Sys.Scheduler; testScheduler.Advance(TimeSpan.FromSeconds(2)); - using var cts = new CancellationTokenSource(RemainingOrDefault); - var pluginHealth = await Extension.CheckJournalHealthAsync("akka.persistence.journal.failing-open", cts.Token); + await AwaitAssertAsync(async () => + { + using var cts = new CancellationTokenSource(RemainingOrDefault); + var pluginHealth = await Extension.CheckJournalHealthAsync("akka.persistence.journal.failing-open", cts.Token); - Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status); - Assert.Contains("Circuit breaker is open", pluginHealth.Description); + Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status); + Assert.Contains("Circuit breaker is open", pluginHealth.Description); + }, RemainingOrDefault); } [Fact] @@ -97,25 +102,21 @@ public async Task JournalHealthCheck_should_return_Degraded_when_CircuitBreaker_ TestActor, 1); journal.Tell(writeMsg, TestActor); - var testScheduler = (TestScheduler)Sys.Scheduler; - - // Advance time past call-timeout to let the write fail and circuit breaker open - testScheduler.Advance(TimeSpan.FromSeconds(1)); + await ExpectMsgAsync(); - // Give the async operations time to complete - await Task.Delay(100); + var testScheduler = (TestScheduler)Sys.Scheduler; // Advance time past reset-timeout to transition to half-open testScheduler.Advance(TimeSpan.FromSeconds(1)); - // Give the transition time to complete - await Task.Delay(100); - - using var cts = new CancellationTokenSource(RemainingOrDefault); - var pluginHealth = await Extension.CheckJournalHealthAsync("akka.persistence.journal.failing-half-open", cts.Token); + await AwaitAssertAsync(async () => + { + using var cts = new CancellationTokenSource(RemainingOrDefault); + var pluginHealth = await Extension.CheckJournalHealthAsync("akka.persistence.journal.failing-half-open", cts.Token); - Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status); - Assert.Contains("Circuit breaker is half-open", pluginHealth.Description); + Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status); + Assert.Contains("Circuit breaker is half-open", pluginHealth.Description); + }, RemainingOrDefault); } } @@ -128,4 +129,4 @@ protected override Task> WriteMessagesAsync(IEnumerabl { throw new InvalidOperationException("Simulated journal write failure"); } -} \ No newline at end of file +} diff --git a/src/core/Akka.Persistence.Tests/SnapshotStoreHealthCheckSpec.cs b/src/core/Akka.Persistence.Tests/SnapshotStoreHealthCheckSpec.cs index b8beb4f9633..0fa724ee62f 100644 --- a/src/core/Akka.Persistence.Tests/SnapshotStoreHealthCheckSpec.cs +++ b/src/core/Akka.Persistence.Tests/SnapshotStoreHealthCheckSpec.cs @@ -72,15 +72,20 @@ public async Task SnapshotStoreHealthCheck_should_return_Degraded_when_CircuitBr var saveMsg = new SaveSnapshot(new SnapshotMetadata("test-pid", 1, DateTime.UtcNow), "test-snapshot"); snapshotStore.Tell(saveMsg, TestActor); + await ExpectMsgAsync(); + // Advance time to let the save fail and circuit breaker open var testScheduler = (TestScheduler)Sys.Scheduler; testScheduler.Advance(TimeSpan.FromSeconds(2)); - using var cts = new CancellationTokenSource(RemainingOrDefault); - var pluginHealth = await Extension.CheckSnapshotStoreHealthAsync("akka.persistence.snapshot-store.failing-open", cts.Token); + await AwaitAssertAsync(async () => + { + using var cts = new CancellationTokenSource(RemainingOrDefault); + var pluginHealth = await Extension.CheckSnapshotStoreHealthAsync("akka.persistence.snapshot-store.failing-open", cts.Token); - Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status); - Assert.Contains("Circuit breaker is open", pluginHealth.Description); + Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status); + Assert.Contains("Circuit breaker is open", pluginHealth.Description); + }, RemainingOrDefault); } [Fact] @@ -93,25 +98,21 @@ public async Task SnapshotStoreHealthCheck_should_return_Degraded_when_CircuitBr var saveMsg = new SaveSnapshot(new SnapshotMetadata("test-pid", 1, DateTime.UtcNow), "test-snapshot"); snapshotStore.Tell(saveMsg, TestActor); - var testScheduler = (TestScheduler)Sys.Scheduler; - - // Advance time past call-timeout to let the save fail and circuit breaker open - testScheduler.Advance(TimeSpan.FromSeconds(1)); + await ExpectMsgAsync(); - // Give the async operations time to complete - await Task.Delay(100); + var testScheduler = (TestScheduler)Sys.Scheduler; // Advance time past reset-timeout to transition to half-open testScheduler.Advance(TimeSpan.FromSeconds(1)); - // Give the transition time to complete - await Task.Delay(100); - - using var cts = new CancellationTokenSource(RemainingOrDefault); - var pluginHealth = await Extension.CheckSnapshotStoreHealthAsync("akka.persistence.snapshot-store.failing-half-open", cts.Token); + await AwaitAssertAsync(async () => + { + using var cts = new CancellationTokenSource(RemainingOrDefault); + var pluginHealth = await Extension.CheckSnapshotStoreHealthAsync("akka.persistence.snapshot-store.failing-half-open", cts.Token); - Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status); - Assert.Contains("Circuit breaker is half-open", pluginHealth.Description); + Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status); + Assert.Contains("Circuit breaker is half-open", pluginHealth.Description); + }, RemainingOrDefault); } } @@ -124,4 +125,4 @@ protected override Task SaveAsync(SnapshotMetadata metadata, object snapshot, Ca { throw new InvalidOperationException("Simulated snapshot store save failure"); } -} \ No newline at end of file +} diff --git a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs index 4939d2cd342..5db49bfd5f0 100644 --- a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs +++ b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs @@ -392,7 +392,15 @@ private async Task ExecuteBatch(WriteMessages message, int atomicWriteCount, IAc try { var writeResult = - await _breaker.WithCircuitBreaker((prepared, awj: this), (state, ct) => state.awj.WriteMessagesAsync(state.prepared, ct)).ConfigureAwait(false); + await _breaker.WithCircuitBreaker( + (prepared, awj: this), + async (state, ct) => + { + // Ensure WriteMessagesAsync is not called in AsyncWriteJournal + // actor context and so doesn't block message handling + await Task.Yield(); + return await state.awj.WriteMessagesAsync(state.prepared, ct); + }).ConfigureAwait(false); ProcessResults(writeResult, atomicWriteCount, message, _resequencer, resequencerCounter, self); } diff --git a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs index f78081fdcd9..bad499a8f47 100644 --- a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs +++ b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs @@ -107,8 +107,14 @@ private bool ReceiveSnapshotStore(object message) timestamp: saveSnapshot.Metadata.Timestamp == DateTime.MinValue ? DateTime.UtcNow : saveSnapshot.Metadata.Timestamp); - _breaker.WithCircuitBreaker(ct => SaveAsync(metadata, saveSnapshot.Snapshot, ct)) - .ContinueWith(t => (!t.IsFaulted && !t.IsCanceled) + _breaker.WithCircuitBreaker( + async (ct) => + { + // Ensure SaveAsync is not called in SnapshotStore + // actor context and so doesn't block message handling + await Task.Yield(); + await SaveAsync(metadata, saveSnapshot.Snapshot, ct); + }).ContinueWith(t => (!t.IsFaulted && !t.IsCanceled) ? new SaveSnapshotSuccess(metadata) as ISnapshotResponse : new SaveSnapshotFailure(saveSnapshot.Metadata, t.IsFaulted