Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public async Task ReliableDelivery_with_Sharding_must_illustrate_Sharding_usage_
var consumerEndProbe = CreateTestProbe();
var region = await ClusterSharding.Get(Sys).StartAsync($"TestConsumer-{_idCount}", _ =>
ShardingConsumerController.Create<Job>(c =>
PropsFor(DefaultConsumerDelay, 42, consumerEndProbe.Ref, c),
PropsFor(DefaultConsumerDelay, 42, consumerEndProbe.Ref, c, expectedProducerCount: 2),
ShardingConsumerController.Settings.Create(Sys)), ClusterShardingSettings.Create(Sys),
HashCodeMessageExtractor.Create(10,
o => string.Empty, o => o));
Expand All @@ -108,7 +108,7 @@ public async Task ReliableDelivery_with_Sharding_must_illustrate_Sharding_usage_
$"p2-{_idCount}");

// expecting 3 end messages, one for each entity: "entity-0", "entity-1", "entity-2"
var endMessages = await consumerEndProbe.ReceiveNAsync(3, TimeSpan.FromSeconds(5)).ToListAsync();
var endMessages = await consumerEndProbe.ReceiveNAsync(3, TimeSpan.FromSeconds(10)).ToListAsync();

var producerIds = endMessages.Cast<Collected>().SelectMany(c => c.ProducerIds).ToList();
producerIds
Expand Down
28 changes: 19 additions & 9 deletions src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1856,21 +1856,31 @@ private void HandleSupervisorStop(SupervisorStopDirectivePassivation msg)
// We only have to do this if we have R-E enabled
if (!_rememberEntities)
return;

var id = _entities.EntityId(msg.Child);
// Just return if the child actor is not a recorded shard entity
if (id is null)
if (id is null)
return;

// Remove the child actor from the entity list
_entities.RemoveEntity(id);


// Only transition to Passivating if entity is Active.
// If already Passivating (entity requested passivation then threw), the normal flow handles it.
//
// Using Passivating state (instead of RemoveEntity which sets NoState):
// 1. Buffers messages arriving during termination (fixes race condition crash)
// 2. Persists removal from store when EntityTerminated arrives
// 3. Restarts entity if messages arrive (transient failure, try again)
// 4. Stays forgotten if no messages arrive
if (_entities.EntityState(id) is Active)
{
_entities.EntityPassivating(id);
}

// Force stop the child actor, it might have been restarted
Context.Stop(msg.Child);

