diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs index 4f79a40454e..d0ff5baf440 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs @@ -88,7 +88,7 @@ public async Task ReliableDelivery_with_Sharding_must_illustrate_Sharding_usage_ var consumerEndProbe = CreateTestProbe(); var region = await ClusterSharding.Get(Sys).StartAsync($"TestConsumer-{_idCount}", _ => ShardingConsumerController.Create(c => - PropsFor(DefaultConsumerDelay, 42, consumerEndProbe.Ref, c), + PropsFor(DefaultConsumerDelay, 42, consumerEndProbe.Ref, c, expectedProducerCount: 2), ShardingConsumerController.Settings.Create(Sys)), ClusterShardingSettings.Create(Sys), HashCodeMessageExtractor.Create(10, o => string.Empty, o => o)); @@ -108,7 +108,7 @@ public async Task ReliableDelivery_with_Sharding_must_illustrate_Sharding_usage_ $"p2-{_idCount}"); // expecting 3 end messages, one for each entity: "entity-0", "entity-1", "entity-2" - var endMessages = await consumerEndProbe.ReceiveNAsync(3, TimeSpan.FromSeconds(5)).ToListAsync(); + var endMessages = await consumerEndProbe.ReceiveNAsync(3, TimeSpan.FromSeconds(10)).ToListAsync(); var producerIds = endMessages.Cast().SelectMany(c => c.ProducerIds).ToList(); producerIds diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs index 6d9d3b2de5f..c3209f3eb44 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs @@ -1856,21 +1856,31 @@ private void HandleSupervisorStop(SupervisorStopDirectivePassivation msg) // We only have to do this if we have R-E enabled if (!_rememberEntities) return; - + var id = _entities.EntityId(msg.Child); // Just return if the child actor is not a recorded shard entity - if (id is null) + if (id is null) return; - - // Remove the child actor from the entity list - _entities.RemoveEntity(id); - + + // Only transition to Passivating if entity is Active. + // If already Passivating (entity requested passivation then threw), the normal flow handles it. + // + // Using Passivating state (instead of RemoveEntity which sets NoState): + // 1. Buffers messages arriving during termination (fixes race condition crash) + // 2. Persists removal from store when EntityTerminated arrives + // 3. Restarts entity if messages arrive (transient failure, try again) + // 4. Stays forgotten if no messages arrive + if (_entities.EntityState(id) is Active) + { + _entities.EntityPassivating(id); + } + // Force stop the child actor, it might have been restarted Context.Stop(msg.Child); - + Log.Error( - msg.LastCause, - "{0}: Remembered entity {1} was stopped: {2}", + msg.LastCause, + "{0}: Remembered entity {1} was stopped: {2}", _typeName, id, msg.Reason); } diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/PublishSubscribe/DistributedPubSubRestartSpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/PublishSubscribe/DistributedPubSubRestartSpec.cs index fd755c0fa9b..7e504235644 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/PublishSubscribe/DistributedPubSubRestartSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/PublishSubscribe/DistributedPubSubRestartSpec.cs @@ -91,12 +91,13 @@ public async Task A_Cluster_with_DistributedPubSub_must_handle_restart_of_nodes_ await WithinAsync(30.Seconds(), async () => { Mediator.Tell(new Subscribe("topic1", TestActor)); - ExpectMsg(); + await ExpectMsgAsync(); await CountAsync(3); - RunOn(() => + await RunOnAsync(() => { Mediator.Tell(new Publish("topic1", "msg1")); + return Task.CompletedTask; }, _config.First); await EnterBarrierAsync("pub-msg1"); @@ -110,8 +111,10 @@ await RunOnAsync(async () => await EnterBarrierAsync("end"); - Mediator.Tell(DeltaCount.Instance); - var deltaCount = await ExpectMsgAsync(); + // Use a probe to isolate DeltaCount query from any stray messages in TestActor mailbox + var probe = CreateTestProbe(); + Mediator.Tell(DeltaCount.Instance, probe.Ref); + var deltaCount = await probe.ExpectMsgAsync(5.Seconds()); deltaCount.Should().Be(oldDeltaCount); }, _config.Second); @@ -123,12 +126,14 @@ await RunOnAsync(async () => var thirdAddress = (await NodeAsync(_config.Third)).Address; await TestConductor.Shutdown(_config.Third).WaitAsync(30.Seconds()); - await WithinAsync(20.Seconds(), async () => + // Use a probe for Identify to avoid polluting TestActor mailbox with stray responses + var identifyProbe = CreateTestProbe(); + await WithinAsync(25.Seconds(), async () => { await AwaitAssertAsync(async () => { - Sys.ActorSelection(new RootActorPath(thirdAddress) / "user" / "shutdown").Tell(new Identify(null)); - (await ExpectMsgAsync(1.Seconds())).Subject.Should().NotBeNull(); + Sys.ActorSelection(new RootActorPath(thirdAddress) / "user" / "shutdown").Tell(new Identify(null), identifyProbe.Ref); + (await identifyProbe.ExpectMsgAsync(2.Seconds())).Subject.Should().NotBeNull(); }); }); @@ -136,8 +141,11 @@ await AwaitAssertAsync(async () => await EnterBarrierAsync("end"); - Mediator.Tell(DeltaCount.Instance); - var deltaCount = await ExpectMsgAsync(); + // Use a probe to isolate DeltaCount query from stray ActorIdentity messages + // that may still be arriving from AwaitAssertAsync Identify retries + var deltaProbe = CreateTestProbe(); + Mediator.Tell(DeltaCount.Instance, deltaProbe.Ref); + var deltaCount = await deltaProbe.ExpectMsgAsync(5.Seconds()); deltaCount.Should().Be(oldDeltaCount); }, _config.First); @@ -157,6 +165,12 @@ await RunOnAsync(async () => await Cluster.Get(newSystem).JoinAsync(Cluster.Get(newSystem).SelfAddress); var newMediator = DistributedPubSub.Get(newSystem).Mediator; var probe = CreateTestProbe(newSystem); + + // Create shutdown actor FIRST so First node can find it while we verify gossip isolation + // This fixes the race condition where First node times out waiting for this actor + newSystem.Log.Info("Creating shutdown actor on {0}", node3Address); + newSystem.ActorOf("shutdown"); + newMediator.Tell(new Subscribe("topic2", probe.Ref), probe.Ref); await probe.ExpectMsgAsync(); @@ -165,8 +179,6 @@ await RunOnAsync(async () => newMediator.Tell(DeltaCount.Instance, probe.Ref); await probe.ExpectMsgAsync(0L); - newSystem.Log.Info("Shutdown actor started on {0}",node3Address); - newSystem.ActorOf("shutdown"); await newSystem.WhenTerminated.WaitAsync(30.Seconds()); } finally @@ -194,10 +206,11 @@ private IActorRef Mediator private async Task JoinAsync(RoleName from, RoleName to) { - RunOn(() => + await RunOnAsync(() => { Cluster.Get(Sys).Join(Node(to).Address); CreateMediator(); + return Task.CompletedTask; }, from); await EnterBarrierAsync(from.Name + "-joined"); } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.DotNet.verified.txt index 8ea957fa160..0ecde733e61 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.DotNet.verified.txt @@ -309,6 +309,7 @@ namespace Akka.TestKit public object LastMessage { get; } public Akka.Actor.IActorRef LastSender { get; } public Akka.Event.ILoggingAdapter Log { get; } + public System.TimeSpan NoMessageRemainingOrDefault { get; } public static System.TimeSpan Now { get; } public System.TimeSpan Remaining { get; } public System.TimeSpan RemainingOrDefault { get; } @@ -397,17 +398,17 @@ namespace Akka.TestKit public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOf(System.Collections.Generic.IReadOnlyCollection messages, System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOf(System.TimeSpan max, params T[] messages) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOf(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection messages, System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__153))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__158))] public System.Collections.Generic.IAsyncEnumerable ExpectMsgAllOfAsync(System.Collections.Generic.IReadOnlyCollection messages, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__156))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__161))] public System.Collections.Generic.IAsyncEnumerable ExpectMsgAllOfAsync(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection messages, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOfMatchingPredicates(params Akka.TestKit.PredicateInfo[] predicates) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOfMatchingPredicates(System.Collections.Generic.IReadOnlyCollection predicates, System.Threading.CancellationToken cancellationToken) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOfMatchingPredicates(System.TimeSpan max, params Akka.TestKit.PredicateInfo[] predicates) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOfMatchingPredicates(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection predicates, System.Threading.CancellationToken cancellationToken) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__163))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__168))] public System.Collections.Generic.IAsyncEnumerable ExpectMsgAllOfMatchingPredicatesAsync(System.Collections.Generic.IReadOnlyCollection predicates, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__166))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__171))] public System.Collections.Generic.IAsyncEnumerable ExpectMsgAllOfMatchingPredicatesAsync(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection predicates, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } public T ExpectMsgAnyOf(System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken = null) { } public System.Threading.Tasks.ValueTask ExpectMsgAnyOfAsync(System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken = null) { } @@ -462,9 +463,9 @@ namespace Akka.TestKit public System.Threading.Tasks.ValueTask PeekOneAsync(System.Threading.CancellationToken cancellationToken) { } public System.Collections.Generic.IReadOnlyCollection ReceiveN(int numberOfMessages, System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyCollection ReceiveN(int numberOfMessages, System.TimeSpan max, System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__212))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__217))] public System.Collections.Generic.IAsyncEnumerable ReceiveNAsync(int numberOfMessages, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__214))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__219))] public System.Collections.Generic.IAsyncEnumerable ReceiveNAsync(int numberOfMessages, System.TimeSpan max, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } public object ReceiveOne(System.Nullable max = null, System.Threading.CancellationToken cancellationToken = null) { } public System.Threading.Tasks.ValueTask ReceiveOneAsync(System.Nullable max = null, System.Threading.CancellationToken cancellationToken = null) { } @@ -472,13 +473,13 @@ namespace Akka.TestKit public System.Collections.Generic.IReadOnlyList ReceiveWhile(System.Nullable max, System.Nullable idle, System.Func filter, int msgs = 2147483647, System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyList ReceiveWhile(System.Func filter, System.Nullable max = null, System.Nullable idle = null, int msgs = 2147483647, System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyList ReceiveWhile(System.Predicate shouldContinue, System.Nullable max = null, System.Nullable idle = null, int msgs = 2147483647, bool shouldIgnoreOtherMessageTypes = True, System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__203))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__208))] public System.Collections.Generic.IAsyncEnumerable ReceiveWhileAsync(System.Nullable max, System.Func filter, int msgs = 2147483647, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__205))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__210))] public System.Collections.Generic.IAsyncEnumerable ReceiveWhileAsync(System.Nullable max, System.Nullable idle, System.Func filter, int msgs = 2147483647, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__207))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__212))] public System.Collections.Generic.IAsyncEnumerable ReceiveWhileAsync(System.Func filter, System.Nullable max = null, System.Nullable idle = null, int msgs = 2147483647, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__209))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__214))] public System.Collections.Generic.IAsyncEnumerable ReceiveWhileAsync(System.Predicate shouldContinue, System.Nullable max = null, System.Nullable idle = null, int msgs = 2147483647, bool shouldIgnoreOtherMessageTypes = True, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } protected System.TimeSpan RemainingOr(System.TimeSpan duration) { } public System.TimeSpan RemainingOrDilated(System.Nullable duration) { } @@ -519,6 +520,7 @@ namespace Akka.TestKit { public TestKitSettings(Akka.Configuration.Config config) { } public System.TimeSpan DefaultTimeout { get; } + public System.TimeSpan ExpectNoMessageDefault { get; } public bool LogTestKitCalls { get; } public System.TimeSpan SingleExpectDefault { get; } public System.TimeSpan TestEventFilterLeeway { get; } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.Net.verified.txt index 92d61d46c37..7903d26784d 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.Net.verified.txt @@ -309,6 +309,7 @@ namespace Akka.TestKit public object LastMessage { get; } public Akka.Actor.IActorRef LastSender { get; } public Akka.Event.ILoggingAdapter Log { get; } + public System.TimeSpan NoMessageRemainingOrDefault { get; } public static System.TimeSpan Now { get; } public System.TimeSpan Remaining { get; } public System.TimeSpan RemainingOrDefault { get; } @@ -397,17 +398,17 @@ namespace Akka.TestKit public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOf(System.Collections.Generic.IReadOnlyCollection messages, System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOf(System.TimeSpan max, params T[] messages) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOf(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection messages, System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__153))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__158))] public System.Collections.Generic.IAsyncEnumerable ExpectMsgAllOfAsync(System.Collections.Generic.IReadOnlyCollection messages, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__156))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__161))] public System.Collections.Generic.IAsyncEnumerable ExpectMsgAllOfAsync(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection messages, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOfMatchingPredicates(params Akka.TestKit.PredicateInfo[] predicates) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOfMatchingPredicates(System.Collections.Generic.IReadOnlyCollection predicates, System.Threading.CancellationToken cancellationToken) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOfMatchingPredicates(System.TimeSpan max, params Akka.TestKit.PredicateInfo[] predicates) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOfMatchingPredicates(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection predicates, System.Threading.CancellationToken cancellationToken) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__163))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__168))] public System.Collections.Generic.IAsyncEnumerable ExpectMsgAllOfMatchingPredicatesAsync(System.Collections.Generic.IReadOnlyCollection predicates, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__166))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__171))] public System.Collections.Generic.IAsyncEnumerable ExpectMsgAllOfMatchingPredicatesAsync(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection predicates, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } public T ExpectMsgAnyOf(System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken = null) { } public System.Threading.Tasks.ValueTask ExpectMsgAnyOfAsync(System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken = null) { } @@ -462,9 +463,9 @@ namespace Akka.TestKit public System.Threading.Tasks.ValueTask PeekOneAsync(System.Threading.CancellationToken cancellationToken) { } public System.Collections.Generic.IReadOnlyCollection ReceiveN(int numberOfMessages, System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyCollection ReceiveN(int numberOfMessages, System.TimeSpan max, System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__212))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__217))] public System.Collections.Generic.IAsyncEnumerable ReceiveNAsync(int numberOfMessages, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__214))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__219))] public System.Collections.Generic.IAsyncEnumerable ReceiveNAsync(int numberOfMessages, System.TimeSpan max, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } public object ReceiveOne(System.Nullable max = null, System.Threading.CancellationToken cancellationToken = null) { } public System.Threading.Tasks.ValueTask ReceiveOneAsync(System.Nullable max = null, System.Threading.CancellationToken cancellationToken = null) { } @@ -472,13 +473,13 @@ namespace Akka.TestKit public System.Collections.Generic.IReadOnlyList ReceiveWhile(System.Nullable max, System.Nullable idle, System.Func filter, int msgs = 2147483647, System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyList ReceiveWhile(System.Func filter, System.Nullable max = null, System.Nullable idle = null, int msgs = 2147483647, System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyList ReceiveWhile(System.Predicate shouldContinue, System.Nullable max = null, System.Nullable idle = null, int msgs = 2147483647, bool shouldIgnoreOtherMessageTypes = True, System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__203))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__208))] public System.Collections.Generic.IAsyncEnumerable ReceiveWhileAsync(System.Nullable max, System.Func filter, int msgs = 2147483647, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__205))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__210))] public System.Collections.Generic.IAsyncEnumerable ReceiveWhileAsync(System.Nullable max, System.Nullable idle, System.Func filter, int msgs = 2147483647, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__207))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__212))] public System.Collections.Generic.IAsyncEnumerable ReceiveWhileAsync(System.Func filter, System.Nullable max = null, System.Nullable idle = null, int msgs = 2147483647, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__209))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__214))] public System.Collections.Generic.IAsyncEnumerable ReceiveWhileAsync(System.Predicate shouldContinue, System.Nullable max = null, System.Nullable idle = null, int msgs = 2147483647, bool shouldIgnoreOtherMessageTypes = True, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } protected System.TimeSpan RemainingOr(System.TimeSpan duration) { } public System.TimeSpan RemainingOrDilated(System.Nullable duration) { } @@ -519,6 +520,7 @@ namespace Akka.TestKit { public TestKitSettings(Akka.Configuration.Config config) { } public System.TimeSpan DefaultTimeout { get; } + public System.TimeSpan ExpectNoMessageDefault { get; } public bool LogTestKitCalls { get; } public System.TimeSpan SingleExpectDefault { get; } public System.TimeSpan TestEventFilterLeeway { get; } diff --git a/src/core/Akka.Remote/RemoteActorRefProvider.cs b/src/core/Akka.Remote/RemoteActorRefProvider.cs index 9e2b545de8f..8401f28c89a 100644 --- a/src/core/Akka.Remote/RemoteActorRefProvider.cs +++ b/src/core/Akka.Remote/RemoteActorRefProvider.cs @@ -681,38 +681,11 @@ public Internals(RemoteTransport transport, Akka.Serialization.Serialization ser #region RemotingTerminator - /// - /// Describes the FSM states of the - /// - private enum TerminatorState - { - /// - /// TBD - /// - Uninitialized, - /// - /// TBD - /// - Idle, - /// - /// TBD - /// - WaitDaemonShutdown, - /// - /// TBD - /// - WaitTransportShutdown, - /// - /// TBD - /// - Finished - } - /// /// Responsible for shutting down the and all transports /// when the is being shutdown. /// - private class RemotingTerminator : FSM, IRequiresMessageQueue + private class RemotingTerminator : ActorBase, IRequiresMessageQueue { private readonly IActorRef _systemGuardian; private readonly ILoggingAdapter _log; @@ -723,56 +696,67 @@ public RemotingTerminator(IActorRef systemGuardian) // can't use normal Logger.GetLogger(this IActorContext) here due to https://github.com/akkadotnet/akka.net/issues/4530 _log = Logging.GetLogger(Context.System.EventStream, "remoting-terminator"); - InitFSM(); + Become(Uninitialized()); } - private void InitFSM() + protected override bool Receive(object message) => EmptyReceive(message); + + private Receive Uninitialized() { - When(TerminatorState.Uninitialized, @event => + return (object message) => { - if (@event.FsmEvent is Internals internals) + if (message is Internals internals) { _systemGuardian.Tell(RegisterTerminationHook.Instance); - return GoTo(TerminatorState.Idle).Using(internals); + Become(Idle(internals)); } - return null; - }); + return false; + }; + } - When(TerminatorState.Idle, @event => + private Receive Idle(Internals internals) + { + return (object message) => { - if (@event.StateData != null && @event.FsmEvent is TerminationHook) + if (message is TerminationHook) { _log.Info("Shutting down remote daemon."); - @event.StateData.RemoteDaemon.Tell(TerminationHook.Instance); - return GoTo(TerminatorState.WaitDaemonShutdown); + internals.RemoteDaemon.Tell(TerminationHook.Instance); + Become(WaitDaemonShutdown(internals)); + return true; } - return null; - }); + return false; + }; + } + private Receive WaitDaemonShutdown(Internals internals) + { // TODO: state timeout - When(TerminatorState.WaitDaemonShutdown, @event => + return (object message) => { - if (@event.StateData != null && @event.FsmEvent is TerminationHookDone) + if (message is TerminationHookDone) { _log.Info("Remote daemon shut down; proceeding with flushing remote transports."); - @event.StateData.Transport.Shutdown() + internals.Transport.Shutdown() .ContinueWith(_ => TransportShutdown.Instance, TaskContinuationOptions.ExecuteSynchronously) .PipeTo(Self); - return GoTo(TerminatorState.WaitTransportShutdown); + Become(WaitTransportShutdown()); + return true; } + return false; + }; + } - return null; - }); - - When(TerminatorState.WaitTransportShutdown, _ => + private Receive WaitTransportShutdown() + { + return (object message) => { _log.Info("Remoting shut down."); _systemGuardian.Tell(TerminationHookDone.Instance); - return Stop(); - }); - - StartWith(TerminatorState.Uninitialized, null); + Context.Stop(Self); + return true; + }; } public sealed class TransportShutdown diff --git a/src/core/Akka.Streams.Tests/Implementation/TimeoutsSpec.cs b/src/core/Akka.Streams.Tests/Implementation/TimeoutsSpec.cs index f108467fd71..87a974207c3 100644 --- a/src/core/Akka.Streams.Tests/Implementation/TimeoutsSpec.cs +++ b/src/core/Akka.Streams.Tests/Implementation/TimeoutsSpec.cs @@ -220,14 +220,14 @@ await this.AssertAllStagesStoppedAsync(async () => var subscriber = this.CreateSubscriberProbe(); Source.From(new[] { 1, 2, 3, 4 }) - .BackpressureTimeout(TimeSpan.FromSeconds(1)) + .BackpressureTimeout(TimeSpan.FromSeconds(2)) .RunWith(Sink.FromSubscriber(subscriber), Materializer); for (var i = 1; i < 4; i++) { await subscriber.AsyncBuilder() .RequestNext(i) - .ExpectNoMsg(TimeSpan.FromMilliseconds(250)) + .ExpectNoMsg(TimeSpan.FromMilliseconds(100)) .ExecuteAsync(); } diff --git a/src/core/Akka.Streams/Implementation/Sinks.cs b/src/core/Akka.Streams/Implementation/Sinks.cs index da7ef7f2208..0215327246c 100644 --- a/src/core/Akka.Streams/Implementation/Sinks.cs +++ b/src/core/Akka.Streams/Implementation/Sinks.cs @@ -743,8 +743,16 @@ public override void PreStart() Pull(_stage.In); } - public override void PostStop() => + public override void PostStop() + { + // Complete any pending request before shutting down to prevent orphaned Tasks + if (_currentRequest.HasValue) + { + _currentRequest.Value.SetException(new StreamDetachedException()); + _currentRequest = Option>>.None; + } StopCallback(promise => promise.SetException(new StreamDetachedException())); + } private Action>> Callback() { diff --git a/src/core/Akka.TestKit.Tests/TestKit_Config_Tests.cs b/src/core/Akka.TestKit.Tests/TestKit_Config_Tests.cs index fcd2b46dcee..155db12493a 100644 --- a/src/core/Akka.TestKit.Tests/TestKit_Config_Tests.cs +++ b/src/core/Akka.TestKit.Tests/TestKit_Config_Tests.cs @@ -7,6 +7,8 @@ using System; using System.Reflection; +using Akka.Configuration; +using Akka.Dispatch; using Xunit; namespace Akka.TestKit.Tests.Xunit2 @@ -19,6 +21,7 @@ public void DefaultValues_should_be_correct() { TestKitSettings.DefaultTimeout.ShouldBe(TimeSpan.FromSeconds(5)); TestKitSettings.SingleExpectDefault.ShouldBe(TimeSpan.FromSeconds(3)); + TestKitSettings.ExpectNoMessageDefault.ShouldBe(TimeSpan.FromSeconds(3)); TestKitSettings.TestEventFilterLeeway.ShouldBe(TimeSpan.FromSeconds(3)); TestKitSettings.TestTimeFactor.ShouldBe(1); TestKitSettings.TestKitStartupTimeout.ShouldBe(TimeSpan.FromSeconds(5)); @@ -29,5 +32,53 @@ public void DefaultValues_should_be_correct() CallingThreadDispatcher.Id.ShouldBe("akka.test.calling-thread-dispatcher"); } } + + public class TestKitCustomConfigTests : TestKit.Xunit2.TestKit + { + private static readonly Config Config = """ + akka.test { + timefactor = 2.0 + filter-leeway = 4s + single-expect-default = 5s + expect-no-message-default = 6s + default-timeout = 7s + startup-timeout = 8s + calling-thread-dispatcher { + type = "Akka.Dispatch.DispatcherConfigurator, Akka" + throughput = 2147483647 + } + + test-actor.dispatcher { + type = "Akka.Dispatch.DispatcherConfigurator, Akka" + throughput = 2147483647 + } + + # Disable batching in order to prevent flakiness with Akka.Remote tests (since they have low message frequency) + remote.dot-netty.tcp.batching.enabled = false + } + """; + + public TestKitCustomConfigTests(): base(Config) + { + } + + [Fact] + public void DefaultValues_should_be_correct() + { + TestKitSettings.TestTimeFactor.ShouldBe(2); + TestKitSettings.TestEventFilterLeeway.ShouldBe(TimeSpan.FromSeconds(4)); + TestKitSettings.SingleExpectDefault.ShouldBe(TimeSpan.FromSeconds(5)); + TestKitSettings.ExpectNoMessageDefault.ShouldBe(TimeSpan.FromSeconds(6)); + TestKitSettings.DefaultTimeout.ShouldBe(TimeSpan.FromSeconds(7)); + TestKitSettings.TestKitStartupTimeout.ShouldBe(TimeSpan.FromSeconds(8)); + + var callingThreadDispatcherTypeName = typeof(DispatcherConfigurator).FullName + ", " + typeof(DispatcherConfigurator).Assembly.GetName().Name; + Assert.False(Sys.Settings.Config.IsEmpty); + + Sys.Settings.Config.GetString("akka.test.calling-thread-dispatcher.type", null).ShouldBe(callingThreadDispatcherTypeName); + Sys.Settings.Config.GetString("akka.test.test-actor.dispatcher.type", null).ShouldBe(callingThreadDispatcherTypeName); + CallingThreadDispatcher.Id.ShouldBe("akka.test.calling-thread-dispatcher"); + } + } } diff --git a/src/core/Akka.TestKit/Internal/Reference.conf b/src/core/Akka.TestKit/Internal/Reference.conf index cf079e06962..0052c4f5c0c 100644 --- a/src/core/Akka.TestKit/Internal/Reference.conf +++ b/src/core/Akka.TestKit/Internal/Reference.conf @@ -23,6 +23,10 @@ akka { # by default single-expect-default = 3s + # duration to wait in expectNoMsg + # by default + expect-no-message-default = 3s + # The timeout that is added as an implicit by DefaultTimeout trait # This is used for Ask-pattern default-timeout = 5s diff --git a/src/core/Akka.TestKit/TestKitBase.cs b/src/core/Akka.TestKit/TestKitBase.cs index a30e4a58eae..28df99051a9 100644 --- a/src/core/Akka.TestKit/TestKitBase.cs +++ b/src/core/Akka.TestKit/TestKitBase.cs @@ -28,6 +28,10 @@ namespace Akka.TestKit /// public abstract partial class TestKitBase : IActorRefFactory { + // AsyncLocal for proper timeout propagation across async boundaries. + // This ensures WithinAsync timeout flows correctly to EventFilter and other async operations. + private readonly AsyncLocal _asyncLocalEnd = new(); + private class TestState { public TestState() @@ -204,6 +208,7 @@ protected void InitializeTest(ActorSystem system, Config config, string actorSys } private TimeSpan SingleExpectDefaultTimeout { get { return _testState.TestKitSettings.SingleExpectDefault; } } + private TimeSpan ExpectNoMessageDefaultTimeout { get { return _testState.TestKitSettings.ExpectNoMessageDefault; } } /// /// The that is recreated and used for each test. @@ -416,6 +421,20 @@ public TimeSpan RemainingOrDefault get { return RemainingOr(Dilated(SingleExpectDefaultTimeout)); } } + /// + /// + /// Retrieves the time remaining for execution of the innermost enclosing + /// Within block. + /// If missing that, then it returns the properly dilated default for this + /// case from settings (key: "akka.test.expect-no-message-default"). + /// + /// The returned value is always finite. + /// + public TimeSpan NoMessageRemainingOrDefault + { + get { return RemainingOr(Dilated(ExpectNoMessageDefaultTimeout)); } + } + /// /// /// Retrieves the time remaining for execution of the innermost enclosing @@ -430,9 +449,21 @@ public TimeSpan Remaining { get { + // Check AsyncLocal first (async context takes precedence) + var asyncEnd = _asyncLocalEnd.Value; + if (asyncEnd.HasValue) + { + if (asyncEnd < TimeSpan.Zero) + throw new InvalidOperationException($"End can not be negative, was: {asyncEnd}"); + + var asyncRemaining = asyncEnd.Value - Now; + return asyncRemaining < TimeSpan.Zero ? TimeSpan.Zero : asyncRemaining; + } + + // Fallback to instance field if(_testState.End is null) throw new InvalidOperationException(@"Remaining may not be called outside of ""within"""); - + if (_testState.End < TimeSpan.Zero) throw new InvalidOperationException($"End can not be negative, was: {_testState.End}"); @@ -451,6 +482,18 @@ public TimeSpan Remaining /// TBD protected TimeSpan RemainingOr(TimeSpan duration) { + // Check AsyncLocal first (async context takes precedence for proper timeout propagation) + var asyncEnd = _asyncLocalEnd.Value; + if (asyncEnd.HasValue) + { + if (asyncEnd < TimeSpan.Zero) + throw new InvalidOperationException($"End can not be negative, was: {asyncEnd}"); + + var asyncRemaining = asyncEnd.Value - Now; + return asyncRemaining < TimeSpan.Zero ? TimeSpan.Zero : asyncRemaining; + } + + // Fallback to instance field for backward compatibility with sync code paths if (!_testState.End.HasValue) return duration; if (_testState.End < TimeSpan.Zero) throw new InvalidOperationException($"End can not be negative, was: {_testState.End}"); diff --git a/src/core/Akka.TestKit/TestKitBase_Expect.cs b/src/core/Akka.TestKit/TestKitBase_Expect.cs index 85e70f81310..f7838964d85 100644 --- a/src/core/Akka.TestKit/TestKitBase_Expect.cs +++ b/src/core/Akka.TestKit/TestKitBase_Expect.cs @@ -456,7 +456,7 @@ private async ValueTask InternalExpectMsgEnvelopeAsync( /// /// Wait time is bounded by remaining time for execution of the innermost enclosing 'within' /// block, if inside a 'within' block; otherwise by the config value - /// "akka.test.single-expect-default". + /// "akka.test.expect-no-message-default". /// public void ExpectNoMsg(CancellationToken cancellationToken = default) { @@ -466,7 +466,7 @@ public void ExpectNoMsg(CancellationToken cancellationToken = default) /// public ValueTask ExpectNoMsgAsync(CancellationToken cancellationToken = default) { - return InternalExpectNoMsgAsync(RemainingOrDefault, cancellationToken); + return InternalExpectNoMsgAsync(NoMessageRemainingOrDefault, cancellationToken); } /// diff --git a/src/core/Akka.TestKit/TestKitBase_Within.cs b/src/core/Akka.TestKit/TestKitBase_Within.cs index b73ad0e93f0..bc9732a45c7 100644 --- a/src/core/Akka.TestKit/TestKitBase_Within.cs +++ b/src/core/Akka.TestKit/TestKitBase_Within.cs @@ -294,7 +294,10 @@ public async Task WithinAsync( var maxDiff = max.Min(rem); var prevEnd = _testState.End; + var prevAsyncEnd = _asyncLocalEnd.Value; // Save previous AsyncLocal value for nesting support + _testState.End = start + maxDiff; + _asyncLocalEnd.Value = start + maxDiff; // Set AsyncLocal for proper async propagation T ret = default; using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) @@ -321,6 +324,7 @@ public async Task WithinAsync( // Make sure we stop the delay task cts.Cancel(); _testState.End = prevEnd; + _asyncLocalEnd.Value = prevAsyncEnd; // Restore previous AsyncLocal value } } diff --git a/src/core/Akka.TestKit/TestKitSettings.cs b/src/core/Akka.TestKit/TestKitSettings.cs index 71ea14c0694..4bb581bbe59 100644 --- a/src/core/Akka.TestKit/TestKitSettings.cs +++ b/src/core/Akka.TestKit/TestKitSettings.cs @@ -30,6 +30,7 @@ public TestKitSettings(Config config) DefaultTimeout = config.GetTimeSpan("akka.test.default-timeout", null, allowInfinite:false); SingleExpectDefault = config.GetTimeSpan("akka.test.single-expect-default", null, allowInfinite: false); + ExpectNoMessageDefault = config.GetTimeSpan("akka.test.expect-no-message-default", null, allowInfinite: false); TestKitStartupTimeout = config.GetTimeSpan("akka.test.startup-timeout", null, allowInfinite: false); TestEventFilterLeeway = config.GetTimeSpan("akka.test.filter-leeway", null, allowInfinite: false); TestTimeFactor = config.GetDouble("akka.test.timefactor", 0); @@ -49,6 +50,9 @@ public TestKitSettings(Config config) /// Gets the config value "akka.test.single-expect-default". It is always finite. public TimeSpan SingleExpectDefault { get; } + /// Gets the config value "akka.test.expect-no-message-default". It is always finite. + public TimeSpan ExpectNoMessageDefault { get; } + /// Gets the config value "akka.test.filter-leeway". It is always finite. public TimeSpan TestEventFilterLeeway { get; } diff --git a/src/core/Akka.Tests/Actor/TerminationSignalHandlerSpec.cs b/src/core/Akka.Tests/Actor/TerminationSignalHandlerSpec.cs new file mode 100644 index 00000000000..e7616bac00a --- /dev/null +++ b/src/core/Akka.Tests/Actor/TerminationSignalHandlerSpec.cs @@ -0,0 +1,295 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Configuration; +using Akka.TestKit; +using Akka.TestKit.Extensions; +using Akka.Util.Internal; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; +using static Akka.Actor.CoordinatedShutdown; + +namespace Akka.Tests.Actor; + +/// +/// Tests for the CLR termination signal handling in . +/// +public class TerminationSignalHandlerSpec : AkkaSpec +{ + public TerminationSignalHandlerSpec(ITestOutputHelper output) : base(output) + { + } + + public ExtendedActorSystem ExtSys => Sys.AsInstanceOf(); + + private static readonly Phase EmptyPhase = new(ImmutableHashSet.Empty, TimeSpan.FromSeconds(10), true); + + /// + /// Test double for that allows simulating termination signals. + /// + private class TestTerminationSignalHandler : ITerminationSignalHandler + { + public Action RegisteredCallback { get; private set; } + public bool IsDisposed { get; private set; } + public int RegisterCallCount { get; private set; } + + public void Register(Action onTerminationSignal) + { + RegisterCallCount++; + RegisteredCallback = onTerminationSignal; + } + + public void SimulateTerminationSignal() + { + RegisteredCallback?.Invoke(); + } + + public void Dispose() + { + IsDisposed = true; + } + } + + [Fact(DisplayName = "CoordinatedShutdown should register handler when run-by-clr-shutdown-hook is enabled")] + public void CoordinatedShutdown_should_register_handler_when_enabled() + { + // Arrange + var phases = new Dictionary { { "a", EmptyPhase } }; + var coord = new CoordinatedShutdown(ExtSys, phases); + var testHandler = new TestTerminationSignalHandler(); + var conf = ConfigurationFactory.ParseString("run-by-clr-shutdown-hook = on"); + + // Act + CoordinatedShutdown.InitClrHook(Sys, conf, coord, testHandler); + + // Assert + testHandler.RegisterCallCount.Should().Be(1); + testHandler.RegisteredCallback.Should().NotBeNull(); + } + + [Fact(DisplayName = "CoordinatedShutdown should not register handler when run-by-clr-shutdown-hook is disabled")] + public void CoordinatedShutdown_should_not_register_handler_when_disabled() + { + // Arrange + var phases = new Dictionary { { "a", EmptyPhase } }; + var coord = new CoordinatedShutdown(ExtSys, phases); + var testHandler = new TestTerminationSignalHandler(); + var conf = ConfigurationFactory.ParseString("run-by-clr-shutdown-hook = off"); + + // Act + CoordinatedShutdown.InitClrHook(Sys, conf, coord, testHandler); + + // Assert + testHandler.RegisterCallCount.Should().Be(0); + testHandler.RegisteredCallback.Should().BeNull(); + } + + [Fact(DisplayName = "CoordinatedShutdown should run shutdown tasks when termination signal is received")] + public async Task CoordinatedShutdown_should_run_when_termination_signal_received() + { + // Arrange + var sys = ActorSystem.Create( + "TerminationSignalTest", + ConfigurationFactory.ParseString(@" + akka.coordinated-shutdown.terminate-actor-system = on + akka.coordinated-shutdown.run-by-clr-shutdown-hook = on + akka.coordinated-shutdown.run-by-actor-system-terminate = off")); + + try + { + var testHandler = new TestTerminationSignalHandler(); + var coord = CoordinatedShutdown.Get(sys); + + var taskExecuted = new TaskCompletionSource(); + coord.AddTask(PhaseBeforeServiceUnbind, "test-task", () => + { + taskExecuted.SetResult(true); + return Task.FromResult(Done.Instance); + }); + + // Re-initialize with test handler + var conf = sys.Settings.Config.GetConfig("akka.coordinated-shutdown"); + CoordinatedShutdown.InitClrHook(sys, conf, coord, testHandler); + + // Act - simulate termination signal + testHandler.SimulateTerminationSignal(); + + // Assert + var result = await taskExecuted.Task.AwaitWithTimeout(TimeSpan.FromSeconds(10)); + result.Should().BeTrue(); + coord.ShutdownReason.Should().Be(ClrExitReason.Instance); + } + finally + { + await sys.Terminate(); + } + } + + [Fact(DisplayName = "CoordinatedShutdown should set _runningClrHook flag during CLR shutdown")] + public async Task CoordinatedShutdown_should_set_running_flag_during_clr_shutdown() + { + // Arrange + var sys = ActorSystem.Create( + "RunningFlagTest", + ConfigurationFactory.ParseString(@" + akka.coordinated-shutdown.terminate-actor-system = on + akka.coordinated-shutdown.run-by-clr-shutdown-hook = on + akka.coordinated-shutdown.run-by-actor-system-terminate = off")); + + try + { + var testHandler = new TestTerminationSignalHandler(); + var coord = CoordinatedShutdown.Get(sys); + + var flagObserved = new TaskCompletionSource(); + coord.AddTask(PhaseBeforeServiceUnbind, "flag-check-task", () => + { + // The _runningClrHook flag should be set by now + // We can't directly access the private field, but we can verify + // the shutdown is running with ClrExitReason + flagObserved.SetResult(coord.ShutdownReason == ClrExitReason.Instance); + return Task.FromResult(Done.Instance); + }); + + var conf = sys.Settings.Config.GetConfig("akka.coordinated-shutdown"); + CoordinatedShutdown.InitClrHook(sys, conf, coord, testHandler); + + // Act + testHandler.SimulateTerminationSignal(); + + // Assert + var result = await flagObserved.Task.AwaitWithTimeout(TimeSpan.FromSeconds(10)); + result.Should().BeTrue(); + } + finally + { + await sys.Terminate(); + } + } + + [Fact(DisplayName = "CoordinatedShutdown should dispose handler when ActorSystem terminates normally")] + public async Task CoordinatedShutdown_should_dispose_handler_on_normal_termination() + { + // Arrange + var sys = ActorSystem.Create( + "DisposeTest", + ConfigurationFactory.ParseString(@" + akka.coordinated-shutdown.terminate-actor-system = on + akka.coordinated-shutdown.run-by-clr-shutdown-hook = on + akka.coordinated-shutdown.run-by-actor-system-terminate = on")); + + var testHandler = new TestTerminationSignalHandler(); + var coord = CoordinatedShutdown.Get(sys); + var conf = sys.Settings.Config.GetConfig("akka.coordinated-shutdown"); + CoordinatedShutdown.InitClrHook(sys, conf, coord, testHandler); + + // Act - terminate system normally (not via signal) + await sys.Terminate(); + + // Give continuation time to run + await Task.Delay(100); + + // Assert + testHandler.IsDisposed.Should().BeTrue(); + } + + [Fact(DisplayName = "CoordinatedShutdown CLR hooks should only execute once even if signal fires multiple times")] + public async Task CoordinatedShutdown_clr_hooks_should_only_execute_once() + { + // Arrange + var sys = ActorSystem.Create( + "IdempotencyTest", + ConfigurationFactory.ParseString(@" + akka.coordinated-shutdown.terminate-actor-system = on + akka.coordinated-shutdown.run-by-clr-shutdown-hook = on + akka.coordinated-shutdown.run-by-actor-system-terminate = off")); + + try + { + var testHandler = new TestTerminationSignalHandler(); + var coord = CoordinatedShutdown.Get(sys); + + var executionCount = 0; + coord.AddTask(PhaseBeforeServiceUnbind, "count-task", () => + { + executionCount++; + return Task.FromResult(Done.Instance); + }); + + var conf = sys.Settings.Config.GetConfig("akka.coordinated-shutdown"); + CoordinatedShutdown.InitClrHook(sys, conf, coord, testHandler); + + // Act - simulate multiple termination signals + testHandler.SimulateTerminationSignal(); + testHandler.SimulateTerminationSignal(); + testHandler.SimulateTerminationSignal(); + + // Wait for shutdown to complete + await sys.WhenTerminated.AwaitWithTimeout(TimeSpan.FromSeconds(10)); + + // Assert - task should only have executed once + executionCount.Should().Be(1); + } + finally + { + if (!sys.WhenTerminated.IsCompleted) + await sys.Terminate(); + } + } + + [Fact(DisplayName = "CoordinatedShutdown should handle exceptions in shutdown tasks gracefully")] + public async Task CoordinatedShutdown_should_handle_task_exceptions_gracefully() + { + // Arrange + var sys = ActorSystem.Create( + "ExceptionTest", + ConfigurationFactory.ParseString(@" + akka.coordinated-shutdown.terminate-actor-system = on + akka.coordinated-shutdown.run-by-clr-shutdown-hook = on + akka.coordinated-shutdown.run-by-actor-system-terminate = off")); + + try + { + var testHandler = new TestTerminationSignalHandler(); + var coord = CoordinatedShutdown.Get(sys); + + var secondTaskExecuted = new TaskCompletionSource(); + + coord.AddTask(PhaseBeforeServiceUnbind, "failing-task", () => + { + throw new Exception("Test exception"); + }); + + coord.AddTask(PhaseServiceUnbind, "second-task", () => + { + secondTaskExecuted.SetResult(true); + return Task.FromResult(Done.Instance); + }); + + var conf = sys.Settings.Config.GetConfig("akka.coordinated-shutdown"); + CoordinatedShutdown.InitClrHook(sys, conf, coord, testHandler); + + // Act + testHandler.SimulateTerminationSignal(); + + // Assert - second task should still execute despite first task throwing + var result = await secondTaskExecuted.Task.AwaitWithTimeout(TimeSpan.FromSeconds(10)); + result.Should().BeTrue(); + } + finally + { + if (!sys.WhenTerminated.IsCompleted) + await sys.Terminate(); + } + } +} \ No newline at end of file diff --git a/src/core/Akka.Tests/Akka.Tests.csproj b/src/core/Akka.Tests/Akka.Tests.csproj index 5d5455f948e..b04bed82bb9 100644 --- a/src/core/Akka.Tests/Akka.Tests.csproj +++ b/src/core/Akka.Tests/Akka.Tests.csproj @@ -20,13 +20,20 @@ - - + + + + + $(DefineConstants);CORECLR + + $(DefineConstants);CORECLR + + diff --git a/src/core/Akka.Tests/Delivery/TestConsumer.cs b/src/core/Akka.Tests/Delivery/TestConsumer.cs index 02eee48f6c4..84f4b28c00e 100644 --- a/src/core/Akka.Tests/Delivery/TestConsumer.cs +++ b/src/core/Akka.Tests/Delivery/TestConsumer.cs @@ -36,16 +36,19 @@ public sealed class TestConsumer : ReceiveActor, IWithTimers private ImmutableHashSet<(string, long)> _processed = ImmutableHashSet<(string, long)>.Empty; private readonly bool _supportRestarts = false; private int _messageCount = 0; + private readonly int _expectedProducerCount; + private ImmutableHashSet _completedProducers = ImmutableHashSet.Empty; public TestConsumer(TimeSpan delay, Func endCondition, IActorRef endReplyTo, - IActorRef consumerController, bool supportRestarts = false) + IActorRef consumerController, int expectedProducerCount = 1, bool supportRestarts = false) { Delay = delay; EndCondition = endCondition; EndReplyTo = endReplyTo; ConsumerController = consumerController; + _expectedProducerCount = expectedProducerCount; _supportRestarts = supportRestarts; - + Active(); } @@ -77,9 +80,27 @@ private void Active() if (EndCondition(job) && (_messageCount > 0 || _supportRestarts)) { - _log.Debug("End at [{0}]", job.SeqNr); - EndReplyTo.Tell(new Collected(_processed.Select(c => c.Item1).ToImmutableHashSet(), _messageCount + 1)); - Context.Stop(Self); + // Track that this producer has completed + if (!_completedProducers.Contains(job.ProducerId)) + { + _completedProducers = _completedProducers.Add(job.ProducerId); + _log.Debug("Producer [{0}] completed at seqNr [{1}]. {2}/{3} producers completed.", + job.ProducerId, job.SeqNr, _completedProducers.Count, _expectedProducerCount); + } + + // Only stop when all expected producers have completed + if (_completedProducers.Count >= _expectedProducerCount) + { + _log.Debug("All {0} producers completed. Stopping consumer.", _expectedProducerCount); + EndReplyTo.Tell(new Collected(_processed.Select(c => c.Item1).ToImmutableHashSet(), _messageCount + 1)); + Context.Stop(Self); + } + else + { + // Continue processing messages from other producers + _processed = cleanProcessed.Add(nextMsg); + _messageCount++; + } } else if (!_supportRestarts && EndCondition(job)) { @@ -188,12 +209,12 @@ public static ConsumerController.SequencedMessage SequencedMessage(string p private static Func ConsumerEndCondition(long seqNr) => msg => msg.SeqNr >= seqNr; - public static Props PropsFor(TimeSpan delay, long seqNr, IActorRef endReplyTo, IActorRef consumerController, bool supportsRestarts = false) => - Props.Create(() => new TestConsumer(delay, ConsumerEndCondition(seqNr), endReplyTo, consumerController, supportsRestarts)); + public static Props PropsFor(TimeSpan delay, long seqNr, IActorRef endReplyTo, IActorRef consumerController, int expectedProducerCount = 1, bool supportsRestarts = false) => + Props.Create(() => new TestConsumer(delay, ConsumerEndCondition(seqNr), endReplyTo, consumerController, expectedProducerCount, supportsRestarts)); public static Props PropsFor(TimeSpan delay, Func endCondition, IActorRef endReplyTo, - IActorRef consumerController, bool supportsRestarts = false) => - Props.Create(() => new TestConsumer(delay, endCondition, endReplyTo, consumerController, supportsRestarts)); + IActorRef consumerController, int expectedProducerCount = 1, bool supportsRestarts = false) => + Props.Create(() => new TestConsumer(delay, endCondition, endReplyTo, consumerController, expectedProducerCount, supportsRestarts)); public ITimerScheduler Timers { get; set; } = null!; } diff --git a/src/core/Akka/Actor/CoordinatedShutdown.cs b/src/core/Akka/Actor/CoordinatedShutdown.cs index f0008de8981..5476dd152d1 100644 --- a/src/core/Akka/Actor/CoordinatedShutdown.cs +++ b/src/core/Akka/Actor/CoordinatedShutdown.cs @@ -706,56 +706,65 @@ internal static void InitPhaseActorSystemTerminate(ActorSystem system, Config co } } - // TODO: do we need to check for null or empty config here? /// - /// Initializes the CLR hook + /// Initializes the CLR hook for handling process termination signals. /// /// The actor system for this extension. /// The HOCON configuration. /// The plugin instance. - internal static void InitClrHook(ActorSystem system, Config conf, CoordinatedShutdown coord) + /// Optional signal handler for testing. If null, creates platform-appropriate handler. + internal static void InitClrHook(ActorSystem system, Config conf, CoordinatedShutdown coord, ITerminationSignalHandler signalHandler = null) { var runByClrShutdownHook = conf.GetBoolean("run-by-clr-shutdown-hook", false); - if (runByClrShutdownHook) + if (!runByClrShutdownHook) + return; + + // Use injected handler or create platform-appropriate one + signalHandler ??= CreateDefaultTerminationHandler(); + + // Register the signal handler to run CLR hooks when termination signal is received + signalHandler.Register(() => { - var exitTask = TerminateOnClrExit(coord); - // run all hooks during termination sequence - AppDomain.CurrentDomain.ProcessExit += exitTask; - system.WhenTerminated.ContinueWith(_ => - { - AppDomain.CurrentDomain.ProcessExit -= exitTask; - }); + // Must block - if this returns, process exits + coord.RunClrHooks().Wait(coord.TotalTimeout); + }); - coord.AddClrShutdownHook(() => + // Add the actual shutdown hook that performs coordinated shutdown + coord.AddClrShutdownHook(() => + { + coord._runningClrHook = true; + return Task.Run(() => { - coord._runningClrHook = true; - return Task.Run(() => + if (!system.WhenTerminated.IsCompleted) { - if (!system.WhenTerminated.IsCompleted) + coord.Log.Info("Starting coordinated shutdown from CLR termination hook."); + try { - coord.Log.Info("Starting coordinated shutdown from CLR termination hook."); - try - { - coord.Run(ClrExitReason.Instance).Wait(coord.TotalTimeout); - } - catch (Exception ex) - { - coord.Log.Warning("CoordinatedShutdown from CLR shutdown failed: {0}", ex.Message); - } + coord.Run(ClrExitReason.Instance).Wait(coord.TotalTimeout); + } + catch (Exception ex) + { + coord.Log.Warning("CoordinatedShutdown from CLR shutdown failed: {0}", ex.Message); } - return Done.Instance; - }); + } + return Done.Instance; }); - } + }); + + // Cleanup handler when system terminates normally + system.WhenTerminated.ContinueWith(_ => signalHandler.Dispose()); } - private static EventHandler TerminateOnClrExit(CoordinatedShutdown coord) + /// + /// Creates the appropriate termination signal handler for the current platform. + /// + private static ITerminationSignalHandler CreateDefaultTerminationHandler() { - return (_, _) => - { - // have to block, because if this method exits the process exits. - coord.RunClrHooks().Wait(coord.TotalTimeout); - }; +#if NET6_0_OR_GREATER + return new PosixTerminationSignalHandler(); +#else + return new LegacyTerminationSignalHandler(); +#endif } } } diff --git a/src/core/Akka/Actor/TerminationSignalHandler.cs b/src/core/Akka/Actor/TerminationSignalHandler.cs new file mode 100644 index 00000000000..7083e4ccd1a --- /dev/null +++ b/src/core/Akka/Actor/TerminationSignalHandler.cs @@ -0,0 +1,117 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Threading; +#if NET6_0_OR_GREATER +using System.Runtime.InteropServices; +#endif + +namespace Akka.Actor +{ + /// + /// Abstraction for handling process termination signals (SIGTERM, SIGHUP, ProcessExit). + /// Required for .NET 10 compatibility where ProcessExit no longer fires on SIGTERM. + /// + internal interface ITerminationSignalHandler : IDisposable + { + /// + /// Registers a callback to be invoked when a termination signal is received. + /// The callback will only be invoked once, even if multiple signals are received. + /// + /// The callback to invoke on termination signal. + void Register(Action onTerminationSignal); + } + +#if NET6_0_OR_GREATER + /// + /// .NET 6+ implementation using PosixSignalRegistration for proper signal handling. + /// Handles SIGTERM and SIGHUP signals, plus ProcessExit as fallback. + /// This is required for .NET 10 compatibility where ProcessExit no longer fires on SIGTERM. + /// + internal sealed class PosixTerminationSignalHandler : ITerminationSignalHandler + { + private PosixSignalRegistration? _sigtermRegistration; + private PosixSignalRegistration? _sighupRegistration; + private EventHandler? _processExitHandler; + private Action? _callback; + private int _invoked; + + /// + public void Register(Action onTerminationSignal) + { + _callback = onTerminationSignal; + + // Register POSIX signals (works on Unix/macOS/Windows in .NET 6+) + _sigtermRegistration = PosixSignalRegistration.Create( + PosixSignal.SIGTERM, OnSignalReceived); + + _sighupRegistration = PosixSignalRegistration.Create( + PosixSignal.SIGINT, OnSignalReceived); + + // Keep ProcessExit as fallback for non-signal termination scenarios + _processExitHandler = (_, _) => InvokeCallback(); + AppDomain.CurrentDomain.ProcessExit += _processExitHandler; + } + + private void OnSignalReceived(PosixSignalContext context) + { + // Cancel default termination to allow graceful shutdown + context.Cancel = true; + InvokeCallback(); + } + + private void InvokeCallback() + { + // Ensure callback only runs once + if (Interlocked.CompareExchange(ref _invoked, 1, 0) == 0) + { + _callback?.Invoke(); + } + } + + /// + public void Dispose() + { + _sigtermRegistration?.Dispose(); + _sighupRegistration?.Dispose(); + if (_processExitHandler != null) + AppDomain.CurrentDomain.ProcessExit -= _processExitHandler; + } + } +#else + /// + /// Legacy implementation for .NET Standard 2.0 / .NET Framework. + /// Uses ProcessExit only (no POSIX signal support available). + /// + internal sealed class LegacyTerminationSignalHandler : ITerminationSignalHandler + { + private EventHandler? _processExitHandler; + private int _invoked; + + /// + public void Register(Action onTerminationSignal) + { + _processExitHandler = (_, _) => + { + if (Interlocked.CompareExchange(ref _invoked, 1, 0) == 0) + { + onTerminationSignal(); + } + }; + AppDomain.CurrentDomain.ProcessExit += _processExitHandler; + } + + /// + public void Dispose() + { + if (_processExitHandler != null) + AppDomain.CurrentDomain.ProcessExit -= _processExitHandler; + } + } +#endif +} diff --git a/src/core/Akka/Event/LogMessage.cs b/src/core/Akka/Event/LogMessage.cs index 64eb3634c36..229e63e1108 100644 --- a/src/core/Akka/Event/LogMessage.cs +++ b/src/core/Akka/Event/LogMessage.cs @@ -75,22 +75,24 @@ public IReadOnlyDictionary GetProperties() { if (_properties == null) { - var names = PropertyNames; var parameters = Parameters(); - // Optimize: avoid ToArray() if Parameters() already returns IReadOnlyList if (parameters is IReadOnlyList readOnlyList) { - _properties = CreatePropertyDictionary(names, readOnlyList); - } - else if (parameters is object[] array) - { - _properties = CreatePropertyDictionary(names, array); + if (readOnlyList.Count == 0) + { + // Optimize: Skips parsing PropertyNames when empty Parameters() + _properties = EmptyDictionary; + } + else + { + _properties = CreatePropertyDictionary(PropertyNames, readOnlyList); + } } else { // Fallback: convert to array - _properties = CreatePropertyDictionary(names, parameters.ToArray()); + _properties = CreatePropertyDictionary(PropertyNames, parameters.ToArray()); } } return _properties; @@ -115,39 +117,7 @@ private static IReadOnlyDictionary CreatePropertyDictionary( dict[names[i]] = values[i]; } -#if NET8_0_OR_GREATER - // Use FrozenDictionary for optimal read performance on .NET 8+ - return System.Collections.Frozen.FrozenDictionary.ToFrozenDictionary(dict); -#else return dict; -#endif - } - - private static IReadOnlyDictionary CreatePropertyDictionary( - IReadOnlyList names, - object[] values) - { - // Handle empty case - if (names.Count == 0) - return EmptyDictionary; - - // Handle mismatched counts (more values than names, or vice versa) - var count = Math.Min(names.Count, values.Length); - if (count == 0) - return EmptyDictionary; - - var dict = new Dictionary(count); - for (int i = 0; i < count; i++) - { - dict[names[i]] = values[i]; - } - -#if NET8_0_OR_GREATER - // Use FrozenDictionary for optimal read performance on .NET 8+ - return System.Collections.Frozen.FrozenDictionary.ToFrozenDictionary(dict); -#else - return dict; -#endif } private static readonly IReadOnlyDictionary EmptyDictionary = diff --git a/src/core/Akka/IO/TcpListener.cs b/src/core/Akka/IO/TcpListener.cs index 01dd20f3486..12d035f9be1 100644 --- a/src/core/Akka/IO/TcpListener.cs +++ b/src/core/Akka/IO/TcpListener.cs @@ -142,7 +142,7 @@ private bool HandleStatsMessages(object msg) { subscriber.Tell(stats); } - } + } return true; case Tcp.SubscribeToTcpListenerStats subscribe: @@ -277,10 +277,17 @@ private void HandleAccept(SocketAsyncEventArgs saea) break; case SocketError.ConnectionReset: + case SocketError.ConnectionAborted: case SocketError.NoBufferSpaceAvailable: case SocketError.TryAgain: case SocketError.TimedOut: case SocketError.WouldBlock: + case SocketError.Interrupted: + case SocketError.TooManyOpenSockets: + case SocketError.NetworkUnreachable: + case SocketError.HostDown: + case SocketError.HostUnreachable: + case SocketError.ConnectionRefused: _retryCount++; _log.Warning("Retriable socket error in TcpListener: {0} - retrying accept operation in 10ms", saea.SocketError); @@ -289,9 +296,11 @@ private void HandleAccept(SocketAsyncEventArgs saea) Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromMilliseconds(10), Self, new RetryAccept(saea), ActorRefs.NoSender); break; + + // Fatal errors - the listener socket itself is broken default: _failedCount++; - _log.Error("Fatal socket error in TcpListener: {0}", saea.SocketError); + _log.Error("Fatal socket error in TcpListener: {0} - stopping listener", saea.SocketError); Context.Stop(Self); break; }