Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ public void ClusterShardingDeadShardRegionTest()
}

[Fact]
public void ClusterSharding_must()
public async Task ClusterSharding_must()
{
ClusterSharding_must_initialize_cluster_and_allocate_sharded_actors();
ClusterSharding_must_only_deliver_buffered_RestartShard_to_the_local_region();
await ClusterSharding_must_only_deliver_buffered_RestartShard_to_the_local_region();
}

public void ClusterSharding_must_initialize_cluster_and_allocate_sharded_actors()
Expand Down Expand Up @@ -186,7 +186,7 @@ public void ClusterSharding_must_initialize_cluster_and_allocate_sharded_actors(
p2.ExpectMsg(3);
}

public void ClusterSharding_must_only_deliver_buffered_RestartShard_to_the_local_region()
public async Task ClusterSharding_must_only_deliver_buffered_RestartShard_to_the_local_region()
{
ImmutableHashSet<string> StatesFor(IActorRef region, TestProbe probe, int expect)
{
Expand All @@ -203,14 +203,14 @@ ImmutableHashSet<string> StatesFor(IActorRef region, TestProbe probe, int expect
}, msgs: expect).SelectMany(i => i).ToImmutableHashSet();
}

bool AwaitRebalance(IActorRef region, int msg, TestProbe probe)
async Task<bool> AwaitRebalanceAsync(IActorRef region, int msg, TestProbe probe)
{
region.Tell(msg, probe.Ref);
var m = probe.ExpectMsg<int>(TimeSpan.FromSeconds(2));
var m = await probe.ExpectMsgAsync<int>(TimeSpan.FromSeconds(2));
if (m == msg)
return true;
else
return AwaitRebalance(region, msg, probe);
return await AwaitRebalanceAsync(region, msg, probe);
}

void Swap<T>(ref T v1, ref T v2)
Expand Down Expand Up @@ -244,9 +244,9 @@ void Swap<T>(ref T v1, ref T v2)
});

// Difficult to raise the RestartShard in conjunction with the rebalance for mode=ddata
AwaitAssert(() =>
await AwaitAssertAsync(async () =>
{
AwaitRebalance(region1, shardIdToMove, p1).Should().BeTrue();
(await AwaitRebalanceAsync(region1, shardIdToMove, p1)).Should().BeTrue();
});

var rebalancedOnRegion1 = StatesFor(region1, p1, expect: numberOfShards);
Expand Down
16 changes: 14 additions & 2 deletions src/core/Akka.Tests/Actor/InboxSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,29 @@ public async Task Inbox_have_a_default_and_custom_timeouts()
{
await WithinAsync(TimeSpan.FromSeconds(4), TimeSpan.FromSeconds(6), () =>
{
Assert.Throws<TimeoutException>(() => _inbox.Receive());
// Inbox.Receive() may throw TimeoutException directly or wrapped in AggregateException
// depending on internal implementation (sync vs async path)
var ex = Record.Exception(() => _inbox.Receive());
Assert.NotNull(ex);
Assert.True(IsTimeoutException(ex), $"Expected TimeoutException but got: {ex.GetType().Name}");
return Task.CompletedTask;
});

await WithinAsync(TimeSpan.FromSeconds(1), () =>
{
Assert.Throws<TimeoutException>(() => _inbox.Receive(TimeSpan.FromMilliseconds(100)));
var ex = Record.Exception(() => _inbox.Receive(TimeSpan.FromMilliseconds(100)));
Assert.NotNull(ex);
Assert.True(IsTimeoutException(ex), $"Expected TimeoutException but got: {ex.GetType().Name}");
return Task.CompletedTask;
});
}

private static bool IsTimeoutException(Exception ex)
{
return ex is TimeoutException ||
(ex is AggregateException agg && agg.Flatten().InnerExceptions.Any(e => e is TimeoutException));
}

[Fact]
public void Select_WithClient_should_update_Client_and_copy_the_rest_of_the_properties_BUG_427()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,21 @@ public async Task ScheduleRepeatedly_in_milliseconds_Tests_and_verify_the_interv
//Expect to get a list from receiver after it has received three messages
var dateTimeOffsets = await ExpectMsgAsync<List<DateTimeOffset>>();
dateTimeOffsets.ShouldHaveCount(3);
// CI machines can have significant timing variability due to CPU contention,
// virtualization overhead, and GC pauses. Use 30% tolerance to accommodate
// this while still catching gross scheduler bugs.
const double maxDeviation = 0.30;
Action<int, int> validate = (a, b) =>
{
var valA = dateTimeOffsets[a];
var valB = dateTimeOffsets[b];
var diffBetweenMessages = Math.Abs((valB - valA).TotalMilliseconds);
var diffInMs = Math.Abs(diffBetweenMessages - interval);
var deviate = (diffInMs/interval);
deviate.Should(val => val < 0.1,
deviate.Should(val => val < maxDeviation,
string.Format(
"Expected the interval between message {1} and {2} to deviate maximum 10% from {0}. It was {3} ms between the messages. It deviated {4}%",
interval, a + 1, b + 1, diffBetweenMessages, deviate*100));
"Expected the interval between message {1} and {2} to deviate maximum {5}% from {0}. It was {3} ms between the messages. It deviated {4}%",
interval, a + 1, b + 1, diffBetweenMessages, deviate*100, maxDeviation*100));
};
validate(0, 1);
validate(1, 2);
Expand Down