From 8aa296c744202c00014edfd50e893c73ea1cf4e1 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 22 Aug 2025 02:20:36 +0700 Subject: [PATCH] Port #7772 - TestKit: synchronous TestActor start * Force synchronous start for `TestActor` fix #7770 * separate creation of implicit, default `TestActor` from additional ones * force `TestActor` to start via CTD tweak instead * don't wait for `TestActor` to start * Revert "don't wait for `TestActor` to start" This reverts commit bdd77f9. * run default `TestActor` without `CallingThreadDispatcher` * fix TestKit deadlock during parallel test execution This commit resolves a deadlock that occurs when running tests in parallel, where the initial TestActor creation gets stuck during async initialization with CallingThreadDispatcher. The root cause was that SystemActorOf hardcodes async=true initialization, creating a RepointableActorRef that requires processing a Supervise system message. With CallingThreadDispatcher, this creates a circular dependency: - TestKit constructor blocks waiting for TestActor initialization - CallingThreadDispatcher only runs on the calling thread - The calling thread is blocked, so Supervise message never gets processed The solution bypasses SystemActorOf and directly calls AttachChild with async=false, enabling true synchronous initialization while preserving full system integration including supervision tree and mailbox configuration. This maintains compatibility with CallingThreadDispatcher for deterministic testing while eliminating startup deadlocks in parallel test scenarios. Resolves issue where TestProbe child actor creation and implicit sender functionality would fail due to incomplete TestActor initialization. * Fix TestKit serialization issue - Use AttachChild with isSystemService=true to exempt TestActor from serialization verification - Resolves 700+ test failures caused by UnboundedChannelWriter serialization errors * still working on synchronous `TestActor` startup * Fix TestKit deadlock during parallel test execution Resolves deadlock that occurs when TestKit instances are created in parallel and actors try to interact with TestActor during initialization. The issue was caused by CallingThreadDispatcher creating RepointableActorRef which requires async initialization, leading to deadlocks. Changes: - Add AttachChildWithAsync internal method to ActorCell to control sync/async actor creation - Modify TestKitBase to create TestActor synchronously (LocalActorRef) instead of async (RepointableActorRef) - Update Xunit/Xunit2 TestKits to create logger actors synchronously - Replace Ask with Tell for logger initialization to avoid synchronous wait deadlocks - Add InternalsVisibleTo for Xunit TestKits to access internal Akka methods - Maintain LoggerInitialized response for protocol compatibility (has IDeadLetterSuppression) Fixes #7770 * added API approvals * remove `EnsureTestActorReady` method * API approvals * ensure calls can't get contaminated with references * fix API approvals * Fix race condition in ParallelTestActorDeadlockSpec The test had a race condition where the PingerActor sends 'ping' to TestActor during PreStart, but the test was expecting 'test-message' first. This could cause ExpectMsgAsync to receive the wrong message and fail. Fixed by properly expecting the 'ping' message first before sending and expecting the 'test-message'. --- .gitignore | 3 + .../Akka.TestKit.Xunit/Internals/Loggers.cs | 2 + .../testkits/Akka.TestKit.Xunit/TestKit.cs | 36 +++++-- .../Akka.TestKit.Xunit2/Internals/Loggers.cs | 2 + .../testkits/Akka.TestKit.Xunit2/TestKit.cs | 40 +++++--- ...oreAPISpec.ApproveCore.DotNet.verified.txt | 2 + .../CoreAPISpec.ApproveCore.Net.verified.txt | 2 + .../ParallelTestActorDeadlockSpec.cs | 93 +++++++++++++++++++ src/core/Akka.TestKit/TestKitBase.cs | 68 ++++++-------- src/core/Akka/Actor/ActorCell.Children.cs | 16 ++++ src/core/Akka/Properties/AssemblyInfo.cs | 2 + 11 files changed, 200 insertions(+), 66 deletions(-) create mode 100644 src/core/Akka.TestKit.Tests/TestActorRefTests/ParallelTestActorDeadlockSpec.cs diff --git a/.gitignore b/.gitignore index 38ed7e158f9..8cb764e486e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,9 @@ ## Ignore Visual Studio temporary files, build results, and ## files generated by popular Visual Studio add-ons. +# Claude configuration +.claude/ + # User-specific files *.suo *.user diff --git a/src/contrib/testkits/Akka.TestKit.Xunit/Internals/Loggers.cs b/src/contrib/testkits/Akka.TestKit.Xunit/Internals/Loggers.cs index a95e352c646..a66ec1a3dc3 100644 --- a/src/contrib/testkits/Akka.TestKit.Xunit/Internals/Loggers.cs +++ b/src/contrib/testkits/Akka.TestKit.Xunit/Internals/Loggers.cs @@ -35,6 +35,8 @@ public TestOutputLogger(ITestOutputHelper output) Receive(e => { e.LoggingBus.Subscribe(Self, typeof (LogEvent)); + // Send response to maintain protocol - LoggerInitialized implements IDeadLetterSuppression + // so it won't interfere with dead letter detection or TestActor message expectations Sender.Tell(new LoggerInitialized()); }); } diff --git a/src/contrib/testkits/Akka.TestKit.Xunit/TestKit.cs b/src/contrib/testkits/Akka.TestKit.Xunit/TestKit.cs index 26272f21045..f81c18b39ca 100644 --- a/src/contrib/testkits/Akka.TestKit.Xunit/TestKit.cs +++ b/src/contrib/testkits/Akka.TestKit.Xunit/TestKit.cs @@ -7,6 +7,7 @@ using System; using Akka.Actor; +using Akka.Actor.Internal; using Akka.Actor.Setup; using Akka.Configuration; using Akka.Event; @@ -170,10 +171,19 @@ protected void InitializeLogger(ActorSystem system) if (Output == null) return; - var extSystem = (ExtendedActorSystem)system; - var logger = extSystem.SystemActorOf(Props.Create(() => new TestOutputLogger(Output)), "log-test"); - logger.Ask(new InitializeLogger(system.EventStream), TestKitSettings.TestKitStartupTimeout) - .ConfigureAwait(false).GetAwaiter().GetResult(); + var systemImpl = system as ActorSystemImpl ?? throw new InvalidOperationException("Expected ActorSystemImpl"); + + // Create logger actor synchronously to avoid deadlock during parallel test execution + // Use AttachChildWithAsync with isAsync:false to create LocalActorRef instead of RepointableActorRef + var logger = systemImpl.Provider.SystemGuardian.Cell.AttachChildWithAsync( + Props.Create(() => new TestOutputLogger(Output)), + isSystemService: true, // Mark as system service + isAsync: false, // Create synchronously to avoid deadlock + name: "log-test"); + + // Send the initialization message without waiting for response to avoid deadlock + // The logger will subscribe to the event stream when it processes this message + logger.Tell(new InitializeLogger(system.EventStream), ActorRefs.NoSender); } protected void InitializeLogger(ActorSystem system, string prefix) @@ -181,11 +191,19 @@ protected void InitializeLogger(ActorSystem system, string prefix) if (Output == null) return; - var extSystem = (ExtendedActorSystem)system; - var logger = extSystem.SystemActorOf(Props.Create(() => new TestOutputLogger( - string.IsNullOrEmpty(prefix) ? Output : new PrefixedOutput(Output, prefix))), "log-test"); - logger.Ask(new InitializeLogger(system.EventStream), TestKitSettings.TestKitStartupTimeout) - .ConfigureAwait(false).GetAwaiter().GetResult(); + var systemImpl = system as ActorSystemImpl ?? throw new InvalidOperationException("Expected ActorSystemImpl"); + + // Create logger actor synchronously to avoid deadlock during parallel test execution + var logger = systemImpl.Provider.SystemGuardian.Cell.AttachChildWithAsync( + Props.Create(() => new TestOutputLogger( + string.IsNullOrEmpty(prefix) ? Output : new PrefixedOutput(Output, prefix))), + isSystemService: true, // Mark as system service + isAsync: false, // Create synchronously to avoid deadlock + name: "log-test"); + + // Send the initialization message without waiting for response to avoid deadlock + // The logger will subscribe to the event stream when it processes this message + logger.Tell(new InitializeLogger(system.EventStream), ActorRefs.NoSender); } /// diff --git a/src/contrib/testkits/Akka.TestKit.Xunit2/Internals/Loggers.cs b/src/contrib/testkits/Akka.TestKit.Xunit2/Internals/Loggers.cs index 903c6e0fc99..0b6413bfc2a 100644 --- a/src/contrib/testkits/Akka.TestKit.Xunit2/Internals/Loggers.cs +++ b/src/contrib/testkits/Akka.TestKit.Xunit2/Internals/Loggers.cs @@ -35,6 +35,8 @@ public TestOutputLogger(ITestOutputHelper output) Receive(e => { e.LoggingBus.Subscribe(Self, typeof (LogEvent)); + // Send response to maintain protocol - LoggerInitialized implements IDeadLetterSuppression + // so it won't interfere with dead letter detection or TestActor message expectations Sender.Tell(new LoggerInitialized()); }); } diff --git a/src/contrib/testkits/Akka.TestKit.Xunit2/TestKit.cs b/src/contrib/testkits/Akka.TestKit.Xunit2/TestKit.cs index 4f73708548f..3af97e60445 100644 --- a/src/contrib/testkits/Akka.TestKit.Xunit2/TestKit.cs +++ b/src/contrib/testkits/Akka.TestKit.Xunit2/TestKit.cs @@ -8,6 +8,7 @@ using System; using System.Threading.Tasks; using Akka.Actor; +using Akka.Actor.Internal; using Akka.Actor.Setup; using Akka.Configuration; using Akka.Event; @@ -140,14 +141,19 @@ protected void InitializeLogger(ActorSystem system) { if (Output != null) { - var extSystem = (ExtendedActorSystem)system; - var logger = extSystem.SystemActorOf(Props.Create(() => new TestOutputLogger(Output)), "log-test"); - // Start the logger initialization task but don't wait for it yet - var loggerTask = logger.Ask(new InitializeLogger(system.EventStream), TestKitSettings.TestKitStartupTimeout); + var systemImpl = system as ActorSystemImpl ?? throw new InvalidOperationException("Expected ActorSystemImpl"); - // By the time TestActor is ready (which happens in base constructor), - // the logger is likely ready too. Now we can safely wait. - loggerTask.ConfigureAwait(false).GetAwaiter().GetResult(); + // Create logger actor synchronously to avoid deadlock during parallel test execution + // Use AttachChildWithAsync with isAsync:false to create LocalActorRef instead of RepointableActorRef + var logger = systemImpl.Provider.SystemGuardian.Cell.AttachChildWithAsync( + Props.Create(() => new TestOutputLogger(Output)), + isSystemService: true, // Mark as system service + isAsync: false, // Create synchronously to avoid deadlock + name: "log-test"); + + // Send the initialization message without waiting for response to avoid deadlock + // The logger will subscribe to the event stream when it processes this message + logger.Tell(new InitializeLogger(system.EventStream), ActorRefs.NoSender); } } @@ -155,15 +161,19 @@ protected void InitializeLogger(ActorSystem system, string prefix) { if (Output != null) { - var extSystem = (ExtendedActorSystem)system; - var logger = extSystem.SystemActorOf(Props.Create(() => new TestOutputLogger( - string.IsNullOrEmpty(prefix) ? Output : new PrefixedOutput(Output, prefix))), "log-test"); - // Start the logger initialization task but don't wait for it yet - var loggerTask = logger.Ask(new InitializeLogger(system.EventStream), TestKitSettings.TestKitStartupTimeout); + var systemImpl = system as ActorSystemImpl ?? throw new InvalidOperationException("Expected ActorSystemImpl"); + + // Create logger actor synchronously to avoid deadlock during parallel test execution + var logger = systemImpl.Provider.SystemGuardian.Cell.AttachChildWithAsync( + Props.Create(() => new TestOutputLogger( + string.IsNullOrEmpty(prefix) ? Output : new PrefixedOutput(Output, prefix))), + isSystemService: true, // Mark as system service + isAsync: false, // Create synchronously to avoid deadlock + name: "log-test"); - // By the time TestActor is ready (which happens in base constructor), - // the logger is likely ready too. Now we can safely wait. - loggerTask.ConfigureAwait(false).GetAwaiter().GetResult(); + // Send the initialization message without waiting for response to avoid deadlock + // The logger will subscribe to the event stream when it processes this message + logger.Tell(new InitializeLogger(system.EventStream), ActorRefs.NoSender); } } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt index 7d8f3de5a78..84f53c066a1 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt @@ -27,6 +27,8 @@ [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Streams.Tests")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Tests")] +[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Xunit")] +[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Xunit2")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Tests")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Tests.Performance")] [assembly: System.Runtime.InteropServices.ComVisibleAttribute(false)] diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt index 02e6be3fb03..6a23954802c 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt @@ -27,6 +27,8 @@ [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Streams.Tests")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Tests")] +[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Xunit")] +[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Xunit2")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Tests")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Tests.Performance")] [assembly: System.Runtime.InteropServices.ComVisibleAttribute(false)] diff --git a/src/core/Akka.TestKit.Tests/TestActorRefTests/ParallelTestActorDeadlockSpec.cs b/src/core/Akka.TestKit.Tests/TestActorRefTests/ParallelTestActorDeadlockSpec.cs new file mode 100644 index 00000000000..3f4b33bf657 --- /dev/null +++ b/src/core/Akka.TestKit.Tests/TestActorRefTests/ParallelTestActorDeadlockSpec.cs @@ -0,0 +1,93 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Akka.Actor; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.TestKit.Tests.TestActorRefTests +{ + public class ParallelTestActorDeadlockSpec + { + private readonly ITestOutputHelper _output; + + public ParallelTestActorDeadlockSpec(ITestOutputHelper output) + { + _output = output; + } + + // This test reproduces the deadlock that occurs in Akka.Hosting.TestKit + // when multiple TestKits start up in parallel and actors try to interact + // with TestActor during initialization. + // + // Related issues: + // - https://github.com/akkadotnet/akka.net/issues/7770 + // - https://github.com/akkadotnet/Akka.Hosting/pull/643 + [Fact(Timeout = 20000)] + public async Task Parallel_TestKit_startup_should_not_deadlock() + { + var concurrentTests = 40; // High parallelism to trigger the issue + + var tasks = Enumerable.Range(0, concurrentTests) + .Select(_ => Task.Run(RunOneTestKit)) + .ToArray(); + + await Task.WhenAll(tasks); + + async Task RunOneTestKit() + { + await Task.Run(async () => + { + var id = Guid.NewGuid().ToString("N").Substring(0, 8); + try + { + _output.WriteLine($"[{id}] Creating TestKit..."); + // Create TestKit synchronously like a normal test would + using var testKit = new Akka.TestKit.Xunit2.TestKit($"test-{id}", output: _output); + _output.WriteLine($"[{id}] TestKit created"); + + // Simulate what happens in Akka.Hosting - actor creation during startup + // that tries to interact with TestActor + _output.WriteLine($"[{id}] Creating PingerActor..."); + var actor = testKit.Sys.ActorOf(Props.Create(() => new PingerActor(testKit.TestActor))); + _output.WriteLine($"[{id}] PingerActor created"); + + // Expect the "ping" message from PingerActor's PreStart + await testKit.ExpectMsgAsync("ping", TimeSpan.FromSeconds(2)); + _output.WriteLine($"[{id}] Received ping from PingerActor"); + + // Now verify the TestKit is working normally + _output.WriteLine($"[{id}] Sending test message..."); + testKit.TestActor.Tell("test-message"); + await testKit.ExpectMsgAsync("test-message", TimeSpan.FromSeconds(2)); + _output.WriteLine($"[{id}] Test completed successfully"); + } + catch (Exception ex) + { + _output.WriteLine($"[{id}] Failed: {ex.Message}"); + throw; + } + }); + } + } + + private class PingerActor : ActorBase + { + private readonly IActorRef _testActor; + + public PingerActor(IActorRef testActor) + { + _testActor = testActor; + } + + protected override bool Receive(object message) => false; + + protected override void PreStart() + { + // This simulates what StartupPinger does in Akka.Hosting + // Sending a message to TestActor during actor initialization + _testActor.Tell("ping"); + } + } + } +} \ No newline at end of file diff --git a/src/core/Akka.TestKit/TestKitBase.cs b/src/core/Akka.TestKit/TestKitBase.cs index 2fb8dd7847b..a30e4a58eae 100644 --- a/src/core/Akka.TestKit/TestKitBase.cs +++ b/src/core/Akka.TestKit/TestKitBase.cs @@ -116,6 +116,7 @@ protected TestKitBase(ITestKitAssertions assertions, ActorSystem system, ActorSy { _assertions = assertions ?? throw new ArgumentNullException(nameof(assertions), "The supplied assertions must not be null."); + // ReSharper disable once VirtualMemberCallInConstructor InitializeTest(system, config, actorSystemName, testActorName); } @@ -170,10 +171,11 @@ protected virtual void InitializeTest(ActorSystem system, ActorSystemSetup confi if (string.IsNullOrEmpty(testActorName)) testActorName = "testActor" + _testActorId.IncrementAndGet(); - var testActor = CreateTestActor(system, testActorName); + var testActor = CreateInitialTestActor(system, testActorName); - // Wait for the testactor to start - WaitUntilTestActorIsReady(testActor, _testState.TestKitSettings); + // For async initialization, don't wait in constructor to avoid deadlock + // The TestActor property getter will ensure it's ready when first accessed + _testState.TestActor = testActor; if (this is not INoImplicitSender) { @@ -187,45 +189,6 @@ protected virtual void InitializeTest(ActorSystem system, ActorSystemSetup confi } SynchronizationContext.SetSynchronizationContext( new ActorCellKeepingSynchronizationContext(InternalCurrentActorCellKeeper.Current)); - - _testState.TestActor = testActor; - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - // Do not convert this method to async, it is being called inside the constructor. - private static void WaitUntilTestActorIsReady(IActorRef testActor, TestKitSettings settings) - { - var deadline = settings.TestKitStartupTimeout; - var stopwatch = Stopwatch.StartNew(); - var ready = false; - - try - { - // TestActor should start almost instantly (microseconds). - // Use SpinWait which will spin for ~10-20 microseconds then yield. - var spinWait = new SpinWait(); - - while (stopwatch.Elapsed < deadline) - { - ready = testActor is not IRepointableRef repRef || repRef.IsStarted; - if (ready) break; - - // SpinWait automatically handles the progression: - // - First ~10 iterations: tight spin loop (microseconds) - // - Next iterations: Thread.Yield() - // - Later: Thread.Sleep(0) - // - Finally: Thread.Sleep(1) - // This is optimal for both fast startup and system under load - spinWait.SpinOnce(); - } - } - finally - { - stopwatch.Stop(); - } - - if (!ready) - throw new Exception("Timeout waiting for test actor to be ready"); } /// @@ -710,10 +673,31 @@ public IActorRef CreateTestActor(string name) return CreateTestActor(_testState.System, name); } + private IActorRef CreateInitialTestActor(ActorSystem system, string name) + { + // Fix both serialization and deadlock issues: + // 1. Use isSystemService=true to skip serialization checks + // 2. Use isAsync=false to create LocalActorRef synchronously (avoids RepointableActorRef deadlock) + var testActorProps = Props.Create(() => new InternalTestActor(_testState.Queue)) + .WithDispatcher("akka.test.test-actor.dispatcher"); + + var systemImpl = system.AsInstanceOf(); + // Use the new AttachChildWithAsync method to create TestActor synchronously + var testActor = systemImpl.Provider.SystemGuardian.Cell.AttachChildWithAsync( + testActorProps, + isSystemService: true, // Skip serialization checks + isAsync: false, // Create synchronously to avoid deadlock + name: name); + + return testActor; + } + private IActorRef CreateTestActor(ActorSystem system, string name) { var testActorProps = Props.Create(() => new InternalTestActor(_testState.Queue)) .WithDispatcher("akka.test.test-actor.dispatcher"); + + // For additional test actors, always use the standard SystemActorOf var testActor = system.AsInstanceOf().SystemActorOf(testActorProps, name); return testActor; } diff --git a/src/core/Akka/Actor/ActorCell.Children.cs b/src/core/Akka/Actor/ActorCell.Children.cs index 5b56c397ad4..c83e172857c 100644 --- a/src/core/Akka/Actor/ActorCell.Children.cs +++ b/src/core/Akka/Actor/ActorCell.Children.cs @@ -98,6 +98,22 @@ public virtual IActorRef AttachChild(Props props, bool isSystemService, string? { return MakeChild(props, name == null ? GetRandomActorName() : CheckName(name), true, isSystemService); } + + /// + /// INTERNAL API + /// + /// Attaches a child actor with explicit control over async initialization. + /// Used by TestKit to create TestActors synchronously to avoid deadlocks. + /// + /// The this child actor will use. + /// If true, then this actor is a system actor and skips serialization checks. + /// If true, creates RepointableActorRef with async init. If false, creates LocalActorRef synchronously. + /// The name of the actor being started. Can be null for auto-generated name. + /// A reference to the initialized child actor. + internal IActorRef AttachChildWithAsync(Props props, bool isSystemService, bool isAsync, string? name = null) + { + return MakeChild(props, name == null ? GetRandomActorName() : CheckName(name), isAsync, isSystemService); + } /// /// TBD diff --git a/src/core/Akka/Properties/AssemblyInfo.cs b/src/core/Akka/Properties/AssemblyInfo.cs index 5ecfbb2329d..377877ae038 100644 --- a/src/core/Akka/Properties/AssemblyInfo.cs +++ b/src/core/Akka/Properties/AssemblyInfo.cs @@ -28,6 +28,8 @@ [assembly: InternalsVisibleTo("Akka.Tests.Performance")] [assembly: InternalsVisibleTo("Akka.TestKit")] [assembly: InternalsVisibleTo("Akka.TestKit.Tests")] +[assembly: InternalsVisibleTo("Akka.TestKit.Xunit")] +[assembly: InternalsVisibleTo("Akka.TestKit.Xunit2")] [assembly: InternalsVisibleTo("Akka.Remote")] [assembly: InternalsVisibleTo("Akka.Remote.TestKit")] [assembly: InternalsVisibleTo("Akka.Remote.Tests")]