diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs index 01f7b95fa15..f87fc465172 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs @@ -1790,6 +1790,31 @@ internal bool Active(object message) } return true; + case ShardStopped m: + // Ported from Pekko: clean up unAckedHostShards when shard reports stopped + if (_unAckedHostShards.TryGetValue(m.Shard, out var stopCancel)) + { + stopCancel.Cancel(); + _unAckedHostShards = _unAckedHostShards.Remove(m.Shard); + } + + // Safety net: if no rebalance is in progress for this shard (RebalanceWorker + // already timed out), deallocate the shard so it can be reallocated elsewhere. + // This prevents the shard from being endlessly recreated via GetShardHome/ShardHome. + if (!_rebalanceInProgress.ContainsKey(m.Shard) && State.Shards.ContainsKey(m.Shard)) + { + Log.Info("{0}: Shard [{1}] stopped - performing late deallocation (rebalance worker timed out).", + TypeName, m.Shard); + Update(new ShardHomeDeallocated(m.Shard), evt => + { + State = State.Updated(evt); + Log.Debug("{0}: Shard [{1}] deallocated (late)", TypeName, m.Shard); + AllocateShardHomesForRememberEntities(); + _context.Self.Tell(new GetShardHome(m.Shard), _ignoreRef); + }); + } + return true; + case ResendShardHost m: { if (State.Shards.TryGetValue(m.Shard, out var region) && region.Equals(m.Region)) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs index 7cc92c2cf14..f18a3891f76 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs @@ -1410,6 +1410,12 @@ private void HandleTerminated(Terminated terminated) { _handingOff = _handingOff.Remove(terminated.ActorRef); _log.Debug("{0}: Shard [{1}] handoff complete", _typeName, shard); + + // Send backup ShardStopped to coordinator in case the RebalanceWorker + // has already timed out and missed the ShardStopped from HandOffStopper. + // The coordinator's Active handler will only deallocate if no rebalance + // is currently in progress for this shard. + _coordinator?.Tell(new ShardCoordinator.ShardStopped(shard)); } else {