Skip to content
83 changes: 83 additions & 0 deletions src/Akka.Hosting.TestKit.Tests/ParallelAmbientContextSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// -----------------------------------------------------------------------
// <copyright file="ParallelAmbientContextSpec.cs" company="Akka.NET Project">
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Actor.Internal;
using Akka.TestKit;
using Xunit;

namespace Akka.Hosting.TestKit.Tests;

public abstract class ParallelAmbientContextSpecBase : TestKit
{
protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider)
{
}

[Fact]
public async Task Implicit_sender_should_resolve_to_own_TestActor()
{
TestActor.Tell("ping");
await ExpectMsgAsync<string>(
"ping",
TimeSpan.FromSeconds(5),
cancellationToken: TestContext.Current.CancellationToken);
Assert.Equal(TestActor, LastSender);

await Task.Yield();
TestActor.Tell("ping-after-yield");
await ExpectMsgAsync<string>(
"ping-after-yield",
TimeSpan.FromSeconds(5),
cancellationToken: TestContext.Current.CancellationToken);
Assert.Equal(TestActor, LastSender);
}
}

public class ParallelAmbientContextSpec01 : ParallelAmbientContextSpecBase { }
public class ParallelAmbientContextSpec02 : ParallelAmbientContextSpecBase { }
public class ParallelAmbientContextSpec03 : ParallelAmbientContextSpecBase { }
public class ParallelAmbientContextSpec04 : ParallelAmbientContextSpecBase { }
public class ParallelAmbientContextSpec05 : ParallelAmbientContextSpecBase { }
public class ParallelAmbientContextSpec06 : ParallelAmbientContextSpecBase { }
public class ParallelAmbientContextSpec07 : ParallelAmbientContextSpecBase { }
public class ParallelAmbientContextSpec08 : ParallelAmbientContextSpecBase { }
public class ParallelAmbientContextSpec09 : ParallelAmbientContextSpecBase { }
public class ParallelAmbientContextSpec10 : ParallelAmbientContextSpecBase { }
public class ParallelAmbientContextSpec11 : ParallelAmbientContextSpecBase { }
public class ParallelAmbientContextSpec12 : ParallelAmbientContextSpecBase { }
public class ParallelAmbientContextSpec13 : ParallelAmbientContextSpecBase { }
public class ParallelAmbientContextSpec14 : ParallelAmbientContextSpecBase { }
public class ParallelAmbientContextSpec15 : ParallelAmbientContextSpecBase { }
public class ParallelAmbientContextSpec16 : ParallelAmbientContextSpecBase { }

public abstract class ParallelNoImplicitSenderSpecBase : TestKit, INoImplicitSender
{
protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider)
{
}

[Fact]
public async Task Current_should_be_null_both_pre_and_post_await()
{
Assert.Null(InternalCurrentActorCellKeeper.Current);
await Task.Yield();
Assert.Null(InternalCurrentActorCellKeeper.Current);
await Task.Yield();
Assert.Null(InternalCurrentActorCellKeeper.Current);
}
}

public class ParallelNoImplicitSenderSpec01 : ParallelNoImplicitSenderSpecBase { }
public class ParallelNoImplicitSenderSpec02 : ParallelNoImplicitSenderSpecBase { }
public class ParallelNoImplicitSenderSpec03 : ParallelNoImplicitSenderSpecBase { }
public class ParallelNoImplicitSenderSpec04 : ParallelNoImplicitSenderSpecBase { }
public class ParallelNoImplicitSenderSpec05 : ParallelNoImplicitSenderSpecBase { }
public class ParallelNoImplicitSenderSpec06 : ParallelNoImplicitSenderSpecBase { }
public class ParallelNoImplicitSenderSpec07 : ParallelNoImplicitSenderSpecBase { }
public class ParallelNoImplicitSenderSpec08 : ParallelNoImplicitSenderSpecBase { }
2 changes: 1 addition & 1 deletion src/Akka.Hosting.TestKit.Tests/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]

[assembly: CollectionBehavior(CollectionBehavior.CollectionPerAssembly, DisableTestParallelization = true)]
[assembly: CollectionBehavior(CollectionBehavior.CollectionPerClass)]
4 changes: 2 additions & 2 deletions src/Akka.Hosting.TestKit.Tests/xunit.runner.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"$schema": "https://xunit.github.io/schema/current/xunit.runner.schema.json",
"longRunningTestSeconds": 60,
"parallelizeAssembly": false,
"parallelizeTestCollections": false
"parallelizeAssembly": true,
"parallelizeTestCollections": true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// -----------------------------------------------------------------------
// <copyright file="ActorCellKeepingSynchronizationContext.cs" company="Akka.NET Project">
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Actor.Internal;

