diff --git a/src/contrib/testkits/Akka.TestKit.Xunit.Tests/ActorCellKeepingSynchronizationContextSpec.cs b/src/contrib/testkits/Akka.TestKit.Xunit.Tests/ActorCellKeepingSynchronizationContextSpec.cs new file mode 100644 index 00000000000..1e7f71ad4fc --- /dev/null +++ b/src/contrib/testkits/Akka.TestKit.Xunit.Tests/ActorCellKeepingSynchronizationContextSpec.cs @@ -0,0 +1,100 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Reflection; +using System.Threading; +using Akka.TestKit; +using Xunit; + +namespace Akka.TestKit.Xunit.Tests; + +public class ActorCellKeepingSynchronizationContextSpec +{ + [Fact] + public void Post_should_delegate_to_outer_synchronization_context() + { + var inner = new RecordingSynchronizationContext(); + var wrapper = CreateWrapper(inner); + var state = new object(); + object? callbackState = null; + SynchronizationContext? callbackContext = null; + + wrapper.Post(s => + { + callbackState = s; + callbackContext = SynchronizationContext.Current; + }, state); + + Assert.Equal(1, inner.PostCalls); + Assert.Same(state, callbackState); + Assert.Same(wrapper, callbackContext); + } + + [Fact] + public void Send_should_delegate_to_outer_synchronization_context() + { + var inner = new RecordingSynchronizationContext(); + var wrapper = CreateWrapper(inner); + var state = new object(); + object? callbackState = null; + SynchronizationContext? callbackContext = null; + + wrapper.Send(s => + { + callbackState = s; + callbackContext = SynchronizationContext.Current; + }, state); + + Assert.Equal(1, inner.SendCalls); + Assert.Same(state, callbackState); + Assert.Same(wrapper, callbackContext); + } + + [Fact] + public void CreateCopy_should_preserve_outer_synchronization_context() + { + var inner = new RecordingSynchronizationContext(); + var wrapper = CreateWrapper(inner); + var copy = wrapper.CreateCopy(); + + copy.Post(_ => { }, null); + copy.Send(_ => { }, null); + + Assert.Equal(1, inner.PostCalls); + Assert.Equal(1, inner.SendCalls); + } + + private static SynchronizationContext CreateWrapper(SynchronizationContext? inner) + { + var wrapperType = typeof(TestKitBase).Assembly.GetType("Akka.TestKit.ActorCellKeepingSynchronizationContext", throwOnError: true)!; + return (SynchronizationContext)Activator.CreateInstance( + wrapperType, + BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic, + binder: null, + args: [null, inner], + culture: null)!; + } + + private sealed class RecordingSynchronizationContext : SynchronizationContext + { + public int PostCalls { get; private set; } + public int SendCalls { get; private set; } + + public override void Post(SendOrPostCallback d, object? state) + { + PostCalls++; + d(state); + } + + public override void Send(SendOrPostCallback d, object? state) + { + SendCalls++; + d(state); + } + } +} diff --git a/src/contrib/testkits/Akka.TestKit.Xunit.Tests/Akka.TestKit.Xunit.Tests.csproj b/src/contrib/testkits/Akka.TestKit.Xunit.Tests/Akka.TestKit.Xunit.Tests.csproj index 8c84bc24667..5daa88c690c 100644 --- a/src/contrib/testkits/Akka.TestKit.Xunit.Tests/Akka.TestKit.Xunit.Tests.csproj +++ b/src/contrib/testkits/Akka.TestKit.Xunit.Tests/Akka.TestKit.Xunit.Tests.csproj @@ -1,6 +1,4 @@ - - $(NetFrameworkTestVersion);$(NetTestVersion) Exe @@ -27,4 +25,10 @@ + + + Always + + + diff --git a/src/contrib/testkits/Akka.TestKit.Xunit.Tests/ParallelAmbientContextSpec.cs b/src/contrib/testkits/Akka.TestKit.Xunit.Tests/ParallelAmbientContextSpec.cs new file mode 100644 index 00000000000..bcc60b7298f --- /dev/null +++ b/src/contrib/testkits/Akka.TestKit.Xunit.Tests/ParallelAmbientContextSpec.cs @@ -0,0 +1,98 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Actor.Internal; +using Akka.TestKit.Xunit.Attributes; +using Xunit; + +namespace Akka.TestKit.Xunit.Tests; + +// Regression guard for the xUnit v3 parallel-class implicit-sender leak +// (see ActorCellKeepingSynchronizationContext for the mechanism). +// +// This project provides its own xunit.runner.json with parallel collections +// enabled so CI and local runs exercise the reported failure mode by default. + +public abstract class ParallelAmbientContextSpecBase : TestKit, IAsyncLifetime +{ + // Forces the post-ctor continuation onto a different SC worker — the + // thread pollution only manifests when the body thread differs from the + // ctor thread. + public async ValueTask InitializeAsync() => await Task.Yield(); + + public ValueTask DisposeAsync() => default; + + [Fact] + public async Task Implicit_sender_should_resolve_to_own_TestActor() + { + // Pre-await prefix — before the fix, this window read whatever cell a + // sibling ctor pinned on the body thread. + TestActor.Tell("ping"); + await ExpectMsgAsync("ping", TimeSpan.FromSeconds(5), cancellationToken: TestContext.Current.CancellationToken); + Assert.Equal(TestActor, LastSender); + + await Task.Yield(); + TestActor.Tell("ping-after-yield"); + await ExpectMsgAsync("ping-after-yield", TimeSpan.FromSeconds(5), cancellationToken: TestContext.Current.CancellationToken); + Assert.Equal(TestActor, LastSender); + } +} + +public class ParallelAmbientContextSpec01 : ParallelAmbientContextSpecBase { } +public class ParallelAmbientContextSpec02 : ParallelAmbientContextSpecBase { } +public class ParallelAmbientContextSpec03 : ParallelAmbientContextSpecBase { } +public class ParallelAmbientContextSpec04 : ParallelAmbientContextSpecBase { } +public class ParallelAmbientContextSpec05 : ParallelAmbientContextSpecBase { } +public class ParallelAmbientContextSpec06 : ParallelAmbientContextSpecBase { } +public class ParallelAmbientContextSpec07 : ParallelAmbientContextSpecBase { } +public class ParallelAmbientContextSpec08 : ParallelAmbientContextSpecBase { } +public class ParallelAmbientContextSpec09 : ParallelAmbientContextSpecBase { } +public class ParallelAmbientContextSpec10 : ParallelAmbientContextSpecBase { } +public class ParallelAmbientContextSpec11 : ParallelAmbientContextSpecBase { } +public class ParallelAmbientContextSpec12 : ParallelAmbientContextSpecBase { } +public class ParallelAmbientContextSpec13 : ParallelAmbientContextSpecBase { } +public class ParallelAmbientContextSpec14 : ParallelAmbientContextSpecBase { } +public class ParallelAmbientContextSpec15 : ParallelAmbientContextSpecBase { } +public class ParallelAmbientContextSpec16 : ParallelAmbientContextSpecBase { } + +// Regression test for INoImplicitSender under the same xUnit v3 parallel +// scheduling. INoImplicitSender tests contractually have no implicit sender — +// Current must be null both at body entry (pre-await prefix) and across any +// await continuations resumed on a reused worker thread that a sibling's +// Before hook may have pinned with a non-null cell. +public abstract class ParallelNoImplicitSenderSpecBase : TestKit, IAsyncLifetime, INoImplicitSender +{ + public async ValueTask InitializeAsync() => await Task.Yield(); + + public ValueTask DisposeAsync() => default; + + [Fact] + public async Task Current_should_be_null_both_pre_and_post_await() + { + // Invariant: body must enter with Current == null. + Assert.Null(InternalCurrentActorCellKeeper.Current); + + // Force continuation onto a potentially polluted worker. + await Task.Yield(); + Assert.Null(InternalCurrentActorCellKeeper.Current); + + await Task.Yield(); + Assert.Null(InternalCurrentActorCellKeeper.Current); + } +} + +public class ParallelNoImplicitSenderSpec01 : ParallelNoImplicitSenderSpecBase { } +public class ParallelNoImplicitSenderSpec02 : ParallelNoImplicitSenderSpecBase { } +public class ParallelNoImplicitSenderSpec03 : ParallelNoImplicitSenderSpecBase { } +public class ParallelNoImplicitSenderSpec04 : ParallelNoImplicitSenderSpecBase { } +public class ParallelNoImplicitSenderSpec05 : ParallelNoImplicitSenderSpecBase { } +public class ParallelNoImplicitSenderSpec06 : ParallelNoImplicitSenderSpecBase { } +public class ParallelNoImplicitSenderSpec07 : ParallelNoImplicitSenderSpecBase { } +public class ParallelNoImplicitSenderSpec08 : ParallelNoImplicitSenderSpecBase { } diff --git a/src/contrib/testkits/Akka.TestKit.Xunit.Tests/xunit.runner.json b/src/contrib/testkits/Akka.TestKit.Xunit.Tests/xunit.runner.json new file mode 100644 index 00000000000..52d4cff76c3 --- /dev/null +++ b/src/contrib/testkits/Akka.TestKit.Xunit.Tests/xunit.runner.json @@ -0,0 +1,6 @@ +{ + "$schema": "https://xunit.github.io/schema/current/xunit.runner.schema.json", + "longRunningTestSeconds": 60, + "parallelizeAssembly": true, + "parallelizeTestCollections": true +} diff --git a/src/contrib/testkits/Akka.TestKit.Xunit/Attributes/AkkaCleanAmbientContextAttribute.cs b/src/contrib/testkits/Akka.TestKit.Xunit/Attributes/AkkaCleanAmbientContextAttribute.cs new file mode 100644 index 00000000000..5e29e133474 --- /dev/null +++ b/src/contrib/testkits/Akka.TestKit.Xunit/Attributes/AkkaCleanAmbientContextAttribute.cs @@ -0,0 +1,99 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +//----------------------------------------------------------------------- + +#nullable enable + +using System; +using System.Reflection; +using System.Threading; +using Akka.Actor; +using Akka.Actor.Internal; +using Xunit; +using Xunit.v3; + +namespace Akka.TestKit.Xunit.Attributes; + +/// +/// Makes a test class parallel-safe under xUnit v3's parallel-collection +/// scheduling by pinning +/// to the running test's TestActor cell on the body thread, and installing +/// an that re-pins the +/// cell across await continuations. +/// +/// Intended for xUnit v3 test kits built on . +/// Applied to (and inherited by derived test +/// classes) so users get parallel-safe behavior automatically, and can also +/// be applied by downstream test kits that derive directly from +/// . See +/// for the underlying +/// mechanism and the ThreadStatic-vs-ExecutionContext rationale. +/// +[AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = true)] +public sealed class AkkaCleanAmbientContextAttribute : BeforeAfterTestAttribute +{ + private sealed record AmbientContextState + { + public required bool Applied { get; init; } + public SynchronizationContext? PreviousContext { get; init; } + public ActorCell? PreviousCell { get; init; } + } + + // AsyncLocal flows across await boundaries via ExecutionContext, unlike [ThreadStatic]. + // This is critical because xUnit v3's runner awaits the test body between Before() and After(), + // so After() can resume on a different thread than Before() ran on. + private static readonly AsyncLocal _state = new(); + + /// + public override void Before(MethodInfo methodUnderTest, IXunitTest test) + { + var instance = TestContext.Current.TestClassInstance; + if (instance is not TestKitBase testKit) + { + _state.Value = new AmbientContextState + { + Applied = false, + PreviousContext = SynchronizationContext.Current, + PreviousCell = InternalCurrentActorCellKeeper.Current + }; + return; + } + + // Null cell for INoImplicitSender mirrors TestKitBase.InitializeTest: + // the Post wrapper will pin Current = null so no sibling cell leaks in. + var cell = testKit is INoImplicitSender ? null : TryGetCell(testKit); + + _state.Value = new AmbientContextState + { + Applied = true, + PreviousContext = SynchronizationContext.Current, + PreviousCell = InternalCurrentActorCellKeeper.Current + }; + + InternalCurrentActorCellKeeper.Current = cell; + SynchronizationContext.SetSynchronizationContext( + new ActorCellKeepingSynchronizationContext(cell, _state.Value.PreviousContext)); + } + + /// + public override void After(MethodInfo methodUnderTest, IXunitTest test) + { + var state = _state.Value; + if (state is null || !state.Applied) + return; + + InternalCurrentActorCellKeeper.Current = state.PreviousCell; + SynchronizationContext.SetSynchronizationContext(state.PreviousContext); + _state.Value = null; + } + + private static ActorCell? TryGetCell(TestKitBase testKit) + { + return testKit.TestActor is ActorRefWithCell withCell + ? withCell.Underlying as ActorCell + : null; + } +} diff --git a/src/contrib/testkits/Akka.TestKit.Xunit/TestKit.cs b/src/contrib/testkits/Akka.TestKit.Xunit/TestKit.cs index 57690b599f9..213442df4c0 100644 --- a/src/contrib/testkits/Akka.TestKit.Xunit/TestKit.cs +++ b/src/contrib/testkits/Akka.TestKit.Xunit/TestKit.cs @@ -11,6 +11,7 @@ using Akka.Actor.Setup; using Akka.Configuration; using Akka.Event; +using Akka.TestKit.Xunit.Attributes; using Akka.TestKit.Xunit.Internals; using Xunit; @@ -20,6 +21,7 @@ namespace Akka.TestKit.Xunit; /// This class represents an Akka.NET TestKit that uses xUnit /// as its testing framework. /// +[AkkaCleanAmbientContext] public class TestKit : TestKitBase, IDisposable { private class PrefixedOutput : ITestOutputHelper diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.DotNet.verified.txt index f07a80792f6..8dd5a2fe34d 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.DotNet.verified.txt @@ -1,4 +1,5 @@ [assembly: System.Reflection.AssemblyMetadataAttribute("RepositoryUrl", "https://github.com/akkadotnet/akka.net")] +[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Xunit")] [assembly: System.Runtime.InteropServices.ComVisibleAttribute(false)] [assembly: System.Runtime.InteropServices.GuidAttribute("ae01b790-1478-4917-9299-b4855ba997cb")] [assembly: System.Runtime.Versioning.TargetFrameworkAttribute(".NETCoreApp,Version=v10.0", FrameworkDisplayName=".NET 10.0")] @@ -849,4 +850,4 @@ namespace Akka.TestKit.TestEvent public Unmute(System.Collections.Generic.IReadOnlyCollection filters) { } public System.Collections.Generic.IReadOnlyCollection Filters { get; } } -} \ No newline at end of file +} diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.Net.verified.txt index 692ecc8345b..f575332de5a 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.Net.verified.txt @@ -1,4 +1,5 @@ [assembly: System.Reflection.AssemblyMetadataAttribute("RepositoryUrl", "https://github.com/akkadotnet/akka.net")] +[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Xunit")] [assembly: System.Runtime.InteropServices.ComVisibleAttribute(false)] [assembly: System.Runtime.InteropServices.GuidAttribute("ae01b790-1478-4917-9299-b4855ba997cb")] [assembly: System.Runtime.Versioning.TargetFrameworkAttribute(".NETStandard,Version=v2.0", FrameworkDisplayName=".NET Standard 2.0")] @@ -849,4 +850,4 @@ namespace Akka.TestKit.TestEvent public Unmute(System.Collections.Generic.IReadOnlyCollection filters) { } public System.Collections.Generic.IReadOnlyCollection Filters { get; } } -} \ No newline at end of file +} diff --git a/src/core/Akka.TestKit/ActorCellKeepingSynchronizationContext.cs b/src/core/Akka.TestKit/ActorCellKeepingSynchronizationContext.cs index 80fc5e63e0f..a70e0bbded0 100644 --- a/src/core/Akka.TestKit/ActorCellKeepingSynchronizationContext.cs +++ b/src/core/Akka.TestKit/ActorCellKeepingSynchronizationContext.cs @@ -1,85 +1,158 @@ -//----------------------------------------------------------------------- +//----------------------------------------------------------------------- // // Copyright (C) 2009-2022 Lightbend Inc. // Copyright (C) 2013-2025 .NET Foundation // //----------------------------------------------------------------------- +#nullable enable + using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; using System.Threading; using System.Threading.Tasks; using Akka.Actor; using Akka.Actor.Internal; +using Akka.Annotations; namespace Akka.TestKit { /// - /// TBD + /// INTERNAL API. + /// + /// A used by the test kits to pin + /// the ambient + /// across await continuations that + /// originate from a test body. + /// + /// is a + /// slot — it does not flow through + /// . When a test awaits, + /// the continuation can resume on an arbitrary + /// thread whose slot is either empty + /// or polluted by unrelated work. Installing this SC on the test-body + /// thread causes every posted continuation to run inside a + /// save/pin/restore window, so the test's cell is visible to + /// IActorRef.Tell(message) implicit-sender resolution and + /// to anything else reading + /// from the continuation. + /// + /// Not intended for use outside the test kits. /// - class ActorCellKeepingSynchronizationContext : SynchronizationContext + [InternalApi] + internal class ActorCellKeepingSynchronizationContext : SynchronizationContext { - private readonly ActorCell _cell; + private readonly ActorCell? _cell; + private readonly SynchronizationContext? _inner; /// - /// TBD + /// Creates a new + /// that pins the given as + /// for the + /// duration of every callback posted through it. /// - /// TBD - public ActorCellKeepingSynchronizationContext(ActorCell cell) + /// + /// The to pin as the ambient current cell, + /// or to pin "no implicit sender" (mirrors + /// the branch of + /// ). + /// + /// + /// An optional outer to delegate + /// scheduling to. When non-null, and + /// dispatch through the outer SC (preserving its scheduling, e.g. xUnit v3's + /// MaxConcurrencySyncContext) while wrapping callbacks with the cell-pinning + /// window. When null, falls back to dispatch. + /// + public ActorCellKeepingSynchronizationContext(ActorCell? cell, SynchronizationContext? inner = null) { _cell = cell; + _inner = inner; } /// - /// TBD + /// Queues the given callback with + /// pinned to the cell this SC was constructed with, then restores the + /// previously pinned value when the callback returns. Delegates scheduling + /// to the inner when available, otherwise + /// falls back to . /// - /// TBD - /// TBD - public override void Post(SendOrPostCallback d, object state) + /// The delegate to invoke. + /// The state object to pass to . + public override void Post(SendOrPostCallback d, object? state) { - ThreadPool.QueueUserWorkItem(_ => + void WrappedCallback(object? s) { var oldCell = InternalCurrentActorCellKeeper.Current; var oldContext = Current; SetSynchronizationContext(this); - InternalCurrentActorCellKeeper.Current = _cell; - try { - d(state); + d(s); } finally { InternalCurrentActorCellKeeper.Current = oldCell; SetSynchronizationContext(oldContext); } - }, state); + } + + if (_inner != null) + _inner.Post(WrappedCallback, state); + else + ThreadPool.QueueUserWorkItem(WrappedCallback, state); } /// - /// TBD + /// Synchronously dispatches the given callback with cell pinning. + /// Delegates to the inner when + /// available, otherwise falls back to with a + /// blocking wait. /// - /// TBD - /// TBD - public override void Send(SendOrPostCallback d, object state) + /// The delegate to invoke. + /// The state object to pass to . + public override void Send(SendOrPostCallback d, object? state) { - var tcs = new TaskCompletionSource(); - Post(_ => + if (_inner != null) { - try + _inner.Send(_ => { - d(state); - tcs.SetResult(0); - } - catch (Exception e) + var oldCell = InternalCurrentActorCellKeeper.Current; + var oldContext = Current; + SetSynchronizationContext(this); + InternalCurrentActorCellKeeper.Current = _cell; + try + { + d(state); + } + finally + { + InternalCurrentActorCellKeeper.Current = oldCell; + SetSynchronizationContext(oldContext); + } + }, state); + } + else + { + var tcs = new TaskCompletionSource(); + Post(_ => { - tcs.TrySetException(e); - } - }, state); - tcs.Task.Wait(); + try + { + d(state); + tcs.SetResult(0); + } + catch (Exception e) + { + tcs.TrySetException(e); + } + }, state); + tcs.Task.Wait(); + } } + + /// + public override SynchronizationContext CreateCopy() + => new ActorCellKeepingSynchronizationContext(_cell, _inner); } } diff --git a/src/core/Akka.TestKit/Properties/AssemblyInfo.cs b/src/core/Akka.TestKit/Properties/AssemblyInfo.cs index 7e6c06f1f00..3af94376b14 100644 --- a/src/core/Akka.TestKit/Properties/AssemblyInfo.cs +++ b/src/core/Akka.TestKit/Properties/AssemblyInfo.cs @@ -6,6 +6,7 @@ //----------------------------------------------------------------------- using System.Reflection; +using System.Runtime.CompilerServices; using System.Runtime.InteropServices; // General Information about an assembly is controlled through the following @@ -19,3 +20,4 @@ // The following GUID is for the ID of the typelib if this project is exposed to COM [assembly: Guid("ae01b790-1478-4917-9299-b4855ba997cb")] +[assembly: InternalsVisibleTo("Akka.TestKit.Xunit")]