From ff3eaa8d38b900b61b454f5a2016d136af5f6cc8 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 20 Jun 2025 23:11:24 +0700 Subject: [PATCH 1/3] Fix unclean ShardingConsumerControllerImpl shutdown --- .../ShardingConsumerControllerImpl.cs | 87 ++++++++++++++++++- 1 file changed, 85 insertions(+), 2 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs index fc4ad86254c..ff4d871f09b 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs @@ -20,8 +20,16 @@ namespace Akka.Cluster.Sharding.Delivery.Internal; /// INTERNAL API /// /// The types of messages handled by the ConsumerController -internal class ShardingConsumerController : ReceiveActor, IWithStash +internal class ShardingConsumerController : ReceiveActor, IWithStash, IWithTimers { + private const string ShutdownTimeoutTimerKey = nameof(ShutdownTimeoutTimerKey); + + private sealed class ShutdownTimeout + { + public static readonly ShutdownTimeout Instance = new (); + private ShutdownTimeout() { } + } + public ShardingConsumerController(Func consumerProps, ShardingConsumerController.Settings settings) { @@ -115,7 +123,17 @@ private void Active() Receive(t => t.ActorRef.Equals(_consumer), _ => { _log.Debug("Consumer terminated."); - Context.Stop(Self); + + // Short-circuit shutdown process, just shut down immediately if there's nothing to clean. + if (ProducerControllers.Count == 0 && ConsumerControllers.Count == 0) + { + _log.Debug("ShardingConsumerController terminated."); + Context.Stop(Self); + } + else + { + Become(ShuttingDown()); + } }); Receive(t => @@ -166,6 +184,70 @@ private void Active() }); } + // Shutdown state after `_consumer` actor is downed. + private Action ShuttingDown() + { + // start a 3-seconds shutdown timeout timer + Timers.StartSingleTimer(ShutdownTimeoutTimerKey, ShutdownTimeout.Instance, TimeSpan.FromSeconds(3), Self); + + _log.Debug("Shutting down child controllers"); + + // It's fine to shut down producers immediately since the `_consumer` user entity actor is already dead, + // no delivery is needed + foreach (var p in ProducerControllers.Keys) + Context.Stop(p); + + foreach (var c in ConsumerControllers.Values.Distinct()) + Context.Stop(c); + + return () => + { + Receive>(seqMsg => + { + var messageType = seqMsg.Message.Chunk.HasValue + ? $"Manifest: {seqMsg.Message.Chunk.Value.Manifest}, SerializerId: {seqMsg.Message.Chunk.Value.SerializerId}" + : seqMsg.Message.Message?.GetType().FullName ?? "Unknown type"; + _log.Warning("Message [{0}] from [{1}] is being ignored because ShardingConsumerController is shutting down.", messageType, seqMsg.ProducerId); + }); + + Receive(_ => + { + // We somehow could not terminate cleanly within 3 seconds, shutdown immediately + Context.Stop(Self); + }); + + Receive(t => + { + if (ProducerControllers.TryGetValue(t.ActorRef, out var producer)) + { + _log.Debug("ProducerController for producerId [{0}] terminated.", producer); + ProducerControllers = ProducerControllers.Remove(t.ActorRef); + } + else + { + var removeList = ConsumerControllers + .Where(kv => kv.Value.Equals(t.ActorRef)) + .Select(kv => kv.Key) + .ToArray(); + + if(removeList.Length > 0) + { + foreach (var key in removeList) + _log.Debug("ConsumerController for producerId [{0}] terminated.", key); + + ConsumerControllers = ConsumerControllers.RemoveRange(removeList); + } + } + + if (ProducerControllers.Count > 0 || ConsumerControllers.Count > 0) + return; + + _log.Debug("ShardingConsumerController terminated."); + Context.Stop(Self); + }); + }; + } + private ImmutableDictionary UpdatedProducerControllers(IActorRef producerController, string producer) { @@ -183,4 +265,5 @@ protected override void PreStart() } public IStash Stash { get; set; } = null!; + public ITimerScheduler Timers { get; set; } = null!; } From 4dc2030663d0a252a8b59514ba680a769c1bc095 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 20 Jun 2025 23:30:22 +0700 Subject: [PATCH 2/3] Ignore producers, they're not the consumer controller responsibility --- .../ShardingConsumerControllerImpl.cs | 33 +++++++------------ 1 file changed, 12 insertions(+), 21 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs index ff4d871f09b..b7f862bb879 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs @@ -191,11 +191,10 @@ private Action ShuttingDown() Timers.StartSingleTimer(ShutdownTimeoutTimerKey, ShutdownTimeout.Instance, TimeSpan.FromSeconds(3), Self); _log.Debug("Shutting down child controllers"); - - // It's fine to shut down producers immediately since the `_consumer` user entity actor is already dead, - // no delivery is needed + foreach (var p in ProducerControllers.Keys) - Context.Stop(p); + Context.Unwatch(p); + ProducerControllers = ImmutableDictionary.Empty; foreach (var c in ConsumerControllers.Values.Distinct()) Context.Stop(c); @@ -218,25 +217,17 @@ private Action ShuttingDown() Receive(t => { - if (ProducerControllers.TryGetValue(t.ActorRef, out var producer)) - { - _log.Debug("ProducerController for producerId [{0}] terminated.", producer); - ProducerControllers = ProducerControllers.Remove(t.ActorRef); - } - else - { - var removeList = ConsumerControllers - .Where(kv => kv.Value.Equals(t.ActorRef)) - .Select(kv => kv.Key) - .ToArray(); + var removeList = ConsumerControllers + .Where(kv => kv.Value.Equals(t.ActorRef)) + .Select(kv => kv.Key) + .ToArray(); - if(removeList.Length > 0) - { - foreach (var key in removeList) - _log.Debug("ConsumerController for producerId [{0}] terminated.", key); + if(removeList.Length > 0) + { + foreach (var key in removeList) + _log.Debug("ConsumerController for producerId [{0}] terminated.", key); - ConsumerControllers = ConsumerControllers.RemoveRange(removeList); - } + ConsumerControllers = ConsumerControllers.RemoveRange(removeList); } if (ProducerControllers.Count > 0 || ConsumerControllers.Count > 0) From e318742fe169b50716d7abbd25d0d5420f263028 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 20 Jun 2025 23:32:59 +0700 Subject: [PATCH 3/3] Add timeout warning message --- .../Delivery/Internal/ShardingConsumerControllerImpl.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs index b7f862bb879..57ab5ca612d 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs @@ -212,6 +212,7 @@ private Action ShuttingDown() Receive(_ => { // We somehow could not terminate cleanly within 3 seconds, shutdown immediately + _log.Warning("ShardingConsumerController cleanup timed out, force terminating."); Context.Stop(Self); });