From 4f8677b17ac0c2af46a12b13b01ba4e1d51233b4 Mon Sep 17 00:00:00 2001 From: Rolf Kristensen Date: Wed, 17 Dec 2025 21:20:58 +0100 Subject: [PATCH 01/13] Skip parsing PropertyNames when empty Parameters (#7960) * Skip parsing PropertyNames when not Parameters * Simplify code --------- Co-authored-by: Gregorius Soedharmo --- src/core/Akka/Event/LogMessage.cs | 37 ++----------------------------- 1 file changed, 2 insertions(+), 35 deletions(-) diff --git a/src/core/Akka/Event/LogMessage.cs b/src/core/Akka/Event/LogMessage.cs index 64eb3634c36..ba2531af548 100644 --- a/src/core/Akka/Event/LogMessage.cs +++ b/src/core/Akka/Event/LogMessage.cs @@ -75,22 +75,16 @@ public IReadOnlyDictionary GetProperties() { if (_properties == null) { - var names = PropertyNames; var parameters = Parameters(); - // Optimize: avoid ToArray() if Parameters() already returns IReadOnlyList if (parameters is IReadOnlyList readOnlyList) { - _properties = CreatePropertyDictionary(names, readOnlyList); - } - else if (parameters is object[] array) - { - _properties = CreatePropertyDictionary(names, array); + _properties = CreatePropertyDictionary(PropertyNames, readOnlyList); } else { // Fallback: convert to array - _properties = CreatePropertyDictionary(names, parameters.ToArray()); + _properties = CreatePropertyDictionary(PropertyNames, parameters.ToArray()); } } return _properties; @@ -115,33 +109,6 @@ private static IReadOnlyDictionary CreatePropertyDictionary( dict[names[i]] = values[i]; } -#if NET8_0_OR_GREATER - // Use FrozenDictionary for optimal read performance on .NET 8+ - return System.Collections.Frozen.FrozenDictionary.ToFrozenDictionary(dict); -#else - return dict; -#endif - } - - private static IReadOnlyDictionary CreatePropertyDictionary( - IReadOnlyList names, - object[] values) - { - // Handle empty case - if (names.Count == 0) - return EmptyDictionary; - - // Handle mismatched counts (more values than names, or vice versa) - var count = Math.Min(names.Count, values.Length); - if (count == 0) - return EmptyDictionary; - - var dict = new Dictionary(count); - for (int i = 0; i < count; i++) - { - dict[names[i]] = values[i]; - } - #if NET8_0_OR_GREATER // Use FrozenDictionary for optimal read performance on .NET 8+ return System.Collections.Frozen.FrozenDictionary.ToFrozenDictionary(dict); From f65bfe0b5325fc44e1891400ebbdaa4fa4064573 Mon Sep 17 00:00:00 2001 From: Petri Kero Date: Wed, 7 Jan 2026 19:20:54 +0200 Subject: [PATCH 02/13] Fix `TcpListener` to not stop from accepting further connections on transient accept errors (#7970) * Minimal fix to handle SocketError.ConnectionAborted in TcpListener. * Handle more transient error cases. Handle known fatal errors explicitly. * Join code paths for known and unknown fatal errors. --------- Co-authored-by: Aaron Stannard --- src/core/Akka/IO/TcpListener.cs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/core/Akka/IO/TcpListener.cs b/src/core/Akka/IO/TcpListener.cs index 01dd20f3486..12d035f9be1 100644 --- a/src/core/Akka/IO/TcpListener.cs +++ b/src/core/Akka/IO/TcpListener.cs @@ -142,7 +142,7 @@ private bool HandleStatsMessages(object msg) { subscriber.Tell(stats); } - } + } return true; case Tcp.SubscribeToTcpListenerStats subscribe: @@ -277,10 +277,17 @@ private void HandleAccept(SocketAsyncEventArgs saea) break; case SocketError.ConnectionReset: + case SocketError.ConnectionAborted: case SocketError.NoBufferSpaceAvailable: case SocketError.TryAgain: case SocketError.TimedOut: case SocketError.WouldBlock: + case SocketError.Interrupted: + case SocketError.TooManyOpenSockets: + case SocketError.NetworkUnreachable: + case SocketError.HostDown: + case SocketError.HostUnreachable: + case SocketError.ConnectionRefused: _retryCount++; _log.Warning("Retriable socket error in TcpListener: {0} - retrying accept operation in 10ms", saea.SocketError); @@ -289,9 +296,11 @@ private void HandleAccept(SocketAsyncEventArgs saea) Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromMilliseconds(10), Self, new RetryAccept(saea), ActorRefs.NoSender); break; + + // Fatal errors - the listener socket itself is broken default: _failedCount++; - _log.Error("Fatal socket error in TcpListener: {0}", saea.SocketError); + _log.Error("Fatal socket error in TcpListener: {0} - stopping listener", saea.SocketError); Context.Stop(Self); break; } From f61b8d431cbe314b9515667b411845e151541c6b Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 29 Dec 2025 11:13:20 -0600 Subject: [PATCH 03/13] Fix race condition in cluster sharding when entity constructor fails (#7981) When a sharded entity's constructor throws an exception with remember-entities enabled, HandleSupervisorStop was calling RemoveEntity() which immediately set the entity state to NoState. However, the actor was still in the ActorCell's TerminatingChildrenContainer. If a new message arrived before EntityTerminated was processed, the shard would see NoState and try to create a new entity, causing "Actor name is not unique!" exception and crashing the shard. Fix: Use EntityPassivating instead of RemoveEntity, which: - Buffers messages arriving during termination - Persists removal from store when termination completes - Restarts entity if messages arrived (transient failure recovery) - Stays forgotten if no messages arrive Closes #7979 Co-authored-by: Gregorius Soedharmo --- .../cluster/Akka.Cluster.Sharding/Shard.cs | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs index 6d9d3b2de5f..c3209f3eb44 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs @@ -1856,21 +1856,31 @@ private void HandleSupervisorStop(SupervisorStopDirectivePassivation msg) // We only have to do this if we have R-E enabled if (!_rememberEntities) return; - + var id = _entities.EntityId(msg.Child); // Just return if the child actor is not a recorded shard entity - if (id is null) + if (id is null) return; - - // Remove the child actor from the entity list - _entities.RemoveEntity(id); - + + // Only transition to Passivating if entity is Active. + // If already Passivating (entity requested passivation then threw), the normal flow handles it. + // + // Using Passivating state (instead of RemoveEntity which sets NoState): + // 1. Buffers messages arriving during termination (fixes race condition crash) + // 2. Persists removal from store when EntityTerminated arrives + // 3. Restarts entity if messages arrive (transient failure, try again) + // 4. Stays forgotten if no messages arrive + if (_entities.EntityState(id) is Active) + { + _entities.EntityPassivating(id); + } + // Force stop the child actor, it might have been restarted Context.Stop(msg.Child); - + Log.Error( - msg.LastCause, - "{0}: Remembered entity {1} was stopped: {2}", + msg.LastCause, + "{0}: Remembered entity {1} was stopped: {2}", _typeName, id, msg.Reason); } From 113b476e17cbdd1b1bbd7237e0096412dc4e3cbc Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Dec 2025 16:44:48 -0600 Subject: [PATCH 04/13] fix(cluster.tools): fix race condition in DistributedPubSubRestartSpec actor creation (#7980) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move shutdown actor creation to execute immediately after the new ActorSystem joins, before the 5-second gossip isolation verification. This eliminates the race where First node times out waiting for Third node to create the shutdown actor. Changes: - Create shutdown actor before ExpectNoMsgAsync delay on Third node - Migrate sync-over-async methods to proper async (ExpectMsgAsync, RunOnAsync) - Increase timeout margins (WithinAsync 20s→25s, ExpectMsgAsync 1s→2s) for better CI stability --- .../DistributedPubSubRestartSpec.cs | 37 +++++++++++++------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/PublishSubscribe/DistributedPubSubRestartSpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/PublishSubscribe/DistributedPubSubRestartSpec.cs index fd755c0fa9b..7e504235644 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/PublishSubscribe/DistributedPubSubRestartSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/PublishSubscribe/DistributedPubSubRestartSpec.cs @@ -91,12 +91,13 @@ public async Task A_Cluster_with_DistributedPubSub_must_handle_restart_of_nodes_ await WithinAsync(30.Seconds(), async () => { Mediator.Tell(new Subscribe("topic1", TestActor)); - ExpectMsg(); + await ExpectMsgAsync(); await CountAsync(3); - RunOn(() => + await RunOnAsync(() => { Mediator.Tell(new Publish("topic1", "msg1")); + return Task.CompletedTask; }, _config.First); await EnterBarrierAsync("pub-msg1"); @@ -110,8 +111,10 @@ await RunOnAsync(async () => await EnterBarrierAsync("end"); - Mediator.Tell(DeltaCount.Instance); - var deltaCount = await ExpectMsgAsync(); + // Use a probe to isolate DeltaCount query from any stray messages in TestActor mailbox + var probe = CreateTestProbe(); + Mediator.Tell(DeltaCount.Instance, probe.Ref); + var deltaCount = await probe.ExpectMsgAsync(5.Seconds()); deltaCount.Should().Be(oldDeltaCount); }, _config.Second); @@ -123,12 +126,14 @@ await RunOnAsync(async () => var thirdAddress = (await NodeAsync(_config.Third)).Address; await TestConductor.Shutdown(_config.Third).WaitAsync(30.Seconds()); - await WithinAsync(20.Seconds(), async () => + // Use a probe for Identify to avoid polluting TestActor mailbox with stray responses + var identifyProbe = CreateTestProbe(); + await WithinAsync(25.Seconds(), async () => { await AwaitAssertAsync(async () => { - Sys.ActorSelection(new RootActorPath(thirdAddress) / "user" / "shutdown").Tell(new Identify(null)); - (await ExpectMsgAsync(1.Seconds())).Subject.Should().NotBeNull(); + Sys.ActorSelection(new RootActorPath(thirdAddress) / "user" / "shutdown").Tell(new Identify(null), identifyProbe.Ref); + (await identifyProbe.ExpectMsgAsync(2.Seconds())).Subject.Should().NotBeNull(); }); }); @@ -136,8 +141,11 @@ await AwaitAssertAsync(async () => await EnterBarrierAsync("end"); - Mediator.Tell(DeltaCount.Instance); - var deltaCount = await ExpectMsgAsync(); + // Use a probe to isolate DeltaCount query from stray ActorIdentity messages + // that may still be arriving from AwaitAssertAsync Identify retries + var deltaProbe = CreateTestProbe(); + Mediator.Tell(DeltaCount.Instance, deltaProbe.Ref); + var deltaCount = await deltaProbe.ExpectMsgAsync(5.Seconds()); deltaCount.Should().Be(oldDeltaCount); }, _config.First); @@ -157,6 +165,12 @@ await RunOnAsync(async () => await Cluster.Get(newSystem).JoinAsync(Cluster.Get(newSystem).SelfAddress); var newMediator = DistributedPubSub.Get(newSystem).Mediator; var probe = CreateTestProbe(newSystem); + + // Create shutdown actor FIRST so First node can find it while we verify gossip isolation + // This fixes the race condition where First node times out waiting for this actor + newSystem.Log.Info("Creating shutdown actor on {0}", node3Address); + newSystem.ActorOf("shutdown"); + newMediator.Tell(new Subscribe("topic2", probe.Ref), probe.Ref); await probe.ExpectMsgAsync(); @@ -165,8 +179,6 @@ await RunOnAsync(async () => newMediator.Tell(DeltaCount.Instance, probe.Ref); await probe.ExpectMsgAsync(0L); - newSystem.Log.Info("Shutdown actor started on {0}",node3Address); - newSystem.ActorOf("shutdown"); await newSystem.WhenTerminated.WaitAsync(30.Seconds()); } finally @@ -194,10 +206,11 @@ private IActorRef Mediator private async Task JoinAsync(RoleName from, RoleName to) { - RunOn(() => + await RunOnAsync(() => { Cluster.Get(Sys).Join(Node(to).Address); CreateMediator(); + return Task.CompletedTask; }, from); await EnterBarrierAsync(from.Name + "-joined"); } From fdf8f6cf7a77127a1e612f6bd6ca055aafdb3bf6 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 29 Dec 2025 14:43:03 -0600 Subject: [PATCH 05/13] Fix WithinAsync timeout not propagating to EventFilter across async boundaries (#7977) * Fix WithinAsync timeout not propagating to EventFilter across async boundaries Use AsyncLocal to properly propagate WithinAsync timeout to EventFilter and other async operations. The previous implementation used a plain instance field (_testState.End) which could have memory visibility issues when accessed from different threads across await boundaries. The fix: - Adds AsyncLocal field that flows through async contexts - WithinAsync sets/restores both instance field and AsyncLocal - Remaining and RemainingOr check AsyncLocal first, then instance field - Preserves backward compatibility with sync code paths * remove `static` --------- Co-authored-by: Gregorius Soedharmo --- src/core/Akka.TestKit/TestKitBase.cs | 30 ++++++++++++++++++++- src/core/Akka.TestKit/TestKitBase_Within.cs | 4 +++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/src/core/Akka.TestKit/TestKitBase.cs b/src/core/Akka.TestKit/TestKitBase.cs index a30e4a58eae..1fc1b62b084 100644 --- a/src/core/Akka.TestKit/TestKitBase.cs +++ b/src/core/Akka.TestKit/TestKitBase.cs @@ -28,6 +28,10 @@ namespace Akka.TestKit /// public abstract partial class TestKitBase : IActorRefFactory { + // AsyncLocal for proper timeout propagation across async boundaries. + // This ensures WithinAsync timeout flows correctly to EventFilter and other async operations. + private readonly AsyncLocal _asyncLocalEnd = new(); + private class TestState { public TestState() @@ -430,9 +434,21 @@ public TimeSpan Remaining { get { + // Check AsyncLocal first (async context takes precedence) + var asyncEnd = _asyncLocalEnd.Value; + if (asyncEnd.HasValue) + { + if (asyncEnd < TimeSpan.Zero) + throw new InvalidOperationException($"End can not be negative, was: {asyncEnd}"); + + var asyncRemaining = asyncEnd.Value - Now; + return asyncRemaining < TimeSpan.Zero ? TimeSpan.Zero : asyncRemaining; + } + + // Fallback to instance field if(_testState.End is null) throw new InvalidOperationException(@"Remaining may not be called outside of ""within"""); - + if (_testState.End < TimeSpan.Zero) throw new InvalidOperationException($"End can not be negative, was: {_testState.End}"); @@ -451,6 +467,18 @@ public TimeSpan Remaining /// TBD protected TimeSpan RemainingOr(TimeSpan duration) { + // Check AsyncLocal first (async context takes precedence for proper timeout propagation) + var asyncEnd = _asyncLocalEnd.Value; + if (asyncEnd.HasValue) + { + if (asyncEnd < TimeSpan.Zero) + throw new InvalidOperationException($"End can not be negative, was: {asyncEnd}"); + + var asyncRemaining = asyncEnd.Value - Now; + return asyncRemaining < TimeSpan.Zero ? TimeSpan.Zero : asyncRemaining; + } + + // Fallback to instance field for backward compatibility with sync code paths if (!_testState.End.HasValue) return duration; if (_testState.End < TimeSpan.Zero) throw new InvalidOperationException($"End can not be negative, was: {_testState.End}"); diff --git a/src/core/Akka.TestKit/TestKitBase_Within.cs b/src/core/Akka.TestKit/TestKitBase_Within.cs index b73ad0e93f0..bc9732a45c7 100644 --- a/src/core/Akka.TestKit/TestKitBase_Within.cs +++ b/src/core/Akka.TestKit/TestKitBase_Within.cs @@ -294,7 +294,10 @@ public async Task WithinAsync( var maxDiff = max.Min(rem); var prevEnd = _testState.End; + var prevAsyncEnd = _asyncLocalEnd.Value; // Save previous AsyncLocal value for nesting support + _testState.End = start + maxDiff; + _asyncLocalEnd.Value = start + maxDiff; // Set AsyncLocal for proper async propagation T ret = default; using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) @@ -321,6 +324,7 @@ public async Task WithinAsync( // Make sure we stop the delay task cts.Cancel(); _testState.End = prevEnd; + _asyncLocalEnd.Value = prevAsyncEnd; // Restore previous AsyncLocal value } } From b8bdf7f771383dc01377e1040f3e473569230d1e Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Dec 2025 14:05:34 -0600 Subject: [PATCH 06/13] Fix race condition in multi-producer sharding delivery test (#7975) The test ReliableDelivery_with_Sharding_must_illustrate_Sharding_usage_with_several_producers was experiencing intermittent failures due to a premature entity termination race condition. Root cause: When multiple producers send messages to the same sharded entities, each producer-entity pair maintains independent sequence numbers (1-42). The test's end condition (seqNr >= 42) would trigger when ANY producer reached seqNr 42, causing the entity to stop immediately, before other producers could complete their message delivery. This resulted in the test expecting 3 Collected messages (one per entity) containing all 6 producer IDs (p1-entity-{0,1,2} and p2-entity-{0,1,2}), but entities would stop after only receiving messages from the first producer to complete, causing timeouts. Solution: Modified TestConsumer to track per-producer completion: - Added expectedProducerCount parameter (defaults to 1 for backward compatibility) - Track which producers have met the end condition in _completedProducers set - Only stop the entity when ALL expected producers have completed - Updated multi-producer test to pass expectedProducerCount: 2 This ensures entities properly wait for all producers to complete message delivery before terminating, eliminating the race condition without relying on timeout adjustments. Verified with 20 consecutive successful test runs and full test suite validation. --- .../Delivery/ReliableDeliveryShardingSpec.cs | 4 +- src/core/Akka.Tests/Delivery/TestConsumer.cs | 39 ++++++++++++++----- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs index 4f79a40454e..d0ff5baf440 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs @@ -88,7 +88,7 @@ public async Task ReliableDelivery_with_Sharding_must_illustrate_Sharding_usage_ var consumerEndProbe = CreateTestProbe(); var region = await ClusterSharding.Get(Sys).StartAsync($"TestConsumer-{_idCount}", _ => ShardingConsumerController.Create(c => - PropsFor(DefaultConsumerDelay, 42, consumerEndProbe.Ref, c), + PropsFor(DefaultConsumerDelay, 42, consumerEndProbe.Ref, c, expectedProducerCount: 2), ShardingConsumerController.Settings.Create(Sys)), ClusterShardingSettings.Create(Sys), HashCodeMessageExtractor.Create(10, o => string.Empty, o => o)); @@ -108,7 +108,7 @@ public async Task ReliableDelivery_with_Sharding_must_illustrate_Sharding_usage_ $"p2-{_idCount}"); // expecting 3 end messages, one for each entity: "entity-0", "entity-1", "entity-2" - var endMessages = await consumerEndProbe.ReceiveNAsync(3, TimeSpan.FromSeconds(5)).ToListAsync(); + var endMessages = await consumerEndProbe.ReceiveNAsync(3, TimeSpan.FromSeconds(10)).ToListAsync(); var producerIds = endMessages.Cast().SelectMany(c => c.ProducerIds).ToList(); producerIds diff --git a/src/core/Akka.Tests/Delivery/TestConsumer.cs b/src/core/Akka.Tests/Delivery/TestConsumer.cs index 02eee48f6c4..84f4b28c00e 100644 --- a/src/core/Akka.Tests/Delivery/TestConsumer.cs +++ b/src/core/Akka.Tests/Delivery/TestConsumer.cs @@ -36,16 +36,19 @@ public sealed class TestConsumer : ReceiveActor, IWithTimers private ImmutableHashSet<(string, long)> _processed = ImmutableHashSet<(string, long)>.Empty; private readonly bool _supportRestarts = false; private int _messageCount = 0; + private readonly int _expectedProducerCount; + private ImmutableHashSet _completedProducers = ImmutableHashSet.Empty; public TestConsumer(TimeSpan delay, Func endCondition, IActorRef endReplyTo, - IActorRef consumerController, bool supportRestarts = false) + IActorRef consumerController, int expectedProducerCount = 1, bool supportRestarts = false) { Delay = delay; EndCondition = endCondition; EndReplyTo = endReplyTo; ConsumerController = consumerController; + _expectedProducerCount = expectedProducerCount; _supportRestarts = supportRestarts; - + Active(); } @@ -77,9 +80,27 @@ private void Active() if (EndCondition(job) && (_messageCount > 0 || _supportRestarts)) { - _log.Debug("End at [{0}]", job.SeqNr); - EndReplyTo.Tell(new Collected(_processed.Select(c => c.Item1).ToImmutableHashSet(), _messageCount + 1)); - Context.Stop(Self); + // Track that this producer has completed + if (!_completedProducers.Contains(job.ProducerId)) + { + _completedProducers = _completedProducers.Add(job.ProducerId); + _log.Debug("Producer [{0}] completed at seqNr [{1}]. {2}/{3} producers completed.", + job.ProducerId, job.SeqNr, _completedProducers.Count, _expectedProducerCount); + } + + // Only stop when all expected producers have completed + if (_completedProducers.Count >= _expectedProducerCount) + { + _log.Debug("All {0} producers completed. Stopping consumer.", _expectedProducerCount); + EndReplyTo.Tell(new Collected(_processed.Select(c => c.Item1).ToImmutableHashSet(), _messageCount + 1)); + Context.Stop(Self); + } + else + { + // Continue processing messages from other producers + _processed = cleanProcessed.Add(nextMsg); + _messageCount++; + } } else if (!_supportRestarts && EndCondition(job)) { @@ -188,12 +209,12 @@ public static ConsumerController.SequencedMessage SequencedMessage(string p private static Func ConsumerEndCondition(long seqNr) => msg => msg.SeqNr >= seqNr; - public static Props PropsFor(TimeSpan delay, long seqNr, IActorRef endReplyTo, IActorRef consumerController, bool supportsRestarts = false) => - Props.Create(() => new TestConsumer(delay, ConsumerEndCondition(seqNr), endReplyTo, consumerController, supportsRestarts)); + public static Props PropsFor(TimeSpan delay, long seqNr, IActorRef endReplyTo, IActorRef consumerController, int expectedProducerCount = 1, bool supportsRestarts = false) => + Props.Create(() => new TestConsumer(delay, ConsumerEndCondition(seqNr), endReplyTo, consumerController, expectedProducerCount, supportsRestarts)); public static Props PropsFor(TimeSpan delay, Func endCondition, IActorRef endReplyTo, - IActorRef consumerController, bool supportsRestarts = false) => - Props.Create(() => new TestConsumer(delay, endCondition, endReplyTo, consumerController, supportsRestarts)); + IActorRef consumerController, int expectedProducerCount = 1, bool supportsRestarts = false) => + Props.Create(() => new TestConsumer(delay, endCondition, endReplyTo, consumerController, expectedProducerCount, supportsRestarts)); public ITimerScheduler Timers { get; set; } = null!; } From 841e83ac8533c04b04d413731d4ebf0db54aa354 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 22 Dec 2025 12:48:02 -0600 Subject: [PATCH 07/13] Fix race condition in QueueSink causing async enumerable timeout (#7973) Fixed a timing-dependent race condition where RunAsAsyncEnumerable would timeout when the materializer shut down while a PullAsync() request was pending. The QueueSink.Logic.PostStop() method now properly completes any pending TaskCompletionSource stored in _currentRequest with a StreamDetachedException before shutting down, preventing orphaned Tasks that would never complete. This resolves intermittent test failures in AsyncEnumerableSpec.RunAsAsyncEnumerable_Throws_on_Abrupt_Stream_termination where the test would hang waiting for an exception that never arrived. --- src/core/Akka.Streams/Implementation/Sinks.cs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/core/Akka.Streams/Implementation/Sinks.cs b/src/core/Akka.Streams/Implementation/Sinks.cs index da7ef7f2208..0215327246c 100644 --- a/src/core/Akka.Streams/Implementation/Sinks.cs +++ b/src/core/Akka.Streams/Implementation/Sinks.cs @@ -743,8 +743,16 @@ public override void PreStart() Pull(_stage.In); } - public override void PostStop() => + public override void PostStop() + { + // Complete any pending request before shutting down to prevent orphaned Tasks + if (_currentRequest.HasValue) + { + _currentRequest.Value.SetException(new StreamDetachedException()); + _currentRequest = Option>>.None; + } StopCallback(promise => promise.SetException(new StreamDetachedException())); + } private Action>> Callback() { From 40dcbc5e7347ee847e19334e6cc720cd2927ee4c Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 22 Dec 2025 12:13:44 -0600 Subject: [PATCH 08/13] Fix timing race in BackpressureTimeout_must_succeed_if_subscriber_demand_arrives test (#7972) The test was failing intermittently in CI/CD with "No demand signalled in the last 00:00:01" because accumulated delays (750ms) left insufficient safety margin with the 1-second timeout. Changes: - Increased backpressure timeout from 1s to 2s - Reduced ExpectNoMsg delays from 250ms to 100ms per iteration This provides 1700ms safety margin (vs 250ms previously) to handle: - Async/await scheduling overhead - Thread pool contention in CI/CD environments - Timer check granularity and imprecision - Test infrastructure processing delays The test semantics remain unchanged - it still verifies that demand arriving within the timeout window prevents backpressure timeout errors. --- src/core/Akka.Streams.Tests/Implementation/TimeoutsSpec.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Implementation/TimeoutsSpec.cs b/src/core/Akka.Streams.Tests/Implementation/TimeoutsSpec.cs index f108467fd71..87a974207c3 100644 --- a/src/core/Akka.Streams.Tests/Implementation/TimeoutsSpec.cs +++ b/src/core/Akka.Streams.Tests/Implementation/TimeoutsSpec.cs @@ -220,14 +220,14 @@ await this.AssertAllStagesStoppedAsync(async () => var subscriber = this.CreateSubscriberProbe(); Source.From(new[] { 1, 2, 3, 4 }) - .BackpressureTimeout(TimeSpan.FromSeconds(1)) + .BackpressureTimeout(TimeSpan.FromSeconds(2)) .RunWith(Sink.FromSubscriber(subscriber), Materializer); for (var i = 1; i < 4; i++) { await subscriber.AsyncBuilder() .RequestNext(i) - .ExpectNoMsg(TimeSpan.FromMilliseconds(250)) + .ExpectNoMsg(TimeSpan.FromMilliseconds(100)) .ExecuteAsync(); } From 69bed78f801a481dfbffbb2e356e659549cf388c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jarkko=20P=C3=B6yry?= Date: Thu, 18 Dec 2025 06:12:50 +0200 Subject: [PATCH 09/13] Make RemotingTerminator non-FSM to avoid racy FSM log init. (#7967) RemotingTerminator is initialized during Provider Start. FSM constructor initializes the _log with `context.GetLogger()` which eventually leads into `LogSource.FromActorRef()`. This method attempts to access `Provider.DefaultAddress`, which is being initialized. If Provider `Init()` loses the race, either its `RemoteInternals` is not set, causing NRE, or if `Remoting.Start()` is not completed, the `_defaultAddress` is not set and returns null. This commit removes the FSM from `RemotingTerminator` and manually implements it using `Become()` API. Co-authored-by: Aaron Stannard --- .../Akka.Remote/RemoteActorRefProvider.cs | 90 ++++++++----------- 1 file changed, 37 insertions(+), 53 deletions(-) diff --git a/src/core/Akka.Remote/RemoteActorRefProvider.cs b/src/core/Akka.Remote/RemoteActorRefProvider.cs index 9e2b545de8f..8401f28c89a 100644 --- a/src/core/Akka.Remote/RemoteActorRefProvider.cs +++ b/src/core/Akka.Remote/RemoteActorRefProvider.cs @@ -681,38 +681,11 @@ public Internals(RemoteTransport transport, Akka.Serialization.Serialization ser #region RemotingTerminator - /// - /// Describes the FSM states of the - /// - private enum TerminatorState - { - /// - /// TBD - /// - Uninitialized, - /// - /// TBD - /// - Idle, - /// - /// TBD - /// - WaitDaemonShutdown, - /// - /// TBD - /// - WaitTransportShutdown, - /// - /// TBD - /// - Finished - } - /// /// Responsible for shutting down the and all transports /// when the is being shutdown. /// - private class RemotingTerminator : FSM, IRequiresMessageQueue + private class RemotingTerminator : ActorBase, IRequiresMessageQueue { private readonly IActorRef _systemGuardian; private readonly ILoggingAdapter _log; @@ -723,56 +696,67 @@ public RemotingTerminator(IActorRef systemGuardian) // can't use normal Logger.GetLogger(this IActorContext) here due to https://github.com/akkadotnet/akka.net/issues/4530 _log = Logging.GetLogger(Context.System.EventStream, "remoting-terminator"); - InitFSM(); + Become(Uninitialized()); } - private void InitFSM() + protected override bool Receive(object message) => EmptyReceive(message); + + private Receive Uninitialized() { - When(TerminatorState.Uninitialized, @event => + return (object message) => { - if (@event.FsmEvent is Internals internals) + if (message is Internals internals) { _systemGuardian.Tell(RegisterTerminationHook.Instance); - return GoTo(TerminatorState.Idle).Using(internals); + Become(Idle(internals)); } - return null; - }); + return false; + }; + } - When(TerminatorState.Idle, @event => + private Receive Idle(Internals internals) + { + return (object message) => { - if (@event.StateData != null && @event.FsmEvent is TerminationHook) + if (message is TerminationHook) { _log.Info("Shutting down remote daemon."); - @event.StateData.RemoteDaemon.Tell(TerminationHook.Instance); - return GoTo(TerminatorState.WaitDaemonShutdown); + internals.RemoteDaemon.Tell(TerminationHook.Instance); + Become(WaitDaemonShutdown(internals)); + return true; } - return null; - }); + return false; + }; + } + private Receive WaitDaemonShutdown(Internals internals) + { // TODO: state timeout - When(TerminatorState.WaitDaemonShutdown, @event => + return (object message) => { - if (@event.StateData != null && @event.FsmEvent is TerminationHookDone) + if (message is TerminationHookDone) { _log.Info("Remote daemon shut down; proceeding with flushing remote transports."); - @event.StateData.Transport.Shutdown() + internals.Transport.Shutdown() .ContinueWith(_ => TransportShutdown.Instance, TaskContinuationOptions.ExecuteSynchronously) .PipeTo(Self); - return GoTo(TerminatorState.WaitTransportShutdown); + Become(WaitTransportShutdown()); + return true; } + return false; + }; + } - return null; - }); - - When(TerminatorState.WaitTransportShutdown, _ => + private Receive WaitTransportShutdown() + { + return (object message) => { _log.Info("Remoting shut down."); _systemGuardian.Tell(TerminationHookDone.Instance); - return Stop(); - }); - - StartWith(TerminatorState.Uninitialized, null); + Context.Stop(Self); + return true; + }; } public sealed class TransportShutdown From d8136dbe1589dc76ee433d14681c27a521a971a9 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Sat, 20 Dec 2025 14:50:11 +0700 Subject: [PATCH 10/13] Fix .NET 10 CLR shutdown hook breaking change (#7964) * Fix .NET 10 CLR shutdown hook breaking change * fix global.json * Fix warning as errors * Install .NET SDK 8.0 * Add very targetted .NET 10 CI/CD run * Fix CI/CD script * Code fix * Revert .NET 10 CI/CD integration (will be moved to a new PR) * whitespace cleanup --------- Co-authored-by: Aaron Stannard --- .../Actor/TerminationSignalHandlerSpec.cs | 295 ++++++++++++++++++ src/core/Akka.Tests/Akka.Tests.csproj | 11 +- src/core/Akka/Actor/CoordinatedShutdown.cs | 75 +++-- .../Akka/Actor/TerminationSignalHandler.cs | 117 +++++++ 4 files changed, 463 insertions(+), 35 deletions(-) create mode 100644 src/core/Akka.Tests/Actor/TerminationSignalHandlerSpec.cs create mode 100644 src/core/Akka/Actor/TerminationSignalHandler.cs diff --git a/src/core/Akka.Tests/Actor/TerminationSignalHandlerSpec.cs b/src/core/Akka.Tests/Actor/TerminationSignalHandlerSpec.cs new file mode 100644 index 00000000000..e7616bac00a --- /dev/null +++ b/src/core/Akka.Tests/Actor/TerminationSignalHandlerSpec.cs @@ -0,0 +1,295 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Configuration; +using Akka.TestKit; +using Akka.TestKit.Extensions; +using Akka.Util.Internal; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; +using static Akka.Actor.CoordinatedShutdown; + +namespace Akka.Tests.Actor; + +/// +/// Tests for the CLR termination signal handling in . +/// +public class TerminationSignalHandlerSpec : AkkaSpec +{ + public TerminationSignalHandlerSpec(ITestOutputHelper output) : base(output) + { + } + + public ExtendedActorSystem ExtSys => Sys.AsInstanceOf(); + + private static readonly Phase EmptyPhase = new(ImmutableHashSet.Empty, TimeSpan.FromSeconds(10), true); + + /// + /// Test double for that allows simulating termination signals. + /// + private class TestTerminationSignalHandler : ITerminationSignalHandler + { + public Action RegisteredCallback { get; private set; } + public bool IsDisposed { get; private set; } + public int RegisterCallCount { get; private set; } + + public void Register(Action onTerminationSignal) + { + RegisterCallCount++; + RegisteredCallback = onTerminationSignal; + } + + public void SimulateTerminationSignal() + { + RegisteredCallback?.Invoke(); + } + + public void Dispose() + { + IsDisposed = true; + } + } + + [Fact(DisplayName = "CoordinatedShutdown should register handler when run-by-clr-shutdown-hook is enabled")] + public void CoordinatedShutdown_should_register_handler_when_enabled() + { + // Arrange + var phases = new Dictionary { { "a", EmptyPhase } }; + var coord = new CoordinatedShutdown(ExtSys, phases); + var testHandler = new TestTerminationSignalHandler(); + var conf = ConfigurationFactory.ParseString("run-by-clr-shutdown-hook = on"); + + // Act + CoordinatedShutdown.InitClrHook(Sys, conf, coord, testHandler); + + // Assert + testHandler.RegisterCallCount.Should().Be(1); + testHandler.RegisteredCallback.Should().NotBeNull(); + } + + [Fact(DisplayName = "CoordinatedShutdown should not register handler when run-by-clr-shutdown-hook is disabled")] + public void CoordinatedShutdown_should_not_register_handler_when_disabled() + { + // Arrange + var phases = new Dictionary { { "a", EmptyPhase } }; + var coord = new CoordinatedShutdown(ExtSys, phases); + var testHandler = new TestTerminationSignalHandler(); + var conf = ConfigurationFactory.ParseString("run-by-clr-shutdown-hook = off"); + + // Act + CoordinatedShutdown.InitClrHook(Sys, conf, coord, testHandler); + + // Assert + testHandler.RegisterCallCount.Should().Be(0); + testHandler.RegisteredCallback.Should().BeNull(); + } + + [Fact(DisplayName = "CoordinatedShutdown should run shutdown tasks when termination signal is received")] + public async Task CoordinatedShutdown_should_run_when_termination_signal_received() + { + // Arrange + var sys = ActorSystem.Create( + "TerminationSignalTest", + ConfigurationFactory.ParseString(@" + akka.coordinated-shutdown.terminate-actor-system = on + akka.coordinated-shutdown.run-by-clr-shutdown-hook = on + akka.coordinated-shutdown.run-by-actor-system-terminate = off")); + + try + { + var testHandler = new TestTerminationSignalHandler(); + var coord = CoordinatedShutdown.Get(sys); + + var taskExecuted = new TaskCompletionSource(); + coord.AddTask(PhaseBeforeServiceUnbind, "test-task", () => + { + taskExecuted.SetResult(true); + return Task.FromResult(Done.Instance); + }); + + // Re-initialize with test handler + var conf = sys.Settings.Config.GetConfig("akka.coordinated-shutdown"); + CoordinatedShutdown.InitClrHook(sys, conf, coord, testHandler); + + // Act - simulate termination signal + testHandler.SimulateTerminationSignal(); + + // Assert + var result = await taskExecuted.Task.AwaitWithTimeout(TimeSpan.FromSeconds(10)); + result.Should().BeTrue(); + coord.ShutdownReason.Should().Be(ClrExitReason.Instance); + } + finally + { + await sys.Terminate(); + } + } + + [Fact(DisplayName = "CoordinatedShutdown should set _runningClrHook flag during CLR shutdown")] + public async Task CoordinatedShutdown_should_set_running_flag_during_clr_shutdown() + { + // Arrange + var sys = ActorSystem.Create( + "RunningFlagTest", + ConfigurationFactory.ParseString(@" + akka.coordinated-shutdown.terminate-actor-system = on + akka.coordinated-shutdown.run-by-clr-shutdown-hook = on + akka.coordinated-shutdown.run-by-actor-system-terminate = off")); + + try + { + var testHandler = new TestTerminationSignalHandler(); + var coord = CoordinatedShutdown.Get(sys); + + var flagObserved = new TaskCompletionSource(); + coord.AddTask(PhaseBeforeServiceUnbind, "flag-check-task", () => + { + // The _runningClrHook flag should be set by now + // We can't directly access the private field, but we can verify + // the shutdown is running with ClrExitReason + flagObserved.SetResult(coord.ShutdownReason == ClrExitReason.Instance); + return Task.FromResult(Done.Instance); + }); + + var conf = sys.Settings.Config.GetConfig("akka.coordinated-shutdown"); + CoordinatedShutdown.InitClrHook(sys, conf, coord, testHandler); + + // Act + testHandler.SimulateTerminationSignal(); + + // Assert + var result = await flagObserved.Task.AwaitWithTimeout(TimeSpan.FromSeconds(10)); + result.Should().BeTrue(); + } + finally + { + await sys.Terminate(); + } + } + + [Fact(DisplayName = "CoordinatedShutdown should dispose handler when ActorSystem terminates normally")] + public async Task CoordinatedShutdown_should_dispose_handler_on_normal_termination() + { + // Arrange + var sys = ActorSystem.Create( + "DisposeTest", + ConfigurationFactory.ParseString(@" + akka.coordinated-shutdown.terminate-actor-system = on + akka.coordinated-shutdown.run-by-clr-shutdown-hook = on + akka.coordinated-shutdown.run-by-actor-system-terminate = on")); + + var testHandler = new TestTerminationSignalHandler(); + var coord = CoordinatedShutdown.Get(sys); + var conf = sys.Settings.Config.GetConfig("akka.coordinated-shutdown"); + CoordinatedShutdown.InitClrHook(sys, conf, coord, testHandler); + + // Act - terminate system normally (not via signal) + await sys.Terminate(); + + // Give continuation time to run + await Task.Delay(100); + + // Assert + testHandler.IsDisposed.Should().BeTrue(); + } + + [Fact(DisplayName = "CoordinatedShutdown CLR hooks should only execute once even if signal fires multiple times")] + public async Task CoordinatedShutdown_clr_hooks_should_only_execute_once() + { + // Arrange + var sys = ActorSystem.Create( + "IdempotencyTest", + ConfigurationFactory.ParseString(@" + akka.coordinated-shutdown.terminate-actor-system = on + akka.coordinated-shutdown.run-by-clr-shutdown-hook = on + akka.coordinated-shutdown.run-by-actor-system-terminate = off")); + + try + { + var testHandler = new TestTerminationSignalHandler(); + var coord = CoordinatedShutdown.Get(sys); + + var executionCount = 0; + coord.AddTask(PhaseBeforeServiceUnbind, "count-task", () => + { + executionCount++; + return Task.FromResult(Done.Instance); + }); + + var conf = sys.Settings.Config.GetConfig("akka.coordinated-shutdown"); + CoordinatedShutdown.InitClrHook(sys, conf, coord, testHandler); + + // Act - simulate multiple termination signals + testHandler.SimulateTerminationSignal(); + testHandler.SimulateTerminationSignal(); + testHandler.SimulateTerminationSignal(); + + // Wait for shutdown to complete + await sys.WhenTerminated.AwaitWithTimeout(TimeSpan.FromSeconds(10)); + + // Assert - task should only have executed once + executionCount.Should().Be(1); + } + finally + { + if (!sys.WhenTerminated.IsCompleted) + await sys.Terminate(); + } + } + + [Fact(DisplayName = "CoordinatedShutdown should handle exceptions in shutdown tasks gracefully")] + public async Task CoordinatedShutdown_should_handle_task_exceptions_gracefully() + { + // Arrange + var sys = ActorSystem.Create( + "ExceptionTest", + ConfigurationFactory.ParseString(@" + akka.coordinated-shutdown.terminate-actor-system = on + akka.coordinated-shutdown.run-by-clr-shutdown-hook = on + akka.coordinated-shutdown.run-by-actor-system-terminate = off")); + + try + { + var testHandler = new TestTerminationSignalHandler(); + var coord = CoordinatedShutdown.Get(sys); + + var secondTaskExecuted = new TaskCompletionSource(); + + coord.AddTask(PhaseBeforeServiceUnbind, "failing-task", () => + { + throw new Exception("Test exception"); + }); + + coord.AddTask(PhaseServiceUnbind, "second-task", () => + { + secondTaskExecuted.SetResult(true); + return Task.FromResult(Done.Instance); + }); + + var conf = sys.Settings.Config.GetConfig("akka.coordinated-shutdown"); + CoordinatedShutdown.InitClrHook(sys, conf, coord, testHandler); + + // Act + testHandler.SimulateTerminationSignal(); + + // Assert - second task should still execute despite first task throwing + var result = await secondTaskExecuted.Task.AwaitWithTimeout(TimeSpan.FromSeconds(10)); + result.Should().BeTrue(); + } + finally + { + if (!sys.WhenTerminated.IsCompleted) + await sys.Terminate(); + } + } +} \ No newline at end of file diff --git a/src/core/Akka.Tests/Akka.Tests.csproj b/src/core/Akka.Tests/Akka.Tests.csproj index 5d5455f948e..b04bed82bb9 100644 --- a/src/core/Akka.Tests/Akka.Tests.csproj +++ b/src/core/Akka.Tests/Akka.Tests.csproj @@ -20,13 +20,20 @@ - - + + + + + $(DefineConstants);CORECLR + + $(DefineConstants);CORECLR + + diff --git a/src/core/Akka/Actor/CoordinatedShutdown.cs b/src/core/Akka/Actor/CoordinatedShutdown.cs index f0008de8981..5476dd152d1 100644 --- a/src/core/Akka/Actor/CoordinatedShutdown.cs +++ b/src/core/Akka/Actor/CoordinatedShutdown.cs @@ -706,56 +706,65 @@ internal static void InitPhaseActorSystemTerminate(ActorSystem system, Config co } } - // TODO: do we need to check for null or empty config here? /// - /// Initializes the CLR hook + /// Initializes the CLR hook for handling process termination signals. /// /// The actor system for this extension. /// The HOCON configuration. /// The plugin instance. - internal static void InitClrHook(ActorSystem system, Config conf, CoordinatedShutdown coord) + /// Optional signal handler for testing. If null, creates platform-appropriate handler. + internal static void InitClrHook(ActorSystem system, Config conf, CoordinatedShutdown coord, ITerminationSignalHandler signalHandler = null) { var runByClrShutdownHook = conf.GetBoolean("run-by-clr-shutdown-hook", false); - if (runByClrShutdownHook) + if (!runByClrShutdownHook) + return; + + // Use injected handler or create platform-appropriate one + signalHandler ??= CreateDefaultTerminationHandler(); + + // Register the signal handler to run CLR hooks when termination signal is received + signalHandler.Register(() => { - var exitTask = TerminateOnClrExit(coord); - // run all hooks during termination sequence - AppDomain.CurrentDomain.ProcessExit += exitTask; - system.WhenTerminated.ContinueWith(_ => - { - AppDomain.CurrentDomain.ProcessExit -= exitTask; - }); + // Must block - if this returns, process exits + coord.RunClrHooks().Wait(coord.TotalTimeout); + }); - coord.AddClrShutdownHook(() => + // Add the actual shutdown hook that performs coordinated shutdown + coord.AddClrShutdownHook(() => + { + coord._runningClrHook = true; + return Task.Run(() => { - coord._runningClrHook = true; - return Task.Run(() => + if (!system.WhenTerminated.IsCompleted) { - if (!system.WhenTerminated.IsCompleted) + coord.Log.Info("Starting coordinated shutdown from CLR termination hook."); + try { - coord.Log.Info("Starting coordinated shutdown from CLR termination hook."); - try - { - coord.Run(ClrExitReason.Instance).Wait(coord.TotalTimeout); - } - catch (Exception ex) - { - coord.Log.Warning("CoordinatedShutdown from CLR shutdown failed: {0}", ex.Message); - } + coord.Run(ClrExitReason.Instance).Wait(coord.TotalTimeout); + } + catch (Exception ex) + { + coord.Log.Warning("CoordinatedShutdown from CLR shutdown failed: {0}", ex.Message); } - return Done.Instance; - }); + } + return Done.Instance; }); - } + }); + + // Cleanup handler when system terminates normally + system.WhenTerminated.ContinueWith(_ => signalHandler.Dispose()); } - private static EventHandler TerminateOnClrExit(CoordinatedShutdown coord) + /// + /// Creates the appropriate termination signal handler for the current platform. + /// + private static ITerminationSignalHandler CreateDefaultTerminationHandler() { - return (_, _) => - { - // have to block, because if this method exits the process exits. - coord.RunClrHooks().Wait(coord.TotalTimeout); - }; +#if NET6_0_OR_GREATER + return new PosixTerminationSignalHandler(); +#else + return new LegacyTerminationSignalHandler(); +#endif } } } diff --git a/src/core/Akka/Actor/TerminationSignalHandler.cs b/src/core/Akka/Actor/TerminationSignalHandler.cs new file mode 100644 index 00000000000..7083e4ccd1a --- /dev/null +++ b/src/core/Akka/Actor/TerminationSignalHandler.cs @@ -0,0 +1,117 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Threading; +#if NET6_0_OR_GREATER +using System.Runtime.InteropServices; +#endif + +namespace Akka.Actor +{ + /// + /// Abstraction for handling process termination signals (SIGTERM, SIGHUP, ProcessExit). + /// Required for .NET 10 compatibility where ProcessExit no longer fires on SIGTERM. + /// + internal interface ITerminationSignalHandler : IDisposable + { + /// + /// Registers a callback to be invoked when a termination signal is received. + /// The callback will only be invoked once, even if multiple signals are received. + /// + /// The callback to invoke on termination signal. + void Register(Action onTerminationSignal); + } + +#if NET6_0_OR_GREATER + /// + /// .NET 6+ implementation using PosixSignalRegistration for proper signal handling. + /// Handles SIGTERM and SIGHUP signals, plus ProcessExit as fallback. + /// This is required for .NET 10 compatibility where ProcessExit no longer fires on SIGTERM. + /// + internal sealed class PosixTerminationSignalHandler : ITerminationSignalHandler + { + private PosixSignalRegistration? _sigtermRegistration; + private PosixSignalRegistration? _sighupRegistration; + private EventHandler? _processExitHandler; + private Action? _callback; + private int _invoked; + + /// + public void Register(Action onTerminationSignal) + { + _callback = onTerminationSignal; + + // Register POSIX signals (works on Unix/macOS/Windows in .NET 6+) + _sigtermRegistration = PosixSignalRegistration.Create( + PosixSignal.SIGTERM, OnSignalReceived); + + _sighupRegistration = PosixSignalRegistration.Create( + PosixSignal.SIGINT, OnSignalReceived); + + // Keep ProcessExit as fallback for non-signal termination scenarios + _processExitHandler = (_, _) => InvokeCallback(); + AppDomain.CurrentDomain.ProcessExit += _processExitHandler; + } + + private void OnSignalReceived(PosixSignalContext context) + { + // Cancel default termination to allow graceful shutdown + context.Cancel = true; + InvokeCallback(); + } + + private void InvokeCallback() + { + // Ensure callback only runs once + if (Interlocked.CompareExchange(ref _invoked, 1, 0) == 0) + { + _callback?.Invoke(); + } + } + + /// + public void Dispose() + { + _sigtermRegistration?.Dispose(); + _sighupRegistration?.Dispose(); + if (_processExitHandler != null) + AppDomain.CurrentDomain.ProcessExit -= _processExitHandler; + } + } +#else + /// + /// Legacy implementation for .NET Standard 2.0 / .NET Framework. + /// Uses ProcessExit only (no POSIX signal support available). + /// + internal sealed class LegacyTerminationSignalHandler : ITerminationSignalHandler + { + private EventHandler? _processExitHandler; + private int _invoked; + + /// + public void Register(Action onTerminationSignal) + { + _processExitHandler = (_, _) => + { + if (Interlocked.CompareExchange(ref _invoked, 1, 0) == 0) + { + onTerminationSignal(); + } + }; + AppDomain.CurrentDomain.ProcessExit += _processExitHandler; + } + + /// + public void Dispose() + { + if (_processExitHandler != null) + AppDomain.CurrentDomain.ProcessExit -= _processExitHandler; + } + } +#endif +} From 712ad983ca548e567026f7a638f38415b5610ce3 Mon Sep 17 00:00:00 2001 From: Yaroslav Paslavskiy Date: Thu, 18 Dec 2025 18:30:04 +0000 Subject: [PATCH 11/13] Akka.TestKit: configurable expect-no-message-default value #6675 (#7006) * * added config expect-no-message-default = 3s * added this config into TestKit setting as ExpectNoMessageDefault property * in TestKit_Expect, bounded relevant overloads of ExpectNoMsg and ExpectNoMsgAsync to the new config * updated unit test * Add custom testkit config validation test * Update API Approval list --------- Co-authored-by: Gregorius Soedharmo --- ...APISpec.ApproveTestKit.DotNet.verified.txt | 22 ++++---- ...oreAPISpec.ApproveTestKit.Net.verified.txt | 22 ++++---- .../TestKit_Config_Tests.cs | 51 +++++++++++++++++++ src/core/Akka.TestKit/Internal/Reference.conf | 4 ++ src/core/Akka.TestKit/TestKitBase.cs | 15 ++++++ src/core/Akka.TestKit/TestKitBase_Expect.cs | 4 +- src/core/Akka.TestKit/TestKitSettings.cs | 4 ++ 7 files changed, 100 insertions(+), 22 deletions(-) diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.DotNet.verified.txt index 8ea957fa160..97054f84d64 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.DotNet.verified.txt @@ -309,6 +309,7 @@ namespace Akka.TestKit public object LastMessage { get; } public Akka.Actor.IActorRef LastSender { get; } public Akka.Event.ILoggingAdapter Log { get; } + public System.TimeSpan NoMessageRemainingOrDefault { get; } public static System.TimeSpan Now { get; } public System.TimeSpan Remaining { get; } public System.TimeSpan RemainingOrDefault { get; } @@ -397,17 +398,17 @@ namespace Akka.TestKit public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOf(System.Collections.Generic.IReadOnlyCollection messages, System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOf(System.TimeSpan max, params T[] messages) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOf(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection messages, System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__153))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__157))] public System.Collections.Generic.IAsyncEnumerable ExpectMsgAllOfAsync(System.Collections.Generic.IReadOnlyCollection messages, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__156))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__160))] public System.Collections.Generic.IAsyncEnumerable ExpectMsgAllOfAsync(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection messages, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOfMatchingPredicates(params Akka.TestKit.PredicateInfo[] predicates) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOfMatchingPredicates(System.Collections.Generic.IReadOnlyCollection predicates, System.Threading.CancellationToken cancellationToken) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOfMatchingPredicates(System.TimeSpan max, params Akka.TestKit.PredicateInfo[] predicates) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOfMatchingPredicates(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection predicates, System.Threading.CancellationToken cancellationToken) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__163))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__167))] public System.Collections.Generic.IAsyncEnumerable ExpectMsgAllOfMatchingPredicatesAsync(System.Collections.Generic.IReadOnlyCollection predicates, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__166))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__170))] public System.Collections.Generic.IAsyncEnumerable ExpectMsgAllOfMatchingPredicatesAsync(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection predicates, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } public T ExpectMsgAnyOf(System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken = null) { } public System.Threading.Tasks.ValueTask ExpectMsgAnyOfAsync(System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken = null) { } @@ -462,9 +463,9 @@ namespace Akka.TestKit public System.Threading.Tasks.ValueTask PeekOneAsync(System.Threading.CancellationToken cancellationToken) { } public System.Collections.Generic.IReadOnlyCollection ReceiveN(int numberOfMessages, System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyCollection ReceiveN(int numberOfMessages, System.TimeSpan max, System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__212))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__216))] public System.Collections.Generic.IAsyncEnumerable ReceiveNAsync(int numberOfMessages, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__214))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__218))] public System.Collections.Generic.IAsyncEnumerable ReceiveNAsync(int numberOfMessages, System.TimeSpan max, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } public object ReceiveOne(System.Nullable max = null, System.Threading.CancellationToken cancellationToken = null) { } public System.Threading.Tasks.ValueTask ReceiveOneAsync(System.Nullable max = null, System.Threading.CancellationToken cancellationToken = null) { } @@ -472,13 +473,13 @@ namespace Akka.TestKit public System.Collections.Generic.IReadOnlyList ReceiveWhile(System.Nullable max, System.Nullable idle, System.Func filter, int msgs = 2147483647, System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyList ReceiveWhile(System.Func filter, System.Nullable max = null, System.Nullable idle = null, int msgs = 2147483647, System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyList ReceiveWhile(System.Predicate shouldContinue, System.Nullable max = null, System.Nullable idle = null, int msgs = 2147483647, bool shouldIgnoreOtherMessageTypes = True, System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__203))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__207))] public System.Collections.Generic.IAsyncEnumerable ReceiveWhileAsync(System.Nullable max, System.Func filter, int msgs = 2147483647, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__205))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__209))] public System.Collections.Generic.IAsyncEnumerable ReceiveWhileAsync(System.Nullable max, System.Nullable idle, System.Func filter, int msgs = 2147483647, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__207))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__211))] public System.Collections.Generic.IAsyncEnumerable ReceiveWhileAsync(System.Func filter, System.Nullable max = null, System.Nullable idle = null, int msgs = 2147483647, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__209))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__213))] public System.Collections.Generic.IAsyncEnumerable ReceiveWhileAsync(System.Predicate shouldContinue, System.Nullable max = null, System.Nullable idle = null, int msgs = 2147483647, bool shouldIgnoreOtherMessageTypes = True, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } protected System.TimeSpan RemainingOr(System.TimeSpan duration) { } public System.TimeSpan RemainingOrDilated(System.Nullable duration) { } @@ -519,6 +520,7 @@ namespace Akka.TestKit { public TestKitSettings(Akka.Configuration.Config config) { } public System.TimeSpan DefaultTimeout { get; } + public System.TimeSpan ExpectNoMessageDefault { get; } public bool LogTestKitCalls { get; } public System.TimeSpan SingleExpectDefault { get; } public System.TimeSpan TestEventFilterLeeway { get; } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.Net.verified.txt index 92d61d46c37..fde4e8d2de0 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.Net.verified.txt @@ -309,6 +309,7 @@ namespace Akka.TestKit public object LastMessage { get; } public Akka.Actor.IActorRef LastSender { get; } public Akka.Event.ILoggingAdapter Log { get; } + public System.TimeSpan NoMessageRemainingOrDefault { get; } public static System.TimeSpan Now { get; } public System.TimeSpan Remaining { get; } public System.TimeSpan RemainingOrDefault { get; } @@ -397,17 +398,17 @@ namespace Akka.TestKit public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOf(System.Collections.Generic.IReadOnlyCollection messages, System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOf(System.TimeSpan max, params T[] messages) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOf(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection messages, System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__153))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__157))] public System.Collections.Generic.IAsyncEnumerable ExpectMsgAllOfAsync(System.Collections.Generic.IReadOnlyCollection messages, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__156))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__160))] public System.Collections.Generic.IAsyncEnumerable ExpectMsgAllOfAsync(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection messages, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOfMatchingPredicates(params Akka.TestKit.PredicateInfo[] predicates) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOfMatchingPredicates(System.Collections.Generic.IReadOnlyCollection predicates, System.Threading.CancellationToken cancellationToken) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOfMatchingPredicates(System.TimeSpan max, params Akka.TestKit.PredicateInfo[] predicates) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOfMatchingPredicates(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection predicates, System.Threading.CancellationToken cancellationToken) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__163))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__167))] public System.Collections.Generic.IAsyncEnumerable ExpectMsgAllOfMatchingPredicatesAsync(System.Collections.Generic.IReadOnlyCollection predicates, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__166))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__170))] public System.Collections.Generic.IAsyncEnumerable ExpectMsgAllOfMatchingPredicatesAsync(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection predicates, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } public T ExpectMsgAnyOf(System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken = null) { } public System.Threading.Tasks.ValueTask ExpectMsgAnyOfAsync(System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken = null) { } @@ -462,9 +463,9 @@ namespace Akka.TestKit public System.Threading.Tasks.ValueTask PeekOneAsync(System.Threading.CancellationToken cancellationToken) { } public System.Collections.Generic.IReadOnlyCollection ReceiveN(int numberOfMessages, System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyCollection ReceiveN(int numberOfMessages, System.TimeSpan max, System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__212))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__216))] public System.Collections.Generic.IAsyncEnumerable ReceiveNAsync(int numberOfMessages, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__214))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__218))] public System.Collections.Generic.IAsyncEnumerable ReceiveNAsync(int numberOfMessages, System.TimeSpan max, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } public object ReceiveOne(System.Nullable max = null, System.Threading.CancellationToken cancellationToken = null) { } public System.Threading.Tasks.ValueTask ReceiveOneAsync(System.Nullable max = null, System.Threading.CancellationToken cancellationToken = null) { } @@ -472,13 +473,13 @@ namespace Akka.TestKit public System.Collections.Generic.IReadOnlyList ReceiveWhile(System.Nullable max, System.Nullable idle, System.Func filter, int msgs = 2147483647, System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyList ReceiveWhile(System.Func filter, System.Nullable max = null, System.Nullable idle = null, int msgs = 2147483647, System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyList ReceiveWhile(System.Predicate shouldContinue, System.Nullable max = null, System.Nullable idle = null, int msgs = 2147483647, bool shouldIgnoreOtherMessageTypes = True, System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__203))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__207))] public System.Collections.Generic.IAsyncEnumerable ReceiveWhileAsync(System.Nullable max, System.Func filter, int msgs = 2147483647, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__205))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__209))] public System.Collections.Generic.IAsyncEnumerable ReceiveWhileAsync(System.Nullable max, System.Nullable idle, System.Func filter, int msgs = 2147483647, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__207))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__211))] public System.Collections.Generic.IAsyncEnumerable ReceiveWhileAsync(System.Func filter, System.Nullable max = null, System.Nullable idle = null, int msgs = 2147483647, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__209))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__213))] public System.Collections.Generic.IAsyncEnumerable ReceiveWhileAsync(System.Predicate shouldContinue, System.Nullable max = null, System.Nullable idle = null, int msgs = 2147483647, bool shouldIgnoreOtherMessageTypes = True, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } protected System.TimeSpan RemainingOr(System.TimeSpan duration) { } public System.TimeSpan RemainingOrDilated(System.Nullable duration) { } @@ -519,6 +520,7 @@ namespace Akka.TestKit { public TestKitSettings(Akka.Configuration.Config config) { } public System.TimeSpan DefaultTimeout { get; } + public System.TimeSpan ExpectNoMessageDefault { get; } public bool LogTestKitCalls { get; } public System.TimeSpan SingleExpectDefault { get; } public System.TimeSpan TestEventFilterLeeway { get; } diff --git a/src/core/Akka.TestKit.Tests/TestKit_Config_Tests.cs b/src/core/Akka.TestKit.Tests/TestKit_Config_Tests.cs index fcd2b46dcee..155db12493a 100644 --- a/src/core/Akka.TestKit.Tests/TestKit_Config_Tests.cs +++ b/src/core/Akka.TestKit.Tests/TestKit_Config_Tests.cs @@ -7,6 +7,8 @@ using System; using System.Reflection; +using Akka.Configuration; +using Akka.Dispatch; using Xunit; namespace Akka.TestKit.Tests.Xunit2 @@ -19,6 +21,7 @@ public void DefaultValues_should_be_correct() { TestKitSettings.DefaultTimeout.ShouldBe(TimeSpan.FromSeconds(5)); TestKitSettings.SingleExpectDefault.ShouldBe(TimeSpan.FromSeconds(3)); + TestKitSettings.ExpectNoMessageDefault.ShouldBe(TimeSpan.FromSeconds(3)); TestKitSettings.TestEventFilterLeeway.ShouldBe(TimeSpan.FromSeconds(3)); TestKitSettings.TestTimeFactor.ShouldBe(1); TestKitSettings.TestKitStartupTimeout.ShouldBe(TimeSpan.FromSeconds(5)); @@ -29,5 +32,53 @@ public void DefaultValues_should_be_correct() CallingThreadDispatcher.Id.ShouldBe("akka.test.calling-thread-dispatcher"); } } + + public class TestKitCustomConfigTests : TestKit.Xunit2.TestKit + { + private static readonly Config Config = """ + akka.test { + timefactor = 2.0 + filter-leeway = 4s + single-expect-default = 5s + expect-no-message-default = 6s + default-timeout = 7s + startup-timeout = 8s + calling-thread-dispatcher { + type = "Akka.Dispatch.DispatcherConfigurator, Akka" + throughput = 2147483647 + } + + test-actor.dispatcher { + type = "Akka.Dispatch.DispatcherConfigurator, Akka" + throughput = 2147483647 + } + + # Disable batching in order to prevent flakiness with Akka.Remote tests (since they have low message frequency) + remote.dot-netty.tcp.batching.enabled = false + } + """; + + public TestKitCustomConfigTests(): base(Config) + { + } + + [Fact] + public void DefaultValues_should_be_correct() + { + TestKitSettings.TestTimeFactor.ShouldBe(2); + TestKitSettings.TestEventFilterLeeway.ShouldBe(TimeSpan.FromSeconds(4)); + TestKitSettings.SingleExpectDefault.ShouldBe(TimeSpan.FromSeconds(5)); + TestKitSettings.ExpectNoMessageDefault.ShouldBe(TimeSpan.FromSeconds(6)); + TestKitSettings.DefaultTimeout.ShouldBe(TimeSpan.FromSeconds(7)); + TestKitSettings.TestKitStartupTimeout.ShouldBe(TimeSpan.FromSeconds(8)); + + var callingThreadDispatcherTypeName = typeof(DispatcherConfigurator).FullName + ", " + typeof(DispatcherConfigurator).Assembly.GetName().Name; + Assert.False(Sys.Settings.Config.IsEmpty); + + Sys.Settings.Config.GetString("akka.test.calling-thread-dispatcher.type", null).ShouldBe(callingThreadDispatcherTypeName); + Sys.Settings.Config.GetString("akka.test.test-actor.dispatcher.type", null).ShouldBe(callingThreadDispatcherTypeName); + CallingThreadDispatcher.Id.ShouldBe("akka.test.calling-thread-dispatcher"); + } + } } diff --git a/src/core/Akka.TestKit/Internal/Reference.conf b/src/core/Akka.TestKit/Internal/Reference.conf index cf079e06962..0052c4f5c0c 100644 --- a/src/core/Akka.TestKit/Internal/Reference.conf +++ b/src/core/Akka.TestKit/Internal/Reference.conf @@ -23,6 +23,10 @@ akka { # by default single-expect-default = 3s + # duration to wait in expectNoMsg + # by default + expect-no-message-default = 3s + # The timeout that is added as an implicit by DefaultTimeout trait # This is used for Ask-pattern default-timeout = 5s diff --git a/src/core/Akka.TestKit/TestKitBase.cs b/src/core/Akka.TestKit/TestKitBase.cs index 1fc1b62b084..28df99051a9 100644 --- a/src/core/Akka.TestKit/TestKitBase.cs +++ b/src/core/Akka.TestKit/TestKitBase.cs @@ -208,6 +208,7 @@ protected void InitializeTest(ActorSystem system, Config config, string actorSys } private TimeSpan SingleExpectDefaultTimeout { get { return _testState.TestKitSettings.SingleExpectDefault; } } + private TimeSpan ExpectNoMessageDefaultTimeout { get { return _testState.TestKitSettings.ExpectNoMessageDefault; } } /// /// The that is recreated and used for each test. @@ -420,6 +421,20 @@ public TimeSpan RemainingOrDefault get { return RemainingOr(Dilated(SingleExpectDefaultTimeout)); } } + /// + /// + /// Retrieves the time remaining for execution of the innermost enclosing + /// Within block. + /// If missing that, then it returns the properly dilated default for this + /// case from settings (key: "akka.test.expect-no-message-default"). + /// + /// The returned value is always finite. + /// + public TimeSpan NoMessageRemainingOrDefault + { + get { return RemainingOr(Dilated(ExpectNoMessageDefaultTimeout)); } + } + /// /// /// Retrieves the time remaining for execution of the innermost enclosing diff --git a/src/core/Akka.TestKit/TestKitBase_Expect.cs b/src/core/Akka.TestKit/TestKitBase_Expect.cs index 85e70f81310..f7838964d85 100644 --- a/src/core/Akka.TestKit/TestKitBase_Expect.cs +++ b/src/core/Akka.TestKit/TestKitBase_Expect.cs @@ -456,7 +456,7 @@ private async ValueTask InternalExpectMsgEnvelopeAsync( /// /// Wait time is bounded by remaining time for execution of the innermost enclosing 'within' /// block, if inside a 'within' block; otherwise by the config value - /// "akka.test.single-expect-default". + /// "akka.test.expect-no-message-default". /// public void ExpectNoMsg(CancellationToken cancellationToken = default) { @@ -466,7 +466,7 @@ public void ExpectNoMsg(CancellationToken cancellationToken = default) /// public ValueTask ExpectNoMsgAsync(CancellationToken cancellationToken = default) { - return InternalExpectNoMsgAsync(RemainingOrDefault, cancellationToken); + return InternalExpectNoMsgAsync(NoMessageRemainingOrDefault, cancellationToken); } /// diff --git a/src/core/Akka.TestKit/TestKitSettings.cs b/src/core/Akka.TestKit/TestKitSettings.cs index 71ea14c0694..4bb581bbe59 100644 --- a/src/core/Akka.TestKit/TestKitSettings.cs +++ b/src/core/Akka.TestKit/TestKitSettings.cs @@ -30,6 +30,7 @@ public TestKitSettings(Config config) DefaultTimeout = config.GetTimeSpan("akka.test.default-timeout", null, allowInfinite:false); SingleExpectDefault = config.GetTimeSpan("akka.test.single-expect-default", null, allowInfinite: false); + ExpectNoMessageDefault = config.GetTimeSpan("akka.test.expect-no-message-default", null, allowInfinite: false); TestKitStartupTimeout = config.GetTimeSpan("akka.test.startup-timeout", null, allowInfinite: false); TestEventFilterLeeway = config.GetTimeSpan("akka.test.filter-leeway", null, allowInfinite: false); TestTimeFactor = config.GetDouble("akka.test.timefactor", 0); @@ -49,6 +50,9 @@ public TestKitSettings(Config config) /// Gets the config value "akka.test.single-expect-default". It is always finite. public TimeSpan SingleExpectDefault { get; } + /// Gets the config value "akka.test.expect-no-message-default". It is always finite. + public TimeSpan ExpectNoMessageDefault { get; } + /// Gets the config value "akka.test.filter-leeway". It is always finite. public TimeSpan TestEventFilterLeeway { get; } From cf997f34511fb59ba7b1b9e7c32a817fb464f01e Mon Sep 17 00:00:00 2001 From: Rolf Kristensen Date: Wed, 17 Dec 2025 23:06:42 +0100 Subject: [PATCH 12/13] LogMessage GetProperties without FrozenDictionary (#7968) --- src/core/Akka/Event/LogMessage.cs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/core/Akka/Event/LogMessage.cs b/src/core/Akka/Event/LogMessage.cs index ba2531af548..229e63e1108 100644 --- a/src/core/Akka/Event/LogMessage.cs +++ b/src/core/Akka/Event/LogMessage.cs @@ -79,7 +79,15 @@ public IReadOnlyDictionary GetProperties() // Optimize: avoid ToArray() if Parameters() already returns IReadOnlyList if (parameters is IReadOnlyList readOnlyList) { - _properties = CreatePropertyDictionary(PropertyNames, readOnlyList); + if (readOnlyList.Count == 0) + { + // Optimize: Skips parsing PropertyNames when empty Parameters() + _properties = EmptyDictionary; + } + else + { + _properties = CreatePropertyDictionary(PropertyNames, readOnlyList); + } } else { @@ -109,12 +117,7 @@ private static IReadOnlyDictionary CreatePropertyDictionary( dict[names[i]] = values[i]; } -#if NET8_0_OR_GREATER - // Use FrozenDictionary for optimal read performance on .NET 8+ - return System.Collections.Frozen.FrozenDictionary.ToFrozenDictionary(dict); -#else return dict; -#endif } private static readonly IReadOnlyDictionary EmptyDictionary = From 0216ad48a2e5451e09f0c802b4be24706a64b7e2 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 8 Jan 2026 04:58:47 +0700 Subject: [PATCH 13/13] Update API approval list --- ...APISpec.ApproveTestKit.DotNet.verified.txt | 20 +++++++++---------- ...oreAPISpec.ApproveTestKit.Net.verified.txt | 20 +++++++++---------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.DotNet.verified.txt index 97054f84d64..0ecde733e61 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.DotNet.verified.txt @@ -398,17 +398,17 @@ namespace Akka.TestKit public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOf(System.Collections.Generic.IReadOnlyCollection messages, System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOf(System.TimeSpan max, params T[] messages) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOf(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection messages, System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__157))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__158))] public System.Collections.Generic.IAsyncEnumerable ExpectMsgAllOfAsync(System.Collections.Generic.IReadOnlyCollection messages, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__160))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__161))] public System.Collections.Generic.IAsyncEnumerable ExpectMsgAllOfAsync(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection messages, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOfMatchingPredicates(params Akka.TestKit.PredicateInfo[] predicates) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOfMatchingPredicates(System.Collections.Generic.IReadOnlyCollection predicates, System.Threading.CancellationToken cancellationToken) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOfMatchingPredicates(System.TimeSpan max, params Akka.TestKit.PredicateInfo[] predicates) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOfMatchingPredicates(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection predicates, System.Threading.CancellationToken cancellationToken) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__167))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__168))] public System.Collections.Generic.IAsyncEnumerable ExpectMsgAllOfMatchingPredicatesAsync(System.Collections.Generic.IReadOnlyCollection predicates, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__170))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__171))] public System.Collections.Generic.IAsyncEnumerable ExpectMsgAllOfMatchingPredicatesAsync(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection predicates, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } public T ExpectMsgAnyOf(System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken = null) { } public System.Threading.Tasks.ValueTask ExpectMsgAnyOfAsync(System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken = null) { } @@ -463,9 +463,9 @@ namespace Akka.TestKit public System.Threading.Tasks.ValueTask PeekOneAsync(System.Threading.CancellationToken cancellationToken) { } public System.Collections.Generic.IReadOnlyCollection ReceiveN(int numberOfMessages, System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyCollection ReceiveN(int numberOfMessages, System.TimeSpan max, System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__216))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__217))] public System.Collections.Generic.IAsyncEnumerable ReceiveNAsync(int numberOfMessages, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__218))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__219))] public System.Collections.Generic.IAsyncEnumerable ReceiveNAsync(int numberOfMessages, System.TimeSpan max, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } public object ReceiveOne(System.Nullable max = null, System.Threading.CancellationToken cancellationToken = null) { } public System.Threading.Tasks.ValueTask ReceiveOneAsync(System.Nullable max = null, System.Threading.CancellationToken cancellationToken = null) { } @@ -473,13 +473,13 @@ namespace Akka.TestKit public System.Collections.Generic.IReadOnlyList ReceiveWhile(System.Nullable max, System.Nullable idle, System.Func filter, int msgs = 2147483647, System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyList ReceiveWhile(System.Func filter, System.Nullable max = null, System.Nullable idle = null, int msgs = 2147483647, System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyList ReceiveWhile(System.Predicate shouldContinue, System.Nullable max = null, System.Nullable idle = null, int msgs = 2147483647, bool shouldIgnoreOtherMessageTypes = True, System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__207))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__208))] public System.Collections.Generic.IAsyncEnumerable ReceiveWhileAsync(System.Nullable max, System.Func filter, int msgs = 2147483647, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__209))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__210))] public System.Collections.Generic.IAsyncEnumerable ReceiveWhileAsync(System.Nullable max, System.Nullable idle, System.Func filter, int msgs = 2147483647, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__211))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__212))] public System.Collections.Generic.IAsyncEnumerable ReceiveWhileAsync(System.Func filter, System.Nullable max = null, System.Nullable idle = null, int msgs = 2147483647, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__213))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__214))] public System.Collections.Generic.IAsyncEnumerable ReceiveWhileAsync(System.Predicate shouldContinue, System.Nullable max = null, System.Nullable idle = null, int msgs = 2147483647, bool shouldIgnoreOtherMessageTypes = True, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } protected System.TimeSpan RemainingOr(System.TimeSpan duration) { } public System.TimeSpan RemainingOrDilated(System.Nullable duration) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.Net.verified.txt index fde4e8d2de0..7903d26784d 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.Net.verified.txt @@ -398,17 +398,17 @@ namespace Akka.TestKit public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOf(System.Collections.Generic.IReadOnlyCollection messages, System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOf(System.TimeSpan max, params T[] messages) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOf(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection messages, System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__157))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__158))] public System.Collections.Generic.IAsyncEnumerable ExpectMsgAllOfAsync(System.Collections.Generic.IReadOnlyCollection messages, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__160))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__161))] public System.Collections.Generic.IAsyncEnumerable ExpectMsgAllOfAsync(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection messages, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOfMatchingPredicates(params Akka.TestKit.PredicateInfo[] predicates) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOfMatchingPredicates(System.Collections.Generic.IReadOnlyCollection predicates, System.Threading.CancellationToken cancellationToken) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOfMatchingPredicates(System.TimeSpan max, params Akka.TestKit.PredicateInfo[] predicates) { } public System.Collections.Generic.IReadOnlyCollection ExpectMsgAllOfMatchingPredicates(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection predicates, System.Threading.CancellationToken cancellationToken) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__167))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__168))] public System.Collections.Generic.IAsyncEnumerable ExpectMsgAllOfMatchingPredicatesAsync(System.Collections.Generic.IReadOnlyCollection predicates, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__170))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__171))] public System.Collections.Generic.IAsyncEnumerable ExpectMsgAllOfMatchingPredicatesAsync(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection predicates, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } public T ExpectMsgAnyOf(System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken = null) { } public System.Threading.Tasks.ValueTask ExpectMsgAnyOfAsync(System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken = null) { } @@ -463,9 +463,9 @@ namespace Akka.TestKit public System.Threading.Tasks.ValueTask PeekOneAsync(System.Threading.CancellationToken cancellationToken) { } public System.Collections.Generic.IReadOnlyCollection ReceiveN(int numberOfMessages, System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyCollection ReceiveN(int numberOfMessages, System.TimeSpan max, System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__216))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__217))] public System.Collections.Generic.IAsyncEnumerable ReceiveNAsync(int numberOfMessages, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__218))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__219))] public System.Collections.Generic.IAsyncEnumerable ReceiveNAsync(int numberOfMessages, System.TimeSpan max, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } public object ReceiveOne(System.Nullable max = null, System.Threading.CancellationToken cancellationToken = null) { } public System.Threading.Tasks.ValueTask ReceiveOneAsync(System.Nullable max = null, System.Threading.CancellationToken cancellationToken = null) { } @@ -473,13 +473,13 @@ namespace Akka.TestKit public System.Collections.Generic.IReadOnlyList ReceiveWhile(System.Nullable max, System.Nullable idle, System.Func filter, int msgs = 2147483647, System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyList ReceiveWhile(System.Func filter, System.Nullable max = null, System.Nullable idle = null, int msgs = 2147483647, System.Threading.CancellationToken cancellationToken = null) { } public System.Collections.Generic.IReadOnlyList ReceiveWhile(System.Predicate shouldContinue, System.Nullable max = null, System.Nullable idle = null, int msgs = 2147483647, bool shouldIgnoreOtherMessageTypes = True, System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__207))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__208))] public System.Collections.Generic.IAsyncEnumerable ReceiveWhileAsync(System.Nullable max, System.Func filter, int msgs = 2147483647, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__209))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__210))] public System.Collections.Generic.IAsyncEnumerable ReceiveWhileAsync(System.Nullable max, System.Nullable idle, System.Func filter, int msgs = 2147483647, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__211))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__212))] public System.Collections.Generic.IAsyncEnumerable ReceiveWhileAsync(System.Func filter, System.Nullable max = null, System.Nullable idle = null, int msgs = 2147483647, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } - [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__213))] + [System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.d__214))] public System.Collections.Generic.IAsyncEnumerable ReceiveWhileAsync(System.Predicate shouldContinue, System.Nullable max = null, System.Nullable idle = null, int msgs = 2147483647, bool shouldIgnoreOtherMessageTypes = True, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { } protected System.TimeSpan RemainingOr(System.TimeSpan duration) { } public System.TimeSpan RemainingOrDilated(System.Nullable duration) { }