From 1c0503598c4502307ddd41ee9a0b9faae83c411d Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 15 May 2025 04:14:59 +0700 Subject: [PATCH 1/3] Optimize DistributedPubSub memory allocation --- .../DistributedPubSubMediator.cs | 41 ++++-- .../PublishSubscribe/Internal/Topics.cs | 120 ++++++++++++++---- 2 files changed, 123 insertions(+), 38 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs index 7333424fd8d..afc9d10a4db 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs @@ -124,6 +124,9 @@ public static Props Props(DistributedPubSubSettings settings) private readonly ILoggingAdapter _log; private readonly Dictionary _registry = new(); + private readonly IActorRef _self; + private readonly Lazy _topicPrefix; + public ITimerScheduler Timers { get; set; } /// @@ -161,6 +164,9 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) _pruneInterval = new TimeSpan(_settings.RemovedTimeToLive.Ticks / 2); _buffer = new PerGroupingBuffer(); + _self = Self; + _topicPrefix = new Lazy(() => _self.Path.ToStringWithoutAddress()); + Receive(send => { var routees = new List(); @@ -193,15 +199,17 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) }); Receive(sendToAll => { + // TODO: Investigate this, this code looks very sketchy PublishMessage(sendToAll.Path, sendToAll, sendToAll.ExcludeSelf); }); Receive(publish => { - var path = Internal.Utils.MakeKey(Self.Path / Internal.Utils.EncodeName(publish.Topic)); + var encodedTopic = Internal.Utils.EncodeName(publish.Topic); + var key = Internal.Utils.MakeKey(Self.Path, encodedTopic); if (publish.SendOneMessageToEachGroup) - PublishToEachGroup(path, publish); + PublishToEachGroup(key, publish); else - PublishMessage(path, publish); + PublishMessage(key, publish); }); Receive(put => { @@ -228,7 +236,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) // each topic is managed by a child actor with the same name as the topic var encodedTopic = Internal.Utils.EncodeName(subscribe.Topic); - _buffer.BufferOr(Internal.Utils.MakeKey(Self.Path / encodedTopic), subscribe, Sender, () => + _buffer.BufferOr(Internal.Utils.MakeKey(Self.Path, encodedTopic), subscribe, Sender, () => { var child = Context.Child(encodedTopic); if (!child.IsNobody()) @@ -264,7 +272,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) { var encodedTopic = Internal.Utils.EncodeName(unsubscribe.Topic); - _buffer.BufferOr(Internal.Utils.MakeKey(Self.Path / encodedTopic), unsubscribe, Sender, () => + _buffer.BufferOr(Internal.Utils.MakeKey(Self.Path, encodedTopic), unsubscribe, Sender, () => { var child = Context.Child(encodedTopic); if (!child.IsNobody()) @@ -272,6 +280,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) else { // no such topic here + Internal.Utils.TryRemoveTopic(unsubscribe.Topic); } }); }); @@ -324,8 +333,13 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) var key = Internal.Utils.MakeKey(terminated.ActorRef); if (_registry.TryGetValue(_cluster.SelfAddress, out var bucket)) + { if (bucket.Content.TryGetValue(key, out var holder) && terminated.ActorRef.Equals(holder.Ref)) + { PutToRegistry(key, null); // remove + Internal.Utils.TryRemoveKey(key, _topicPrefix.Value); + } + } _buffer.RecreateAndForwardMessagesIfNeeded(key, () => NewTopicActor(terminated.ActorRef.Path.Name)); }); @@ -365,7 +379,10 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) { var member = removed.Member; if (member.Address == _cluster.SelfAddress) + { Context.Stop(Self); + Internal.Utils.Clear(); + } else if (IsMatchingRole(member, _role)) { _nodes.Remove(member.Address); @@ -385,7 +402,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) Receive(msg => { var encTopic = Internal.Utils.EncodeName(msg.Topic); - _buffer.BufferOr(Internal.Utils.MakeKey(Self.Path / encTopic), msg, Sender, () => + _buffer.BufferOr(Internal.Utils.MakeKey(Self.Path, encTopic), msg, Sender, () => { var child = Context.Child(encTopic); if (!child.IsNobody()) @@ -454,19 +471,15 @@ private IEnumerable CollectDelta(IImmutableDictionary ver private IEnumerable GetCurrentTopics() { - var topicPrefix = Self.Path.ToStringWithoutAddress(); + var topicPrefix = _topicPrefix.Value; foreach (var (_, bucket) in _registry) { foreach (var (key, _) in bucket.Content) { - if (!key.StartsWith(topicPrefix)) + var encodedTopic = Internal.Utils.KeyToEncodedTopic(key, topicPrefix); + if (key is null) continue; - - var topic = key[(topicPrefix.Length + 1)..]; - if (!topic.Contains('/')) - { - yield return topic; - } + yield return encodedTopic; } } } diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs index e1da86ba73b..c6da927e102 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs @@ -214,7 +214,7 @@ protected override bool Business(object message) { case Subscribe { Group: not null } subscribe: var encodedGroup = Utils.EncodeName(subscribe.Group); - _buffer.BufferOr(Utils.MakeKey(Self.Path / encodedGroup), subscribe, Sender, () => + _buffer.BufferOr(Utils.MakeKey(Self.Path, encodedGroup), subscribe, Sender, () => { var child = Context.Child(encodedGroup); if (!child.IsNobody()) @@ -235,7 +235,7 @@ protected override bool Business(object message) case Unsubscribe { Group: not null } unsubscribe: encodedGroup = Utils.EncodeName(unsubscribe.Group); - _buffer.BufferOr(Utils.MakeKey(Self.Path / encodedGroup), unsubscribe, Sender, () => + _buffer.BufferOr(Utils.MakeKey(Self.Path, encodedGroup), unsubscribe, Sender, () => { var child = Context.Child(encodedGroup); if (!child.IsNobody()) @@ -245,6 +245,7 @@ protected override bool Business(object message) else { // no such group here + Utils.TryRemoveTopic(unsubscribe.Group); } }); return true; @@ -308,17 +309,19 @@ public Group(TimeSpan emptyTimeToLive, RoutingLogic routingLogic, bool sendToDea /// protected override bool Business(object message) { - if (message is SendToOneSubscriber send) + switch (message) { - if (Subscribers.Count != 0) - { - var routees = Subscribers.Select(sub => (Routee)new ActorRefRoutee(sub)).ToArray(); + case SendToOneSubscriber when Subscribers.Count == 0: + return true; + + case SendToOneSubscriber send: + var routees = Subscribers.Select(Routee (sub) => new ActorRefRoutee(sub)).ToArray(); new Router(_routingLogic, routees).Route(Utils.WrapIfNeeded(send.Message), Sender); - } + return true; + + default: + return false; } - else return false; - - return true; } } @@ -329,8 +332,15 @@ protected override bool Business(object message) /// internal static class Utils { + private record MakeKeyInfo(ActorPath Path, string Topic); + private static readonly System.Text.RegularExpressions.Regex PathRegex = new("^/remote/.+(/user/.+)"); + private static readonly Dictionary TopicToEncodedMap = new(); + private static readonly Dictionary EncodedToTopicMap = new(); + private static readonly Dictionary MakeKeyMap = new(); + private static readonly Dictionary MakeKeyReverseMap = new(); + /// /// /// Mediator uses to send messages to multiple destinations, Router in general @@ -350,14 +360,10 @@ public static object WrapIfNeeded(object message) return message is RouterEnvelope ? new MediatorRouterEnvelope(message) : message; } - /// - /// TBD - /// - /// TBD - /// TBD public static string MakeKey(IActorRef actorRef) { - return MakeKey(actorRef.Path); + //return MakeKey(actorRef.Path); + return PathRegex.Replace(actorRef.Path.ToStringWithoutAddress(), "$1"); } /// @@ -367,17 +373,83 @@ public static string MakeKey(IActorRef actorRef) /// TBD public static string EncodeName(string name) { - return name == null ? null : Uri.EscapeDataString(name); + if (string.IsNullOrWhiteSpace(name)) + return null; + + if (TopicToEncodedMap.TryGetValue(name, out var encoded)) + return encoded; + + encoded = Uri.EscapeDataString(name); + TopicToEncodedMap[name] = encoded; + EncodedToTopicMap[encoded] = name; + return encoded; } - /// - /// TBD - /// - /// TBD - /// TBD - public static string MakeKey(ActorPath path) + public static string MakeKey(ActorPath path, string topic) + { + var info = new MakeKeyInfo(path, topic); + if(MakeKeyMap.TryGetValue(info, out var key)) + return key; + + key = PathRegex.Replace((path / topic).ToStringWithoutAddress(), "$1"); + MakeKeyMap[info] = key; + MakeKeyReverseMap[key] = info; + return key; + } + + [MethodImpl(MethodImplOptions.NoInlining)] + public static string? KeyToEncodedTopic(string key, string topicPrefix) + { + if (!key.StartsWith(topicPrefix)) + return null; + + var topic = key[(topicPrefix.Length + 1)..]; + return !topic.Contains('/') ? topic : null; + } + + public static void TryRemoveEncodedTopic(string encodedTopic) + { + if (!EncodedToTopicMap.TryGetValue(encodedTopic, out var topic)) + return; + + EncodedToTopicMap.Remove(encodedTopic); + TopicToEncodedMap.Remove(topic); + } + + public static void TryRemoveTopic(string topic) + { + if(!TopicToEncodedMap.TryGetValue(topic, out var encodedTopic)) + return; + + TopicToEncodedMap.Remove(topic); + EncodedToTopicMap.Remove(encodedTopic); + } + + public static void TryRemoveKey(string key, string topicPrefix) + { + if (MakeKeyReverseMap.TryGetValue(key, out var keyInfo)) + { + MakeKeyReverseMap.Remove(key); + MakeKeyMap.Remove(keyInfo); + } + + var encodedTopic = KeyToEncodedTopic(key, topicPrefix); + if (encodedTopic == null) + return; + + if (!EncodedToTopicMap.TryGetValue(encodedTopic, out var topic)) + return; + + TopicToEncodedMap.Remove(topic); + EncodedToTopicMap.Remove(encodedTopic); + } + + public static void Clear() { - return PathRegex.Replace(path.ToStringWithoutAddress(), "$1"); + TopicToEncodedMap.Clear(); + EncodedToTopicMap.Clear(); + MakeKeyMap.Clear(); + MakeKeyReverseMap.Clear(); } } } From caeef361682cdc9b3c6ff9196025171c9cbbae59 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 15 May 2025 22:44:35 +0700 Subject: [PATCH 2/3] Move cache dictionary to non-static class instance --- .../DistributedPubSubMediator.cs | 42 +++--- .../PublishSubscribe/Internal/Topics.cs | 135 ++++++++++-------- 2 files changed, 93 insertions(+), 84 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs index afc9d10a4db..a996cbbbe62 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs @@ -124,8 +124,8 @@ public static Props Props(DistributedPubSubSettings settings) private readonly ILoggingAdapter _log; private readonly Dictionary _registry = new(); - private readonly IActorRef _self; - private readonly Lazy _topicPrefix; + private readonly string _topicPrefix; + private readonly PubSubCache _cache; public ITimerScheduler Timers { get; set; } @@ -164,8 +164,8 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) _pruneInterval = new TimeSpan(_settings.RemovedTimeToLive.Ticks / 2); _buffer = new PerGroupingBuffer(); - _self = Self; - _topicPrefix = new Lazy(() => _self.Path.ToStringWithoutAddress()); + _topicPrefix = Self.Path.ToStringWithoutAddress(); + _cache = new PubSubCache(); Receive(send => { @@ -204,8 +204,8 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) }); Receive(publish => { - var encodedTopic = Internal.Utils.EncodeName(publish.Topic); - var key = Internal.Utils.MakeKey(Self.Path, encodedTopic); + var encodedTopic = _cache.EncodeName(publish.Topic); + var key = _cache.MakeKey(Self.Path, encodedTopic); if (publish.SendOneMessageToEachGroup) PublishToEachGroup(key, publish); else @@ -217,7 +217,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) _log.Warning("Registered actor must be local: [{0}]", put.Ref); else { - PutToRegistry(Internal.Utils.MakeKey(put.Ref), put.Ref); + PutToRegistry(PubSubCache.MakeKey(put.Ref), put.Ref); Context.Watch(put.Ref); } }); @@ -234,9 +234,9 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) Receive(subscribe => { // each topic is managed by a child actor with the same name as the topic - var encodedTopic = Internal.Utils.EncodeName(subscribe.Topic); + var encodedTopic = _cache.EncodeName(subscribe.Topic); - _buffer.BufferOr(Internal.Utils.MakeKey(Self.Path, encodedTopic), subscribe, Sender, () => + _buffer.BufferOr(_cache.MakeKey(Self.Path, encodedTopic), subscribe, Sender, () => { var child = Context.Child(encodedTopic); if (!child.IsNobody()) @@ -251,13 +251,13 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) }); Receive(_ => { - var key = Internal.Utils.MakeKey(Sender); + var key = PubSubCache.MakeKey(Sender); _buffer.InitializeGrouping(key); Sender.Tell(TerminateRequest.Instance); }); Receive(_ => { - var key = Internal.Utils.MakeKey(Sender); + var key = PubSubCache.MakeKey(Sender); _buffer.ForwardMessages(key, Sender); }); Receive(_ => @@ -270,9 +270,9 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) }); Receive(unsubscribe => { - var encodedTopic = Internal.Utils.EncodeName(unsubscribe.Topic); + var encodedTopic = _cache.EncodeName(unsubscribe.Topic); - _buffer.BufferOr(Internal.Utils.MakeKey(Self.Path, encodedTopic), unsubscribe, Sender, () => + _buffer.BufferOr(_cache.MakeKey(Self.Path, encodedTopic), unsubscribe, Sender, () => { var child = Context.Child(encodedTopic); if (!child.IsNobody()) @@ -280,7 +280,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) else { // no such topic here - Internal.Utils.TryRemoveTopic(unsubscribe.Topic); + _cache.TryRemoveTopic(unsubscribe.Topic); } }); }); @@ -330,14 +330,14 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) Receive(_ => HandlePrune()); Receive(terminated => { - var key = Internal.Utils.MakeKey(terminated.ActorRef); + var key = PubSubCache.MakeKey(terminated.ActorRef); if (_registry.TryGetValue(_cluster.SelfAddress, out var bucket)) { if (bucket.Content.TryGetValue(key, out var holder) && terminated.ActorRef.Equals(holder.Ref)) { PutToRegistry(key, null); // remove - Internal.Utils.TryRemoveKey(key, _topicPrefix.Value); + _cache.TryRemoveKey(key, _topicPrefix); } } @@ -381,7 +381,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) if (member.Address == _cluster.SelfAddress) { Context.Stop(Self); - Internal.Utils.Clear(); + _cache.Clear(); } else if (IsMatchingRole(member, _role)) { @@ -401,8 +401,8 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) }); Receive(msg => { - var encTopic = Internal.Utils.EncodeName(msg.Topic); - _buffer.BufferOr(Internal.Utils.MakeKey(Self.Path, encTopic), msg, Sender, () => + var encTopic = _cache.EncodeName(msg.Topic); + _buffer.BufferOr(_cache.MakeKey(Self.Path, encTopic), msg, Sender, () => { var child = Context.Child(encTopic); if (!child.IsNobody()) @@ -471,7 +471,7 @@ private IEnumerable CollectDelta(IImmutableDictionary ver private IEnumerable GetCurrentTopics() { - var topicPrefix = _topicPrefix.Value; + var topicPrefix = _topicPrefix; foreach (var (_, bucket) in _registry) { foreach (var (key, _) in bucket.Content) @@ -486,7 +486,7 @@ private IEnumerable GetCurrentTopics() private void HandleRegisterTopic(IActorRef actorRef) { - PutToRegistry(Internal.Utils.MakeKey(actorRef), actorRef); + PutToRegistry(PubSubCache.MakeKey(actorRef), actorRef); Context.Watch(actorRef); } diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs index c6da927e102..26d695dd31f 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs @@ -193,6 +193,8 @@ internal class Topic : TopicLike { private readonly RoutingLogic _routingLogic; private readonly PerGroupingBuffer _buffer; + private readonly PubSubCache _cache; + private readonly string _topicPrefix; /// /// Creates a new topic actor @@ -205,6 +207,9 @@ public Topic(TimeSpan emptyTimeToLive, RoutingLogic routingLogic, bool sendToDea { _routingLogic = routingLogic; _buffer = new PerGroupingBuffer(); + + _topicPrefix = Self.Path.ToStringWithoutAddress(); + _cache = new PubSubCache(); } /// @@ -213,8 +218,8 @@ protected override bool Business(object message) switch (message) { case Subscribe { Group: not null } subscribe: - var encodedGroup = Utils.EncodeName(subscribe.Group); - _buffer.BufferOr(Utils.MakeKey(Self.Path, encodedGroup), subscribe, Sender, () => + var encodedGroup = _cache.EncodeName(subscribe.Group); + _buffer.BufferOr(_cache.MakeKey(Self.Path, encodedGroup), subscribe, Sender, () => { var child = Context.Child(encodedGroup); if (!child.IsNobody()) @@ -234,8 +239,8 @@ protected override bool Business(object message) return true; case Unsubscribe { Group: not null } unsubscribe: - encodedGroup = Utils.EncodeName(unsubscribe.Group); - _buffer.BufferOr(Utils.MakeKey(Self.Path, encodedGroup), unsubscribe, Sender, () => + encodedGroup = _cache.EncodeName(unsubscribe.Group); + _buffer.BufferOr(_cache.MakeKey(Self.Path, encodedGroup), unsubscribe, Sender, () => { var child = Context.Child(encodedGroup); if (!child.IsNobody()) @@ -245,7 +250,7 @@ protected override bool Business(object message) else { // no such group here - Utils.TryRemoveTopic(unsubscribe.Group); + _cache.TryRemoveTopic(unsubscribe.Group); } }); return true; @@ -255,20 +260,21 @@ protected override bool Business(object message) return true; case Cluster: - var key = Utils.MakeKey(Sender); + var key = PubSubCache.MakeKey(Sender); _buffer.InitializeGrouping(key); Sender.Tell(TerminateRequest.Instance); return true; case NewSubscriberArrived: - key = Utils.MakeKey(Sender); + key = PubSubCache.MakeKey(Sender); _buffer.ForwardMessages(key, Sender); return true; case Terminated terminated: - key = Utils.MakeKey(terminated.ActorRef); + key = PubSubCache.MakeKey(terminated.ActorRef); _buffer.RecreateAndForwardMessagesIfNeeded(key, () => NewGroupActor(terminated.ActorRef.Path.Name)); Remove(terminated.ActorRef); + _cache.TryRemoveKey(key, _topicPrefix); return true; } @@ -325,22 +331,8 @@ protected override bool Business(object message) } } - /// - /// INTERNAL API - /// - /// Used for generating Uri-safe topic and group names. - /// internal static class Utils { - private record MakeKeyInfo(ActorPath Path, string Topic); - - private static readonly System.Text.RegularExpressions.Regex PathRegex = new("^/remote/.+(/user/.+)"); - - private static readonly Dictionary TopicToEncodedMap = new(); - private static readonly Dictionary EncodedToTopicMap = new(); - private static readonly Dictionary MakeKeyMap = new(); - private static readonly Dictionary MakeKeyReverseMap = new(); - /// /// /// Mediator uses to send messages to multiple destinations, Router in general @@ -354,15 +346,42 @@ private record MakeKeyInfo(ActorPath Path, string Topic); /// /// /// TBD - /// TBD + [MethodImpl(MethodImplOptions.NoInlining)] public static object WrapIfNeeded(object message) { return message is RouterEnvelope ? new MediatorRouterEnvelope(message) : message; } + + [MethodImpl(MethodImplOptions.NoInlining)] + public static string? KeyToEncodedTopic(string key, string topicPrefix) + { + if (!key.StartsWith(topicPrefix)) + return null; + + var topic = key[(topicPrefix.Length + 1)..]; + return !topic.Contains('/') ? topic : null; + } + } + + /// + /// INTERNAL API + /// + /// Used for generating Uri-safe topic and group names. + /// + internal sealed class PubSubCache + { + private readonly record struct MakeKeyInfo(ActorPath Path, string Topic); + + private static readonly System.Text.RegularExpressions.Regex PathRegex = new("^/remote/.+(/user/.+)"); + private readonly Dictionary _topicToEncodedMap = new(); + private readonly Dictionary _encodedToTopicMap = new(); + private readonly Dictionary _makeKeyMap = new(); + private readonly Dictionary _makeKeyReverseMap = new(); + + [MethodImpl(MethodImplOptions.NoInlining)] public static string MakeKey(IActorRef actorRef) { - //return MakeKey(actorRef.Path); return PathRegex.Replace(actorRef.Path.ToStringWithoutAddress(), "$1"); } @@ -371,85 +390,75 @@ public static string MakeKey(IActorRef actorRef) /// /// TBD /// TBD - public static string EncodeName(string name) + public string EncodeName(string name) { if (string.IsNullOrWhiteSpace(name)) return null; - if (TopicToEncodedMap.TryGetValue(name, out var encoded)) + if (_topicToEncodedMap.TryGetValue(name, out var encoded)) return encoded; encoded = Uri.EscapeDataString(name); - TopicToEncodedMap[name] = encoded; - EncodedToTopicMap[encoded] = name; + _topicToEncodedMap[name] = encoded; + _encodedToTopicMap[encoded] = name; return encoded; } - public static string MakeKey(ActorPath path, string topic) + public string MakeKey(ActorPath path, string topic) { var info = new MakeKeyInfo(path, topic); - if(MakeKeyMap.TryGetValue(info, out var key)) + if(_makeKeyMap.TryGetValue(info, out var key)) return key; key = PathRegex.Replace((path / topic).ToStringWithoutAddress(), "$1"); - MakeKeyMap[info] = key; - MakeKeyReverseMap[key] = info; + _makeKeyMap[info] = key; + _makeKeyReverseMap[key] = info; return key; } - [MethodImpl(MethodImplOptions.NoInlining)] - public static string? KeyToEncodedTopic(string key, string topicPrefix) - { - if (!key.StartsWith(topicPrefix)) - return null; - - var topic = key[(topicPrefix.Length + 1)..]; - return !topic.Contains('/') ? topic : null; - } - - public static void TryRemoveEncodedTopic(string encodedTopic) + public void TryRemoveEncodedTopic(string encodedTopic) { - if (!EncodedToTopicMap.TryGetValue(encodedTopic, out var topic)) + if (!_encodedToTopicMap.TryGetValue(encodedTopic, out var topic)) return; - EncodedToTopicMap.Remove(encodedTopic); - TopicToEncodedMap.Remove(topic); + _topicToEncodedMap.Remove(topic); + _encodedToTopicMap.Remove(encodedTopic); } - public static void TryRemoveTopic(string topic) + public void TryRemoveTopic(string topic) { - if(!TopicToEncodedMap.TryGetValue(topic, out var encodedTopic)) + if(!_topicToEncodedMap.TryGetValue(topic, out var encodedTopic)) return; - TopicToEncodedMap.Remove(topic); - EncodedToTopicMap.Remove(encodedTopic); + _encodedToTopicMap.Remove(encodedTopic); + _topicToEncodedMap.Remove(topic); } - public static void TryRemoveKey(string key, string topicPrefix) + public void TryRemoveKey(string key, string topicPrefix) { - if (MakeKeyReverseMap.TryGetValue(key, out var keyInfo)) + if (_makeKeyReverseMap.TryGetValue(key, out var keyInfo)) { - MakeKeyReverseMap.Remove(key); - MakeKeyMap.Remove(keyInfo); + _makeKeyMap.Remove(keyInfo); + _makeKeyReverseMap.Remove(key); } - var encodedTopic = KeyToEncodedTopic(key, topicPrefix); + var encodedTopic = Utils.KeyToEncodedTopic(key, topicPrefix); if (encodedTopic == null) return; - if (!EncodedToTopicMap.TryGetValue(encodedTopic, out var topic)) + if (!_encodedToTopicMap.TryGetValue(encodedTopic, out var topic)) return; - TopicToEncodedMap.Remove(topic); - EncodedToTopicMap.Remove(encodedTopic); + _topicToEncodedMap.Remove(topic); + _encodedToTopicMap.Remove(encodedTopic); } - public static void Clear() + public void Clear() { - TopicToEncodedMap.Clear(); - EncodedToTopicMap.Clear(); - MakeKeyMap.Clear(); - MakeKeyReverseMap.Clear(); + _topicToEncodedMap.Clear(); + _encodedToTopicMap.Clear(); + _makeKeyMap.Clear(); + _makeKeyReverseMap.Clear(); } } } From f7190847f94437f6acd2ddb4eba39f7c2dc3cfaf Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 15 May 2025 23:29:58 +0700 Subject: [PATCH 3/3] refactor old methods back into `Utils` static class and inline them --- .../DistributedPubSubMediator.cs | 10 +- .../PublishSubscribe/Internal/Topics.cs | 148 +++++++++--------- 2 files changed, 83 insertions(+), 75 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs index a996cbbbe62..3b30d79ba56 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs @@ -217,7 +217,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) _log.Warning("Registered actor must be local: [{0}]", put.Ref); else { - PutToRegistry(PubSubCache.MakeKey(put.Ref), put.Ref); + PutToRegistry(Internal.Utils.MakeKey(put.Ref), put.Ref); Context.Watch(put.Ref); } }); @@ -251,13 +251,13 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) }); Receive(_ => { - var key = PubSubCache.MakeKey(Sender); + var key = Internal.Utils.MakeKey(Sender); _buffer.InitializeGrouping(key); Sender.Tell(TerminateRequest.Instance); }); Receive(_ => { - var key = PubSubCache.MakeKey(Sender); + var key = Internal.Utils.MakeKey(Sender); _buffer.ForwardMessages(key, Sender); }); Receive(_ => @@ -330,7 +330,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) Receive(_ => HandlePrune()); Receive(terminated => { - var key = PubSubCache.MakeKey(terminated.ActorRef); + var key = Internal.Utils.MakeKey(terminated.ActorRef); if (_registry.TryGetValue(_cluster.SelfAddress, out var bucket)) { @@ -486,7 +486,7 @@ private IEnumerable GetCurrentTopics() private void HandleRegisterTopic(IActorRef actorRef) { - PutToRegistry(PubSubCache.MakeKey(actorRef), actorRef); + PutToRegistry(Internal.Utils.MakeKey(actorRef), actorRef); Context.Watch(actorRef); } diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs index 26d695dd31f..49984efdfea 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs @@ -260,18 +260,18 @@ protected override bool Business(object message) return true; case Cluster: - var key = PubSubCache.MakeKey(Sender); + var key = Utils.MakeKey(Sender); _buffer.InitializeGrouping(key); Sender.Tell(TerminateRequest.Instance); return true; case NewSubscriberArrived: - key = PubSubCache.MakeKey(Sender); + key = Utils.MakeKey(Sender); _buffer.ForwardMessages(key, Sender); return true; case Terminated terminated: - key = PubSubCache.MakeKey(terminated.ActorRef); + key = Utils.MakeKey(terminated.ActorRef); _buffer.RecreateAndForwardMessagesIfNeeded(key, () => NewGroupActor(terminated.ActorRef.Path.Name)); Remove(terminated.ActorRef); _cache.TryRemoveKey(key, _topicPrefix); @@ -331,8 +331,17 @@ protected override bool Business(object message) } } + /// + /// INTERNAL API + /// + /// Used for generating Uri-safe topic and group names. + /// internal static class Utils { + public readonly record struct MakeKeyInfo(ActorPath Path, string Topic); + + private static readonly System.Text.RegularExpressions.Regex PathRegex = new("^/remote/.+(/user/.+)"); + /// /// /// Mediator uses to send messages to multiple destinations, Router in general @@ -361,23 +370,8 @@ public static object WrapIfNeeded(object message) var topic = key[(topicPrefix.Length + 1)..]; return !topic.Contains('/') ? topic : null; } - } - - /// - /// INTERNAL API - /// - /// Used for generating Uri-safe topic and group names. - /// - internal sealed class PubSubCache - { - private readonly record struct MakeKeyInfo(ActorPath Path, string Topic); - - private static readonly System.Text.RegularExpressions.Regex PathRegex = new("^/remote/.+(/user/.+)"); - private readonly Dictionary _topicToEncodedMap = new(); - private readonly Dictionary _encodedToTopicMap = new(); - private readonly Dictionary _makeKeyMap = new(); - private readonly Dictionary _makeKeyReverseMap = new(); + #region Key related methods [MethodImpl(MethodImplOptions.NoInlining)] public static string MakeKey(IActorRef actorRef) @@ -385,80 +379,94 @@ public static string MakeKey(IActorRef actorRef) return PathRegex.Replace(actorRef.Path.ToStringWithoutAddress(), "$1"); } - /// - /// TBD - /// - /// TBD - /// TBD - public string EncodeName(string name) - { - if (string.IsNullOrWhiteSpace(name)) - return null; - - if (_topicToEncodedMap.TryGetValue(name, out var encoded)) - return encoded; - - encoded = Uri.EscapeDataString(name); - _topicToEncodedMap[name] = encoded; - _encodedToTopicMap[encoded] = name; - return encoded; - } - - public string MakeKey(ActorPath path, string topic) + [MethodImpl(MethodImplOptions.NoInlining)] + public static string MakeKey(this PubSubCache cache, ActorPath path, string topic) { var info = new MakeKeyInfo(path, topic); - if(_makeKeyMap.TryGetValue(info, out var key)) + if(cache.MakeKeyMap.TryGetValue(info, out var key)) return key; key = PathRegex.Replace((path / topic).ToStringWithoutAddress(), "$1"); - _makeKeyMap[info] = key; - _makeKeyReverseMap[key] = info; + cache.MakeKeyMap[info] = key; + cache.MakeKeyReverseMap[key] = info; return key; } - public void TryRemoveEncodedTopic(string encodedTopic) + [MethodImpl(MethodImplOptions.NoInlining)] + public static void TryRemoveKey(this PubSubCache cache, string key, string topicPrefix) { - if (!_encodedToTopicMap.TryGetValue(encodedTopic, out var topic)) + if (cache.MakeKeyReverseMap.TryGetValue(key, out var keyInfo)) + { + cache.MakeKeyMap.Remove(keyInfo); + cache.MakeKeyReverseMap.Remove(key); + } + + var encodedTopic = Utils.KeyToEncodedTopic(key, topicPrefix); + if (encodedTopic == null) + return; + + if (!cache.EncodedToTopicMap.TryGetValue(encodedTopic, out var topic)) return; - _topicToEncodedMap.Remove(topic); - _encodedToTopicMap.Remove(encodedTopic); + cache.TopicToEncodedMap.Remove(topic); + cache.EncodedToTopicMap.Remove(encodedTopic); } - - public void TryRemoveTopic(string topic) + + #endregion + + #region Topic/group name related methods + + [MethodImpl(MethodImplOptions.NoInlining)] + public static string EncodeName(this PubSubCache cache, string name) { - if(!_topicToEncodedMap.TryGetValue(topic, out var encodedTopic)) - return; + if (string.IsNullOrWhiteSpace(name)) + return null; - _encodedToTopicMap.Remove(encodedTopic); - _topicToEncodedMap.Remove(topic); + if (cache.TopicToEncodedMap.TryGetValue(name, out var encoded)) + return encoded; + + encoded = Uri.EscapeDataString(name); + cache.TopicToEncodedMap[name] = encoded; + cache.EncodedToTopicMap[encoded] = name; + return encoded; } - public void TryRemoveKey(string key, string topicPrefix) + [MethodImpl(MethodImplOptions.NoInlining)] + public static void TryRemoveEncodedTopic(this PubSubCache cache, string encodedTopic) { - if (_makeKeyReverseMap.TryGetValue(key, out var keyInfo)) - { - _makeKeyMap.Remove(keyInfo); - _makeKeyReverseMap.Remove(key); - } - - var encodedTopic = Utils.KeyToEncodedTopic(key, topicPrefix); - if (encodedTopic == null) + if (!cache.EncodedToTopicMap.TryGetValue(encodedTopic, out var topic)) return; + + cache.TopicToEncodedMap.Remove(topic); + cache.EncodedToTopicMap.Remove(encodedTopic); + } - if (!_encodedToTopicMap.TryGetValue(encodedTopic, out var topic)) + [MethodImpl(MethodImplOptions.NoInlining)] + public static void TryRemoveTopic(this PubSubCache cache, string topic) + { + if(!cache.TopicToEncodedMap.TryGetValue(topic, out var encodedTopic)) return; - _topicToEncodedMap.Remove(topic); - _encodedToTopicMap.Remove(encodedTopic); + cache.EncodedToTopicMap.Remove(encodedTopic); + cache.TopicToEncodedMap.Remove(topic); } - - public void Clear() + + #endregion + + public static void Clear(this PubSubCache cache) { - _topicToEncodedMap.Clear(); - _encodedToTopicMap.Clear(); - _makeKeyMap.Clear(); - _makeKeyReverseMap.Clear(); + cache.TopicToEncodedMap.Clear(); + cache.EncodedToTopicMap.Clear(); + cache.MakeKeyMap.Clear(); + cache.MakeKeyReverseMap.Clear(); } } + + internal sealed class PubSubCache + { + public readonly Dictionary TopicToEncodedMap = new(); + public readonly Dictionary EncodedToTopicMap = new(); + public readonly Dictionary MakeKeyMap = new(); + public readonly Dictionary MakeKeyReverseMap = new(); + } }