namespace Akka.Hosting.TestKit.Internals
{
/// <summary>
/// A decorator <see cref="SynchronizationContext"/> that preserves the outer
/// SC's scheduling (e.g. xUnit v3's MaxConcurrencySyncContext) while pinning
/// <see cref="InternalCurrentActorCellKeeper.Current"/> around every callback.
/// When no outer SC exists, falls back to <see cref="ThreadPool"/> dispatch.
/// </summary>
internal sealed class ActorCellKeepingSynchronizationContext : SynchronizationContext
{
private readonly ActorCell? _cell;
private readonly SynchronizationContext? _inner;

internal ActorCellKeepingSynchronizationContext(ActorCell? cell, SynchronizationContext? inner)
{
_cell = cell;
_inner = inner;
}

public override void Post(SendOrPostCallback d, object? state)
{
void WrappedCallback(object? s)
{
var oldCell = InternalCurrentActorCellKeeper.Current;
var oldCtx = Current;
SetSynchronizationContext(this);
InternalCurrentActorCellKeeper.Current = _cell;
try
{
d(s);
}
finally
{
InternalCurrentActorCellKeeper.Current = oldCell;
SetSynchronizationContext(oldCtx);
}
}

if (_inner != null)
_inner.Post(WrappedCallback, state);
else
ThreadPool.QueueUserWorkItem(WrappedCallback, state);
}

public override void Send(SendOrPostCallback d, object? state)
{
if (_inner != null)
{
_inner.Send(_ =>
{
var oldCell = InternalCurrentActorCellKeeper.Current;
var oldCtx = Current;
SetSynchronizationContext(this);
InternalCurrentActorCellKeeper.Current = _cell;
try
{
d(state);
}
finally
{
InternalCurrentActorCellKeeper.Current = oldCell;
SetSynchronizationContext(oldCtx);
}
}, state);
}
else
{
var tcs = new TaskCompletionSource<int>();
Post(_ =>
{
try
{
d(state);
tcs.SetResult(0);
}
catch (Exception e)
{
tcs.TrySetException(e);
}
}, state);
tcs.Task.Wait();
}
}

public override SynchronizationContext CreateCopy()
=> new ActorCellKeepingSynchronizationContext(_cell, _inner);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// -----------------------------------------------------------------------
// <copyright file="HostingCleanAmbientContextAttribute.cs" company="Akka.NET Project">
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Reflection;
using System.Threading;
using Akka.Actor;
using Akka.Actor.Internal;
using Akka.TestKit;
using Xunit;
using Xunit.v3;

namespace Akka.Hosting.TestKit.Internals
{
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = true)]
internal sealed class HostingCleanAmbientContextAttribute : BeforeAfterTestAttribute
{
// AsyncLocal flows across await boundaries via ExecutionContext, unlike [ThreadStatic].
// This is critical because xUnit v3's runner awaits the test body between Before() and After(),
// so After() can resume on a different thread than Before() ran on.
private static readonly AsyncLocal<SynchronizationContext?> _previousContext = new();
private static readonly AsyncLocal<bool> _applied = new();

public override void Before(MethodInfo methodUnderTest, IXunitTest test)
{
var instance = TestContext.Current.TestClassInstance;
if (instance is not TestKitBase testKit)
{
_applied.Value = false;
return;
}

_applied.Value = true;
var cell = testKit is INoImplicitSender ? null : TryGetCell(testKit);

InternalCurrentActorCellKeeper.Current = cell;
_previousContext.Value = SynchronizationContext.Current;
SynchronizationContext.SetSynchronizationContext(
new ActorCellKeepingSynchronizationContext(cell, _previousContext.Value));
}

public override void After(MethodInfo methodUnderTest, IXunitTest test)
{
if (!_applied.Value)
return;

_applied.Value = false;
InternalCurrentActorCellKeeper.Current = null;
SynchronizationContext.SetSynchronizationContext(_previousContext.Value);
_previousContext.Value = null;
}

private static ActorCell? TryGetCell(TestKitBase testKit)
=> testKit.TestActor is ActorRefWithCell withCell
? withCell.Underlying as ActorCell
: null;
}
}
13 changes: 1 addition & 12 deletions src/Akka.Hosting.TestKit/TestKit.Shared.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public IHost Host
/// </summary>
private void EnsureImplicitSender()
{
if (this is not INoImplicitSender && TestActor != null)
if (this is not INoImplicitSender && InternalCurrentActorCellKeeper.Current == null && TestActor != null)
InternalCurrentActorCellKeeper.Current = (ActorCell)((ActorRefWithCell)TestActor).Underlying;
}

Expand Down Expand Up @@ -190,12 +190,6 @@ private async Task InitializeAsyncCore()
// TestActor initialization and registration now happens in AddStartup
// before user actors are created, preventing race conditions

// ALWAYS set the implicit sender context on the current thread after initialization
// This ensures it's available on the thread where tests will run
// This is critical for tests using DI-created actors
if (this is not INoImplicitSender && TestActor != null)
InternalCurrentActorCellKeeper.Current = (ActorCell)((ActorRefWithCell)TestActor).Underlying;

await BeforeTestStart();
}

Expand All @@ -219,11 +213,6 @@ protected sealed override void InitializeTest(ActorSystem system, ActorSystemSet

protected virtual Task BeforeTestStart()
{
// Ensure the implicit sender is set on the current thread before each test
// This is critical because tests may run on different threads than initialization
if (this is not INoImplicitSender)
InternalCurrentActorCellKeeper.Current = (ActorCell)((ActorRefWithCell)TestActor).Underlying;

return Task.CompletedTask;
}

Expand Down
2 changes: 2 additions & 0 deletions src/Akka.Hosting.TestKit/TestKit.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
using System;
using System.Threading.Tasks;
using Akka.Annotations;
using Akka.Hosting.TestKit.Internals;
using Xunit;

namespace Akka.Hosting.TestKit
{
[HostingCleanAmbientContext]
public abstract partial class TestKit : IAsyncLifetime, IAsyncDisposable
{
[InternalApi]
Expand Down