diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs index 7333424fd8d..3b30d79ba56 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs @@ -124,6 +124,9 @@ public static Props Props(DistributedPubSubSettings settings) private readonly ILoggingAdapter _log; private readonly Dictionary _registry = new(); + private readonly string _topicPrefix; + private readonly PubSubCache _cache; + public ITimerScheduler Timers { get; set; } /// @@ -161,6 +164,9 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) _pruneInterval = new TimeSpan(_settings.RemovedTimeToLive.Ticks / 2); _buffer = new PerGroupingBuffer(); + _topicPrefix = Self.Path.ToStringWithoutAddress(); + _cache = new PubSubCache(); + Receive(send => { var routees = new List(); @@ -193,15 +199,17 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) }); Receive(sendToAll => { + // TODO: Investigate this, this code looks very sketchy PublishMessage(sendToAll.Path, sendToAll, sendToAll.ExcludeSelf); }); Receive(publish => { - var path = Internal.Utils.MakeKey(Self.Path / Internal.Utils.EncodeName(publish.Topic)); + var encodedTopic = _cache.EncodeName(publish.Topic); + var key = _cache.MakeKey(Self.Path, encodedTopic); if (publish.SendOneMessageToEachGroup) - PublishToEachGroup(path, publish); + PublishToEachGroup(key, publish); else - PublishMessage(path, publish); + PublishMessage(key, publish); }); Receive(put => { @@ -226,9 +234,9 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) Receive(subscribe => { // each topic is managed by a child actor with the same name as the topic - var encodedTopic = Internal.Utils.EncodeName(subscribe.Topic); + var encodedTopic = _cache.EncodeName(subscribe.Topic); - _buffer.BufferOr(Internal.Utils.MakeKey(Self.Path / encodedTopic), subscribe, Sender, () => + _buffer.BufferOr(_cache.MakeKey(Self.Path, encodedTopic), subscribe, Sender, () => { var child = Context.Child(encodedTopic); if (!child.IsNobody()) @@ -262,9 +270,9 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) }); Receive(unsubscribe => { - var encodedTopic = Internal.Utils.EncodeName(unsubscribe.Topic); + var encodedTopic = _cache.EncodeName(unsubscribe.Topic); - _buffer.BufferOr(Internal.Utils.MakeKey(Self.Path / encodedTopic), unsubscribe, Sender, () => + _buffer.BufferOr(_cache.MakeKey(Self.Path, encodedTopic), unsubscribe, Sender, () => { var child = Context.Child(encodedTopic); if (!child.IsNobody()) @@ -272,6 +280,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) else { // no such topic here + _cache.TryRemoveTopic(unsubscribe.Topic); } }); }); @@ -324,8 +333,13 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) var key = Internal.Utils.MakeKey(terminated.ActorRef); if (_registry.TryGetValue(_cluster.SelfAddress, out var bucket)) + { if (bucket.Content.TryGetValue(key, out var holder) && terminated.ActorRef.Equals(holder.Ref)) + { PutToRegistry(key, null); // remove + _cache.TryRemoveKey(key, _topicPrefix); + } + } _buffer.RecreateAndForwardMessagesIfNeeded(key, () => NewTopicActor(terminated.ActorRef.Path.Name)); }); @@ -365,7 +379,10 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) { var member = removed.Member; if (member.Address == _cluster.SelfAddress) + { Context.Stop(Self); + _cache.Clear(); + } else if (IsMatchingRole(member, _role)) { _nodes.Remove(member.Address); @@ -384,8 +401,8 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) }); Receive(msg => { - var encTopic = Internal.Utils.EncodeName(msg.Topic); - _buffer.BufferOr(Internal.Utils.MakeKey(Self.Path / encTopic), msg, Sender, () => + var encTopic = _cache.EncodeName(msg.Topic); + _buffer.BufferOr(_cache.MakeKey(Self.Path, encTopic), msg, Sender, () => { var child = Context.Child(encTopic); if (!child.IsNobody()) @@ -454,19 +471,15 @@ private IEnumerable CollectDelta(IImmutableDictionary ver private IEnumerable GetCurrentTopics() { - var topicPrefix = Self.Path.ToStringWithoutAddress(); + var topicPrefix = _topicPrefix; foreach (var (_, bucket) in _registry) { foreach (var (key, _) in bucket.Content) { - if (!key.StartsWith(topicPrefix)) + var encodedTopic = Internal.Utils.KeyToEncodedTopic(key, topicPrefix); + if (key is null) continue; - - var topic = key[(topicPrefix.Length + 1)..]; - if (!topic.Contains('/')) - { - yield return topic; - } + yield return encodedTopic; } } } diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs index e1da86ba73b..49984efdfea 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs @@ -193,6 +193,8 @@ internal class Topic : TopicLike { private readonly RoutingLogic _routingLogic; private readonly PerGroupingBuffer _buffer; + private readonly PubSubCache _cache; + private readonly string _topicPrefix; /// /// Creates a new topic actor @@ -205,6 +207,9 @@ public Topic(TimeSpan emptyTimeToLive, RoutingLogic routingLogic, bool sendToDea { _routingLogic = routingLogic; _buffer = new PerGroupingBuffer(); + + _topicPrefix = Self.Path.ToStringWithoutAddress(); + _cache = new PubSubCache(); } /// @@ -213,8 +218,8 @@ protected override bool Business(object message) switch (message) { case Subscribe { Group: not null } subscribe: - var encodedGroup = Utils.EncodeName(subscribe.Group); - _buffer.BufferOr(Utils.MakeKey(Self.Path / encodedGroup), subscribe, Sender, () => + var encodedGroup = _cache.EncodeName(subscribe.Group); + _buffer.BufferOr(_cache.MakeKey(Self.Path, encodedGroup), subscribe, Sender, () => { var child = Context.Child(encodedGroup); if (!child.IsNobody()) @@ -234,8 +239,8 @@ protected override bool Business(object message) return true; case Unsubscribe { Group: not null } unsubscribe: - encodedGroup = Utils.EncodeName(unsubscribe.Group); - _buffer.BufferOr(Utils.MakeKey(Self.Path / encodedGroup), unsubscribe, Sender, () => + encodedGroup = _cache.EncodeName(unsubscribe.Group); + _buffer.BufferOr(_cache.MakeKey(Self.Path, encodedGroup), unsubscribe, Sender, () => { var child = Context.Child(encodedGroup); if (!child.IsNobody()) @@ -245,6 +250,7 @@ protected override bool Business(object message) else { // no such group here + _cache.TryRemoveTopic(unsubscribe.Group); } }); return true; @@ -268,6 +274,7 @@ protected override bool Business(object message) key = Utils.MakeKey(terminated.ActorRef); _buffer.RecreateAndForwardMessagesIfNeeded(key, () => NewGroupActor(terminated.ActorRef.Path.Name)); Remove(terminated.ActorRef); + _cache.TryRemoveKey(key, _topicPrefix); return true; } @@ -308,17 +315,19 @@ public Group(TimeSpan emptyTimeToLive, RoutingLogic routingLogic, bool sendToDea /// protected override bool Business(object message) { - if (message is SendToOneSubscriber send) + switch (message) { - if (Subscribers.Count != 0) - { - var routees = Subscribers.Select(sub => (Routee)new ActorRefRoutee(sub)).ToArray(); + case SendToOneSubscriber when Subscribers.Count == 0: + return true; + + case SendToOneSubscriber send: + var routees = Subscribers.Select(Routee (sub) => new ActorRefRoutee(sub)).ToArray(); new Router(_routingLogic, routees).Route(Utils.WrapIfNeeded(send.Message), Sender); - } + return true; + + default: + return false; } - else return false; - - return true; } } @@ -329,6 +338,8 @@ protected override bool Business(object message) /// internal static class Utils { + public readonly record struct MakeKeyInfo(ActorPath Path, string Topic); + private static readonly System.Text.RegularExpressions.Regex PathRegex = new("^/remote/.+(/user/.+)"); /// @@ -344,40 +355,118 @@ internal static class Utils /// /// /// TBD - /// TBD + [MethodImpl(MethodImplOptions.NoInlining)] public static object WrapIfNeeded(object message) { return message is RouterEnvelope ? new MediatorRouterEnvelope(message) : message; } + + [MethodImpl(MethodImplOptions.NoInlining)] + public static string? KeyToEncodedTopic(string key, string topicPrefix) + { + if (!key.StartsWith(topicPrefix)) + return null; + + var topic = key[(topicPrefix.Length + 1)..]; + return !topic.Contains('/') ? topic : null; + } - /// - /// TBD - /// - /// TBD - /// TBD + #region Key related methods + + [MethodImpl(MethodImplOptions.NoInlining)] public static string MakeKey(IActorRef actorRef) { - return MakeKey(actorRef.Path); + return PathRegex.Replace(actorRef.Path.ToStringWithoutAddress(), "$1"); } - /// - /// TBD - /// - /// TBD - /// TBD - public static string EncodeName(string name) + [MethodImpl(MethodImplOptions.NoInlining)] + public static string MakeKey(this PubSubCache cache, ActorPath path, string topic) { - return name == null ? null : Uri.EscapeDataString(name); + var info = new MakeKeyInfo(path, topic); + if(cache.MakeKeyMap.TryGetValue(info, out var key)) + return key; + + key = PathRegex.Replace((path / topic).ToStringWithoutAddress(), "$1"); + cache.MakeKeyMap[info] = key; + cache.MakeKeyReverseMap[key] = info; + return key; } - /// - /// TBD - /// - /// TBD - /// TBD - public static string MakeKey(ActorPath path) + [MethodImpl(MethodImplOptions.NoInlining)] + public static void TryRemoveKey(this PubSubCache cache, string key, string topicPrefix) { - return PathRegex.Replace(path.ToStringWithoutAddress(), "$1"); + if (cache.MakeKeyReverseMap.TryGetValue(key, out var keyInfo)) + { + cache.MakeKeyMap.Remove(keyInfo); + cache.MakeKeyReverseMap.Remove(key); + } + + var encodedTopic = Utils.KeyToEncodedTopic(key, topicPrefix); + if (encodedTopic == null) + return; + + if (!cache.EncodedToTopicMap.TryGetValue(encodedTopic, out var topic)) + return; + + cache.TopicToEncodedMap.Remove(topic); + cache.EncodedToTopicMap.Remove(encodedTopic); + } + + #endregion + + #region Topic/group name related methods + + [MethodImpl(MethodImplOptions.NoInlining)] + public static string EncodeName(this PubSubCache cache, string name) + { + if (string.IsNullOrWhiteSpace(name)) + return null; + + if (cache.TopicToEncodedMap.TryGetValue(name, out var encoded)) + return encoded; + + encoded = Uri.EscapeDataString(name); + cache.TopicToEncodedMap[name] = encoded; + cache.EncodedToTopicMap[encoded] = name; + return encoded; + } + + [MethodImpl(MethodImplOptions.NoInlining)] + public static void TryRemoveEncodedTopic(this PubSubCache cache, string encodedTopic) + { + if (!cache.EncodedToTopicMap.TryGetValue(encodedTopic, out var topic)) + return; + + cache.TopicToEncodedMap.Remove(topic); + cache.EncodedToTopicMap.Remove(encodedTopic); } + + [MethodImpl(MethodImplOptions.NoInlining)] + public static void TryRemoveTopic(this PubSubCache cache, string topic) + { + if(!cache.TopicToEncodedMap.TryGetValue(topic, out var encodedTopic)) + return; + + cache.EncodedToTopicMap.Remove(encodedTopic); + cache.TopicToEncodedMap.Remove(topic); + } + + #endregion + + public static void Clear(this PubSubCache cache) + { + cache.TopicToEncodedMap.Clear(); + cache.EncodedToTopicMap.Clear(); + cache.MakeKeyMap.Clear(); + cache.MakeKeyReverseMap.Clear(); + } + } + + internal sealed class PubSubCache + { + public readonly Dictionary TopicToEncodedMap = new(); + public readonly Dictionary EncodedToTopicMap = new(); + public readonly Dictionary MakeKeyMap = new(); + public readonly Dictionary MakeKeyReverseMap = new(); } }