diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/PublishSubscribe/DistributedPubSubRestartSpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/PublishSubscribe/DistributedPubSubRestartSpec.cs index 985ef0ad9d4..231ba36ccd8 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/PublishSubscribe/DistributedPubSubRestartSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/PublishSubscribe/DistributedPubSubRestartSpec.cs @@ -91,12 +91,13 @@ public async Task A_Cluster_with_DistributedPubSub_must_handle_restart_of_nodes_ await WithinAsync(30.Seconds(), async () => { Mediator.Tell(new Subscribe("topic1", TestActor)); - ExpectMsg(); + await ExpectMsgAsync(); await CountAsync(3); - RunOn(() => + await RunOnAsync(() => { Mediator.Tell(new Publish("topic1", "msg1")); + return Task.CompletedTask; }, _config.First); await EnterBarrierAsync("pub-msg1"); @@ -112,8 +113,10 @@ await RunOnAsync(async () => { await EnterBarrierAsync("end"); - Mediator.Tell(DeltaCount.Instance); - var deltaCount = await ExpectMsgAsync(); + // Use a probe to isolate DeltaCount query from any stray messages in TestActor mailbox + var probe = CreateTestProbe(); + Mediator.Tell(DeltaCount.Instance, probe.Ref); + var deltaCount = await probe.ExpectMsgAsync(5.Seconds()); deltaCount.Should().Be(oldDeltaCount); }, _config.Second); @@ -122,12 +125,14 @@ await RunOnAsync(async () => var thirdAddress = (await NodeAsync(_config.Third)).Address; await TestConductor.Shutdown(_config.Third).WaitAsync(30.Seconds()); - await WithinAsync(20.Seconds(), async () => + // Use a probe for Identify to avoid polluting TestActor mailbox with stray responses + var identifyProbe = CreateTestProbe(); + await WithinAsync(25.Seconds(), async () => { await AwaitAssertAsync(async () => { - Sys.ActorSelection(new RootActorPath(thirdAddress) / "user" / "shutdown").Tell(new Identify(null)); - (await ExpectMsgAsync(1.Seconds())).Subject.Should().NotBeNull(); + Sys.ActorSelection(new RootActorPath(thirdAddress) / "user" / "shutdown").Tell(new Identify(null), identifyProbe.Ref); + (await identifyProbe.ExpectMsgAsync(2.Seconds())).Subject.Should().NotBeNull(); }); }); @@ -135,8 +140,11 @@ await AwaitAssertAsync(async () => await EnterBarrierAsync("end"); - Mediator.Tell(DeltaCount.Instance); - var deltaCount = await ExpectMsgAsync(); + // Use a probe to isolate DeltaCount query from stray ActorIdentity messages + // that may still be arriving from AwaitAssertAsync Identify retries + var deltaProbe = CreateTestProbe(); + Mediator.Tell(DeltaCount.Instance, deltaProbe.Ref); + var deltaCount = await deltaProbe.ExpectMsgAsync(5.Seconds()); deltaCount.Should().Be(oldDeltaCount); }, _config.First); @@ -156,6 +164,12 @@ await RunOnAsync(async () => await Cluster.Get(newSystem).JoinAsync(Cluster.Get(newSystem).SelfAddress); var newMediator = DistributedPubSub.Get(newSystem).Mediator; var probe = CreateTestProbe(newSystem); + + // Create shutdown actor FIRST so First node can find it while we verify gossip isolation + // This fixes the race condition where First node times out waiting for this actor + newSystem.Log.Info("Creating shutdown actor on {0}", node3Address); + newSystem.ActorOf("shutdown"); + newMediator.Tell(new Subscribe("topic2", probe.Ref), probe.Ref); await probe.ExpectMsgAsync(); @@ -164,8 +178,6 @@ await RunOnAsync(async () => newMediator.Tell(DeltaCount.Instance, probe.Ref); await probe.ExpectMsgAsync(0L); - newSystem.Log.Info("Shutdown actor started on {0}",node3Address); - newSystem.ActorOf("shutdown"); await newSystem.WhenTerminated.WaitAsync(30.Seconds()); } finally @@ -193,10 +205,11 @@ private IActorRef Mediator private async Task JoinAsync(RoleName from, RoleName to) { - RunOn(() => + await RunOnAsync(() => { Cluster.Get(Sys).Join(Node(to).Address); CreateMediator(); + return Task.CompletedTask; }, from); await EnterBarrierAsync(from.Name + "-joined"); }