Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IService
builder
.WithRemoting()
.WithClustering()
// Join the cluster during host startup (matching the other cluster specs) rather than in
// the test body, so cluster formation completes before the test body runs.
.WithActors(async (system, _) =>
{
var cluster = Cluster.Get(system);
await cluster.JoinAsync(cluster.SelfAddress);
})
.WithDistributedData(opt =>
{
opt.Name = ReplicatorName;
Expand All @@ -32,8 +39,7 @@ protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IService
public async Task WithDistributedDataStartsAutomaticallyTest()
{
var cluster = Cluster.Get(Sys);
await cluster.JoinAsync(cluster.SelfAddress);
await AwaitAssertAsync(() =>
await AwaitAssertAsync(() =>
Assert.Equal(1, cluster.State.Members.Count(m => m.Status == MemberStatus.Up)),
interval: TimeSpan.FromMilliseconds(200),
duration: TimeSpan.FromSeconds(10));
Expand Down
165 changes: 130 additions & 35 deletions src/Akka.Hosting.TestKit/TestKit.Shared.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,30 @@ private void InternalConfigureServices(HostBuilderContext context, IServiceColle
// This ensures TestProbe is available for any actors that depend on IRequiredActor<TestProbe>
builder.StartActors((actorSystem, actorRegistry) =>
{
// Initialize TestActor here to ensure it's available before user actors start
base.InitializeTest(actorSystem, (ActorSystemSetup)null!, null, null);
actorRegistry.Register<TestProbe>(TestActor);
// base.InitializeTest -> Akka.TestKit.TestKitBase.InitializeTest unconditionally calls
// SynchronizationContext.SetSynchronizationContext(new ActorCellKeepingSynchronizationContext(...)).
// This delegate runs on a host-startup thread inside _host.StartAsync(); SetSynchronizationContext
// is per-thread and is NOT unwound by await, and nothing here scrubs it. Left unbracketed, that
// SynchronizationContext leaks onto pool threads, escapes InitializeAsyncCore, and is captured by
// xUnit v3's CreateTestClassInstance -> [AkkaCleanAmbientContext].Before, which then pins the
// next sequentially-run test's continuations onto this (disposed) test's ActorCell.
// This delegate is synchronous, so same-thread save/restore fully contains the mutation. The
// correct per-test SynchronizationContext is installed later by [AkkaCleanAmbientContext].Before.
var savedContext = SynchronizationContext.Current;
try
{
// Initialize TestActor here to ensure it's available before user actors start
base.InitializeTest(actorSystem, (ActorSystemSetup)null!, null, null);
actorRegistry.Register<TestProbe>(TestActor);

// Set implicit sender on initialization thread
if (this is not INoImplicitSender)
InternalCurrentActorCellKeeper.Current = (ActorCell)((ActorRefWithCell)TestActor).Underlying;
// Set implicit sender on initialization thread
if (this is not INoImplicitSender)
InternalCurrentActorCellKeeper.Current = (ActorCell)((ActorRefWithCell)TestActor).Underlying;
}
finally
{
SynchronizationContext.SetSynchronizationContext(savedContext);
}
});

// User configuration comes AFTER TestProbe registration
Expand Down Expand Up @@ -153,46 +170,124 @@ protected virtual void ConfigureLogging(ILoggingBuilder builder)
[InternalApi]
private async Task InitializeAsyncCore()
{
var hostBuilder = new HostBuilder();
if (Output != null)
// Defense-in-depth for the SynchronizationContext leak contained at the base.InitializeTest
// call site above: SetSynchronizationContext is per-thread and not unwound by await, so this
// continuation can return to xUnit's CreateTestClassInstance on a thread carrying a stale SC.
// Restoring the entry SC on exit guarantees [AkkaCleanAmbientContext].Before captures a clean
// PreviousContext regardless of what the host-startup pipeline installs.
var entryContext = SynchronizationContext.Current;
try
{
hostBuilder.ConfigureLogging(logger =>
var hostBuilder = new HostBuilder();
if (Output != null)
{
logger.ClearProviders();
logger.AddProvider(new XUnitLoggerProvider(Output, LogLevel));
logger.AddFilter("Akka.*", LogLevel);
ConfigureLogging(logger);
});
}
hostBuilder.ConfigureLogging(logger =>
{
logger.ClearProviders();
logger.AddProvider(new XUnitLoggerProvider(Output, LogLevel));
logger.AddFilter("Akka.*", LogLevel);
ConfigureLogging(logger);
});
}

hostBuilder
.ConfigureHostConfiguration(ConfigureHostConfiguration)
.ConfigureAppConfiguration(ConfigureAppConfiguration);
ConfigureHostBuilder(hostBuilder);
hostBuilder.ConfigureServices(InternalConfigureServices);
hostBuilder
.ConfigureHostConfiguration(ConfigureHostConfiguration)
.ConfigureAppConfiguration(ConfigureAppConfiguration);
ConfigureHostBuilder(hostBuilder);
hostBuilder.ConfigureServices(InternalConfigureServices);

_host = hostBuilder.Build();
_host = hostBuilder.Build();

using var cts = new CancellationTokenSource(StartupTimeout);
try
{
await _host.StartAsync(cts.Token);
using var cts = new CancellationTokenSource(StartupTimeout);
try
{
await _host.StartAsync(cts.Token);
}
catch (OperationCanceledException) when (cts.IsCancellationRequested)
{
throw new TimeoutException($"Host failed to start within {StartupTimeout.TotalSeconds} seconds");
}

// Wait for Akka initialization with timeout
var initializedTask = _initialized.Task;
var timeoutTask = Task.Delay(StartupTimeout, CancellationToken.None);
if (await Task.WhenAny(initializedTask, timeoutTask) == timeoutTask)
throw new TimeoutException($"Akka.NET failed to initialize within {StartupTimeout.TotalSeconds} seconds");

// The TestActor is created (via base.InitializeTest) inside a StartActors callback while
// remoting/clustering/etc. are concurrently spinning up their own /system actors. That
// concurrent startup intermittently terminates the freshly-created TestActor. Host startup
// is now complete, so the system is quiet — verify the TestActor survived and re-create it
// here (race-free) if it did not. See EnsureTestActorAliveAsync.
await EnsureTestActorAliveAsync();

await BeforeTestStart();
}
catch (OperationCanceledException) when (cts.IsCancellationRequested)
finally
{
throw new TimeoutException($"Host failed to start within {StartupTimeout.TotalSeconds} seconds");
SynchronizationContext.SetSynchronizationContext(entryContext);
}
}

// Wait for Akka initialization with timeout
var initializedTask = _initialized.Task;
var timeoutTask = Task.Delay(StartupTimeout, CancellationToken.None);
if (await Task.WhenAny(initializedTask, timeoutTask) == timeoutTask)
throw new TimeoutException($"Akka.NET failed to initialize within {StartupTimeout.TotalSeconds} seconds");
/// <summary>
/// Verifies that the <see cref="TestKitBase.TestActor"/> survived host startup, and re-creates it
/// if it did not.
/// <para>
/// The TestActor is an <c>InternalTestActor</c> created under <c>/system</c> on the
/// <c>CallingThreadDispatcher</c> via <see cref="TestKitBase.InitializeTest(ActorSystem, ActorSystemSetup, string, string)"/>,
/// which runs inside a <see cref="AkkaConfigurationBuilder.StartActors(Akka.Hosting.ActorStarter)"/> callback while
/// remoting, clustering and other extensions are concurrently creating their own <c>/system</c> actors.
/// That concurrent startup intermittently terminates the freshly-created TestActor, after which every
/// message sent to it dead-letters and <c>ExpectMsg</c> calls time out.
/// </para>
/// <para>
/// By the time this runs host startup has completed and the system is quiet, so re-creating the
/// TestActor here is race-free and deterministic.
/// </para>
/// </summary>
private async Task EnsureTestActorAliveAsync()
{
const int maxAttempts = 3;
for (var attempt = 0; attempt < maxAttempts; attempt++)
{
if (await IsTestActorAliveAsync())
return;

Sys.Log.Warning(
"TestActor [{0}] did not survive host startup; re-creating it (attempt {1}/{2}).",
TestActor.Path, attempt + 1, maxAttempts);

// TestActor initialization and registration now happens in AddStartup
// before user actors are created, preventing race conditions
// base.InitializeTest installs an ActorCellKeepingSynchronizationContext on the current
// thread; bracket it the same way the original call site does so it cannot leak.
var savedContext = SynchronizationContext.Current;
try
{
base.InitializeTest(Sys, (ActorSystemSetup)null!, null, null);
}
finally
{
SynchronizationContext.SetSynchronizationContext(savedContext);
}

await BeforeTestStart();
ActorRegistry.Register<TestProbe>(TestActor, overwrite: true);
}

if (!await IsTestActorAliveAsync())
throw new InvalidOperationException(
$"TestActor could not be kept alive across host startup after {maxAttempts} attempts.");
}

private async Task<bool> IsTestActorAliveAsync()
{
try
{
await Sys.ActorSelection(TestActor.Path).ResolveOne(TimeSpan.FromSeconds(1));
return true;
}
catch (ActorNotFoundException)
{
return false;
}
}

protected sealed override void InitializeTest(ActorSystem system, ActorSystemSetup config, string actorSystemName,
Expand Down
Loading