Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 19 additions & 18 deletions src/core/Akka.Persistence.Tests/JournalHealthCheckSpec.cs
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to harden the timing assertions in these specs due to the new Task.Yield behavior on the journal / snapshot stores, but the health check implementations themselves look to be unaffected by this.

Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,20 @@ public async Task JournalHealthCheck_should_return_Degraded_when_CircuitBreaker_
TestActor, 1);
journal.Tell(writeMsg, TestActor);

await ExpectMsgAsync<WriteMessagesFailed>();

// Advance time to let the write fail and circuit breaker open
var testScheduler = (TestScheduler)Sys.Scheduler;
testScheduler.Advance(TimeSpan.FromSeconds(2));

using var cts = new CancellationTokenSource(RemainingOrDefault);
var pluginHealth = await Extension.CheckJournalHealthAsync("akka.persistence.journal.failing-open", cts.Token);
await AwaitAssertAsync(async () =>
{
using var cts = new CancellationTokenSource(RemainingOrDefault);
var pluginHealth = await Extension.CheckJournalHealthAsync("akka.persistence.journal.failing-open", cts.Token);

Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status);
Assert.Contains("Circuit breaker is open", pluginHealth.Description);
Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status);
Assert.Contains("Circuit breaker is open", pluginHealth.Description);
}, RemainingOrDefault);
}

[Fact]
Expand All @@ -97,25 +102,21 @@ public async Task JournalHealthCheck_should_return_Degraded_when_CircuitBreaker_
TestActor, 1);
journal.Tell(writeMsg, TestActor);

var testScheduler = (TestScheduler)Sys.Scheduler;

// Advance time past call-timeout to let the write fail and circuit breaker open
testScheduler.Advance(TimeSpan.FromSeconds(1));
await ExpectMsgAsync<WriteMessagesFailed>();

// Give the async operations time to complete
await Task.Delay(100);
var testScheduler = (TestScheduler)Sys.Scheduler;

// Advance time past reset-timeout to transition to half-open
testScheduler.Advance(TimeSpan.FromSeconds(1));

// Give the transition time to complete
await Task.Delay(100);

using var cts = new CancellationTokenSource(RemainingOrDefault);
var pluginHealth = await Extension.CheckJournalHealthAsync("akka.persistence.journal.failing-half-open", cts.Token);
await AwaitAssertAsync(async () =>
{
using var cts = new CancellationTokenSource(RemainingOrDefault);
var pluginHealth = await Extension.CheckJournalHealthAsync("akka.persistence.journal.failing-half-open", cts.Token);

Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status);
Assert.Contains("Circuit breaker is half-open", pluginHealth.Description);
Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status);
Assert.Contains("Circuit breaker is half-open", pluginHealth.Description);
}, RemainingOrDefault);
}
}

Expand All @@ -128,4 +129,4 @@ protected override Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerabl
{
throw new InvalidOperationException("Simulated journal write failure");
}
}
}
37 changes: 19 additions & 18 deletions src/core/Akka.Persistence.Tests/SnapshotStoreHealthCheckSpec.cs
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same timing issues with the health checks here - replaced all of the fragile test scheduler stuff with AwaitAssertAsync

Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,20 @@ public async Task SnapshotStoreHealthCheck_should_return_Degraded_when_CircuitBr
var saveMsg = new SaveSnapshot(new SnapshotMetadata("test-pid", 1, DateTime.UtcNow), "test-snapshot");
snapshotStore.Tell(saveMsg, TestActor);

await ExpectMsgAsync<SaveSnapshotFailure>();

// Advance time to let the save fail and circuit breaker open
var testScheduler = (TestScheduler)Sys.Scheduler;
testScheduler.Advance(TimeSpan.FromSeconds(2));

using var cts = new CancellationTokenSource(RemainingOrDefault);
var pluginHealth = await Extension.CheckSnapshotStoreHealthAsync("akka.persistence.snapshot-store.failing-open", cts.Token);
await AwaitAssertAsync(async () =>
{
using var cts = new CancellationTokenSource(RemainingOrDefault);
var pluginHealth = await Extension.CheckSnapshotStoreHealthAsync("akka.persistence.snapshot-store.failing-open", cts.Token);

Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status);
Assert.Contains("Circuit breaker is open", pluginHealth.Description);
Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status);
Assert.Contains("Circuit breaker is open", pluginHealth.Description);
}, RemainingOrDefault);
}

