From 702623f60e5bdc0fd8592eb7608e3931c217cb87 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 11 Aug 2025 19:51:48 -0500 Subject: [PATCH 1/5] Fix excessive exception nesting when cancelling ExpectMsgAsync Fixes #7743 by preventing double AggregateException wrapping in cancelled async operations --- .../TestKitAsyncCancellationSpec.cs | 147 ++++++++++++++++++ src/core/Akka.TestKit/TestKitBase_Receive.cs | 39 +++-- 2 files changed, 174 insertions(+), 12 deletions(-) create mode 100644 src/core/Akka.TestKit.Tests/TestKitAsyncCancellationSpec.cs diff --git a/src/core/Akka.TestKit.Tests/TestKitAsyncCancellationSpec.cs b/src/core/Akka.TestKit.Tests/TestKitAsyncCancellationSpec.cs new file mode 100644 index 00000000000..072c04773f3 --- /dev/null +++ b/src/core/Akka.TestKit.Tests/TestKitAsyncCancellationSpec.cs @@ -0,0 +1,147 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Akka.TestKit; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Tests.TestKit +{ + /// + /// Tests for GitHub issue #7743 - Excessive exception nesting when cancelling ExpectMsgAsync + /// + public class TestKitAsyncCancellationSpec : AkkaSpec + { + public TestKitAsyncCancellationSpec(ITestOutputHelper output) : base(output) + { + } + + [Fact(DisplayName = "ExpectMsgAsync should not have excessive exception nesting when cancelled")] + public async Task ExpectMsgAsync_Should_Not_Have_Excessive_Exception_Nesting_When_Cancelled() + { + var probe = CreateTestProbe(); + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + Exception caughtException = null; + try + { + await probe.ExpectMsgAsync(cancellationToken: cts.Token); + } + catch (Exception ex) + { + caughtException = ex; + } + + Assert.NotNull(caughtException); + + // The key issue from #7743 is that we should NOT have AggregateException containing AggregateException + // We should have at most one level of wrapping + if (caughtException is AggregateException aggEx) + { + // The inner exception should NOT be another AggregateException + Assert.IsNotType(aggEx.InnerException); + // It should be some form of OperationCanceledException + Assert.IsAssignableFrom(aggEx.InnerException); + } + else + { + // Or it could be OperationCanceledException directly (which is fine) + Assert.IsAssignableFrom(caughtException); + } + } + + [Fact(DisplayName = "ExpectMsgAsync accessed via Task.Exception should not have excessive nesting")] + public async Task ExpectMsgAsync_Task_Exception_Should_Not_Have_Excessive_Nesting() + { + var probe = CreateTestProbe(); + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + var task = probe.ExpectMsgAsync(cancellationToken: cts.Token).AsTask(); + + // Wait for the task to complete + await Task.Delay(100); + + // The task should be either Canceled or Faulted + Assert.True(task.IsCanceled || task.IsFaulted); + + if (task.IsFaulted) + { + Assert.NotNull(task.Exception); + + // The key issue: we should NOT have nested AggregateExceptions + var aggEx = Assert.IsType(task.Exception); + + // Verify no excessive nesting - inner should not be AggregateException + Assert.IsNotType(aggEx.InnerException); + + // It should be some form of OperationCanceledException + Assert.IsAssignableFrom(aggEx.InnerException); + } + // If task.IsCanceled is true, Task.Exception will be null, which is also valid + } + + [Fact(DisplayName = "Synchronous ExpectMsg with cancellation should not have excessive nesting")] + public void ExpectMsg_Synchronous_Should_Not_Have_Excessive_Nesting() + { + var probe = CreateTestProbe(); + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + Exception syncException = null; + try + { + probe.ExpectMsg(cancellationToken: cts.Token); + } + catch (Exception ex) + { + syncException = ex; + } + + Assert.NotNull(syncException); + + // The key issue: verify no excessive nesting + if (syncException is AggregateException aggEx) + { + // The inner exception should NOT be another AggregateException + Assert.IsNotType(aggEx.InnerException); + // It should be some form of OperationCanceledException + Assert.IsAssignableFrom(aggEx.InnerException); + } + else + { + // Or it could be OperationCanceledException directly + Assert.IsAssignableFrom(syncException); + } + } + + [Fact(DisplayName = "Original bug report scenario from issue #7743")] + public async Task Original_Bug_Report_Scenario_Should_Pass() + { + // This is the exact test from the bug report that was failing + var probe = CreateTestProbe(); + + using var stopper = new CancellationTokenSource(); + stopper.Cancel(); + + var task = probe.ExpectMsgAsync( + cancellationToken: stopper.Token).AsTask(); + + await Task.Delay(TimeSpan.FromSeconds(1)); // default timeout is 3 seconds + + // Original test expected double nesting - we're verifying this is fixed + if (task.IsFaulted && task.Exception != null) + { + var outer = task.Exception; + Assert.IsType(outer); + + // The bug was that InnerException was ALSO an AggregateException + // Now it should be OperationCanceledException directly + Assert.IsNotType(outer.InnerException); + Assert.IsAssignableFrom(outer.InnerException); + } + // Task might also be in Canceled state, which is valid + } + } +} \ No newline at end of file diff --git a/src/core/Akka.TestKit/TestKitBase_Receive.cs b/src/core/Akka.TestKit/TestKitBase_Receive.cs index fa77e20687e..881dc37a2af 100644 --- a/src/core/Akka.TestKit/TestKitBase_Receive.cs +++ b/src/core/Akka.TestKit/TestKitBase_Receive.cs @@ -247,6 +247,9 @@ public bool TryReceiveOne( bool shouldLog, CancellationToken cancellationToken) { + // Check for cancellation upfront to avoid extra wrapping + cancellationToken.ThrowIfCancellationRequested(); + (bool didTake, MessageEnvelope env) take; var maxDuration = GetTimeoutOrDefault(max); var start = Now; @@ -259,26 +262,38 @@ public bool TryReceiveOne( else if (maxDuration.IsPositiveFinite()) { ConditionalLog(shouldLog, "Trying to receive message from TestActor queue within {0}", maxDuration); - var delayTask = Task.Delay(maxDuration, cancellationToken); - var readTask = _testState.Queue.Reader.WaitToReadAsync(cancellationToken).AsTask(); - var completedTask = await Task.WhenAny(readTask, delayTask); - - if (completedTask == readTask && readTask.Result) + + try { - // Data is available within the timeout. - var didTake = _testState.Queue.Reader.TryRead(out var item); - take = (didTake, item); + var delayTask = Task.Delay(maxDuration, cancellationToken); + var readTask = _testState.Queue.Reader.WaitToReadAsync(cancellationToken).AsTask(); + var completedTask = await Task.WhenAny(readTask, delayTask).ConfigureAwait(false); + + // Check if cancellation was requested during the wait + cancellationToken.ThrowIfCancellationRequested(); + + if (completedTask == readTask && readTask.Result) + { + // Data is available within the timeout. + var didTake = _testState.Queue.Reader.TryRead(out var item); + take = (didTake, item); + } + else + { + // Timeout occurred before data was available. + take = (false, null); + } } - else + catch (OperationCanceledException) { - // Timeout occurred before data was available. - take = (false, null); + // Re-throw without extra wrapping + throw; } } else if (maxDuration == Timeout.InfiniteTimeSpan) { Log.Warning("Trying to receive message from TestActor queue with infinite timeout! Will wait indefinitely!"); - var readItem = await _testState.Queue.Reader.ReadAsync(cancellationToken); + var readItem = await _testState.Queue.Reader.ReadAsync(cancellationToken).ConfigureAwait(false); take = (true, readItem); } else From b2fd59932c50df42eecabdd34ca36e26299ed434 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 11 Aug 2025 19:58:49 -0500 Subject: [PATCH 2/5] Refine cancellation handling to distinguish timeout from user cancellation - Timeout scenarios return (false, null) following Try pattern - User cancellation throws OperationCanceledException as expected - Uses linked cancellation token for cleaner timeout/cancellation logic --- src/core/Akka.TestKit/TestKitBase_Receive.cs | 27 +++++++++----------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/src/core/Akka.TestKit/TestKitBase_Receive.cs b/src/core/Akka.TestKit/TestKitBase_Receive.cs index 881dc37a2af..47e4dc2a81e 100644 --- a/src/core/Akka.TestKit/TestKitBase_Receive.cs +++ b/src/core/Akka.TestKit/TestKitBase_Receive.cs @@ -247,9 +247,6 @@ public bool TryReceiveOne( bool shouldLog, CancellationToken cancellationToken) { - // Check for cancellation upfront to avoid extra wrapping - cancellationToken.ThrowIfCancellationRequested(); - (bool didTake, MessageEnvelope env) take; var maxDuration = GetTimeoutOrDefault(max); var start = Now; @@ -263,16 +260,15 @@ public bool TryReceiveOne( { ConditionalLog(shouldLog, "Trying to receive message from TestActor queue within {0}", maxDuration); + // Use a combined cancellation token to handle both timeout and user cancellation + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(maxDuration); + try { - var delayTask = Task.Delay(maxDuration, cancellationToken); - var readTask = _testState.Queue.Reader.WaitToReadAsync(cancellationToken).AsTask(); - var completedTask = await Task.WhenAny(readTask, delayTask).ConfigureAwait(false); - - // Check if cancellation was requested during the wait - cancellationToken.ThrowIfCancellationRequested(); - - if (completedTask == readTask && readTask.Result) + // This will throw OperationCanceledException if cancelled + var canRead = await _testState.Queue.Reader.WaitToReadAsync(cts.Token).ConfigureAwait(false); + if (canRead) { // Data is available within the timeout. var didTake = _testState.Queue.Reader.TryRead(out var item); @@ -280,15 +276,16 @@ public bool TryReceiveOne( } else { - // Timeout occurred before data was available. + // Channel was completed take = (false, null); } } - catch (OperationCanceledException) + catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested) { - // Re-throw without extra wrapping - throw; + // This was a timeout, not user cancellation - return false + take = (false, null); } + // If cancellationToken.IsCancellationRequested is true, let the exception propagate } else if (maxDuration == Timeout.InfiniteTimeSpan) { From 8a292e70da31db7955db6ac8afb952a653f291cf Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 12 Aug 2025 09:11:50 -0500 Subject: [PATCH 3/5] Fix cancellation token handling in InternalTryReceiveOneAsync The previous implementation could incorrectly trigger cancellation when using CreateLinkedTokenSource with a default (non-cancellable) token. This was causing test failures in Akka.Cluster.Metrics.Tests where test initialization would timeout incorrectly. Changes: - Create timeout CancellationTokenSource directly with the timeout value - Only create linked token source when user provides a cancellable token - Properly distinguish between timeout and user cancellation in exception handling - User cancellation exceptions are re-thrown, timeout exceptions return false This fixes the MetricValuesSpec constructor failures where CreateTestData was incorrectly timing out during metrics collection. --- src/core/Akka.TestKit/TestKitBase_Receive.cs | 26 ++++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/src/core/Akka.TestKit/TestKitBase_Receive.cs b/src/core/Akka.TestKit/TestKitBase_Receive.cs index 47e4dc2a81e..1221c51e60b 100644 --- a/src/core/Akka.TestKit/TestKitBase_Receive.cs +++ b/src/core/Akka.TestKit/TestKitBase_Receive.cs @@ -260,14 +260,20 @@ public bool TryReceiveOne( { ConditionalLog(shouldLog, "Trying to receive message from TestActor queue within {0}", maxDuration); - // Use a combined cancellation token to handle both timeout and user cancellation - using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - cts.CancelAfter(maxDuration); - try { + // Create timeout task + using var cts = new CancellationTokenSource(maxDuration); + + // Combine with user cancellation token if provided + using var linkedCts = cancellationToken.CanBeCanceled + ? CancellationTokenSource.CreateLinkedTokenSource(cts.Token, cancellationToken) + : null; + + var effectiveToken = linkedCts?.Token ?? cts.Token; + // This will throw OperationCanceledException if cancelled - var canRead = await _testState.Queue.Reader.WaitToReadAsync(cts.Token).ConfigureAwait(false); + var canRead = await _testState.Queue.Reader.WaitToReadAsync(effectiveToken).ConfigureAwait(false); if (canRead) { // Data is available within the timeout. @@ -280,12 +286,16 @@ public bool TryReceiveOne( take = (false, null); } } - catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested) + catch (OperationCanceledException) when (cancellationToken.CanBeCanceled && cancellationToken.IsCancellationRequested) + { + // User cancellation - let it propagate + throw; + } + catch (OperationCanceledException) { - // This was a timeout, not user cancellation - return false + // This was a timeout - return false take = (false, null); } - // If cancellationToken.IsCancellationRequested is true, let the exception propagate } else if (maxDuration == Timeout.InfiniteTimeSpan) { From 9e010bfff534ce70e20f645d45b9e72c1816aebd Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 12 Aug 2025 12:51:27 -0500 Subject: [PATCH 4/5] remove `ConfigureAwait(false)` calls --- src/core/Akka.TestKit/TestKitBase_Receive.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.TestKit/TestKitBase_Receive.cs b/src/core/Akka.TestKit/TestKitBase_Receive.cs index 1221c51e60b..3e0be70e522 100644 --- a/src/core/Akka.TestKit/TestKitBase_Receive.cs +++ b/src/core/Akka.TestKit/TestKitBase_Receive.cs @@ -273,7 +273,7 @@ public bool TryReceiveOne( var effectiveToken = linkedCts?.Token ?? cts.Token; // This will throw OperationCanceledException if cancelled - var canRead = await _testState.Queue.Reader.WaitToReadAsync(effectiveToken).ConfigureAwait(false); + var canRead = await _testState.Queue.Reader.WaitToReadAsync(effectiveToken); if (canRead) { // Data is available within the timeout. @@ -300,7 +300,7 @@ public bool TryReceiveOne( else if (maxDuration == Timeout.InfiniteTimeSpan) { Log.Warning("Trying to receive message from TestActor queue with infinite timeout! Will wait indefinitely!"); - var readItem = await _testState.Queue.Reader.ReadAsync(cancellationToken).ConfigureAwait(false); + var readItem = await _testState.Queue.Reader.ReadAsync(cancellationToken); take = (true, readItem); } else From b6ce90abd996a91829251d72cc433239a413df24 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 12 Aug 2025 15:12:56 -0500 Subject: [PATCH 5/5] Fix flaky MetricsCollector test and platform-specific issues - Convert test to async to avoid Thread.Sleep blocking - Increase timeout from 10s to 30s for resource-constrained CI environments - Add exception handling for MaxWorkingSet access on Linux/Mono - Remove ConfigureAwait(false) from test code --- .../Base/AkkaSpecWithCollector.cs | 23 +++++++++++++++++++ .../MetricsCollectorSpec.cs | 4 ++-- .../Collectors/DefaultCollector.cs | 16 +++++++++---- 3 files changed, 37 insertions(+), 6 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Metrics.Tests/Base/AkkaSpecWithCollector.cs b/src/contrib/cluster/Akka.Cluster.Metrics.Tests/Base/AkkaSpecWithCollector.cs index 7b127a8a584..ccf91f04c86 100644 --- a/src/contrib/cluster/Akka.Cluster.Metrics.Tests/Base/AkkaSpecWithCollector.cs +++ b/src/contrib/cluster/Akka.Cluster.Metrics.Tests/Base/AkkaSpecWithCollector.cs @@ -66,6 +66,29 @@ protected NodeMetrics CreateTestData(string[] requiredMetrics, CancellationToken } while (!HasRequiredMetrics(metrics.Metrics, requiredMetrics)); return metrics; } + + protected async Task CreateTestDataAsync(TimeSpan timeout, string[] requiredMetrics) + { + using var cts = new CancellationTokenSource(timeout); + NodeMetrics metrics; + + do + { + cts.Token.ThrowIfCancellationRequested(); + metrics = Collector.Sample(); + + if (HasRequiredMetrics(metrics.Metrics, requiredMetrics)) + { + return metrics; + } + + // Small delay between attempts to avoid tight loop + await Task.Delay(100, cts.Token); + + } while (!cts.Token.IsCancellationRequested); + + throw new OperationCanceledException($"Could not collect required metrics {string.Join(", ", requiredMetrics)} within {timeout}"); + } private static bool HasRequiredMetrics(ImmutableHashSet metrics, string[] requiredMetrics) { diff --git a/src/contrib/cluster/Akka.Cluster.Metrics.Tests/MetricsCollectorSpec.cs b/src/contrib/cluster/Akka.Cluster.Metrics.Tests/MetricsCollectorSpec.cs index 007ca27abfb..374432b5c27 100644 --- a/src/contrib/cluster/Akka.Cluster.Metrics.Tests/MetricsCollectorSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Metrics.Tests/MetricsCollectorSpec.cs @@ -47,12 +47,12 @@ public void Metric_should_merge_2_metrics_that_are_tracking_the_same_metric() } [Fact] - public void MetricsCollector_should_collector_accurate_metrics_for_node() + public async Task MetricsCollector_should_collector_accurate_metrics_for_node() { NodeMetrics sample; try { - sample = CreateTestData(10.Seconds(), [ + sample = await CreateTestDataAsync(30.Seconds(), [ StandardMetrics.MemoryAvailable, StandardMetrics.MemoryUsed ]); diff --git a/src/contrib/cluster/Akka.Cluster.Metrics/Collectors/DefaultCollector.cs b/src/contrib/cluster/Akka.Cluster.Metrics/Collectors/DefaultCollector.cs index 94b7d515736..990e8c2b8d7 100644 --- a/src/contrib/cluster/Akka.Cluster.Metrics/Collectors/DefaultCollector.cs +++ b/src/contrib/cluster/Akka.Cluster.Metrics/Collectors/DefaultCollector.cs @@ -70,11 +70,19 @@ public NodeMetrics Sample() if(processorCount.HasValue) metrics.Add(processorCount.Value); - if (process.MaxWorkingSet != IntPtr.Zero) + try { - var workingSet = NodeMetrics.Types.Metric.Create(StandardMetrics.MaxMemoryRecommended, process.MaxWorkingSet.ToInt64()); - if(workingSet.HasValue) - metrics.Add(workingSet.Value); + if (process.MaxWorkingSet != IntPtr.Zero) + { + var workingSet = NodeMetrics.Types.Metric.Create(StandardMetrics.MaxMemoryRecommended, process.MaxWorkingSet.ToInt64()); + if(workingSet.HasValue) + metrics.Add(workingSet.Value); + } + } + catch (Exception) + { + // MaxWorkingSet may throw on some platforms (e.g., Linux/Mono) + // Ignore and continue without this metric } var (processCpuUsage, totalCpuUsage) = GetCpuUsages(process.Id);