Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,13 @@ public async Task A_Cluster_with_DistributedPubSub_must_handle_restart_of_nodes_
await WithinAsync(30.Seconds(), async () =>
{
Mediator.Tell(new Subscribe("topic1", TestActor));
ExpectMsg<SubscribeAck>();
await ExpectMsgAsync<SubscribeAck>();
await CountAsync(3);

RunOn(() =>
await RunOnAsync(() =>
{
Mediator.Tell(new Publish("topic1", "msg1"));
return Task.CompletedTask;
}, _config.First);
await EnterBarrierAsync("pub-msg1");

Expand All @@ -112,8 +113,10 @@ await RunOnAsync(async () =>
{
await EnterBarrierAsync("end");

Mediator.Tell(DeltaCount.Instance);
var deltaCount = await ExpectMsgAsync<long>();
// Use a probe to isolate DeltaCount query from any stray messages in TestActor mailbox
var probe = CreateTestProbe();
Mediator.Tell(DeltaCount.Instance, probe.Ref);
var deltaCount = await probe.ExpectMsgAsync<long>(5.Seconds());
deltaCount.Should().Be(oldDeltaCount);
}, _config.Second);

Expand All @@ -122,21 +125,26 @@ await RunOnAsync(async () =>
var thirdAddress = (await NodeAsync(_config.Third)).Address;
await TestConductor.Shutdown(_config.Third).WaitAsync(30.Seconds());

await WithinAsync(20.Seconds(), async () =>
// Use a probe for Identify to avoid polluting TestActor mailbox with stray responses
var identifyProbe = CreateTestProbe();
await WithinAsync(25.Seconds(), async () =>
{
await AwaitAssertAsync(async () =>
{
Sys.ActorSelection(new RootActorPath(thirdAddress) / "user" / "shutdown").Tell(new Identify(null));
(await ExpectMsgAsync<ActorIdentity>(1.Seconds())).Subject.Should().NotBeNull();
Sys.ActorSelection(new RootActorPath(thirdAddress) / "user" / "shutdown").Tell(new Identify(null), identifyProbe.Ref);
(await identifyProbe.ExpectMsgAsync<ActorIdentity>(2.Seconds())).Subject.Should().NotBeNull();
});
});

Sys.ActorSelection(new RootActorPath(thirdAddress) / "user" / "shutdown").Tell("shutdown");

await EnterBarrierAsync("end");

Mediator.Tell(DeltaCount.Instance);
var deltaCount = await ExpectMsgAsync<long>();
// Use a probe to isolate DeltaCount query from stray ActorIdentity messages
// that may still be arriving from AwaitAssertAsync Identify retries
var deltaProbe = CreateTestProbe();
Mediator.Tell(DeltaCount.Instance, deltaProbe.Ref);
var deltaCount = await deltaProbe.ExpectMsgAsync<long>(5.Seconds());
deltaCount.Should().Be(oldDeltaCount);
}, _config.First);

Expand All @@ -156,6 +164,12 @@ await RunOnAsync(async () =>
await Cluster.Get(newSystem).JoinAsync(Cluster.Get(newSystem).SelfAddress);
var newMediator = DistributedPubSub.Get(newSystem).Mediator;
var probe = CreateTestProbe(newSystem);

// Create shutdown actor FIRST so First node can find it while we verify gossip isolation
// This fixes the race condition where First node times out waiting for this actor
newSystem.Log.Info("Creating shutdown actor on {0}", node3Address);
newSystem.ActorOf<DistributedPubSubRestartSpecConfig.Shutdown>("shutdown");

newMediator.Tell(new Subscribe("topic2", probe.Ref), probe.Ref);
await probe.ExpectMsgAsync<SubscribeAck>();

Expand All @@ -164,8 +178,6 @@ await RunOnAsync(async () =>
newMediator.Tell(DeltaCount.Instance, probe.Ref);
await probe.ExpectMsgAsync(0L);

newSystem.Log.Info("Shutdown actor started on {0}",node3Address);
newSystem.ActorOf<DistributedPubSubRestartSpecConfig.Shutdown>("shutdown");
await newSystem.WhenTerminated.WaitAsync(30.Seconds());
}
finally
Expand Down Expand Up @@ -193,10 +205,11 @@ private IActorRef Mediator

private async Task JoinAsync(RoleName from, RoleName to)
{
RunOn(() =>
await RunOnAsync(() =>
{
Cluster.Get(Sys).Join(Node(to).Address);
CreateMediator();
return Task.CompletedTask;
}, from);
await EnterBarrierAsync(from.Name + "-joined");
}
Expand Down