diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSub.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSub.cs
index 62830dd636b..c27ed5bebce 100644
--- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSub.cs
+++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSub.cs
@@ -105,10 +105,8 @@ private IActorRef CreateMediator()
dispatcher = Dispatchers.InternalDispatcherId;
return _system.SystemActorOf(
- Props.Create(() => new DistributedPubSubMediator(_settings))
- .WithDeploy(Deploy.Local)
- .WithDispatcher(dispatcher),
- name);
+ props: DistributedPubSubMediator.Props(_settings).WithDispatcher(dispatcher),
+ name: name);
}
}
}
diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs
index def2a8237f4..7333424fd8d 100644
--- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs
+++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs
@@ -9,14 +9,13 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
+using System.Runtime.CompilerServices;
using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe.Internal;
using Akka.Event;
using Akka.Pattern;
using Akka.Routing;
using Akka.Util;
-using Akka.Util.Internal;
-using Group = Akka.Cluster.Tools.PublishSubscribe.Internal.Group;
using Status = Akka.Cluster.Tools.PublishSubscribe.Internal.Status;
namespace Akka.Cluster.Tools.PublishSubscribe
@@ -99,8 +98,11 @@ namespace Akka.Cluster.Tools.PublishSubscribe
/// replies.
///
///
- public class DistributedPubSubMediator : ReceiveActor
+ public class DistributedPubSubMediator : ReceiveActor, IWithTimers
{
+ private const string GossipTimerKey = "GossipTimer";
+ private const string PruneTimerKey = "PruneTimer";
+
///
/// TBD
///
@@ -112,21 +114,17 @@ public static Props Props(DistributedPubSubSettings settings)
}
private readonly Cluster _cluster = Cluster.Get(Context.System);
+ private readonly string _role;
private readonly DistributedPubSubSettings _settings;
- private readonly ICancelable _gossipCancelable;
- private readonly ICancelable _pruneCancelable;
private readonly TimeSpan _pruneInterval;
private readonly PerGroupingBuffer _buffer;
private ISet
_nodes = new HashSet();
- private long deltaCount = 0L;
- private ILoggingAdapter _log;
- private IDictionary _registry = new Dictionary();
+ private long _deltaCount;
+ private readonly ILoggingAdapter _log;
+ private readonly Dictionary _registry = new();
- ///
- /// TBD
- ///
- public ILoggingAdapter Log { get { return _log ??= Context.GetLogger(); } }
+ public ITimerScheduler Timers { get; set; }
///
/// TBD
@@ -157,10 +155,10 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
if (!string.IsNullOrEmpty(_settings.Role) && !_cluster.SelfRoles.Contains(_settings.Role))
throw new ArgumentException($"The cluster member [{_cluster.SelfAddress}] doesn't have the role [{_settings.Role}]");
- //Start periodic gossip to random nodes in cluster
- _gossipCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(_settings.GossipInterval, _settings.GossipInterval, Self, GossipTick.Instance, Self);
+ _log = Context.GetLogger();
+
+ _role = settings.Role;
_pruneInterval = new TimeSpan(_settings.RemovedTimeToLive.Ticks / 2);
- _pruneCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(_pruneInterval, _pruneInterval, Self, Prune.Instance, Self);
_buffer = new PerGroupingBuffer();
Receive(send =>
@@ -199,7 +197,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
});
Receive(publish =>
{
- string path = Internal.Utils.MakeKey(Self.Path / Internal.Utils.EncodeName(publish.Topic));
+ var path = Internal.Utils.MakeKey(Self.Path / Internal.Utils.EncodeName(publish.Topic));
if (publish.SendOneMessageToEachGroup)
PublishToEachGroup(path, publish);
else
@@ -208,7 +206,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
Receive(put =>
{
if (put.Ref.Path.Address.HasGlobalScope)
- Log.Warning("Registered actor must be local: [{0}]", put.Ref);
+ _log.Warning("Registered actor must be local: [{0}]", put.Ref);
else
{
PutToRegistry(Internal.Utils.MakeKey(put.Ref), put.Ref);
@@ -217,14 +215,13 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
});
Receive(remove =>
{
- if (_registry.TryGetValue(_cluster.SelfAddress, out var bucket))
- {
- if (bucket.Content.TryGetValue(remove.Path, out var valueHolder) && !valueHolder.Ref.IsNobody())
- {
- Context.Unwatch(valueHolder.Ref);
- PutToRegistry(remove.Path, null);
- }
- }
+ if (!_registry.TryGetValue(_cluster.SelfAddress, out var bucket))
+ return;
+ if (!bucket.Content.TryGetValue(remove.Path, out var valueHolder) || valueHolder.Ref.IsNobody())
+ return;
+
+ Context.Unwatch(valueHolder.Ref);
+ PutToRegistry(remove.Path, null);
});
Receive(subscribe =>
{
@@ -286,20 +283,20 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
{
// only accept status from known nodes, otherwise old cluster with same address may interact
// also accept from local for testing purposes
- if (_nodes.Contains(Sender.Path.Address) || Sender.Path.Address.HasLocalScope)
- {
- // gossip chat starts with a Status message, containing the bucket versions of the other node
- var delta = CollectDelta(status.Versions).ToImmutableList();
- if (delta.Count != 0)
- Sender.Tell(new Delta(delta));
-
- if (!status.IsReplyToStatus && OtherHasNewerVersions(status.Versions))
- Sender.Tell(new Status(versions: OwnVersions, isReplyToStatus: true)); // it will reply with Delta
- }
+ if (!_nodes.Contains(Sender.Path.Address) && !Sender.Path.Address.HasLocalScope)
+ return;
+
+ // gossip chat starts with a Status message, containing the bucket versions of the other node
+ var delta = CollectDelta(status.Versions).ToImmutableList();
+ if (delta.Count != 0)
+ Sender.Tell(new Delta(delta));
+
+ if (!status.IsReplyToStatus && OtherHasNewerVersions(status.Versions))
+ Sender.Tell(new Status(versions: OwnVersions, isReplyToStatus: true)); // it will reply with Delta
});
Receive(delta =>
{
- deltaCount += 1;
+ _deltaCount += 1;
// reply from Status message in the gossip chat
// the Delta contains potential updates (newer versions) from the other node
@@ -335,41 +332,41 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
Receive(state =>
{
var nodes = state.Members
- .Where(m => m.Status != MemberStatus.Joining && IsMatchingRole(m))
+ .Where(m => m.Status != MemberStatus.Joining && IsMatchingRole(m, _role))
.Select(m => m.Address);
_nodes = new HashSet(nodes);
});
Receive(up =>
{
- if (IsMatchingRole(up.Member)) _nodes.Add(up.Member.Address);
+ if (IsMatchingRole(up.Member, _role)) _nodes.Add(up.Member.Address);
});
Receive(weaklyUp =>
{
- if (IsMatchingRole(weaklyUp.Member)) _nodes.Add(weaklyUp.Member.Address);
+ if (IsMatchingRole(weaklyUp.Member, _role)) _nodes.Add(weaklyUp.Member.Address);
});
Receive(left =>
{
- if (IsMatchingRole(left.Member))
- {
- _nodes.Remove(left.Member.Address);
- _registry.Remove(left.Member.Address);
- }
+ if (!IsMatchingRole(left.Member, _role))
+ return;
+
+ _nodes.Remove(left.Member.Address);
+ _registry.Remove(left.Member.Address);
});
Receive(downed =>
{
- if (IsMatchingRole(downed.Member))
- {
- _nodes.Remove(downed.Member.Address);
- _registry.Remove(downed.Member.Address);
- }
+ if (!IsMatchingRole(downed.Member, _role))
+ return;
+
+ _nodes.Remove(downed.Member.Address);
+ _registry.Remove(downed.Member.Address);
});
Receive(removed =>
{
var member = removed.Member;
if (member.Address == _cluster.SelfAddress)
Context.Stop(Self);
- else if (IsMatchingRole(member))
+ else if (IsMatchingRole(member, _role))
{
_nodes.Remove(member.Address);
_registry.Remove(member.Address);
@@ -383,7 +380,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
});
Receive(_ =>
{
- Sender.Tell(deltaCount);
+ Sender.Tell(_deltaCount);
});
Receive(msg =>
{
@@ -458,20 +455,17 @@ private IEnumerable CollectDelta(IImmutableDictionary ver
private IEnumerable GetCurrentTopics()
{
var topicPrefix = Self.Path.ToStringWithoutAddress();
- foreach (var entry in _registry)
+ foreach (var (_, bucket) in _registry)
{
- var bucket = entry.Value;
- foreach (var kv in bucket.Content)
+ foreach (var (key, _) in bucket.Content)
{
- var key = kv.Key;
- var value = kv.Value;
- if (key.StartsWith(topicPrefix))
+ if (!key.StartsWith(topicPrefix))
+ continue;
+
+ var topic = key[(topicPrefix.Length + 1)..];
+ if (!topic.Contains('/'))
{
- var topic = key.Substring(topicPrefix.Length + 1);
- if (!topic.Contains('/'))
- {
- yield return Uri.EscapeDataString(topic);
- }
+ yield return topic;
}
}
}
@@ -501,13 +495,21 @@ private void IgnoreOrSendToDeadLetters(object message)
private void PublishMessage(string path, IWrappedMessage publish, bool allButSelf = false)
{
+ var counter = 0;
+ foreach (var r in Refs())
+ {
+ if (r == null) continue;
+ r.Forward(publish.Message);
+ counter++;
+ }
+
+ if (counter == 0) IgnoreOrSendToDeadLetters(publish);
+ return;
+
IEnumerable Refs()
{
- foreach (var entry in _registry)
+ foreach (var (address, bucket) in _registry)
{
- var address = entry.Key;
- var bucket = entry.Value;
-
if (!(allButSelf && address == _cluster.SelfAddress) && bucket.Content.TryGetValue(path, out var valueHolder))
{
if (valueHolder != null && !valueHolder.Ref.IsNobody())
@@ -515,16 +517,6 @@ IEnumerable Refs()
}
}
}
-
- var counter = 0;
- foreach (var r in Refs())
- {
- if (r == null) continue;
- r.Forward(publish.Message);
- counter++;
- }
-
- if (counter == 0) IgnoreOrSendToDeadLetters(publish);
}
private void PublishToEachGroup(string path, Publish publish)
@@ -552,12 +544,12 @@ private void PublishToEachGroup(string path, Publish publish)
private IEnumerable> ExtractGroups(string prefix, string lastKey)
{
- foreach (var bucket in _registry.Values)
+ foreach (var (_, bucket) in _registry)
{
//TODO: optimize into tree-aware key range [prefix, lastKey]
- foreach (var keyVal in bucket.Content.Where(kv => kv.Key.CompareTo(prefix) != -1 && kv.Key.CompareTo(lastKey) != 1))
+ foreach (var (key, value) in bucket.Content.Where(kv => kv.Key.CompareTo(prefix) != -1 && kv.Key.CompareTo(lastKey) != 1))
{
- yield return new KeyValuePair(keyVal.Key, keyVal.Value.Routee);
+ yield return new KeyValuePair(key, value.Routee);
}
}
}
@@ -565,16 +557,14 @@ private IEnumerable> ExtractGroups(string prefix, s
private void HandlePrune()
{
var modifications = new Dictionary();
- foreach (var entry in _registry)
+ foreach (var (owner, bucket) in _registry)
{
- var owner = entry.Key;
- var bucket = entry.Value;
-
var oldRemoved = bucket.Content
.Where(kv => kv.Value.Ref.IsNobody() && (bucket.Version - kv.Value.Version) > _settings.RemovedTimeToLive.TotalMilliseconds)
- .Select(kv => kv.Key);
+ .Select(kv => kv.Key)
+ .ToArray();
- if (oldRemoved.Any())
+ if (oldRemoved.Length > 0)
{
modifications.Add(owner, new Bucket(bucket.Owner, bucket.Version, bucket.Content.RemoveRange(oldRemoved)));
}
@@ -588,7 +578,7 @@ private void HandlePrune()
private void HandleGossip()
{
- var node = SelectRandomNode(_nodes.Except(new[] { _cluster.SelfAddress }).ToArray());
+ var node = SelectRandomNode(_nodes.Except([_cluster.SelfAddress]).ToArray());
if (node != null)
GossipTo(node);
}
@@ -599,7 +589,8 @@ private void GossipTo(Address address)
sel.Tell(new Status(versions: OwnVersions, isReplyToStatus: false));
}
- private Address SelectRandomNode(IList addresses)
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static Address SelectRandomNode(IList addresses)
{
if (addresses == null || addresses.Count == 0) return null;
return addresses[ThreadLocalRandom.Current.Next(addresses.Count)];
@@ -613,6 +604,10 @@ protected override void PreStart()
base.PreStart();
if (_cluster.IsTerminated) throw new IllegalStateException("Cluster node must not be terminated");
_cluster.Subscribe(Self, typeof(ClusterEvent.IMemberEvent));
+
+ //Start periodic gossip to random nodes in cluster
+ Timers.StartPeriodicTimer(GossipTimerKey, GossipTick.Instance, _settings.GossipInterval, _settings.GossipInterval, Self);
+ Timers.StartPeriodicTimer(PruneTimerKey, Prune.Instance, _pruneInterval, _pruneInterval, Self);
}
///
@@ -620,19 +615,19 @@ protected override void PreStart()
///
protected override void PostStop()
{
+ Timers.CancelAll();
base.PostStop();
_cluster.Unsubscribe(Self);
- _gossipCancelable.Cancel();
- _pruneCancelable.Cancel();
}
- private bool IsMatchingRole(Member member)
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static bool IsMatchingRole(Member member, string role)
{
- return string.IsNullOrEmpty(_settings.Role) || member.HasRole(_settings.Role);
+ return string.IsNullOrEmpty(role) || member.HasRole(role);
}
// the version is a timestamp because it is also used when pruning removed entries
- private long _version = 0L;
+ private long _version;
private long NextVersion()
{
var current = DateTime.UtcNow.Ticks / 10000;
diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs
index 93800244b19..e1da86ba73b 100644
--- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs
+++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs
@@ -41,18 +41,15 @@ public override string ToString()
///
/// Base class for both topics and groups.
///
- internal abstract class TopicLike : ActorBase
+ internal abstract class TopicLike : ActorBase, IWithTimers
{
+ private const string PruneTimerKey = "PruneTimer";
+
///
/// TBD
///
protected readonly TimeSpan PruneInterval;
- ///
- /// TBD
- ///
- protected readonly ICancelable PruneCancelable;
-
///
/// TBD
///
@@ -85,16 +82,21 @@ protected TopicLike(TimeSpan emptyTimeToLive, bool sendToDeadLettersWhenNone)
EmptyTimeToLive = emptyTimeToLive;
SendToDeadLettersWhenNoSubscribers = sendToDeadLettersWhenNone;
PruneInterval = new TimeSpan(emptyTimeToLive.Ticks / 2);
- PruneCancelable =
- Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(PruneInterval, PruneInterval, Self,
- Prune.Instance, Self);
+ }
+
+ public ITimerScheduler Timers { get; set; }
+
+ protected override void PreStart()
+ {
+ base.PreStart();
+ Timers.StartPeriodicTimer(PruneTimerKey, Prune.Instance, PruneInterval, PruneInterval, Self);
}
///
protected override void PostStop()
{
base.PostStop();
- PruneCancelable.Cancel();
+ Timers.CancelAll();
}
///
@@ -102,7 +104,7 @@ protected override void PostStop()
///
/// The message we're going to process.
/// true if we handled it, false otherwise.
- protected bool DefaultReceive(object message)
+ private bool DefaultReceive(object message)
{
switch (message)
{
@@ -123,8 +125,8 @@ protected bool DefaultReceive(object message)
Remove(terminated.ActorRef);
return true;
- case Prune _:
- if (PruneDeadline != null && PruneDeadline.IsOverdue)
+ case Prune:
+ if (PruneDeadline is { IsOverdue: true })
{
PruneDeadline = null;
Context.Parent.Tell(NoMoreSubscribers.Instance);
@@ -132,7 +134,7 @@ protected bool DefaultReceive(object message)
return true;
- case TerminateRequest _:
+ case TerminateRequest:
if (Subscribers.Count == 0 && !Context.GetChildren().Any())
{
Context.Stop(Self);
@@ -144,7 +146,7 @@ protected bool DefaultReceive(object message)
return true;
- case Count _:
+ case Count:
Sender.Tell(Subscribers.Count);
return true;
@@ -210,7 +212,7 @@ protected override bool Business(object message)
{
switch (message)
{
- case Subscribe subscribe when subscribe.Group != null:
+ case Subscribe { Group: not null } subscribe:
var encodedGroup = Utils.EncodeName(subscribe.Group);
_buffer.BufferOr(Utils.MakeKey(Self.Path / encodedGroup), subscribe, Sender, () =>
{
@@ -227,7 +229,11 @@ protected override bool Business(object message)
PruneDeadline = null;
return true;
- case Unsubscribe unsubscribe when unsubscribe.Group != null:
+ case Subscribed:
+ Context.Parent.Forward(message);
+ return true;
+
+ case Unsubscribe { Group: not null } unsubscribe:
encodedGroup = Utils.EncodeName(unsubscribe.Group);
_buffer.BufferOr(Utils.MakeKey(Self.Path / encodedGroup), unsubscribe, Sender, () =>
{
@@ -243,21 +249,17 @@ protected override bool Business(object message)
});
return true;
- case Subscribed _:
- Context.Parent.Forward(message);
- return true;
-
- case Unsubscribed _:
+ case Unsubscribed:
Context.Parent.Forward(message);
return true;
- case NoMoreSubscribers _:
+ case Cluster:
var key = Utils.MakeKey(Sender);
_buffer.InitializeGrouping(key);
Sender.Tell(TerminateRequest.Instance);
return true;
- case NewSubscriberArrived _:
+ case NewSubscriberArrived:
key = Utils.MakeKey(Sender);
_buffer.ForwardMessages(key, Sender);
return true;
@@ -327,7 +329,7 @@ protected override bool Business(object message)
///
internal static class Utils
{
- private static System.Text.RegularExpressions.Regex _pathRegex = new("^/remote/.+(/user/.+)");
+ private static readonly System.Text.RegularExpressions.Regex PathRegex = new("^/remote/.+(/user/.+)");
///
///
@@ -375,7 +377,7 @@ public static string EncodeName(string name)
/// TBD
public static string MakeKey(ActorPath path)
{
- return _pathRegex.Replace(path.ToStringWithoutAddress(), "$1");
+ return PathRegex.Replace(path.ToStringWithoutAddress(), "$1");
}
}
}
diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt
index e4397fdab81..d0124638e03 100644
--- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt
+++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt
@@ -243,11 +243,11 @@ namespace Akka.Cluster.Tools.PublishSubscribe
public DistributedPubSubExtensionProvider() { }
public override Akka.Cluster.Tools.PublishSubscribe.DistributedPubSub CreateExtension(Akka.Actor.ExtendedActorSystem system) { }
}
- public class DistributedPubSubMediator : Akka.Actor.ReceiveActor
+ public class DistributedPubSubMediator : Akka.Actor.ReceiveActor, Akka.Actor.IWithTimers
{
public DistributedPubSubMediator(Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings settings) { }
- public Akka.Event.ILoggingAdapter Log { get; }
public System.Collections.Immutable.IImmutableDictionary OwnVersions { get; }
+ public Akka.Actor.ITimerScheduler Timers { get; set; }
protected override void PostStop() { }
protected override void PreStart() { }
public static Akka.Actor.Props Props(Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings settings) { }
diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt
index ad19deb0f48..e8ed57f9b28 100644
--- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt
+++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt
@@ -243,11 +243,11 @@ namespace Akka.Cluster.Tools.PublishSubscribe
public DistributedPubSubExtensionProvider() { }
public override Akka.Cluster.Tools.PublishSubscribe.DistributedPubSub CreateExtension(Akka.Actor.ExtendedActorSystem system) { }
}
- public class DistributedPubSubMediator : Akka.Actor.ReceiveActor
+ public class DistributedPubSubMediator : Akka.Actor.ReceiveActor, Akka.Actor.IWithTimers
{
public DistributedPubSubMediator(Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings settings) { }
- public Akka.Event.ILoggingAdapter Log { get; }
public System.Collections.Immutable.IImmutableDictionary OwnVersions { get; }
+ public Akka.Actor.ITimerScheduler Timers { get; set; }
protected override void PostStop() { }
protected override void PreStart() { }
public static Akka.Actor.Props Props(Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings settings) { }