Log.Error(
msg.LastCause,
"{0}: Remembered entity {1} was stopped: {2}",
msg.LastCause,
"{0}: Remembered entity {1} was stopped: {2}",
_typeName, id, msg.Reason);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,13 @@ public async Task A_Cluster_with_DistributedPubSub_must_handle_restart_of_nodes_
await WithinAsync(30.Seconds(), async () =>
{
Mediator.Tell(new Subscribe("topic1", TestActor));
ExpectMsg<SubscribeAck>();
await ExpectMsgAsync<SubscribeAck>();
await CountAsync(3);

RunOn(() =>
await RunOnAsync(() =>
{
Mediator.Tell(new Publish("topic1", "msg1"));
return Task.CompletedTask;
}, _config.First);
await EnterBarrierAsync("pub-msg1");

Expand All @@ -110,8 +111,10 @@ await RunOnAsync(async () =>

await EnterBarrierAsync("end");

Mediator.Tell(DeltaCount.Instance);
var deltaCount = await ExpectMsgAsync<long>();
// Use a probe to isolate DeltaCount query from any stray messages in TestActor mailbox
var probe = CreateTestProbe();
Mediator.Tell(DeltaCount.Instance, probe.Ref);
var deltaCount = await probe.ExpectMsgAsync<long>(5.Seconds());
deltaCount.Should().Be(oldDeltaCount);
}, _config.Second);

Expand All @@ -123,21 +126,26 @@ await RunOnAsync(async () =>
var thirdAddress = (await NodeAsync(_config.Third)).Address;
await TestConductor.Shutdown(_config.Third).WaitAsync(30.Seconds());

await WithinAsync(20.Seconds(), async () =>
// Use a probe for Identify to avoid polluting TestActor mailbox with stray responses
var identifyProbe = CreateTestProbe();
await WithinAsync(25.Seconds(), async () =>
{
await AwaitAssertAsync(async () =>
{
Sys.ActorSelection(new RootActorPath(thirdAddress) / "user" / "shutdown").Tell(new Identify(null));
(await ExpectMsgAsync<ActorIdentity>(1.Seconds())).Subject.Should().NotBeNull();
Sys.ActorSelection(new RootActorPath(thirdAddress) / "user" / "shutdown").Tell(new Identify(null), identifyProbe.Ref);
(await identifyProbe.ExpectMsgAsync<ActorIdentity>(2.Seconds())).Subject.Should().NotBeNull();
});
});

Sys.ActorSelection(new RootActorPath(thirdAddress) / "user" / "shutdown").Tell("shutdown");

await EnterBarrierAsync("end");

Mediator.Tell(DeltaCount.Instance);
var deltaCount = await ExpectMsgAsync<long>();
// Use a probe to isolate DeltaCount query from stray ActorIdentity messages
// that may still be arriving from AwaitAssertAsync Identify retries
var deltaProbe = CreateTestProbe();
Mediator.Tell(DeltaCount.Instance, deltaProbe.Ref);
var deltaCount = await deltaProbe.ExpectMsgAsync<long>(5.Seconds());
deltaCount.Should().Be(oldDeltaCount);
}, _config.First);

Expand All @@ -157,6 +165,12 @@ await RunOnAsync(async () =>
await Cluster.Get(newSystem).JoinAsync(Cluster.Get(newSystem).SelfAddress);
var newMediator = DistributedPubSub.Get(newSystem).Mediator;
var probe = CreateTestProbe(newSystem);

// Create shutdown actor FIRST so First node can find it while we verify gossip isolation
// This fixes the race condition where First node times out waiting for this actor
newSystem.Log.Info("Creating shutdown actor on {0}", node3Address);
newSystem.ActorOf<DistributedPubSubRestartSpecConfig.Shutdown>("shutdown");

newMediator.Tell(new Subscribe("topic2", probe.Ref), probe.Ref);
await probe.ExpectMsgAsync<SubscribeAck>();

Expand All @@ -165,8 +179,6 @@ await RunOnAsync(async () =>
newMediator.Tell(DeltaCount.Instance, probe.Ref);
await probe.ExpectMsgAsync(0L);

newSystem.Log.Info("Shutdown actor started on {0}",node3Address);
newSystem.ActorOf<DistributedPubSubRestartSpecConfig.Shutdown>("shutdown");
await newSystem.WhenTerminated.WaitAsync(30.Seconds());
}
finally
Expand Down Expand Up @@ -194,10 +206,11 @@ private IActorRef Mediator

private async Task JoinAsync(RoleName from, RoleName to)
{
RunOn(() =>
await RunOnAsync(() =>
{
Cluster.Get(Sys).Join(Node(to).Address);
CreateMediator();
return Task.CompletedTask;
}, from);
await EnterBarrierAsync(from.Name + "-joined");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ namespace Akka.TestKit
public object LastMessage { get; }
public Akka.Actor.IActorRef LastSender { get; }
public Akka.Event.ILoggingAdapter Log { get; }
public System.TimeSpan NoMessageRemainingOrDefault { get; }
public static System.TimeSpan Now { get; }
public System.TimeSpan Remaining { get; }
public System.TimeSpan RemainingOrDefault { get; }
Expand Down Expand Up @@ -397,17 +398,17 @@ namespace Akka.TestKit
public System.Collections.Generic.IReadOnlyCollection<T> ExpectMsgAllOf<T>(System.Collections.Generic.IReadOnlyCollection<T> messages, System.Threading.CancellationToken cancellationToken = null) { }
public System.Collections.Generic.IReadOnlyCollection<T> ExpectMsgAllOf<T>(System.TimeSpan max, params T[] messages) { }
public System.Collections.Generic.IReadOnlyCollection<T> ExpectMsgAllOf<T>(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection<T> messages, System.Threading.CancellationToken cancellationToken = null) { }
[System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.<ExpectMsgAllOfAsync>d__153<T>))]
[System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.<ExpectMsgAllOfAsync>d__158<T>))]
public System.Collections.Generic.IAsyncEnumerable<T> ExpectMsgAllOfAsync<T>(System.Collections.Generic.IReadOnlyCollection<T> messages, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { }
[System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.<ExpectMsgAllOfAsync>d__156<T>))]
[System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.<ExpectMsgAllOfAsync>d__161<T>))]
public System.Collections.Generic.IAsyncEnumerable<T> ExpectMsgAllOfAsync<T>(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection<T> messages, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { }
public System.Collections.Generic.IReadOnlyCollection<object> ExpectMsgAllOfMatchingPredicates(params Akka.TestKit.PredicateInfo[] predicates) { }
public System.Collections.Generic.IReadOnlyCollection<object> ExpectMsgAllOfMatchingPredicates(System.Collections.Generic.IReadOnlyCollection<Akka.TestKit.PredicateInfo> predicates, System.Threading.CancellationToken cancellationToken) { }
public System.Collections.Generic.IReadOnlyCollection<object> ExpectMsgAllOfMatchingPredicates(System.TimeSpan max, params Akka.TestKit.PredicateInfo[] predicates) { }
public System.Collections.Generic.IReadOnlyCollection<object> ExpectMsgAllOfMatchingPredicates(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection<Akka.TestKit.PredicateInfo> predicates, System.Threading.CancellationToken cancellationToken) { }
[System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.<ExpectMsgAllOfMatchingPredicatesAsync>d__163))]
[System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.<ExpectMsgAllOfMatchingPredicatesAsync>d__168))]
public System.Collections.Generic.IAsyncEnumerable<object> ExpectMsgAllOfMatchingPredicatesAsync(System.Collections.Generic.IReadOnlyCollection<Akka.TestKit.PredicateInfo> predicates, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { }
[System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.<ExpectMsgAllOfMatchingPredicatesAsync>d__166))]
[System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.<ExpectMsgAllOfMatchingPredicatesAsync>d__171))]
public System.Collections.Generic.IAsyncEnumerable<object> ExpectMsgAllOfMatchingPredicatesAsync(System.TimeSpan max, System.Collections.Generic.IReadOnlyCollection<Akka.TestKit.PredicateInfo> predicates, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { }
public T ExpectMsgAnyOf<T>(System.Collections.Generic.IEnumerable<T> messages, System.Threading.CancellationToken cancellationToken = null) { }
public System.Threading.Tasks.ValueTask<T> ExpectMsgAnyOfAsync<T>(System.Collections.Generic.IEnumerable<T> messages, System.Threading.CancellationToken cancellationToken = null) { }
Expand Down Expand Up @@ -462,23 +463,23 @@ namespace Akka.TestKit
public System.Threading.Tasks.ValueTask<object> PeekOneAsync(System.Threading.CancellationToken cancellationToken) { }
public System.Collections.Generic.IReadOnlyCollection<object> ReceiveN(int numberOfMessages, System.Threading.CancellationToken cancellationToken = null) { }
public System.Collections.Generic.IReadOnlyCollection<object> ReceiveN(int numberOfMessages, System.TimeSpan max, System.Threading.CancellationToken cancellationToken = null) { }
[System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.<ReceiveNAsync>d__212))]
[System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.<ReceiveNAsync>d__217))]
public System.Collections.Generic.IAsyncEnumerable<object> ReceiveNAsync(int numberOfMessages, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { }
[System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.<ReceiveNAsync>d__214))]
[System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.<ReceiveNAsync>d__219))]
public System.Collections.Generic.IAsyncEnumerable<object> ReceiveNAsync(int numberOfMessages, System.TimeSpan max, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { }
public object ReceiveOne(System.Nullable<System.TimeSpan> max = null, System.Threading.CancellationToken cancellationToken = null) { }
public System.Threading.Tasks.ValueTask<object> ReceiveOneAsync(System.Nullable<System.TimeSpan> max = null, System.Threading.CancellationToken cancellationToken = null) { }
public System.Collections.Generic.IReadOnlyList<T> ReceiveWhile<T>(System.Nullable<System.TimeSpan> max, System.Func<object, T> filter, int msgs = 2147483647, System.Threading.CancellationToken cancellationToken = null) { }
public System.Collections.Generic.IReadOnlyList<T> ReceiveWhile<T>(System.Nullable<System.TimeSpan> max, System.Nullable<System.TimeSpan> idle, System.Func<object, T> filter, int msgs = 2147483647, System.Threading.CancellationToken cancellationToken = null) { }
public System.Collections.Generic.IReadOnlyList<T> ReceiveWhile<T>(System.Func<object, T> filter, System.Nullable<System.TimeSpan> max = null, System.Nullable<System.TimeSpan> idle = null, int msgs = 2147483647, System.Threading.CancellationToken cancellationToken = null) { }
public System.Collections.Generic.IReadOnlyList<T> ReceiveWhile<T>(System.Predicate<T> shouldContinue, System.Nullable<System.TimeSpan> max = null, System.Nullable<System.TimeSpan> idle = null, int msgs = 2147483647, bool shouldIgnoreOtherMessageTypes = True, System.Threading.CancellationToken cancellationToken = null) { }
[System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.<ReceiveWhileAsync>d__203<T>))]
[System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.<ReceiveWhileAsync>d__208<T>))]
public System.Collections.Generic.IAsyncEnumerable<T> ReceiveWhileAsync<T>(System.Nullable<System.TimeSpan> max, System.Func<object, T> filter, int msgs = 2147483647, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { }
[System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.<ReceiveWhileAsync>d__205<T>))]
[System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.<ReceiveWhileAsync>d__210<T>))]
public System.Collections.Generic.IAsyncEnumerable<T> ReceiveWhileAsync<T>(System.Nullable<System.TimeSpan> max, System.Nullable<System.TimeSpan> idle, System.Func<object, T> filter, int msgs = 2147483647, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { }
[System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.<ReceiveWhileAsync>d__207<T>))]
[System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.<ReceiveWhileAsync>d__212<T>))]
public System.Collections.Generic.IAsyncEnumerable<T> ReceiveWhileAsync<T>(System.Func<object, T> filter, System.Nullable<System.TimeSpan> max = null, System.Nullable<System.TimeSpan> idle = null, int msgs = 2147483647, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { }
[System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.<ReceiveWhileAsync>d__209<T>))]
[System.Runtime.CompilerServices.AsyncIteratorStateMachineAttribute(typeof(Akka.TestKit.TestKitBase.<ReceiveWhileAsync>d__214<T>))]
public System.Collections.Generic.IAsyncEnumerable<T> ReceiveWhileAsync<T>(System.Predicate<T> shouldContinue, System.Nullable<System.TimeSpan> max = null, System.Nullable<System.TimeSpan> idle = null, int msgs = 2147483647, bool shouldIgnoreOtherMessageTypes = True, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute()] System.Threading.CancellationToken cancellationToken = null) { }
protected System.TimeSpan RemainingOr(System.TimeSpan duration) { }
public System.TimeSpan RemainingOrDilated(System.Nullable<System.TimeSpan> duration) { }
Expand Down Expand Up @@ -519,6 +520,7 @@ namespace Akka.TestKit
{
public TestKitSettings(Akka.Configuration.Config config) { }
public System.TimeSpan DefaultTimeout { get; }
public System.TimeSpan ExpectNoMessageDefault { get; }
public bool LogTestKitCalls { get; }
public System.TimeSpan SingleExpectDefault { get; }
public System.TimeSpan TestEventFilterLeeway { get; }
Expand Down
Loading
Loading