From 137e9e500e50a4b1df6ae3e1006f1ae6e411b20d Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 18 May 2026 00:34:44 +0000 Subject: [PATCH 1/2] fix: contain SynchronizationContext leak in Akka.Hosting.TestKit startup Akka.TestKit's TestKitBase.InitializeTest unconditionally installs an ActorCellKeepingSynchronizationContext on the current thread. Akka.Hosting.TestKit calls base.InitializeTest from inside a StartActors delegate that runs during async host startup. SetSynchronizationContext is per-thread and is not unwound by await, and nothing scrubbed it, so that context leaked out of InitializeAsyncCore and was captured by xUnit v3's CreateTestClassInstance -> [AkkaCleanAmbientContext]. In a sequentially-run xUnit v3 suite (parallelizeTestCollections: false) this caused each test to inherit the previous (disposed) test's SynchronizationContext, pinning continuations onto a dead ActorCell. For Akka.Cluster.Hosting.Tests this cascaded into cluster-startup timeouts (ClusterJoinFailedException: "Cluster has already been terminated") and blew the suite runtime from ~30s to ~8min. Fix: - Bracket the base.InitializeTest call with a synchronous save/restore of SynchronizationContext.Current so the context it installs cannot escape the StartActors delegate. - Restore the entry SynchronizationContext in a finally around InitializeAsyncCore as defense-in-depth, guaranteeing a clean context is handed back to xUnit regardless of which continuation thread the startup chain returns on. Verified: Akka.Cluster.Hosting.Tests 34/34 green across 20+ sequential runs (24-35s), zero cluster-startup cascades; Akka.Hosting.TestKit.Tests 305/305 green. --- src/Akka.Hosting.TestKit/TestKit.Shared.cs | 106 +++++++++++++-------- 1 file changed, 68 insertions(+), 38 deletions(-) diff --git a/src/Akka.Hosting.TestKit/TestKit.Shared.cs b/src/Akka.Hosting.TestKit/TestKit.Shared.cs index 11f421e1..e532b540 100644 --- a/src/Akka.Hosting.TestKit/TestKit.Shared.cs +++ b/src/Akka.Hosting.TestKit/TestKit.Shared.cs @@ -115,13 +115,30 @@ private void InternalConfigureServices(HostBuilderContext context, IServiceColle // This ensures TestProbe is available for any actors that depend on IRequiredActor builder.StartActors((actorSystem, actorRegistry) => { - // Initialize TestActor here to ensure it's available before user actors start - base.InitializeTest(actorSystem, (ActorSystemSetup)null!, null, null); - actorRegistry.Register(TestActor); + // base.InitializeTest -> Akka.TestKit.TestKitBase.InitializeTest unconditionally calls + // SynchronizationContext.SetSynchronizationContext(new ActorCellKeepingSynchronizationContext(...)). + // This delegate runs on a host-startup thread inside _host.StartAsync(); SetSynchronizationContext + // is per-thread and is NOT unwound by await, and nothing here scrubs it. Left unbracketed, that + // SynchronizationContext leaks onto pool threads, escapes InitializeAsyncCore, and is captured by + // xUnit v3's CreateTestClassInstance -> [AkkaCleanAmbientContext].Before, which then pins the + // next sequentially-run test's continuations onto this (disposed) test's ActorCell. + // This delegate is synchronous, so same-thread save/restore fully contains the mutation. The + // correct per-test SynchronizationContext is installed later by [AkkaCleanAmbientContext].Before. + var savedContext = SynchronizationContext.Current; + try + { + // Initialize TestActor here to ensure it's available before user actors start + base.InitializeTest(actorSystem, (ActorSystemSetup)null!, null, null); + actorRegistry.Register(TestActor); - // Set implicit sender on initialization thread - if (this is not INoImplicitSender) - InternalCurrentActorCellKeeper.Current = (ActorCell)((ActorRefWithCell)TestActor).Underlying; + // Set implicit sender on initialization thread + if (this is not INoImplicitSender) + InternalCurrentActorCellKeeper.Current = (ActorCell)((ActorRefWithCell)TestActor).Underlying; + } + finally + { + SynchronizationContext.SetSynchronizationContext(savedContext); + } }); // User configuration comes AFTER TestProbe registration @@ -153,46 +170,59 @@ protected virtual void ConfigureLogging(ILoggingBuilder builder) [InternalApi] private async Task InitializeAsyncCore() { - var hostBuilder = new HostBuilder(); - if (Output != null) + // Defense-in-depth for the SynchronizationContext leak contained at the base.InitializeTest + // call site above: SetSynchronizationContext is per-thread and not unwound by await, so this + // continuation can return to xUnit's CreateTestClassInstance on a thread carrying a stale SC. + // Restoring the entry SC on exit guarantees [AkkaCleanAmbientContext].Before captures a clean + // PreviousContext regardless of what the host-startup pipeline installs. + var entryContext = SynchronizationContext.Current; + try { - hostBuilder.ConfigureLogging(logger => + var hostBuilder = new HostBuilder(); + if (Output != null) { - logger.ClearProviders(); - logger.AddProvider(new XUnitLoggerProvider(Output, LogLevel)); - logger.AddFilter("Akka.*", LogLevel); - ConfigureLogging(logger); - }); - } + hostBuilder.ConfigureLogging(logger => + { + logger.ClearProviders(); + logger.AddProvider(new XUnitLoggerProvider(Output, LogLevel)); + logger.AddFilter("Akka.*", LogLevel); + ConfigureLogging(logger); + }); + } - hostBuilder - .ConfigureHostConfiguration(ConfigureHostConfiguration) - .ConfigureAppConfiguration(ConfigureAppConfiguration); - ConfigureHostBuilder(hostBuilder); - hostBuilder.ConfigureServices(InternalConfigureServices); + hostBuilder + .ConfigureHostConfiguration(ConfigureHostConfiguration) + .ConfigureAppConfiguration(ConfigureAppConfiguration); + ConfigureHostBuilder(hostBuilder); + hostBuilder.ConfigureServices(InternalConfigureServices); - _host = hostBuilder.Build(); + _host = hostBuilder.Build(); - using var cts = new CancellationTokenSource(StartupTimeout); - try - { - await _host.StartAsync(cts.Token); - } - catch (OperationCanceledException) when (cts.IsCancellationRequested) - { - throw new TimeoutException($"Host failed to start within {StartupTimeout.TotalSeconds} seconds"); - } + using var cts = new CancellationTokenSource(StartupTimeout); + try + { + await _host.StartAsync(cts.Token); + } + catch (OperationCanceledException) when (cts.IsCancellationRequested) + { + throw new TimeoutException($"Host failed to start within {StartupTimeout.TotalSeconds} seconds"); + } - // Wait for Akka initialization with timeout - var initializedTask = _initialized.Task; - var timeoutTask = Task.Delay(StartupTimeout, CancellationToken.None); - if (await Task.WhenAny(initializedTask, timeoutTask) == timeoutTask) - throw new TimeoutException($"Akka.NET failed to initialize within {StartupTimeout.TotalSeconds} seconds"); + // Wait for Akka initialization with timeout + var initializedTask = _initialized.Task; + var timeoutTask = Task.Delay(StartupTimeout, CancellationToken.None); + if (await Task.WhenAny(initializedTask, timeoutTask) == timeoutTask) + throw new TimeoutException($"Akka.NET failed to initialize within {StartupTimeout.TotalSeconds} seconds"); - // TestActor initialization and registration now happens in AddStartup - // before user actors are created, preventing race conditions + // TestActor initialization and registration now happens in AddStartup + // before user actors are created, preventing race conditions - await BeforeTestStart(); + await BeforeTestStart(); + } + finally + { + SynchronizationContext.SetSynchronizationContext(entryContext); + } } protected sealed override void InitializeTest(ActorSystem system, ActorSystemSetup config, string actorSystemName, From 7c88e4e85285a7121031eace60c150bf1aa76041 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 18 May 2026 03:39:02 +0000 Subject: [PATCH 2/2] fix: recover TestActor terminated by the host-startup actor-creation race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Akka.Hosting.TestKit creates the TestActor (an InternalTestActor under /system on the CallingThreadDispatcher) via base.InitializeTest, inside a StartActors callback that runs during _host.StartAsync() — concurrently with remoting, clustering and other extensions creating their own /system actors. That concurrent startup storm intermittently terminates the freshly-created TestActor: it is created successfully, then cleanly terminated a few milliseconds later (confirmed by death-watch probe and actor-tree dump — the rest of the system stays healthy, only the TestActor dies). Once dead, every message sent to it dead-letters and ExpectMsg calls time out. This is Akka.Hosting.TestKit-specific: mainline Akka.TestKit creates the TestActor in a quiet constructor, not amid a host-startup storm. The exact Akka-core trigger could not be pinned without a debugger attached to core internals, so this is a recovery, not a prevention. Fix: - Akka.Hosting.TestKit: after host startup completes (system quiet), EnsureTestActorAliveAsync verifies the TestActor survived and re-creates it via base.InitializeTest if it did not. Re-creation in the now-quiet system is race-free. - ClusterShardingDistributedDataSpecs: join the cluster during host startup via WithActors (matching every other cluster spec) instead of in the test body, so the cluster-formation storm completes within the window EnsureTestActorAliveAsync covers. Verified: Akka.Cluster.Hosting.Tests 34/34 green across 25 consecutive runs (was ~10-30% flaky, plus a 5-test cascade); Akka.Hosting.TestKit.Tests 305/305 green. --- .../ClusterShardingDistributedDataSpecs.cs | 10 ++- src/Akka.Hosting.TestKit/TestKit.Shared.cs | 69 ++++++++++++++++++- 2 files changed, 75 insertions(+), 4 deletions(-) diff --git a/src/Akka.Cluster.Hosting.Tests/ClusterShardingDistributedDataSpecs.cs b/src/Akka.Cluster.Hosting.Tests/ClusterShardingDistributedDataSpecs.cs index 54f77a12..254a5708 100644 --- a/src/Akka.Cluster.Hosting.Tests/ClusterShardingDistributedDataSpecs.cs +++ b/src/Akka.Cluster.Hosting.Tests/ClusterShardingDistributedDataSpecs.cs @@ -22,6 +22,13 @@ protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IService builder .WithRemoting() .WithClustering() + // Join the cluster during host startup (matching the other cluster specs) rather than in + // the test body, so cluster formation completes before the test body runs. + .WithActors(async (system, _) => + { + var cluster = Cluster.Get(system); + await cluster.JoinAsync(cluster.SelfAddress); + }) .WithDistributedData(opt => { opt.Name = ReplicatorName; @@ -32,8 +39,7 @@ protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IService public async Task WithDistributedDataStartsAutomaticallyTest() { var cluster = Cluster.Get(Sys); - await cluster.JoinAsync(cluster.SelfAddress); - await AwaitAssertAsync(() => + await AwaitAssertAsync(() => Assert.Equal(1, cluster.State.Members.Count(m => m.Status == MemberStatus.Up)), interval: TimeSpan.FromMilliseconds(200), duration: TimeSpan.FromSeconds(10)); diff --git a/src/Akka.Hosting.TestKit/TestKit.Shared.cs b/src/Akka.Hosting.TestKit/TestKit.Shared.cs index e532b540..2fc7cde1 100644 --- a/src/Akka.Hosting.TestKit/TestKit.Shared.cs +++ b/src/Akka.Hosting.TestKit/TestKit.Shared.cs @@ -214,8 +214,12 @@ private async Task InitializeAsyncCore() if (await Task.WhenAny(initializedTask, timeoutTask) == timeoutTask) throw new TimeoutException($"Akka.NET failed to initialize within {StartupTimeout.TotalSeconds} seconds"); - // TestActor initialization and registration now happens in AddStartup - // before user actors are created, preventing race conditions + // The TestActor is created (via base.InitializeTest) inside a StartActors callback while + // remoting/clustering/etc. are concurrently spinning up their own /system actors. That + // concurrent startup intermittently terminates the freshly-created TestActor. Host startup + // is now complete, so the system is quiet — verify the TestActor survived and re-create it + // here (race-free) if it did not. See EnsureTestActorAliveAsync. + await EnsureTestActorAliveAsync(); await BeforeTestStart(); } @@ -225,6 +229,67 @@ private async Task InitializeAsyncCore() } } + /// + /// Verifies that the survived host startup, and re-creates it + /// if it did not. + /// + /// The TestActor is an InternalTestActor created under /system on the + /// CallingThreadDispatcher via , + /// which runs inside a callback while + /// remoting, clustering and other extensions are concurrently creating their own /system actors. + /// That concurrent startup intermittently terminates the freshly-created TestActor, after which every + /// message sent to it dead-letters and ExpectMsg calls time out. + /// + /// + /// By the time this runs host startup has completed and the system is quiet, so re-creating the + /// TestActor here is race-free and deterministic. + /// + /// + private async Task EnsureTestActorAliveAsync() + { + const int maxAttempts = 3; + for (var attempt = 0; attempt < maxAttempts; attempt++) + { + if (await IsTestActorAliveAsync()) + return; + + Sys.Log.Warning( + "TestActor [{0}] did not survive host startup; re-creating it (attempt {1}/{2}).", + TestActor.Path, attempt + 1, maxAttempts); + + // base.InitializeTest installs an ActorCellKeepingSynchronizationContext on the current + // thread; bracket it the same way the original call site does so it cannot leak. + var savedContext = SynchronizationContext.Current; + try + { + base.InitializeTest(Sys, (ActorSystemSetup)null!, null, null); + } + finally + { + SynchronizationContext.SetSynchronizationContext(savedContext); + } + + ActorRegistry.Register(TestActor, overwrite: true); + } + + if (!await IsTestActorAliveAsync()) + throw new InvalidOperationException( + $"TestActor could not be kept alive across host startup after {maxAttempts} attempts."); + } + + private async Task IsTestActorAliveAsync() + { + try + { + await Sys.ActorSelection(TestActor.Path).ResolveOne(TimeSpan.FromSeconds(1)); + return true; + } + catch (ActorNotFoundException) + { + return false; + } + } + protected sealed override void InitializeTest(ActorSystem system, ActorSystemSetup config, string actorSystemName, string testActorName) {