From e55868686f64202b465096bfd5471cc0386758ef Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 24 Feb 2026 22:20:34 -0600 Subject: [PATCH] Port Pekko ShardStopped handler + handoff safety net (#7500) Shards can fail to HandOff indefinitely during scale-up when the RebalanceWorker times out before receiving ShardStopped. The coordinator never deallocates the shard, causing an endless GetShardHome/ShardHome loop. - Add ShardStopped handler to ShardCoordinator.Active() (Pekko port): cleans up unAckedHostShards and performs late deallocation when no rebalance is in progress for the shard - ShardRegion sends backup ShardStopped to coordinator on handoff completion, ensuring the coordinator learns about it even when the RebalanceWorker has already timed out --- .../Akka.Cluster.Sharding/ShardCoordinator.cs | 25 +++++++++++++++++++ .../Akka.Cluster.Sharding/ShardRegion.cs | 6 +++++ 2 files changed, 31 insertions(+) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs index 1b39526b6cf..54d7cd49a4f 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs @@ -1790,6 +1790,31 @@ internal bool Active(object message) } return true; + case ShardStopped m: + // Ported from Pekko: clean up unAckedHostShards when shard reports stopped + if (_unAckedHostShards.TryGetValue(m.Shard, out var stopCancel)) + { + stopCancel.Cancel(); + _unAckedHostShards = _unAckedHostShards.Remove(m.Shard); + } + + // Safety net: if no rebalance is in progress for this shard (RebalanceWorker + // already timed out), deallocate the shard so it can be reallocated elsewhere. + // This prevents the shard from being endlessly recreated via GetShardHome/ShardHome. + if (!_rebalanceInProgress.ContainsKey(m.Shard) && State.Shards.ContainsKey(m.Shard)) + { + Log.Info("{0}: Shard [{1}] stopped - performing late deallocation (rebalance worker timed out).", + TypeName, m.Shard); + Update(new ShardHomeDeallocated(m.Shard), evt => + { + State = State.Updated(evt); + Log.Debug("{0}: Shard [{1}] deallocated (late)", TypeName, m.Shard); + AllocateShardHomesForRememberEntities(); + _context.Self.Tell(new GetShardHome(m.Shard), _ignoreRef); + }); + } + return true; + case ResendShardHost m: { if (State.Shards.TryGetValue(m.Shard, out var region) && region.Equals(region)) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs index 7cc92c2cf14..f18a3891f76 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs @@ -1410,6 +1410,12 @@ private void HandleTerminated(Terminated terminated) { _handingOff = _handingOff.Remove(terminated.ActorRef); _log.Debug("{0}: Shard [{1}] handoff complete", _typeName, shard); + + // Send backup ShardStopped to coordinator in case the RebalanceWorker + // has already timed out and missed the ShardStopped from HandOffStopper. + // The coordinator's Active handler will only deallocate if no rebalance + // is currently in progress for this shard. + _coordinator?.Tell(new ShardCoordinator.ShardStopped(shard)); } else {