diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.DotNet.verified.txt index 110606c472d..9150919ff0a 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.DotNet.verified.txt @@ -7,12 +7,6 @@ [assembly: System.Runtime.Versioning.TargetFrameworkAttribute(".NETCoreApp,Version=v6.0", FrameworkDisplayName=".NET 6.0")] namespace Akka.Persistence { - public sealed class AsyncHandlerInvocation : Akka.Persistence.IPendingHandlerInvocation - { - public AsyncHandlerInvocation(object evt, System.Action handler) { } - public object Event { get; } - public System.Action Handler { get; } - } public abstract class AtLeastOnceDeliveryActor : Akka.Persistence.PersistentActor { protected AtLeastOnceDeliveryActor() { } @@ -246,6 +240,7 @@ namespace Akka.Persistence public override void AroundPreStart() { } protected override bool AroundReceive(Akka.Actor.Receive receive, object message) { } public void DeferAsync(TEvent evt, System.Action handler) { } + public void DeferAsync(TEvent evt, System.Func handler) { } public void DeleteMessages(long toSequenceNr) { } public void DeleteSnapshot(long sequenceNr) { } public void DeleteSnapshots(Akka.Persistence.SnapshotSelectionCriteria criteria) { } @@ -255,9 +250,21 @@ namespace Akka.Persistence protected virtual void OnRecoveryFailure(System.Exception reason, object message = null) { } protected virtual void OnReplaySuccess() { } public void Persist(TEvent @event, System.Action handler) { } + public void Persist(TEvent @event, System.Func handler) { } public void PersistAll(System.Collections.Generic.IEnumerable events, System.Action handler) { } + public void PersistAll(System.Collections.Generic.IEnumerable events, System.Action handler, System.Action onComplete) { } + public void PersistAll(System.Collections.Generic.IEnumerable events, System.Action handler, System.Func onCompleteAsync) { } + public void PersistAll(System.Collections.Generic.IEnumerable events, System.Func handler) { } + public void PersistAll(System.Collections.Generic.IEnumerable events, System.Func handler, System.Action onComplete) { } + public void PersistAll(System.Collections.Generic.IEnumerable events, System.Func handler, System.Func onCompleteAsync) { } public void PersistAllAsync(System.Collections.Generic.IEnumerable events, System.Action handler) { } + public void PersistAllAsync(System.Collections.Generic.IEnumerable events, System.Action handler, System.Action onComplete) { } + public void PersistAllAsync(System.Collections.Generic.IEnumerable events, System.Action handler, System.Func onCompleteAsync) { } + public void PersistAllAsync(System.Collections.Generic.IEnumerable events, System.Func handler) { } + public void PersistAllAsync(System.Collections.Generic.IEnumerable events, System.Func handler, System.Action onComplete) { } + public void PersistAllAsync(System.Collections.Generic.IEnumerable events, System.Func handler, System.Func onCompleteAsync) { } public void PersistAsync(TEvent @event, System.Action handler) { } + public void PersistAsync(TEvent @event, System.Func handler) { } protected abstract bool ReceiveCommand(object message); protected abstract bool ReceiveRecover(object message); protected void RunTask(System.Func action) { } @@ -272,11 +279,6 @@ namespace Akka.Persistence } public interface IJournalRequest : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IJournalMessage, Akka.Persistence.IPersistenceMessage { } public interface IJournalResponse : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IJournalMessage, Akka.Persistence.IPersistenceMessage { } - public interface IPendingHandlerInvocation - { - object Event { get; } - System.Action Handler { get; } - } public interface IPersistenceMessage : Akka.Actor.INoSerializationVerificationNeeded { } public interface IPersistenceRecovery { @@ -676,12 +678,6 @@ namespace Akka.Persistence public Akka.Persistence.PersistenceHealthCheckResult Result { get; } public override string ToString() { } } - public sealed class StashingHandlerInvocation : Akka.Persistence.IPendingHandlerInvocation - { - public StashingHandlerInvocation(object evt, System.Action handler) { } - public object Event { get; } - public System.Action Handler { get; } - } public sealed class ThrowExceptionConfigurator : Akka.Persistence.IStashOverflowStrategyConfigurator { public ThrowExceptionConfigurator() { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Net.verified.txt index f33ef77425a..bc49f0158d3 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Net.verified.txt @@ -7,12 +7,6 @@ [assembly: System.Runtime.Versioning.TargetFrameworkAttribute(".NETStandard,Version=v2.0", FrameworkDisplayName=".NET Standard 2.0")] namespace Akka.Persistence { - public sealed class AsyncHandlerInvocation : Akka.Persistence.IPendingHandlerInvocation - { - public AsyncHandlerInvocation(object evt, System.Action handler) { } - public object Event { get; } - public System.Action Handler { get; } - } public abstract class AtLeastOnceDeliveryActor : Akka.Persistence.PersistentActor { protected AtLeastOnceDeliveryActor() { } @@ -246,6 +240,7 @@ namespace Akka.Persistence public override void AroundPreStart() { } protected override bool AroundReceive(Akka.Actor.Receive receive, object message) { } public void DeferAsync(TEvent evt, System.Action handler) { } + public void DeferAsync(TEvent evt, System.Func handler) { } public void DeleteMessages(long toSequenceNr) { } public void DeleteSnapshot(long sequenceNr) { } public void DeleteSnapshots(Akka.Persistence.SnapshotSelectionCriteria criteria) { } @@ -255,9 +250,21 @@ namespace Akka.Persistence protected virtual void OnRecoveryFailure(System.Exception reason, object message = null) { } protected virtual void OnReplaySuccess() { } public void Persist(TEvent @event, System.Action handler) { } + public void Persist(TEvent @event, System.Func handler) { } public void PersistAll(System.Collections.Generic.IEnumerable events, System.Action handler) { } + public void PersistAll(System.Collections.Generic.IEnumerable events, System.Action handler, System.Action onComplete) { } + public void PersistAll(System.Collections.Generic.IEnumerable events, System.Action handler, System.Func onCompleteAsync) { } + public void PersistAll(System.Collections.Generic.IEnumerable events, System.Func handler) { } + public void PersistAll(System.Collections.Generic.IEnumerable events, System.Func handler, System.Action onComplete) { } + public void PersistAll(System.Collections.Generic.IEnumerable events, System.Func handler, System.Func onCompleteAsync) { } public void PersistAllAsync(System.Collections.Generic.IEnumerable events, System.Action handler) { } + public void PersistAllAsync(System.Collections.Generic.IEnumerable events, System.Action handler, System.Action onComplete) { } + public void PersistAllAsync(System.Collections.Generic.IEnumerable events, System.Action handler, System.Func onCompleteAsync) { } + public void PersistAllAsync(System.Collections.Generic.IEnumerable events, System.Func handler) { } + public void PersistAllAsync(System.Collections.Generic.IEnumerable events, System.Func handler, System.Action onComplete) { } + public void PersistAllAsync(System.Collections.Generic.IEnumerable events, System.Func handler, System.Func onCompleteAsync) { } public void PersistAsync(TEvent @event, System.Action handler) { } + public void PersistAsync(TEvent @event, System.Func handler) { } protected abstract bool ReceiveCommand(object message); protected abstract bool ReceiveRecover(object message); protected void RunTask(System.Func action) { } @@ -272,11 +279,6 @@ namespace Akka.Persistence } public interface IJournalRequest : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IJournalMessage, Akka.Persistence.IPersistenceMessage { } public interface IJournalResponse : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IJournalMessage, Akka.Persistence.IPersistenceMessage { } - public interface IPendingHandlerInvocation - { - object Event { get; } - System.Action Handler { get; } - } public interface IPersistenceMessage : Akka.Actor.INoSerializationVerificationNeeded { } public interface IPersistenceRecovery { @@ -675,12 +677,6 @@ namespace Akka.Persistence public Akka.Persistence.PersistenceHealthCheckResult Result { get; } public override string ToString() { } } - public sealed class StashingHandlerInvocation : Akka.Persistence.IPendingHandlerInvocation - { - public StashingHandlerInvocation(object evt, System.Action handler) { } - public object Event { get; } - public System.Action Handler { get; } - } public sealed class ThrowExceptionConfigurator : Akka.Persistence.IStashOverflowStrategyConfigurator { public ThrowExceptionConfigurator() { } diff --git a/src/core/Akka.Persistence.Tests/PersistenceCompletionCallbackSpec.cs b/src/core/Akka.Persistence.Tests/PersistenceCompletionCallbackSpec.cs new file mode 100644 index 00000000000..c80787023ff --- /dev/null +++ b/src/core/Akka.Persistence.Tests/PersistenceCompletionCallbackSpec.cs @@ -0,0 +1,886 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.TestKit; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.Tests +{ + /// + /// Tests for persistence completion callbacks and async handler support. + /// + public class PersistenceCompletionCallbackSpec : PersistenceSpec + { + public PersistenceCompletionCallbackSpec(ITestOutputHelper output) + : base(Configuration("PersistenceCompletionCallbackSpec"), output) + { + } + + #region Test Actors + + private class TestEvent + { + public string Data { get; } + public TestEvent(string data) => Data = data; + } + + private class GetEvents + { + public static readonly GetEvents Instance = new(); + private GetEvents() { } + } + + private class GetCompletionOrder + { + public static readonly GetCompletionOrder Instance = new(); + private GetCompletionOrder() { } + } + + /// + /// Actor that tests PersistAll with sync completion callback + /// + private class PersistAllWithCompletionActor : UntypedPersistentActor + { + private readonly List _events = new(); + private readonly List _completionOrder = new(); + private readonly IActorRef _probe; + + public override string PersistenceId { get; } + + public PersistAllWithCompletionActor(string persistenceId, IActorRef probe) + { + PersistenceId = persistenceId; + _probe = probe; + } + + protected override void OnRecover(object message) + { + if (message is TestEvent evt) + _events.Add(evt.Data); + } + + protected override void OnCommand(object message) + { + switch (message) + { + case string[] events: + var testEvents = new List(); + foreach (var e in events) + testEvents.Add(new TestEvent(e)); + + PersistAll(testEvents, evt => + { + _events.Add(evt.Data); + _completionOrder.Add($"handler:{evt.Data}"); + }, () => + { + _completionOrder.Add("completion"); + _probe.Tell("completed"); + }); + break; + + case GetEvents: + Sender.Tell(_events.ToArray()); + break; + + case GetCompletionOrder: + Sender.Tell(_completionOrder.ToArray()); + break; + } + } + } + + /// + /// Actor that tests PersistAll with async completion callback + /// + private class PersistAllWithAsyncCompletionActor : UntypedPersistentActor + { + private readonly List _events = new(); + private readonly List _completionOrder = new(); + private readonly IActorRef _probe; + + public override string PersistenceId { get; } + + public PersistAllWithAsyncCompletionActor(string persistenceId, IActorRef probe) + { + PersistenceId = persistenceId; + _probe = probe; + } + + protected override void OnRecover(object message) + { + if (message is TestEvent evt) + _events.Add(evt.Data); + } + + protected override void OnCommand(object message) + { + switch (message) + { + case string[] events: + var testEvents = new List(); + foreach (var e in events) + testEvents.Add(new TestEvent(e)); + + PersistAll(testEvents, evt => + { + _events.Add(evt.Data); + _completionOrder.Add($"handler:{evt.Data}"); + }, async () => + { + await Task.Delay(10); + _completionOrder.Add("async-completion"); + _probe.Tell("completed"); + }); + break; + + case GetEvents: + Sender.Tell(_events.ToArray()); + break; + + case GetCompletionOrder: + Sender.Tell(_completionOrder.ToArray()); + break; + } + } + } + + /// + /// Actor that tests PersistAllAsync with sync completion callback + /// + private class PersistAllAsyncWithCompletionActor : UntypedPersistentActor + { + private readonly List _events = new(); + private readonly List _completionOrder = new(); + private readonly IActorRef _probe; + + public override string PersistenceId { get; } + + public PersistAllAsyncWithCompletionActor(string persistenceId, IActorRef probe) + { + PersistenceId = persistenceId; + _probe = probe; + } + + protected override void OnRecover(object message) + { + if (message is TestEvent evt) + _events.Add(evt.Data); + } + + protected override void OnCommand(object message) + { + switch (message) + { + case string[] events: + var testEvents = new List(); + foreach (var e in events) + testEvents.Add(new TestEvent(e)); + + PersistAllAsync(testEvents, evt => + { + _events.Add(evt.Data); + _completionOrder.Add($"handler:{evt.Data}"); + }, () => + { + _completionOrder.Add("completion"); + _probe.Tell("completed"); + }); + break; + + case GetEvents: + Sender.Tell(_events.ToArray()); + break; + + case GetCompletionOrder: + Sender.Tell(_completionOrder.ToArray()); + break; + } + } + } + + /// + /// Actor that tests Persist with async handler + /// + private class PersistWithAsyncHandlerActor : UntypedPersistentActor + { + private readonly List _events = new(); + private readonly List _completionOrder = new(); + private readonly IActorRef _probe; + + public override string PersistenceId { get; } + + public PersistWithAsyncHandlerActor(string persistenceId, IActorRef probe) + { + PersistenceId = persistenceId; + _probe = probe; + } + + protected override void OnRecover(object message) + { + if (message is TestEvent evt) + _events.Add(evt.Data); + } + + protected override void OnCommand(object message) + { + switch (message) + { + case string eventData: + Persist(new TestEvent(eventData), async evt => + { + await Task.Delay(10); + _events.Add(evt.Data); + _completionOrder.Add($"async-handler:{evt.Data}"); + _probe.Tell("handled"); + }); + break; + + case GetEvents: + Sender.Tell(_events.ToArray()); + break; + + case GetCompletionOrder: + Sender.Tell(_completionOrder.ToArray()); + break; + } + } + } + + /// + /// Actor that tests PersistAsync with async handler + /// + private class PersistAsyncWithAsyncHandlerActor : UntypedPersistentActor + { + private readonly List _events = new(); + private readonly List _completionOrder = new(); + private readonly IActorRef _probe; + + public override string PersistenceId { get; } + + public PersistAsyncWithAsyncHandlerActor(string persistenceId, IActorRef probe) + { + PersistenceId = persistenceId; + _probe = probe; + } + + protected override void OnRecover(object message) + { + if (message is TestEvent evt) + _events.Add(evt.Data); + } + + protected override void OnCommand(object message) + { + switch (message) + { + case string eventData: + PersistAsync(new TestEvent(eventData), async evt => + { + await Task.Delay(10); + _events.Add(evt.Data); + _completionOrder.Add($"async-handler:{evt.Data}"); + _probe.Tell("handled"); + }); + break; + + case GetEvents: + Sender.Tell(_events.ToArray()); + break; + + case GetCompletionOrder: + Sender.Tell(_completionOrder.ToArray()); + break; + } + } + } + + /// + /// Actor that tests DeferAsync with async handler + /// + private class DeferAsyncWithAsyncHandlerActor : UntypedPersistentActor + { + private readonly List _events = new(); + private readonly List _completionOrder = new(); + private readonly IActorRef _probe; + + public override string PersistenceId { get; } + + public DeferAsyncWithAsyncHandlerActor(string persistenceId, IActorRef probe) + { + PersistenceId = persistenceId; + _probe = probe; + } + + protected override void OnRecover(object message) + { + if (message is TestEvent evt) + _events.Add(evt.Data); + } + + protected override void OnCommand(object message) + { + switch (message) + { + case string[] events: + // First persist events, then defer async + var testEvents = new List(); + foreach (var e in events) + testEvents.Add(new TestEvent(e)); + + PersistAllAsync(testEvents, evt => + { + _events.Add(evt.Data); + _completionOrder.Add($"handler:{evt.Data}"); + }); + + DeferAsync("deferred", async _ => + { + await Task.Delay(10); + _completionOrder.Add("async-deferred"); + _probe.Tell("deferred"); + }); + break; + + case GetEvents: + Sender.Tell(_events.ToArray()); + break; + + case GetCompletionOrder: + Sender.Tell(_completionOrder.ToArray()); + break; + } + } + } + + /// + /// Actor that tests PersistAll with async handlers + /// + private class PersistAllWithAsyncHandlerActor : UntypedPersistentActor + { + private readonly List _events = new(); + private readonly List _completionOrder = new(); + private readonly IActorRef _probe; + + public override string PersistenceId { get; } + + public PersistAllWithAsyncHandlerActor(string persistenceId, IActorRef probe) + { + PersistenceId = persistenceId; + _probe = probe; + } + + protected override void OnRecover(object message) + { + if (message is TestEvent evt) + _events.Add(evt.Data); + } + + protected override void OnCommand(object message) + { + switch (message) + { + case string[] events: + var testEvents = new List(); + foreach (var e in events) + testEvents.Add(new TestEvent(e)); + + PersistAll(testEvents, async evt => + { + await Task.Delay(10); + _events.Add(evt.Data); + _completionOrder.Add($"async-handler:{evt.Data}"); + }, () => + { + _completionOrder.Add("completion"); + _probe.Tell("completed"); + }); + break; + + case GetEvents: + Sender.Tell(_events.ToArray()); + break; + + case GetCompletionOrder: + Sender.Tell(_completionOrder.ToArray()); + break; + } + } + } + + /// + /// Actor that tests stashing behavior - commands should be stashed during PersistAll + /// + private class StashingBehaviorTestActor : UntypedPersistentActor + { + private readonly List _commandOrder = new(); + private readonly IActorRef _probe; + + public override string PersistenceId { get; } + + public StashingBehaviorTestActor(string persistenceId, IActorRef probe) + { + PersistenceId = persistenceId; + _probe = probe; + } + + protected override void OnRecover(object message) { } + + protected override void OnCommand(object message) + { + switch (message) + { + case "persist": + _commandOrder.Add("persist-start"); + PersistAll(new[] { new TestEvent("a"), new TestEvent("b") }, evt => + { + _commandOrder.Add($"handler:{evt.Data}"); + }, () => + { + _commandOrder.Add("completion"); + }); + _commandOrder.Add("persist-end"); + break; + + case "other": + _commandOrder.Add("other-command"); + _probe.Tell("other-processed"); + break; + + case "get-order": + Sender.Tell(_commandOrder.ToArray()); + break; + } + } + } + + /// + /// Actor that tests sequential persist operations to verify ordering is maintained + /// even when empty events are involved + /// + private class SequentialPersistOrderingActor : UntypedPersistentActor + { + private readonly List _executionOrder = new(); + private readonly IActorRef _probe; + + public override string PersistenceId { get; } + + public SequentialPersistOrderingActor(string persistenceId, IActorRef probe) + { + PersistenceId = persistenceId; + _probe = probe; + } + + protected override void OnRecover(object message) { } + + protected override void OnCommand(object message) + { + switch (message) + { + // Test: Persist followed by PersistAll with empty events + // The empty PersistAll completion should run AFTER the Persist handler + case "persist-then-empty": + Persist(new TestEvent("first"), evt => + { + _executionOrder.Add($"persist-handler:{evt.Data}"); + }); + PersistAll(Array.Empty(), _ => { }, () => + { + _executionOrder.Add("empty-completion"); + _probe.Tell("done"); + }); + break; + + // Test: Multiple PersistAll calls where middle one is empty + case "persist-empty-persist": + PersistAll(new[] { new TestEvent("first") }, evt => + { + _executionOrder.Add($"first-handler:{evt.Data}"); + }, () => + { + _executionOrder.Add("first-completion"); + }); + PersistAll(Array.Empty(), _ => { }, () => + { + _executionOrder.Add("empty-completion"); + }); + PersistAll(new[] { new TestEvent("last") }, evt => + { + _executionOrder.Add($"last-handler:{evt.Data}"); + }, () => + { + _executionOrder.Add("last-completion"); + _probe.Tell("done"); + }); + break; + + case "get-order": + Sender.Tell(_executionOrder.ToArray()); + break; + } + } + } + + /// + /// Actor that tests empty event list with various completion callback overloads + /// + private class EmptyEventsWithCompletionActor : UntypedPersistentActor + { + private readonly IActorRef _probe; + private bool _completionCalled; + + public override string PersistenceId { get; } + + public EmptyEventsWithCompletionActor(string persistenceId, IActorRef probe) + { + PersistenceId = persistenceId; + _probe = probe; + } + + protected override void OnRecover(object message) { } + + protected override void OnCommand(object message) + { + switch (message) + { + // PersistAll with sync completion callback + case "persist-empty-sync": + PersistAll(Array.Empty(), _ => { }, () => + { + _completionCalled = true; + _probe.Tell("completed"); + }); + break; + + // PersistAll with async completion callback + case "persist-empty-async": + PersistAll(Array.Empty(), _ => { }, async () => + { + await Task.Yield(); + _completionCalled = true; + _probe.Tell("completed"); + }); + break; + + // PersistAllAsync with sync completion callback + case "persist-async-empty-sync": + PersistAllAsync(Array.Empty(), _ => { }, () => + { + _completionCalled = true; + _probe.Tell("completed"); + }); + break; + + // PersistAllAsync with async completion callback + case "persist-async-empty-async": + PersistAllAsync(Array.Empty(), _ => { }, async () => + { + await Task.Yield(); + _completionCalled = true; + _probe.Tell("completed"); + }); + break; + + case "check": + Sender.Tell(_completionCalled); + break; + + case "reset": + _completionCalled = false; + Sender.Tell("reset-done"); + break; + } + } + } + + #endregion + + #region Tests + + [Fact(DisplayName = "PersistAll with sync completion callback should invoke callback after all handlers")] + public async Task PersistAll_WithSyncCompletion_Should_InvokeAfterAllHandlers() + { + var probe = CreateTestProbe(); + var actor = ActorOf(Props.Create(() => + new PersistAllWithCompletionActor(Name, probe))); + + actor.Tell(new[] { "event1", "event2", "event3" }); + await probe.ExpectMsgAsync("completed"); + + actor.Tell(GetCompletionOrder.Instance); + var order = await ExpectMsgAsync(); + + order.Should().BeEquivalentTo(new[] + { + "handler:event1", + "handler:event2", + "handler:event3", + "completion" + }, options => options.WithStrictOrdering()); + } + + [Fact(DisplayName = "PersistAll with async completion callback should invoke callback after all handlers")] + public async Task PersistAll_WithAsyncCompletion_Should_InvokeAfterAllHandlers() + { + var probe = CreateTestProbe(); + var actor = ActorOf(Props.Create(() => + new PersistAllWithAsyncCompletionActor(Name, probe))); + + actor.Tell(new[] { "event1", "event2", "event3" }); + await probe.ExpectMsgAsync("completed"); + + actor.Tell(GetCompletionOrder.Instance); + var order = await ExpectMsgAsync(); + + order.Should().BeEquivalentTo(new[] + { + "handler:event1", + "handler:event2", + "handler:event3", + "async-completion" + }, options => options.WithStrictOrdering()); + } + + [Fact(DisplayName = "PersistAllAsync with sync completion callback should invoke callback after all handlers")] + public async Task PersistAllAsync_WithSyncCompletion_Should_InvokeAfterAllHandlers() + { + var probe = CreateTestProbe(); + var actor = ActorOf(Props.Create(() => + new PersistAllAsyncWithCompletionActor(Name, probe))); + + actor.Tell(new[] { "event1", "event2", "event3" }); + await probe.ExpectMsgAsync("completed"); + + actor.Tell(GetCompletionOrder.Instance); + var order = await ExpectMsgAsync(); + + order.Should().BeEquivalentTo(new[] + { + "handler:event1", + "handler:event2", + "handler:event3", + "completion" + }, options => options.WithStrictOrdering()); + } + + [Fact(DisplayName = "Persist with async handler should execute handler asynchronously")] + public async Task Persist_WithAsyncHandler_Should_ExecuteAsynchronously() + { + var probe = CreateTestProbe(); + var actor = ActorOf(Props.Create(() => + new PersistWithAsyncHandlerActor(Name, probe))); + + actor.Tell("event1"); + await probe.ExpectMsgAsync("handled"); + + actor.Tell(GetCompletionOrder.Instance); + var order = await ExpectMsgAsync(); + + order.Should().Contain("async-handler:event1"); + } + + [Fact(DisplayName = "PersistAsync with async handler should execute handler asynchronously")] + public async Task PersistAsync_WithAsyncHandler_Should_ExecuteAsynchronously() + { + var probe = CreateTestProbe(); + var actor = ActorOf(Props.Create(() => + new PersistAsyncWithAsyncHandlerActor(Name, probe))); + + actor.Tell("event1"); + await probe.ExpectMsgAsync("handled"); + + actor.Tell(GetCompletionOrder.Instance); + var order = await ExpectMsgAsync(); + + order.Should().Contain("async-handler:event1"); + } + + [Fact(DisplayName = "DeferAsync with async handler should execute after pending invocations")] + public async Task DeferAsync_WithAsyncHandler_Should_ExecuteAfterPending() + { + var probe = CreateTestProbe(); + var actor = ActorOf(Props.Create(() => + new DeferAsyncWithAsyncHandlerActor(Name, probe))); + + actor.Tell(new[] { "event1", "event2" }); + await probe.ExpectMsgAsync("deferred"); + + actor.Tell(GetCompletionOrder.Instance); + var order = await ExpectMsgAsync(); + + order.Should().BeEquivalentTo(new[] + { + "handler:event1", + "handler:event2", + "async-deferred" + }, options => options.WithStrictOrdering()); + } + + [Fact(DisplayName = "PersistAll with async handlers should execute handlers and completion in order")] + public async Task PersistAll_WithAsyncHandlers_Should_ExecuteInOrder() + { + var probe = CreateTestProbe(); + var actor = ActorOf(Props.Create(() => + new PersistAllWithAsyncHandlerActor(Name, probe))); + + actor.Tell(new[] { "event1", "event2" }); + await probe.ExpectMsgAsync("completed"); + + actor.Tell(GetCompletionOrder.Instance); + var order = await ExpectMsgAsync(); + + order.Should().BeEquivalentTo(new[] + { + "async-handler:event1", + "async-handler:event2", + "completion" + }, options => options.WithStrictOrdering()); + } + + [Fact(DisplayName = "PersistAll should stash commands until completion callback finishes")] + public async Task PersistAll_Should_StashCommandsUntilCompletion() + { + var probe = CreateTestProbe(); + var actor = ActorOf(Props.Create(() => + new StashingBehaviorTestActor(Name, probe))); + + // Send persist command followed immediately by another command + actor.Tell("persist"); + actor.Tell("other"); + + // Wait for the other command to be processed (after completion) + await probe.ExpectMsgAsync("other-processed"); + + actor.Tell("get-order"); + var order = await ExpectMsgAsync(); + + // The "other" command should be processed after the completion callback + order.Should().BeEquivalentTo(new[] + { + "persist-start", + "persist-end", + "handler:a", + "handler:b", + "completion", + "other-command" + }, options => options.WithStrictOrdering()); + } + + [Fact(DisplayName = "PersistAll with empty events and sync completion should invoke completion callback immediately")] + public async Task PersistAll_WithEmptyEvents_SyncCompletion_Should_InvokeCompletionImmediately() + { + var probe = CreateTestProbe(); + var actor = ActorOf(Props.Create(() => + new EmptyEventsWithCompletionActor(Name, probe))); + + actor.Tell("persist-empty-sync"); + await probe.ExpectMsgAsync("completed"); + + actor.Tell("check"); + var completionCalled = await ExpectMsgAsync(); + completionCalled.Should().BeTrue(); + } + + [Fact(DisplayName = "PersistAll with empty events and async completion should invoke completion callback immediately")] + public async Task PersistAll_WithEmptyEvents_AsyncCompletion_Should_InvokeCompletionImmediately() + { + var probe = CreateTestProbe(); + var actor = ActorOf(Props.Create(() => + new EmptyEventsWithCompletionActor(Name, probe))); + + actor.Tell("persist-empty-async"); + await probe.ExpectMsgAsync("completed"); + + actor.Tell("check"); + var completionCalled = await ExpectMsgAsync(); + completionCalled.Should().BeTrue(); + } + + [Fact(DisplayName = "PersistAllAsync with empty events and sync completion should invoke completion callback immediately")] + public async Task PersistAllAsync_WithEmptyEvents_SyncCompletion_Should_InvokeCompletionImmediately() + { + var probe = CreateTestProbe(); + var actor = ActorOf(Props.Create(() => + new EmptyEventsWithCompletionActor(Name, probe))); + + actor.Tell("persist-async-empty-sync"); + await probe.ExpectMsgAsync("completed"); + + actor.Tell("check"); + var completionCalled = await ExpectMsgAsync(); + completionCalled.Should().BeTrue(); + } + + [Fact(DisplayName = "PersistAllAsync with empty events and async completion should invoke completion callback immediately")] + public async Task PersistAllAsync_WithEmptyEvents_AsyncCompletion_Should_InvokeCompletionImmediately() + { + var probe = CreateTestProbe(); + var actor = ActorOf(Props.Create(() => + new EmptyEventsWithCompletionActor(Name, probe))); + + actor.Tell("persist-async-empty-async"); + await probe.ExpectMsgAsync("completed"); + + actor.Tell("check"); + var completionCalled = await ExpectMsgAsync(); + completionCalled.Should().BeTrue(); + } + + [Fact(DisplayName = "Persist followed by PersistAll with empty events should maintain execution order")] + public async Task Persist_ThenEmptyPersistAll_Should_MaintainOrder() + { + var probe = CreateTestProbe(); + var actor = ActorOf(Props.Create(() => + new SequentialPersistOrderingActor(Name, probe))); + + actor.Tell("persist-then-empty"); + await probe.ExpectMsgAsync("done"); + + actor.Tell("get-order"); + var order = await ExpectMsgAsync(); + + // The empty PersistAll completion must run AFTER the Persist handler + order.Should().BeEquivalentTo(new[] + { + "persist-handler:first", + "empty-completion" + }, options => options.WithStrictOrdering()); + } + + [Fact(DisplayName = "Sequential PersistAll with empty events in middle should maintain execution order")] + public async Task SequentialPersistAll_WithEmptyInMiddle_Should_MaintainOrder() + { + var probe = CreateTestProbe(); + var actor = ActorOf(Props.Create(() => + new SequentialPersistOrderingActor(Name, probe))); + + actor.Tell("persist-empty-persist"); + await probe.ExpectMsgAsync("done"); + + actor.Tell("get-order"); + var order = await ExpectMsgAsync(); + + // All callbacks should execute in the order they were queued + order.Should().BeEquivalentTo(new[] + { + "first-handler:first", + "first-completion", + "empty-completion", + "last-handler:last", + "last-completion" + }, options => options.WithStrictOrdering()); + } + + #endregion + } +} diff --git a/src/core/Akka.Persistence/Eventsourced.Recovery.cs b/src/core/Akka.Persistence/Eventsourced.Recovery.cs index 4a1a1c59d5a..34600abbc30 100644 --- a/src/core/Akka.Persistence/Eventsourced.Recovery.cs +++ b/src/core/Akka.Persistence/Eventsourced.Recovery.cs @@ -383,7 +383,7 @@ private EventsourcedState PersistingEvents() // enables an early return to `processingCommands`, because if this counter hits `0`, // we know the remaining pendingInvocations are all `persistAsync` created, which // means we can go back to processing commands also - and these callbacks will be called as soon as possible - if (invocation is StashingHandlerInvocation) + if (invocation is IStashingInvocation) _pendingStashingPersistInvocations--; if (_pendingStashingPersistInvocations == 0) @@ -398,15 +398,54 @@ private EventsourcedState PersistingEvents() }); } - private void PeekApplyHandler(object payload) + /// + /// Applies the handler for the first pending invocation. + /// For sync handlers, invokes directly. For async handlers, uses RunTask. + /// + /// The event payload to pass to the handler. + /// Callback invoked when the handler completes (true if error). + private void PeekApplyHandler(object payload, Action onComplete) { - try + var invocation = _pendingInvocations.First.Value; + + if (invocation is IAsyncHandlerInvocation asyncInv) { - _pendingInvocations.First.Value.Handler(payload); + // Async handler - run via RunTask + RunTask(async () => + { + try + { + await asyncInv.AsyncHandler(payload); + onComplete(false); + } + catch + { + onComplete(true); + throw; + } + finally + { + FlushBatch(); + } + }); } - finally + else if (invocation is ISyncHandlerInvocation syncInv) { - FlushBatch(); + // Sync handler - invoke directly + try + { + syncInv.Handler(payload); + onComplete(false); + } + catch + { + onComplete(true); + throw; + } + finally + { + FlushBatch(); + } } } @@ -421,16 +460,7 @@ private bool CommonProcessingStateBehavior(object message, Action onWriteM if (m1.ActorInstanceId == _instanceId) { UpdateLastSequenceNr(m1.Persistent); - try - { - PeekApplyHandler(m1.Persistent.Payload); - onWriteMessageComplete(false); - } - catch - { - onWriteMessageComplete(true); - throw; - } + PeekApplyHandler(m1.Persistent.Payload, onWriteMessageComplete); } break; @@ -469,16 +499,7 @@ private bool CommonProcessingStateBehavior(object message, Action onWriteM { if (m.ActorInstanceId == _instanceId) { - try - { - PeekApplyHandler(m.Message); - onWriteMessageComplete(false); - } - catch (Exception) - { - onWriteMessageComplete(true); - throw; - } + PeekApplyHandler(m.Message, onWriteMessageComplete); } break; diff --git a/src/core/Akka.Persistence/Eventsourced.cs b/src/core/Akka.Persistence/Eventsourced.cs index 252addaf197..8e1d8ca6ce2 100644 --- a/src/core/Akka.Persistence/Eventsourced.cs +++ b/src/core/Akka.Persistence/Eventsourced.cs @@ -16,16 +16,43 @@ namespace Akka.Persistence { - public interface IPendingHandlerInvocation + /// + /// Base interface for pending handler invocations. + /// + internal interface IPendingHandlerInvocation { object Event { get; } + } + + /// + /// Interface for invocations with synchronous handlers. + /// + internal interface ISyncHandlerInvocation : IPendingHandlerInvocation + { Action Handler { get; } } + /// + /// Interface for invocations with asynchronous handlers. + /// + internal interface IAsyncHandlerInvocation : IPendingHandlerInvocation + { + Func AsyncHandler { get; } + } + + /// + /// Marker interface for stashing invocations that increment the stashing counter. + /// + internal interface IStashingInvocation : IPendingHandlerInvocation + { + } + /// /// Forces actor to stash incoming commands until all invocations are handled. + /// Used by and + /// . /// - public sealed class StashingHandlerInvocation : IPendingHandlerInvocation + internal sealed class StashingHandlerInvocation : ISyncHandlerInvocation, IStashingInvocation { public StashingHandlerInvocation(object evt, Action handler) { @@ -34,16 +61,32 @@ public StashingHandlerInvocation(object evt, Action handler) } public object Event { get; } - public Action Handler { get; } } + /// + /// Stashing invocation with an asynchronous handler. + /// Used by and + /// . + /// + internal sealed class StashingAsyncHandlerInvocation : IAsyncHandlerInvocation, IStashingInvocation + { + public StashingAsyncHandlerInvocation(object evt, Func asyncHandler) + { + Event = evt; + AsyncHandler = asyncHandler; + } + + public object Event { get; } + public Func AsyncHandler { get; } + } + /// /// Unlike this one does not force actor to stash commands. /// Originates from - /// or method calls. + /// or method calls. /// - public sealed class AsyncHandlerInvocation : IPendingHandlerInvocation + internal sealed class AsyncHandlerInvocation : ISyncHandlerInvocation { public AsyncHandlerInvocation(object evt, Action handler) { @@ -52,10 +95,26 @@ public AsyncHandlerInvocation(object evt, Action handler) } public object Event { get; } - public Action Handler { get; } } + /// + /// Non-stashing invocation with an asynchronous handler. + /// Used by and + /// . + /// + internal sealed class AsyncAsyncHandlerInvocation : IAsyncHandlerInvocation + { + public AsyncAsyncHandlerInvocation(object evt, Func asyncHandler) + { + Event = evt; + AsyncHandler = asyncHandler; + } + + public object Event { get; } + public Func AsyncHandler { get; } + } + /// /// Message used to detect that recovery timed out. /// @@ -310,6 +369,27 @@ public void Persist(TEvent @event, Action handler) sequenceNr: NextSequenceNr(), writerGuid: _writerGuid, sender: Sender))); } + /// + /// Asynchronously persists an with an async handler. + /// This method guarantees that no new commands will be received by a persistent actor + /// between a call to and execution of its handler. + /// + /// The event type. + /// The event to persist. + /// The async handler to invoke after persistence. + public void Persist(TEvent @event, Func handler) + { + if (IsRecovering) + { + throw new InvalidOperationException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later."); + } + + _pendingStashingPersistInvocations++; + _pendingInvocations.AddLast(new StashingAsyncHandlerInvocation(@event, o => handler((TEvent)o))); + _eventBatch.AddLast(new AtomicWrite(new Persistent(@event, persistenceId: PersistenceId, + sequenceNr: NextSequenceNr(), writerGuid: _writerGuid, sender: Sender))); + } + /// /// Asynchronously persists series of in specified order. /// This is equivalent of multiple calls of calls @@ -341,6 +421,129 @@ public void PersistAll(IEnumerable events, Action handle _eventBatch.AddLast(new AtomicWrite(persistents.ToImmutable())); } + /// + /// Asynchronously persists series of in specified order with a completion callback. + /// The callback is invoked after all events have been persisted and their handlers executed. + /// This method guarantees that no new commands will be received until all handlers and the completion callback have finished. + /// + /// The event type. + /// The events to persist. + /// The handler to invoke for each persisted event. + /// The callback to invoke after all events have been persisted and handled. + public void PersistAll(IEnumerable events, Action handler, Action onComplete) + { + if (events == null || !events.Any()) + { + if (onComplete != null) + Defer(null, _ => onComplete()); + return; + } + + PersistAll(events, handler); + if (onComplete != null) + Defer(null, _ => onComplete()); + } + + /// + /// Asynchronously persists series of in specified order with an async completion callback. + /// The callback is invoked after all events have been persisted and their handlers executed. + /// This method guarantees that no new commands will be received until all handlers and the completion callback have finished. + /// + /// The event type. + /// The events to persist. + /// The handler to invoke for each persisted event. + /// The async callback to invoke after all events have been persisted and handled. + public void PersistAll(IEnumerable events, Action handler, Func onCompleteAsync) + { + if (events == null || !events.Any()) + { + if (onCompleteAsync != null) + Defer(null, async _ => await onCompleteAsync()); + return; + } + + PersistAll(events, handler); + if (onCompleteAsync != null) + Defer(null, async _ => await onCompleteAsync()); + } + + /// + /// Asynchronously persists series of with an async handler. + /// This method guarantees that no new commands will be received until all handlers have finished. + /// + /// The event type. + /// The events to persist. + /// The async handler to invoke for each persisted event. + public void PersistAll(IEnumerable events, Func handler) + { + if (IsRecovering) + { + throw new InvalidOperationException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later."); + } + + if (events == null) return; + + Func Inv(Func h) => o => h((TEvent)o); + var asyncInv = Inv(handler); + var persistents = ImmutableList.CreateBuilder(); + foreach (var @event in events) + { + _pendingStashingPersistInvocations++; + _pendingInvocations.AddLast(new StashingAsyncHandlerInvocation(@event, asyncInv)); + persistents.Add(new Persistent(@event, persistenceId: PersistenceId, + sequenceNr: NextSequenceNr(), writerGuid: _writerGuid, sender: Sender)); + } + + if (persistents.Count > 0) + _eventBatch.AddLast(new AtomicWrite(persistents.ToImmutable())); + } + + /// + /// Asynchronously persists series of with an async handler and completion callback. + /// The callback is invoked after all events have been persisted and their handlers executed. + /// This method guarantees that no new commands will be received until all handlers and the completion callback have finished. + /// + /// The event type. + /// The events to persist. + /// The async handler to invoke for each persisted event. + /// The callback to invoke after all events have been persisted and handled. + public void PersistAll(IEnumerable events, Func handler, Action onComplete) + { + if (events == null || !events.Any()) + { + if (onComplete != null) + Defer(null, _ => onComplete()); + return; + } + + PersistAll(events, handler); + if (onComplete != null) + Defer(null, _ => onComplete()); + } + + /// + /// Asynchronously persists series of with an async handler and async completion callback. + /// The callback is invoked after all events have been persisted and their handlers executed. + /// This method guarantees that no new commands will be received until all handlers and the completion callback have finished. + /// + /// The event type. + /// The events to persist. + /// The async handler to invoke for each persisted event. + /// The async callback to invoke after all events have been persisted and handled. + public void PersistAll(IEnumerable events, Func handler, Func onCompleteAsync) + { + if (events == null || !events.Any()) + { + if (onCompleteAsync != null) + Defer(null, async _ => await onCompleteAsync()); + return; + } + + PersistAll(events, handler); + if (onCompleteAsync != null) + Defer(null, async _ => await onCompleteAsync()); + } + /// /// Asynchronously persists an . On successful persistence, the /// is called with the persisted event. Unlike method, @@ -381,6 +584,26 @@ public void PersistAsync(TEvent @event, Action handler) sequenceNr: NextSequenceNr(), writerGuid: _writerGuid, sender: Sender))); } + /// + /// Asynchronously persists an with an async handler. + /// Unlike , this method allows + /// commands to be processed between the persist call and handler execution. + /// + /// The event type. + /// The event to persist. + /// The async handler to invoke after persistence. + public void PersistAsync(TEvent @event, Func handler) + { + if (IsRecovering) + { + throw new InvalidOperationException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later."); + } + + _pendingInvocations.AddLast(new AsyncAsyncHandlerInvocation(@event, o => handler((TEvent)o))); + _eventBatch.AddLast(new AtomicWrite(new Persistent(@event, persistenceId: PersistenceId, + sequenceNr: NextSequenceNr(), writerGuid: _writerGuid, sender: Sender))); + } + /// /// Asynchronously persists series of in specified order. /// This is equivalent of multiple calls of calls @@ -408,12 +631,136 @@ public void PersistAllAsync(IEnumerable events, Action h .ToImmutableList())); } + /// + /// Asynchronously persists series of in specified order with a completion callback. + /// Unlike , this method allows + /// commands to be processed between event handler executions. + /// The callback is invoked after all events have been persisted and their handlers executed. + /// + /// The event type. + /// The events to persist. + /// The handler to invoke for each persisted event. + /// The callback to invoke after all events have been persisted and handled. + public void PersistAllAsync(IEnumerable events, Action handler, Action onComplete) + { + if (events == null || !events.Any()) + { + if (onComplete != null) + DeferAsync(null, _ => onComplete()); + return; + } + + PersistAllAsync(events, handler); + if (onComplete != null) + DeferAsync(null, _ => onComplete()); + } + + /// + /// Asynchronously persists series of in specified order with an async completion callback. + /// Unlike , this method allows + /// commands to be processed between event handler executions. + /// The callback is invoked after all events have been persisted and their handlers executed. + /// + /// The event type. + /// The events to persist. + /// The handler to invoke for each persisted event. + /// The async callback to invoke after all events have been persisted and handled. + public void PersistAllAsync(IEnumerable events, Action handler, Func onCompleteAsync) + { + if (events == null || !events.Any()) + { + if (onCompleteAsync != null) + DeferAsync(null, async _ => await onCompleteAsync()); + return; + } + + PersistAllAsync(events, handler); + if (onCompleteAsync != null) + DeferAsync(null, async _ => await onCompleteAsync()); + } + + /// + /// Asynchronously persists series of with an async handler. + /// Unlike , this method allows + /// commands to be processed between event handler executions. + /// + /// The event type. + /// The events to persist. + /// The async handler to invoke for each persisted event. + public void PersistAllAsync(IEnumerable events, Func handler) + { + if (IsRecovering) + { + throw new InvalidOperationException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later."); + } + + Func Inv(Func h) => o => h((TEvent)o); + var asyncInv = Inv(handler); + var enumerable = events as TEvent[] ?? events.ToArray(); + foreach (var @event in enumerable) + { + _pendingInvocations.AddLast(new AsyncAsyncHandlerInvocation(@event, asyncInv)); + } + + _eventBatch.AddLast(new AtomicWrite(enumerable.Select(e => new Persistent(e, persistenceId: PersistenceId, + sequenceNr: NextSequenceNr(), writerGuid: _writerGuid, sender: Sender)) + .ToImmutableList())); + } + + /// + /// Asynchronously persists series of with an async handler and completion callback. + /// Unlike , this method allows + /// commands to be processed between event handler executions. + /// The callback is invoked after all events have been persisted and their handlers executed. + /// + /// The event type. + /// The events to persist. + /// The async handler to invoke for each persisted event. + /// The callback to invoke after all events have been persisted and handled. + public void PersistAllAsync(IEnumerable events, Func handler, Action onComplete) + { + if (events == null || !events.Any()) + { + if (onComplete != null) + DeferAsync(null, _ => onComplete()); + return; + } + + PersistAllAsync(events, handler); + if (onComplete != null) + DeferAsync(null, _ => onComplete()); + } + + /// + /// Asynchronously persists series of with an async handler and async completion callback. + /// Unlike , this method allows + /// commands to be processed between event handler executions. + /// The callback is invoked after all events have been persisted and their handlers executed. + /// + /// The event type. + /// The events to persist. + /// The async handler to invoke for each persisted event. + /// The async callback to invoke after all events have been persisted and handled. + public void PersistAllAsync(IEnumerable events, Func handler, Func onCompleteAsync) + { + if (events == null || !events.Any()) + { + if (onCompleteAsync != null) + DeferAsync(null, async _ => await onCompleteAsync()); + return; + } + + PersistAllAsync(events, handler); + if (onCompleteAsync != null) + DeferAsync(null, async _ => await onCompleteAsync()); + } + /// /// Defer the execution until all pending handlers have been executed. /// Allows to define logic within the actor, which will respect the invocation-order-guarantee /// in respect to calls. /// That is, if was invoked before - /// , the corresponding handlers will be + /// , the corresponding handlers will be /// invoked in the same order as they were registered in. /// /// This call will NOT result in being persisted, use @@ -447,6 +794,79 @@ public void DeferAsync(TEvent evt, Action handler) } } + /// + /// Defer the execution until all pending handlers have been executed. + /// This is the async variant that accepts an async handler. + /// + /// This call will NOT result in being persisted. + /// + /// If there are no pending persist handler calls, the will be called immediately + /// via . + /// + /// If persistence of an earlier event fails, the persistent actor will stop, and the + /// will not be run. + /// + /// The event type. + /// The event to pass to the handler. + /// The async handler to invoke. + public void DeferAsync(TEvent evt, Func handler) + { + if (IsRecovering) + { + throw new InvalidOperationException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later."); + } + + if (_pendingInvocations.Count == 0) + { + RunTask(() => handler(evt)); + } + else + { + _pendingInvocations.AddLast(new AsyncAsyncHandlerInvocation(evt, o => handler((TEvent)o))); + _eventBatch.AddLast(new NonPersistentMessage(evt, Sender)); + } + } + + /// + /// Internal stashing variant of Defer. Increments _pendingStashingPersistInvocations + /// to ensure commands remain stashed until this handler completes. + /// Used internally for completion callbacks on . + /// + /// The event type. + /// The event to pass to the handler. + /// The handler to invoke. + internal void Defer(TEvent evt, Action handler) + { + if (IsRecovering) + { + throw new InvalidOperationException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later."); + } + + _pendingStashingPersistInvocations++; + _pendingInvocations.AddLast(new StashingHandlerInvocation(evt, o => handler((TEvent)o))); + _eventBatch.AddLast(new NonPersistentMessage(evt, Sender)); + } + + /// + /// Internal stashing variant of Defer with async handler. Increments _pendingStashingPersistInvocations + /// to ensure commands remain stashed until this handler completes. + /// Used internally for async completion callbacks on . + /// + /// The event type. + /// The event to pass to the handler. + /// The async handler to invoke. + internal void Defer(TEvent evt, Func handler) + { + if (IsRecovering) + { + throw new InvalidOperationException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later."); + } + + _pendingStashingPersistInvocations++; + _pendingInvocations.AddLast(new StashingAsyncHandlerInvocation(evt, o => handler((TEvent)o))); + _eventBatch.AddLast(new NonPersistentMessage(evt, Sender)); + } + /// /// Permanently deletes all persistent messages with sequence numbers less than or equal . /// If the delete is successful a will be sent to the actor.