diff --git a/src/contrib/cluster/Akka.Cluster.Metrics.Tests/Base/AkkaSpecWithCollector.cs b/src/contrib/cluster/Akka.Cluster.Metrics.Tests/Base/AkkaSpecWithCollector.cs index 7b127a8a584..ccf91f04c86 100644 --- a/src/contrib/cluster/Akka.Cluster.Metrics.Tests/Base/AkkaSpecWithCollector.cs +++ b/src/contrib/cluster/Akka.Cluster.Metrics.Tests/Base/AkkaSpecWithCollector.cs @@ -66,6 +66,29 @@ protected NodeMetrics CreateTestData(string[] requiredMetrics, CancellationToken } while (!HasRequiredMetrics(metrics.Metrics, requiredMetrics)); return metrics; } + + protected async Task CreateTestDataAsync(TimeSpan timeout, string[] requiredMetrics) + { + using var cts = new CancellationTokenSource(timeout); + NodeMetrics metrics; + + do + { + cts.Token.ThrowIfCancellationRequested(); + metrics = Collector.Sample(); + + if (HasRequiredMetrics(metrics.Metrics, requiredMetrics)) + { + return metrics; + } + + // Small delay between attempts to avoid tight loop + await Task.Delay(100, cts.Token); + + } while (!cts.Token.IsCancellationRequested); + + throw new OperationCanceledException($"Could not collect required metrics {string.Join(", ", requiredMetrics)} within {timeout}"); + } private static bool HasRequiredMetrics(ImmutableHashSet metrics, string[] requiredMetrics) { diff --git a/src/contrib/cluster/Akka.Cluster.Metrics.Tests/MetricsCollectorSpec.cs b/src/contrib/cluster/Akka.Cluster.Metrics.Tests/MetricsCollectorSpec.cs index 007ca27abfb..374432b5c27 100644 --- a/src/contrib/cluster/Akka.Cluster.Metrics.Tests/MetricsCollectorSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Metrics.Tests/MetricsCollectorSpec.cs @@ -47,12 +47,12 @@ public void Metric_should_merge_2_metrics_that_are_tracking_the_same_metric() } [Fact] - public void MetricsCollector_should_collector_accurate_metrics_for_node() + public async Task MetricsCollector_should_collector_accurate_metrics_for_node() { NodeMetrics sample; try { - sample = CreateTestData(10.Seconds(), [ + sample = await CreateTestDataAsync(30.Seconds(), [ StandardMetrics.MemoryAvailable, StandardMetrics.MemoryUsed ]); diff --git a/src/contrib/cluster/Akka.Cluster.Metrics/Collectors/DefaultCollector.cs b/src/contrib/cluster/Akka.Cluster.Metrics/Collectors/DefaultCollector.cs index 94b7d515736..990e8c2b8d7 100644 --- a/src/contrib/cluster/Akka.Cluster.Metrics/Collectors/DefaultCollector.cs +++ b/src/contrib/cluster/Akka.Cluster.Metrics/Collectors/DefaultCollector.cs @@ -70,11 +70,19 @@ public NodeMetrics Sample() if(processorCount.HasValue) metrics.Add(processorCount.Value); - if (process.MaxWorkingSet != IntPtr.Zero) + try { - var workingSet = NodeMetrics.Types.Metric.Create(StandardMetrics.MaxMemoryRecommended, process.MaxWorkingSet.ToInt64()); - if(workingSet.HasValue) - metrics.Add(workingSet.Value); + if (process.MaxWorkingSet != IntPtr.Zero) + { + var workingSet = NodeMetrics.Types.Metric.Create(StandardMetrics.MaxMemoryRecommended, process.MaxWorkingSet.ToInt64()); + if(workingSet.HasValue) + metrics.Add(workingSet.Value); + } + } + catch (Exception) + { + // MaxWorkingSet may throw on some platforms (e.g., Linux/Mono) + // Ignore and continue without this metric } var (processCpuUsage, totalCpuUsage) = GetCpuUsages(process.Id); diff --git a/src/core/Akka.TestKit.Tests/TestKitAsyncCancellationSpec.cs b/src/core/Akka.TestKit.Tests/TestKitAsyncCancellationSpec.cs new file mode 100644 index 00000000000..072c04773f3 --- /dev/null +++ b/src/core/Akka.TestKit.Tests/TestKitAsyncCancellationSpec.cs @@ -0,0 +1,147 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Akka.TestKit; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Tests.TestKit +{ + /// + /// Tests for GitHub issue #7743 - Excessive exception nesting when cancelling ExpectMsgAsync + /// + public class TestKitAsyncCancellationSpec : AkkaSpec + { + public TestKitAsyncCancellationSpec(ITestOutputHelper output) : base(output) + { + } + + [Fact(DisplayName = "ExpectMsgAsync should not have excessive exception nesting when cancelled")] + public async Task ExpectMsgAsync_Should_Not_Have_Excessive_Exception_Nesting_When_Cancelled() + { + var probe = CreateTestProbe(); + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + Exception caughtException = null; + try + { + await probe.ExpectMsgAsync(cancellationToken: cts.Token); + } + catch (Exception ex) + { + caughtException = ex; + } + + Assert.NotNull(caughtException); + + // The key issue from #7743 is that we should NOT have AggregateException containing AggregateException + // We should have at most one level of wrapping + if (caughtException is AggregateException aggEx) + { + // The inner exception should NOT be another AggregateException + Assert.IsNotType(aggEx.InnerException); + // It should be some form of OperationCanceledException + Assert.IsAssignableFrom(aggEx.InnerException); + } + else + { + // Or it could be OperationCanceledException directly (which is fine) + Assert.IsAssignableFrom(caughtException); + } + } + + [Fact(DisplayName = "ExpectMsgAsync accessed via Task.Exception should not have excessive nesting")] + public async Task ExpectMsgAsync_Task_Exception_Should_Not_Have_Excessive_Nesting() + { + var probe = CreateTestProbe(); + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + var task = probe.ExpectMsgAsync(cancellationToken: cts.Token).AsTask(); + + // Wait for the task to complete + await Task.Delay(100); + + // The task should be either Canceled or Faulted + Assert.True(task.IsCanceled || task.IsFaulted); + + if (task.IsFaulted) + { + Assert.NotNull(task.Exception); + + // The key issue: we should NOT have nested AggregateExceptions + var aggEx = Assert.IsType(task.Exception); + + // Verify no excessive nesting - inner should not be AggregateException + Assert.IsNotType(aggEx.InnerException); + + // It should be some form of OperationCanceledException + Assert.IsAssignableFrom(aggEx.InnerException); + } + // If task.IsCanceled is true, Task.Exception will be null, which is also valid + } + + [Fact(DisplayName = "Synchronous ExpectMsg with cancellation should not have excessive nesting")] + public void ExpectMsg_Synchronous_Should_Not_Have_Excessive_Nesting() + { + var probe = CreateTestProbe(); + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + Exception syncException = null; + try + { + probe.ExpectMsg(cancellationToken: cts.Token); + } + catch (Exception ex) + { + syncException = ex; + } + + Assert.NotNull(syncException); + + // The key issue: verify no excessive nesting + if (syncException is AggregateException aggEx) + { + // The inner exception should NOT be another AggregateException + Assert.IsNotType(aggEx.InnerException); + // It should be some form of OperationCanceledException + Assert.IsAssignableFrom(aggEx.InnerException); + } + else + { + // Or it could be OperationCanceledException directly + Assert.IsAssignableFrom(syncException); + } + } + + [Fact(DisplayName = "Original bug report scenario from issue #7743")] + public async Task Original_Bug_Report_Scenario_Should_Pass() + { + // This is the exact test from the bug report that was failing + var probe = CreateTestProbe(); + + using var stopper = new CancellationTokenSource(); + stopper.Cancel(); + + var task = probe.ExpectMsgAsync( + cancellationToken: stopper.Token).AsTask(); + + await Task.Delay(TimeSpan.FromSeconds(1)); // default timeout is 3 seconds + + // Original test expected double nesting - we're verifying this is fixed + if (task.IsFaulted && task.Exception != null) + { + var outer = task.Exception; + Assert.IsType(outer); + + // The bug was that InnerException was ALSO an AggregateException + // Now it should be OperationCanceledException directly + Assert.IsNotType(outer.InnerException); + Assert.IsAssignableFrom(outer.InnerException); + } + // Task might also be in Canceled state, which is valid + } + } +} \ No newline at end of file diff --git a/src/core/Akka.TestKit/TestKitBase_Receive.cs b/src/core/Akka.TestKit/TestKitBase_Receive.cs index fa77e20687e..3e0be70e522 100644 --- a/src/core/Akka.TestKit/TestKitBase_Receive.cs +++ b/src/core/Akka.TestKit/TestKitBase_Receive.cs @@ -259,19 +259,41 @@ public bool TryReceiveOne( else if (maxDuration.IsPositiveFinite()) { ConditionalLog(shouldLog, "Trying to receive message from TestActor queue within {0}", maxDuration); - var delayTask = Task.Delay(maxDuration, cancellationToken); - var readTask = _testState.Queue.Reader.WaitToReadAsync(cancellationToken).AsTask(); - var completedTask = await Task.WhenAny(readTask, delayTask); - - if (completedTask == readTask && readTask.Result) + + try { - // Data is available within the timeout. - var didTake = _testState.Queue.Reader.TryRead(out var item); - take = (didTake, item); + // Create timeout task + using var cts = new CancellationTokenSource(maxDuration); + + // Combine with user cancellation token if provided + using var linkedCts = cancellationToken.CanBeCanceled + ? CancellationTokenSource.CreateLinkedTokenSource(cts.Token, cancellationToken) + : null; + + var effectiveToken = linkedCts?.Token ?? cts.Token; + + // This will throw OperationCanceledException if cancelled + var canRead = await _testState.Queue.Reader.WaitToReadAsync(effectiveToken); + if (canRead) + { + // Data is available within the timeout. + var didTake = _testState.Queue.Reader.TryRead(out var item); + take = (didTake, item); + } + else + { + // Channel was completed + take = (false, null); + } } - else + catch (OperationCanceledException) when (cancellationToken.CanBeCanceled && cancellationToken.IsCancellationRequested) { - // Timeout occurred before data was available. + // User cancellation - let it propagate + throw; + } + catch (OperationCanceledException) + { + // This was a timeout - return false take = (false, null); } }