Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,102 @@ public RestartMessage(object message)
}
}

/// <summary>
/// Test actor that uses DeferAsync with an async handler inside CommandAsync.
/// This reproduces the bug where DeferAsync throws "RunTask calls cannot be nested"
/// when called from within a CommandAsync handler after an await, with no pending persist operations.
/// See: https://github.com/akkadotnet/akka.net/issues/7998
/// </summary>
public sealed class DeferAsyncFromCommandAsyncActor : ReceivePersistentActor
{
public override string PersistenceId { get; }

public DeferAsyncFromCommandAsyncActor(string persistenceId)
{
PersistenceId = persistenceId;

RecoverAny(_ => { });

// This pattern triggers the bug: CommandAsync -> await -> DeferAsync(async handler)
// with no prior Persist/PersistAsync calls
CommandAsync<string>(async msg =>
{
// Any await causes us to be in a RunTask context
await Task.Delay(10);

// DeferAsync with async handler, no pending persist operations
// BUG: This throws "RunTask calls cannot be nested" because
// _pendingInvocations.Count == 0 triggers immediate RunTask execution
DeferAsync(msg, async evt =>
{
await Task.CompletedTask;
Sender.Tell("deferred-" + evt);
});
});
}
}

/// <summary>
/// Test actor that uses DeferAsync with a sync handler inside CommandAsync.
/// This should work correctly (sync handler doesn't use RunTask).
/// </summary>
public sealed class DeferAsyncSyncHandlerFromCommandAsyncActor : ReceivePersistentActor
{
public override string PersistenceId { get; }

public DeferAsyncSyncHandlerFromCommandAsyncActor(string persistenceId)
{
PersistenceId = persistenceId;

RecoverAny(_ => { });

CommandAsync<string>(async msg =>
{
await Task.Delay(10);

// DeferAsync with sync handler - this should work
DeferAsync(msg, evt =>
{
Sender.Tell("deferred-" + evt);
});
});
}
}

/// <summary>
/// Test actor that uses DeferAsync with an async handler after PersistAsync.
/// This should work because _pendingInvocations.Count > 0.
/// </summary>
public sealed class DeferAsyncAfterPersistAsyncActor : ReceivePersistentActor
{
public override string PersistenceId { get; }

public DeferAsyncAfterPersistAsyncActor(string persistenceId)
{
PersistenceId = persistenceId;

RecoverAny(_ => { });

CommandAsync<string>(async msg =>
{
await Task.Delay(10);

// PersistAsync first - this populates _pendingInvocations
PersistAsync(msg, evt =>
{
Sender.Tell("persisted-" + evt);
});

// DeferAsync with async handler - should queue and work
DeferAsync(msg, async evt =>
{
await Task.CompletedTask;
Sender.Tell("deferred-" + evt);
});
});
}
}

public class ReceivePersistentActorAsyncAwaitSpec : AkkaSpec
{
public ReceivePersistentActorAsyncAwaitSpec(ITestOutputHelper output = null)
Expand Down Expand Up @@ -677,6 +773,52 @@ public Task Actor_receiveasync_overloads_should_work()
"Expected 'handled' for double via CommandAsync(typeof(double))");
return Task.CompletedTask;
}

/// <summary>
/// Regression test for https://github.com/akkadotnet/akka.net/issues/7998
/// DeferAsync with async handler should work when called from CommandAsync,
/// even without prior Persist/PersistAsync calls.
/// </summary>
[Fact(DisplayName = "DeferAsync with async handler should work from CommandAsync without prior persist calls")]
public async Task DeferAsync_with_async_handler_should_work_from_CommandAsync_without_prior_persist_calls()
{
var actor = Sys.ActorOf(Props.Create(() => new DeferAsyncFromCommandAsyncActor("defer-async-pid")));

actor.Tell("hello");
var response = await ExpectMsgAsync<string>(TimeSpan.FromSeconds(5));
response.ShouldBe("deferred-hello");
}

/// <summary>
/// Verify that DeferAsync with sync handler still works from CommandAsync.
/// </summary>
[Fact(DisplayName = "DeferAsync with sync handler should work from CommandAsync")]
public async Task DeferAsync_with_sync_handler_should_work_from_CommandAsync()
{
var actor = Sys.ActorOf(Props.Create(() => new DeferAsyncSyncHandlerFromCommandAsyncActor("defer-sync-pid")));

actor.Tell("world");
var response = await ExpectMsgAsync<string>(TimeSpan.FromSeconds(5));
response.ShouldBe("deferred-world");
}

/// <summary>
/// Verify that DeferAsync with async handler works after PersistAsync
/// (this path was already working since _pendingInvocations.Count > 0).
/// </summary>
[Fact(DisplayName = "DeferAsync with async handler should work after PersistAsync")]
public async Task DeferAsync_with_async_handler_should_work_after_PersistAsync()
{
var actor = Sys.ActorOf(Props.Create(() => new DeferAsyncAfterPersistAsyncActor("defer-after-persist-pid")));

actor.Tell("test");
// Should receive both persist and defer responses in order
var first = await ExpectMsgAsync<string>(TimeSpan.FromSeconds(5));
first.ShouldBe("persisted-test");

var second = await ExpectMsgAsync<string>(TimeSpan.FromSeconds(5));
second.ShouldBe("deferred-test");
}
}
}

21 changes: 10 additions & 11 deletions src/core/Akka.Persistence/Eventsourced.cs
Original file line number Diff line number Diff line change
Expand Up @@ -800,8 +800,8 @@ public void DeferAsync<TEvent>(TEvent evt, Action<TEvent> handler)
///
/// This call will NOT result in <paramref name="evt"/> being persisted.
///
/// If there are no pending persist handler calls, the <paramref name="handler"/> will be called immediately
/// via <see cref="RunTask"/>.
/// The handler is always queued and will be executed after the current command handler completes.
/// This avoids "RunTask calls cannot be nested" errors when called from within CommandAsync handlers.
///
/// If persistence of an earlier event fails, the persistent actor will stop, and the
/// <paramref name="handler"/> will not be run.
Expand All @@ -816,15 +816,14 @@ public void DeferAsync<TEvent>(TEvent evt, Func<TEvent, Task> handler)
throw new InvalidOperationException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.");
}

if (_pendingInvocations.Count == 0)
{
RunTask(() => handler(evt));
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was my bad - added this as part of #7937 , but this was wrong

}
else
{
_pendingInvocations.AddLast(new AsyncAsyncHandlerInvocation(evt, o => handler((TEvent)o)));
_eventBatch.AddLast(new NonPersistentMessage(evt, Sender));
}
// Always queue the async handler - do not use RunTask directly here.
// This avoids "RunTask calls cannot be nested" errors when DeferAsync
// is called from within a CommandAsync handler (which is already in a RunTask context).
// The handler will be executed via PeekApplyHandler when the batch is processed
// after the command handler completes.
// See: https://github.com/akkadotnet/akka.net/issues/7998
_pendingInvocations.AddLast(new AsyncAsyncHandlerInvocation(evt, o => handler((TEvent)o)));
_eventBatch.AddLast(new NonPersistentMessage(evt, Sender));
}

/// <summary>
Expand Down