[Fact]
Expand All @@ -93,25 +98,21 @@ public async Task SnapshotStoreHealthCheck_should_return_Degraded_when_CircuitBr
var saveMsg = new SaveSnapshot(new SnapshotMetadata("test-pid", 1, DateTime.UtcNow), "test-snapshot");
snapshotStore.Tell(saveMsg, TestActor);

var testScheduler = (TestScheduler)Sys.Scheduler;

// Advance time past call-timeout to let the save fail and circuit breaker open
testScheduler.Advance(TimeSpan.FromSeconds(1));
await ExpectMsgAsync<SaveSnapshotFailure>();

// Give the async operations time to complete
await Task.Delay(100);
var testScheduler = (TestScheduler)Sys.Scheduler;

// Advance time past reset-timeout to transition to half-open
testScheduler.Advance(TimeSpan.FromSeconds(1));

// Give the transition time to complete
await Task.Delay(100);

using var cts = new CancellationTokenSource(RemainingOrDefault);
var pluginHealth = await Extension.CheckSnapshotStoreHealthAsync("akka.persistence.snapshot-store.failing-half-open", cts.Token);
await AwaitAssertAsync(async () =>
{
using var cts = new CancellationTokenSource(RemainingOrDefault);
var pluginHealth = await Extension.CheckSnapshotStoreHealthAsync("akka.persistence.snapshot-store.failing-half-open", cts.Token);

Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status);
Assert.Contains("Circuit breaker is half-open", pluginHealth.Description);
Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status);
Assert.Contains("Circuit breaker is half-open", pluginHealth.Description);
}, RemainingOrDefault);
}
}

Expand All @@ -124,4 +125,4 @@ protected override Task SaveAsync(SnapshotMetadata metadata, object snapshot, Ca
{
throw new InvalidOperationException("Simulated snapshot store save failure");
}
}
}
10 changes: 9 additions & 1 deletion src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,15 @@ private async Task ExecuteBatch(WriteMessages message, int atomicWriteCount, IAc
try
{
var writeResult =
await _breaker.WithCircuitBreaker((prepared, awj: this), (state, ct) => state.awj.WriteMessagesAsync(state.prepared, ct)).ConfigureAwait(false);
await _breaker.WithCircuitBreaker(
(prepared, awj: this),
async (state, ct) =>
{
// Ensure WriteMessagesAsync is not called in AsyncWriteJournal
// actor context and so doesn't block message handling
await Task.Yield();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine to me - does exactly what is advertised.

return await state.awj.WriteMessagesAsync(state.prepared, ct);
}).ConfigureAwait(false);

ProcessResults(writeResult, atomicWriteCount, message, _resequencer, resequencerCounter, self);
}
Expand Down
10 changes: 8 additions & 2 deletions src/core/Akka.Persistence/Snapshot/SnapshotStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,14 @@ private bool ReceiveSnapshotStore(object message)
timestamp: saveSnapshot.Metadata.Timestamp == DateTime.MinValue
? DateTime.UtcNow
: saveSnapshot.Metadata.Timestamp);
_breaker.WithCircuitBreaker(ct => SaveAsync(metadata, saveSnapshot.Snapshot, ct))
.ContinueWith(t => (!t.IsFaulted && !t.IsCanceled)
_breaker.WithCircuitBreaker(
async (ct) =>
{
// Ensure SaveAsync is not called in SnapshotStore
// actor context and so doesn't block message handling
await Task.Yield();
await SaveAsync(metadata, saveSnapshot.Snapshot, ct);
}).ContinueWith(t => (!t.IsFaulted && !t.IsCanceled)
? new SaveSnapshotSuccess(metadata) as ISnapshotResponse
: new SaveSnapshotFailure(saveSnapshot.Metadata,
t.IsFaulted
Expand Down
Loading