Ensure WriteMessagesAsync/SaveAsync is called asynchronously in Async…#8163
Conversation
…WriteJournal/SnapshotStore.
|
Looks like these changes are tripping the circuit breaker under some conditions, at least in the tests we're running on the health checks. Let me dig into this. |
There was a problem hiding this comment.
Had to harden the timing assertions in these specs due to the new Task.Yield behavior on the journal / snapshot stores, but the health check implementations themselves look to be unaffected by this.
There was a problem hiding this comment.
Same timing issues with the health checks here - replaced all of the fragile test scheduler stuff with AwaitAssertAsync
| { | ||
| // Ensure WriteMessagesAsync is not called in AsyncWriteJournal | ||
| // actor context and so doesn't block message handling | ||
| await Task.Yield(); |
There was a problem hiding this comment.
Looks fine to me - does exactly what is advertised.
|
nice work @schdooz - this looks like a genuinely good perf improvement for all Akka.Persistence plugins. We'll ship it in the next v1.5 release. |
#8163) * Ensure WriteMessagesAsync/SaveAsync is called asynchronously in AsyncWriteJournal/SnapshotStore. * Fix persistence health check timing tests. --------- Co-authored-by: Mark Dinh <mark.dinh@youlend.com> Co-authored-by: Aaron Stannard <aaron@petabridge.com> Co-authored-by: Aaron Stannard <aaron@aaronstannard.com>
…napshotStore (#8163) This reverts the Task.Yield() additions from PR #8163 in AsyncWriteJournal.ExecuteBatch and SnapshotStore.ReceiveSnapshotStore, while preserving the health check test improvements from that same PR. PR #8163 added `await Task.Yield()` before calling `WriteMessagesAsync` and `SaveAsync` inside their respective circuit breaker lambdas. The intent was to move expensive byte serialization off the actor's message-processing thread, which showed ~45% throughput improvement in benchmarks. However, this silently broke the implicit contract that persistence plugins relied on: that the synchronous preamble of `WriteMessagesAsync`/`SaveAsync` executes in actor context. Moving execution to the thread pool caused: 1. Plugins that access `Self` inside `WriteMessagesAsync` (e.g. Akka.Persistence.Sql, Akka.Persistence.EventStore) throw `NotSupportedException` because there is no active ActorContext on a thread pool thread. 2. Plugins that use non-thread-safe collections like `Dictionary<string, Task>` for write tracking (e.g. Akka.Persistence.Sql, Akka.Persistence.EventStore) are now subject to concurrent access from both the actor thread and thread pool threads, causing `InvalidOperationException` or silent data corruption. 3. Plugins that send messages to subscribers after writes complete (e.g. Akka.Persistence.Redis) access shared actor state off the actor thread. The change was too blunt an instrument — it applied uniformly to all plugins via the base class, removing their ability to do any actor-thread setup before async work begins. Ironically, the plugins that benefit most from off-thread serialization (MongoDB, Azure Table Storage) don't access actor context at all, while the plugins that break (SQL, EventStore, Redis) already perform serialization off-thread in their async pipelines. A future version may reintroduce this optimization with a more surgical approach (e.g. opt-in property or Template Method pattern) that preserves the plugin threading contract.
…napshotStore (#8163) (#8189) This reverts the Task.Yield() additions from PR #8163 in AsyncWriteJournal.ExecuteBatch and SnapshotStore.ReceiveSnapshotStore, while preserving the health check test improvements from that same PR. PR #8163 added `await Task.Yield()` before calling `WriteMessagesAsync` and `SaveAsync` inside their respective circuit breaker lambdas. The intent was to move expensive byte serialization off the actor's message-processing thread, which showed ~45% throughput improvement in benchmarks. However, this silently broke the implicit contract that persistence plugins relied on: that the synchronous preamble of `WriteMessagesAsync`/`SaveAsync` executes in actor context. Moving execution to the thread pool caused: 1. Plugins that access `Self` inside `WriteMessagesAsync` (e.g. Akka.Persistence.Sql, Akka.Persistence.EventStore) throw `NotSupportedException` because there is no active ActorContext on a thread pool thread. 2. Plugins that use non-thread-safe collections like `Dictionary<string, Task>` for write tracking (e.g. Akka.Persistence.Sql, Akka.Persistence.EventStore) are now subject to concurrent access from both the actor thread and thread pool threads, causing `InvalidOperationException` or silent data corruption. 3. Plugins that send messages to subscribers after writes complete (e.g. Akka.Persistence.Redis) access shared actor state off the actor thread. The change was too blunt an instrument — it applied uniformly to all plugins via the base class, removing their ability to do any actor-thread setup before async work begins. Ironically, the plugins that benefit most from off-thread serialization (MongoDB, Azure Table Storage) don't access actor context at all, while the plugins that break (SQL, EventStore, Redis) already perform serialization off-thread in their async pipelines. A future version may reintroduce this optimization with a more surgical approach (e.g. opt-in property or Template Method pattern) that preserves the plugin threading contract.
Fixes #8162
Changes
Call
await Task.Yield()before callingAsyncWriteJournal.WriteMessagesAsync()/SnapshotStore.SaveAsync()to ensure that these methods are executed entirely off the actor context.Without the
await Task.Yield(), the beginning portion of theAsyncWriteJournal.WriteMessagesAsync()/SnapshotStore.SaveAsync()methods will execute synchronously in theAsyncWriteJournal/SnapshotStoreactor context and will block further message processing by the actor until execution has completed. This can cause performance issues if expensive work must be done at the beginning of the method (e.g. serialization).Checklist
For significant changes, please ensure that the following have been completed (delete if not relevant):
Latest
devBenchmarksBenchmarked with this repo
This PR's Benchmarks
Benchmarked with this repo