Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,29 @@ protected NodeMetrics CreateTestData(string[] requiredMetrics, CancellationToken
} while (!HasRequiredMetrics(metrics.Metrics, requiredMetrics));
return metrics;
}

protected async Task<NodeMetrics> CreateTestDataAsync(TimeSpan timeout, string[] requiredMetrics)
{
using var cts = new CancellationTokenSource(timeout);
NodeMetrics metrics;

do
{
cts.Token.ThrowIfCancellationRequested();
metrics = Collector.Sample();

if (HasRequiredMetrics(metrics.Metrics, requiredMetrics))
{
return metrics;
}

// Small delay between attempts to avoid tight loop
await Task.Delay(100, cts.Token);

} while (!cts.Token.IsCancellationRequested);

throw new OperationCanceledException($"Could not collect required metrics {string.Join(", ", requiredMetrics)} within {timeout}");
}

private static bool HasRequiredMetrics(ImmutableHashSet<NodeMetrics.Types.Metric> metrics, string[] requiredMetrics)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ public void Metric_should_merge_2_metrics_that_are_tracking_the_same_metric()
}

[Fact]
public void MetricsCollector_should_collector_accurate_metrics_for_node()
public async Task MetricsCollector_should_collector_accurate_metrics_for_node()
{
NodeMetrics sample;
try
{
sample = CreateTestData(10.Seconds(), [
sample = await CreateTestDataAsync(30.Seconds(), [
StandardMetrics.MemoryAvailable,
StandardMetrics.MemoryUsed
]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,19 @@ public NodeMetrics Sample()
if(processorCount.HasValue)
metrics.Add(processorCount.Value);

if (process.MaxWorkingSet != IntPtr.Zero)
try
{
var workingSet = NodeMetrics.Types.Metric.Create(StandardMetrics.MaxMemoryRecommended, process.MaxWorkingSet.ToInt64());
if(workingSet.HasValue)
metrics.Add(workingSet.Value);
if (process.MaxWorkingSet != IntPtr.Zero)
{
var workingSet = NodeMetrics.Types.Metric.Create(StandardMetrics.MaxMemoryRecommended, process.MaxWorkingSet.ToInt64());
if(workingSet.HasValue)
metrics.Add(workingSet.Value);
}
}
catch (Exception)
{
// MaxWorkingSet may throw on some platforms (e.g., Linux/Mono)
// Ignore and continue without this metric
}

var (processCpuUsage, totalCpuUsage) = GetCpuUsages(process.Id);
Expand Down
147 changes: 147 additions & 0 deletions src/core/Akka.TestKit.Tests/TestKitAsyncCancellationSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Tests.TestKit
{
/// <summary>
/// Tests for GitHub issue #7743 - Excessive exception nesting when cancelling ExpectMsgAsync
/// </summary>
public class TestKitAsyncCancellationSpec : AkkaSpec
{
public TestKitAsyncCancellationSpec(ITestOutputHelper output) : base(output)
{
}

[Fact(DisplayName = "ExpectMsgAsync should not have excessive exception nesting when cancelled")]
public async Task ExpectMsgAsync_Should_Not_Have_Excessive_Exception_Nesting_When_Cancelled()
{
var probe = CreateTestProbe();
using var cts = new CancellationTokenSource();
cts.Cancel();

Exception caughtException = null;
try
{
await probe.ExpectMsgAsync<int>(cancellationToken: cts.Token);
}
catch (Exception ex)
{
caughtException = ex;
}

Assert.NotNull(caughtException);

// The key issue from #7743 is that we should NOT have AggregateException containing AggregateException
// We should have at most one level of wrapping
if (caughtException is AggregateException aggEx)
{
// The inner exception should NOT be another AggregateException
Assert.IsNotType<AggregateException>(aggEx.InnerException);
// It should be some form of OperationCanceledException
Assert.IsAssignableFrom<OperationCanceledException>(aggEx.InnerException);
}
else
{
// Or it could be OperationCanceledException directly (which is fine)
Assert.IsAssignableFrom<OperationCanceledException>(caughtException);
}
}

[Fact(DisplayName = "ExpectMsgAsync accessed via Task.Exception should not have excessive nesting")]
public async Task ExpectMsgAsync_Task_Exception_Should_Not_Have_Excessive_Nesting()
{
var probe = CreateTestProbe();
using var cts = new CancellationTokenSource();
cts.Cancel();

var task = probe.ExpectMsgAsync<int>(cancellationToken: cts.Token).AsTask();

// Wait for the task to complete
await Task.Delay(100);

// The task should be either Canceled or Faulted
Assert.True(task.IsCanceled || task.IsFaulted);

if (task.IsFaulted)
{
Assert.NotNull(task.Exception);

// The key issue: we should NOT have nested AggregateExceptions
var aggEx = Assert.IsType<AggregateException>(task.Exception);

// Verify no excessive nesting - inner should not be AggregateException
Assert.IsNotType<AggregateException>(aggEx.InnerException);

// It should be some form of OperationCanceledException
Assert.IsAssignableFrom<OperationCanceledException>(aggEx.InnerException);
}
// If task.IsCanceled is true, Task.Exception will be null, which is also valid
}

[Fact(DisplayName = "Synchronous ExpectMsg with cancellation should not have excessive nesting")]
public void ExpectMsg_Synchronous_Should_Not_Have_Excessive_Nesting()
{
var probe = CreateTestProbe();
using var cts = new CancellationTokenSource();
cts.Cancel();

Exception syncException = null;
try
{
probe.ExpectMsg<int>(cancellationToken: cts.Token);
}
catch (Exception ex)
{
syncException = ex;
}

Assert.NotNull(syncException);

// The key issue: verify no excessive nesting
if (syncException is AggregateException aggEx)
{
// The inner exception should NOT be another AggregateException
Assert.IsNotType<AggregateException>(aggEx.InnerException);
// It should be some form of OperationCanceledException
Assert.IsAssignableFrom<OperationCanceledException>(aggEx.InnerException);
}
else
{
// Or it could be OperationCanceledException directly
Assert.IsAssignableFrom<OperationCanceledException>(syncException);
}
}

[Fact(DisplayName = "Original bug report scenario from issue #7743")]
public async Task Original_Bug_Report_Scenario_Should_Pass()
{
// This is the exact test from the bug report that was failing
var probe = CreateTestProbe();

using var stopper = new CancellationTokenSource();
stopper.Cancel();

var task = probe.ExpectMsgAsync<int>(
cancellationToken: stopper.Token).AsTask();

await Task.Delay(TimeSpan.FromSeconds(1)); // default timeout is 3 seconds

// Original test expected double nesting - we're verifying this is fixed
if (task.IsFaulted && task.Exception != null)
{
var outer = task.Exception;
Assert.IsType<AggregateException>(outer);

// The bug was that InnerException was ALSO an AggregateException
// Now it should be OperationCanceledException directly
Assert.IsNotType<AggregateException>(outer.InnerException);
Assert.IsAssignableFrom<OperationCanceledException>(outer.InnerException);
}
// Task might also be in Canceled state, which is valid
}
}
}
42 changes: 32 additions & 10 deletions src/core/Akka.TestKit/TestKitBase_Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -259,19 +259,41 @@ public bool TryReceiveOne(
else if (maxDuration.IsPositiveFinite())
{
ConditionalLog(shouldLog, "Trying to receive message from TestActor queue within {0}", maxDuration);
var delayTask = Task.Delay(maxDuration, cancellationToken);
var readTask = _testState.Queue.Reader.WaitToReadAsync(cancellationToken).AsTask();
var completedTask = await Task.WhenAny(readTask, delayTask);

if (completedTask == readTask && readTask.Result)

try
{
// Data is available within the timeout.
var didTake = _testState.Queue.Reader.TryRead(out var item);
take = (didTake, item);
// Create timeout task
using var cts = new CancellationTokenSource(maxDuration);

// Combine with user cancellation token if provided
using var linkedCts = cancellationToken.CanBeCanceled
? CancellationTokenSource.CreateLinkedTokenSource(cts.Token, cancellationToken)
: null;

var effectiveToken = linkedCts?.Token ?? cts.Token;

// This will throw OperationCanceledException if cancelled
var canRead = await _testState.Queue.Reader.WaitToReadAsync(effectiveToken);
if (canRead)
{
// Data is available within the timeout.
var didTake = _testState.Queue.Reader.TryRead(out var item);
take = (didTake, item);
}
else
{
// Channel was completed
take = (false, null);
}
}
else
catch (OperationCanceledException) when (cancellationToken.CanBeCanceled && cancellationToken.IsCancellationRequested)
{
// Timeout occurred before data was available.
// User cancellation - let it propagate
throw;
}
catch (OperationCanceledException)
{
// This was a timeout - return false
take = (false, null);
}
}
Expand Down