feat(persistence): add completion callbacks and async handler support#7937
feat(persistence): add completion callbacks and async handler support#7937Aaronontheweb wants to merge 2 commits into
Conversation
Add completion callback support to PersistAll and PersistAllAsync methods to detect when all events in a batch have been persisted and their handlers executed. Add async handler support (Func<TEvent, Task>) for all persist methods (Persist, PersistAsync, PersistAll, PersistAllAsync). Changes: - Add 8 new invocation classes with interface segregation for sync/async handlers and sync/async completion callbacks - Add IStashingInvocation marker interface for type-safe detection of stashing invocations - Add 10 new method overloads across all 4 persist methods supporting completion callbacks and async handlers - Update PeekApplyHandler to chain async operations in a single RunTask call to avoid nested RunTask exceptions - Maintain backward compatibility and preserve actor semantics Test coverage: - 8 new tests covering completion callbacks and async handlers - All new tests pass consistently (100% pass rate) - No new failures introduced in existing test suite
Aaronontheweb
left a comment
There was a problem hiding this comment.
Detailed my changes - IMHO, I think I can simplify this by transforming all of the Action<object> callbacks into Func<object,Task> callbacks by just wrapping the delegate. That will cut down on the number of internal classes and interfaces pretty significantly.
Plus I identified some inconsistencies in how we're handling empty / null event arrays.
| namespace Akka.Persistence | ||
| { | ||
| public interface IPendingHandlerInvocation | ||
| internal interface IPendingHandlerInvocation |
There was a problem hiding this comment.
Moving all of the IPendingHandlerInvocations from public to internal - this is going to be part of the v1.6 API changes.
There was a problem hiding this comment.
These should have never been public APIs in the first place IMHO
| internal interface IPendingHandlerInvocation | ||
| { | ||
| object Event { get; } | ||
| bool IsLastInBatch { get; } |
There was a problem hiding this comment.
Used to signal if the OnCompletion handler should be fired. No-ops for individual Persist / PersistAsync calls.
| bool IsLastInBatch { get; } | ||
| } | ||
|
|
||
| internal interface ISyncHandlerInvocation : IPendingHandlerInvocation |
There was a problem hiding this comment.
Sync == uses Action<object>
Async == uses Func<object,Task>
| Func<object, Task> AsyncHandler { get; } | ||
| } | ||
|
|
||
| internal interface ISyncCompletionInvocation : IPendingHandlerInvocation |
There was a problem hiding this comment.
Completion callbacks don't take an event, so just Action or Func<Task> here
| /// <summary> | ||
| /// Marker interface for stashing invocations that require command stashing. | ||
| /// </summary> | ||
| internal interface IStashingInvocation : IPendingHandlerInvocation |
There was a problem hiding this comment.
Persistence Handler Invocation Types
This document describes the internal invocation types used by Eventsourced actors to manage persistence handlers and completion callbacks.
Interfaces
| Interface | Purpose | Key Property |
|---|---|---|
IPendingHandlerInvocation |
Base interface for all invocations | Event, IsLastInBatch |
ISyncHandlerInvocation |
Sync event handler (Action<object>) |
Handler |
IAsyncHandlerInvocation |
Async event handler (Func<object, Task>) |
AsyncHandler |
ISyncCompletionInvocation |
Sync completion callback (Action) |
CompletionCallback |
IAsyncCompletionInvocation |
Async completion callback (Func<Task>) |
AsyncCompletionCallback |
IStashingInvocation |
Marker for stashing behavior (blocks commands) | (none - marker only) |
Concrete Classes
| Class | Stashing | Handler | Completion | Used By |
|---|---|---|---|---|
StashingHandlerInvocation |
Yes | Sync | Sync | Persist, PersistAll |
StashingHandlerInvocationWithAsyncCompletion |
Yes | Sync | Async | PersistAll |
StashingAsyncHandlerInvocation |
Yes | Async | Sync | Persist, PersistAll |
StashingAsyncHandlerInvocationWithAsyncCompletion |
Yes | Async | Async | PersistAll |
NonStashingHandlerInvocation |
No | Sync | Sync | PersistAsync, PersistAllAsync, DeferAsync |
NonStashingHandlerInvocationWithAsyncCompletion |
No | Sync | Async | PersistAllAsync |
NonStashingAsyncHandlerInvocation |
No | Async | Sync | PersistAsync, PersistAllAsync |
NonStashingAsyncHandlerInvocationWithAsyncCompletion |
No | Async | Async | PersistAllAsync |
Interface Implementation Matrix
| Class | IStashingInvocation |
ISyncHandlerInvocation |
IAsyncHandlerInvocation |
ISyncCompletionInvocation |
IAsyncCompletionInvocation |
|---|---|---|---|---|---|
StashingHandlerInvocation |
✓ | ✓ | ✓ | ||
StashingHandlerInvocationWithAsyncCompletion |
✓ | ✓ | ✓ | ||
StashingAsyncHandlerInvocation |
✓ | ✓ | ✓ | ||
StashingAsyncHandlerInvocationWithAsyncCompletion |
✓ | ✓ | ✓ | ||
NonStashingHandlerInvocation |
✓ | ✓ | |||
NonStashingHandlerInvocationWithAsyncCompletion |
✓ | ✓ | |||
NonStashingAsyncHandlerInvocation |
✓ | ✓ | |||
NonStashingAsyncHandlerInvocationWithAsyncCompletion |
✓ | ✓ |
Key Concepts
- Stashing: When
IStashingInvocationis implemented, incoming commands are stashed until all persistence handlers complete. This ensures consistency during persistence operations. - Handler Type: Sync (
Action<TEvent>) vs Async (Func<TEvent, Task>) determines how event handlers execute. - Completion Type: Sync (
Action) vs Async (Func<Task>) determines how the batch completion callback executes.
Naming Convention
The class names follow a consistent pattern:
[Stashing|NonStashing][Async]HandlerInvocation[WithAsyncCompletion]
Stashing/NonStashing- Whether commands are stashed during persistenceAsync(before Handler) - Whether the event handler is asyncWithAsyncCompletion- Whether the completion callback is async (omitted for sync completion)
There was a problem hiding this comment.
We have a larger number of overloads here due to the combination of things we need to support both for backwards compatibility and ease of API use.... alllllllllllllthough..... this gives me an idea....
| throw new InvalidOperationException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later."); | ||
| } | ||
|
|
||
| if (events == null) return; |
There was a problem hiding this comment.
Should we also exit of the events collection is zero-length?
There was a problem hiding this comment.
Also, don't we need to invoke the task handler here just like we did in the above overload?
| throw new InvalidOperationException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later."); | ||
| } | ||
|
|
||
| if (events == null) |
There was a problem hiding this comment.
What about zero-length?
| var invocation = _pendingInvocations.First.Value; | ||
|
|
||
| // Check if we have any async work (handler or completion callback) | ||
| bool hasAsyncHandler = invocation is IAsyncHandlerInvocation; |
There was a problem hiding this comment.
Check to see what type of handler(s) we need to run
| // Invoke handler (async or sync) | ||
| if (hasAsyncHandler) | ||
| { | ||
| await ((IAsyncHandlerInvocation)invocation).AsyncHandler(payload); |
There was a problem hiding this comment.
event-specific handler always goes first
| // Invoke completion callback if this is the last event in the batch | ||
| if (invocation.IsLastInBatch) | ||
| { | ||
| if (invocation is IAsyncCompletionInvocation asyncCompInv && asyncCompInv.AsyncCompletionCallback != null) |
There was a problem hiding this comment.
completion handler always goes last
…ative to #7937 (#7954) * feat(persistence): add completion callbacks and async handler support Add completion callback overloads for PersistAll and PersistAllAsync that invoke a callback after all events have been persisted and their handlers executed. Also add async handler support (Func<TEvent, Task>) to all persist methods. Key changes: - Add IPendingHandlerInvocation, ISyncHandlerInvocation, IAsyncHandlerInvocation, and IStashingInvocation interfaces for type-safe handler invocation - Add StashingHandlerInvocation, StashingAsyncHandlerInvocation, AsyncHandlerInvocation, and AsyncAsyncHandlerInvocation classes - Add Persist<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add PersistAsync<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add PersistAll overloads with completion callbacks (sync and async) - Add PersistAllAsync overloads with completion callbacks (sync and async) - Add DeferAsync<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add internal stashing Defer methods for completion callback support - Update PeekApplyHandler to handle async handlers via RunTask - Update PersistingEvents to use IStashingInvocation marker interface Stashing semantics are preserved: PersistAll completion callbacks use internal stashing Defer (increments _pendingStashingPersistInvocations), while PersistAllAsync uses non-stashing DeferAsync. * chore: update API approval for persistence completion callbacks Update verified API file to reflect: - New public methods: Persist/PersistAsync/PersistAll/PersistAllAsync async handler overloads and completion callback overloads - New public method: DeferAsync with async handler - Internal invocation classes: AsyncHandlerInvocation, StashingHandlerInvocation, and IPendingHandlerInvocation are now internal (implementation detail) * chore: update .NET Framework API approval for persistence completion callbacks * fixed API approvals * test(persistence): add empty events tests and convert to async test methods - Convert all tests to use async/await with ExpectMsgAsync instead of sync-over-async ExpectMsg calls - Add tests for PersistAll/PersistAllAsync with empty events to verify completion callbacks are invoked immediately for all overloads: - PersistAll with sync completion callback (existing) - PersistAll with async completion callback (new) - PersistAllAsync with sync completion callback (new) - PersistAllAsync with async completion callback (new) - Update EmptyEventsWithCompletionActor to support all four scenarios * fix(persistence): use Defer for empty events completion callbacks to maintain ordering When PersistAll/PersistAllAsync is called with empty events, the completion callback must still be queued through Defer/DeferAsync to maintain the in-order execution guarantee. Previously, the callback was invoked immediately which could cause out-of-order execution if there were pending invocations from prior Persist/PersistAll calls. Changes: - Replace immediate invocation with Defer/DeferAsync for all 8 overloads that have completion callbacks when events collection is null or empty - Add SequentialPersistOrderingActor test actor for ordering verification - Add test: Persist followed by empty PersistAll maintains execution order - Add test: Sequential PersistAll with empty in middle maintains order
|
Superseded via #7954 |
…ative to akkadotnet#7937 (akkadotnet#7954) * feat(persistence): add completion callbacks and async handler support Add completion callback overloads for PersistAll and PersistAllAsync that invoke a callback after all events have been persisted and their handlers executed. Also add async handler support (Func<TEvent, Task>) to all persist methods. Key changes: - Add IPendingHandlerInvocation, ISyncHandlerInvocation, IAsyncHandlerInvocation, and IStashingInvocation interfaces for type-safe handler invocation - Add StashingHandlerInvocation, StashingAsyncHandlerInvocation, AsyncHandlerInvocation, and AsyncAsyncHandlerInvocation classes - Add Persist<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add PersistAsync<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add PersistAll overloads with completion callbacks (sync and async) - Add PersistAllAsync overloads with completion callbacks (sync and async) - Add DeferAsync<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add internal stashing Defer methods for completion callback support - Update PeekApplyHandler to handle async handlers via RunTask - Update PersistingEvents to use IStashingInvocation marker interface Stashing semantics are preserved: PersistAll completion callbacks use internal stashing Defer (increments _pendingStashingPersistInvocations), while PersistAllAsync uses non-stashing DeferAsync. * chore: update API approval for persistence completion callbacks Update verified API file to reflect: - New public methods: Persist/PersistAsync/PersistAll/PersistAllAsync async handler overloads and completion callback overloads - New public method: DeferAsync with async handler - Internal invocation classes: AsyncHandlerInvocation, StashingHandlerInvocation, and IPendingHandlerInvocation are now internal (implementation detail) * chore: update .NET Framework API approval for persistence completion callbacks * fixed API approvals * test(persistence): add empty events tests and convert to async test methods - Convert all tests to use async/await with ExpectMsgAsync instead of sync-over-async ExpectMsg calls - Add tests for PersistAll/PersistAllAsync with empty events to verify completion callbacks are invoked immediately for all overloads: - PersistAll with sync completion callback (existing) - PersistAll with async completion callback (new) - PersistAllAsync with sync completion callback (new) - PersistAllAsync with async completion callback (new) - Update EmptyEventsWithCompletionActor to support all four scenarios * fix(persistence): use Defer for empty events completion callbacks to maintain ordering When PersistAll/PersistAllAsync is called with empty events, the completion callback must still be queued through Defer/DeferAsync to maintain the in-order execution guarantee. Previously, the callback was invoked immediately which could cause out-of-order execution if there were pending invocations from prior Persist/PersistAll calls. Changes: - Replace immediate invocation with Defer/DeferAsync for all 8 overloads that have completion callbacks when events collection is null or empty - Add SequentialPersistOrderingActor test actor for ordering verification - Add test: Persist followed by empty PersistAll maintains execution order - Add test: Sequential PersistAll with empty in middle maintains order
…ative to #7937 (#7954) (#7957) * feat(persistence): add completion callbacks and async handler support Add completion callback overloads for PersistAll and PersistAllAsync that invoke a callback after all events have been persisted and their handlers executed. Also add async handler support (Func<TEvent, Task>) to all persist methods. Key changes: - Add IPendingHandlerInvocation, ISyncHandlerInvocation, IAsyncHandlerInvocation, and IStashingInvocation interfaces for type-safe handler invocation - Add StashingHandlerInvocation, StashingAsyncHandlerInvocation, AsyncHandlerInvocation, and AsyncAsyncHandlerInvocation classes - Add Persist<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add PersistAsync<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add PersistAll overloads with completion callbacks (sync and async) - Add PersistAllAsync overloads with completion callbacks (sync and async) - Add DeferAsync<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add internal stashing Defer methods for completion callback support - Update PeekApplyHandler to handle async handlers via RunTask - Update PersistingEvents to use IStashingInvocation marker interface Stashing semantics are preserved: PersistAll completion callbacks use internal stashing Defer (increments _pendingStashingPersistInvocations), while PersistAllAsync uses non-stashing DeferAsync. * chore: update API approval for persistence completion callbacks Update verified API file to reflect: - New public methods: Persist/PersistAsync/PersistAll/PersistAllAsync async handler overloads and completion callback overloads - New public method: DeferAsync with async handler - Internal invocation classes: AsyncHandlerInvocation, StashingHandlerInvocation, and IPendingHandlerInvocation are now internal (implementation detail) * chore: update .NET Framework API approval for persistence completion callbacks * fixed API approvals * test(persistence): add empty events tests and convert to async test methods - Convert all tests to use async/await with ExpectMsgAsync instead of sync-over-async ExpectMsg calls - Add tests for PersistAll/PersistAllAsync with empty events to verify completion callbacks are invoked immediately for all overloads: - PersistAll with sync completion callback (existing) - PersistAll with async completion callback (new) - PersistAllAsync with sync completion callback (new) - PersistAllAsync with async completion callback (new) - Update EmptyEventsWithCompletionActor to support all four scenarios * fix(persistence): use Defer for empty events completion callbacks to maintain ordering When PersistAll/PersistAllAsync is called with empty events, the completion callback must still be queued through Defer/DeferAsync to maintain the in-order execution guarantee. Previously, the callback was invoked immediately which could cause out-of-order execution if there were pending invocations from prior Persist/PersistAll calls. Changes: - Replace immediate invocation with Defer/DeferAsync for all 8 overloads that have completion callbacks when events collection is null or empty - Add SequentialPersistOrderingActor test actor for ordering verification - Add test: Persist followed by empty PersistAll maintains execution order - Add test: Sequential PersistAll with empty in middle maintains order
Summary
fixes #7935
This PR adds completion callback support and async handler support to Akka.NET persistence, enabling users to:
PersistAll/PersistAllAsyncbatch have been persisted and their handlers executedFunc<TEvent, Task>) with all persist methodsNote: This is a proof of concept implementation.
New API Reference
Async Handler Overloads (NEW)
Single event persistence with async handlers:
Batch Persistence with Async Handlers (NEW)
Completion Callbacks for Sync Handlers (NEW)
Usage Examples
Async handler with completion callback:
Async handler with async completion callback:
Empty batch handling:
Changes
New Features
PersistAllandPersistAllAsyncnow accept optionalonComplete(sync) oronCompleteAsync(async) callbacks that execute after all events in the batch are persistedPersist,PersistAsync,PersistAll,PersistAllAsync) now supportFunc<TEvent, Task>async handlersImplementation Details
IStashingInvocationmarker interface for type-safe detection of stashing invocationsPeekApplyHandlerto chain all async operations in a singleRunTaskcall to avoid nestedRunTaskexceptionsTest Coverage
Added comprehensive test suite in
PersistenceCompletionCallbackSpec.cs:PersistAllAsync_should_invoke_completion_callback_after_all_events_persistedPersistAll_should_invoke_completion_callback_after_all_events_persistedPersistAllAsync_should_invoke_async_completion_callbackPersistAll_should_invoke_async_completion_callbackPersistAllAsync_should_support_async_handlersPersistAsync_should_support_async_handlersCompletion_callback_should_fire_exactly_once_per_batchEmpty_event_list_should_invoke_completion_callback_immediatelyAll 8 new tests pass consistently (100% pass rate).
Test Results
Full test suite results match baseline stability (266-268 passing, 7-10 failing - matching pre-existing baseline variance). No new failures introduced by this implementation.