Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,8 @@ private IActorRef CreateMediator()
dispatcher = Dispatchers.InternalDispatcherId;

return _system.SystemActorOf(
Props.Create(() => new DistributedPubSubMediator(_settings))
.WithDeploy(Deploy.Local)
.WithDispatcher(dispatcher),
name);
props: DistributedPubSubMediator.Props(_settings).WithDispatcher(dispatcher),
name: name);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Runtime.CompilerServices;
using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe.Internal;
using Akka.Event;
using Akka.Pattern;
using Akka.Routing;
using Akka.Util;
using Akka.Util.Internal;
using Group = Akka.Cluster.Tools.PublishSubscribe.Internal.Group;
using Status = Akka.Cluster.Tools.PublishSubscribe.Internal.Status;

namespace Akka.Cluster.Tools.PublishSubscribe
Expand Down Expand Up @@ -99,8 +98,11 @@ namespace Akka.Cluster.Tools.PublishSubscribe
/// replies.
/// </para>
/// </summary>
public class DistributedPubSubMediator : ReceiveActor
public class DistributedPubSubMediator : ReceiveActor, IWithTimers
{
private const string GossipTimerKey = "GossipTimer";
private const string PruneTimerKey = "PruneTimer";

/// <summary>
/// TBD
/// </summary>
Expand All @@ -112,21 +114,17 @@ public static Props Props(DistributedPubSubSettings settings)
}

private readonly Cluster _cluster = Cluster.Get(Context.System);
private readonly string _role;
private readonly DistributedPubSubSettings _settings;
private readonly ICancelable _gossipCancelable;
private readonly ICancelable _pruneCancelable;
private readonly TimeSpan _pruneInterval;
private readonly PerGroupingBuffer _buffer;

private ISet<Address> _nodes = new HashSet<Address>();
private long deltaCount = 0L;
private ILoggingAdapter _log;
private IDictionary<Address, Bucket> _registry = new Dictionary<Address, Bucket>();
private long _deltaCount;
private readonly ILoggingAdapter _log;
private readonly Dictionary<Address, Bucket> _registry = new();

/// <summary>
/// TBD
/// </summary>
public ILoggingAdapter Log { get { return _log ??= Context.GetLogger(); } }
public ITimerScheduler Timers { get; set; }

/// <summary>
/// TBD
Expand Down Expand Up @@ -157,10 +155,10 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
if (!string.IsNullOrEmpty(_settings.Role) && !_cluster.SelfRoles.Contains(_settings.Role))
throw new ArgumentException($"The cluster member [{_cluster.SelfAddress}] doesn't have the role [{_settings.Role}]");

//Start periodic gossip to random nodes in cluster
_gossipCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(_settings.GossipInterval, _settings.GossipInterval, Self, GossipTick.Instance, Self);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM (should probably keep the comment though)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment was moved to the PreStart() override where we start the periodic timer

_log = Context.GetLogger();

_role = settings.Role;
_pruneInterval = new TimeSpan(_settings.RemovedTimeToLive.Ticks / 2);
_pruneCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(_pruneInterval, _pruneInterval, Self, Prune.Instance, Self);
_buffer = new PerGroupingBuffer();

Receive<Send>(send =>
Expand Down Expand Up @@ -199,7 +197,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
});
Receive<Publish>(publish =>
{
string path = Internal.Utils.MakeKey(Self.Path / Internal.Utils.EncodeName(publish.Topic));
var path = Internal.Utils.MakeKey(Self.Path / Internal.Utils.EncodeName(publish.Topic));
if (publish.SendOneMessageToEachGroup)
PublishToEachGroup(path, publish);
else
Expand All @@ -208,7 +206,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
Receive<Put>(put =>
{
if (put.Ref.Path.Address.HasGlobalScope)
Log.Warning("Registered actor must be local: [{0}]", put.Ref);
_log.Warning("Registered actor must be local: [{0}]", put.Ref);
else
{
PutToRegistry(Internal.Utils.MakeKey(put.Ref), put.Ref);
Expand All @@ -217,14 +215,13 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
});
Receive<Remove>(remove =>
{
if (_registry.TryGetValue(_cluster.SelfAddress, out var bucket))
{
if (bucket.Content.TryGetValue(remove.Path, out var valueHolder) && !valueHolder.Ref.IsNobody())
{
Context.Unwatch(valueHolder.Ref);
PutToRegistry(remove.Path, null);
}
}
if (!_registry.TryGetValue(_cluster.SelfAddress, out var bucket))
return;
if (!bucket.Content.TryGetValue(remove.Path, out var valueHolder) || valueHolder.Ref.IsNobody())
return;

Context.Unwatch(valueHolder.Ref);
PutToRegistry(remove.Path, null);
});
Receive<Subscribe>(subscribe =>
{
Expand Down Expand Up @@ -286,20 +283,20 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
{
// only accept status from known nodes, otherwise old cluster with same address may interact
// also accept from local for testing purposes
if (_nodes.Contains(Sender.Path.Address) || Sender.Path.Address.HasLocalScope)
{
// gossip chat starts with a Status message, containing the bucket versions of the other node
var delta = CollectDelta(status.Versions).ToImmutableList();
if (delta.Count != 0)
Sender.Tell(new Delta(delta));

if (!status.IsReplyToStatus && OtherHasNewerVersions(status.Versions))
Sender.Tell(new Status(versions: OwnVersions, isReplyToStatus: true)); // it will reply with Delta
}
if (!_nodes.Contains(Sender.Path.Address) && !Sender.Path.Address.HasLocalScope)
return;

// gossip chat starts with a Status message, containing the bucket versions of the other node
var delta = CollectDelta(status.Versions).ToImmutableList();
if (delta.Count != 0)
Sender.Tell(new Delta(delta));

if (!status.IsReplyToStatus && OtherHasNewerVersions(status.Versions))
Sender.Tell(new Status(versions: OwnVersions, isReplyToStatus: true)); // it will reply with Delta
});
Receive<Delta>(delta =>
{
deltaCount += 1;
_deltaCount += 1;

// reply from Status message in the gossip chat
// the Delta contains potential updates (newer versions) from the other node
Expand Down Expand Up @@ -335,41 +332,41 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
Receive<ClusterEvent.CurrentClusterState>(state =>
{
var nodes = state.Members
.Where(m => m.Status != MemberStatus.Joining && IsMatchingRole(m))
.Where(m => m.Status != MemberStatus.Joining && IsMatchingRole(m, _role))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this a bug before?

@Arkatufus Arkatufus May 14, 2025

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it wasn't a bug. We access the _settings.Role property inside the IsMatchingRole method before, now it is a static method

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it

.Select(m => m.Address);

_nodes = new HashSet<Address>(nodes);
});
Receive<ClusterEvent.MemberUp>(up =>
{
if (IsMatchingRole(up.Member)) _nodes.Add(up.Member.Address);
if (IsMatchingRole(up.Member, _role)) _nodes.Add(up.Member.Address);
});
Receive<ClusterEvent.MemberWeaklyUp>(weaklyUp =>
{
if (IsMatchingRole(weaklyUp.Member)) _nodes.Add(weaklyUp.Member.Address);
if (IsMatchingRole(weaklyUp.Member, _role)) _nodes.Add(weaklyUp.Member.Address);
});
Receive<ClusterEvent.MemberLeft>(left =>
{
if (IsMatchingRole(left.Member))
{
_nodes.Remove(left.Member.Address);
_registry.Remove(left.Member.Address);
}
if (!IsMatchingRole(left.Member, _role))
return;

_nodes.Remove(left.Member.Address);
_registry.Remove(left.Member.Address);
});
Receive<ClusterEvent.MemberDowned>(downed =>
{
if (IsMatchingRole(downed.Member))
{
_nodes.Remove(downed.Member.Address);
_registry.Remove(downed.Member.Address);
}
if (!IsMatchingRole(downed.Member, _role))
return;

_nodes.Remove(downed.Member.Address);
_registry.Remove(downed.Member.Address);
});
Receive<ClusterEvent.MemberRemoved>(removed =>
{
var member = removed.Member;
if (member.Address == _cluster.SelfAddress)
Context.Stop(Self);
else if (IsMatchingRole(member))
else if (IsMatchingRole(member, _role))
{
_nodes.Remove(member.Address);
_registry.Remove(member.Address);
Expand All @@ -383,7 +380,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
});
Receive<DeltaCount>(_ =>
{
Sender.Tell(deltaCount);
Sender.Tell(_deltaCount);
});
Receive<CountSubscribers>(msg =>
{
Expand Down Expand Up @@ -458,20 +455,17 @@ private IEnumerable<Bucket> CollectDelta(IImmutableDictionary<Address, long> ver
private IEnumerable<string> GetCurrentTopics()
{
var topicPrefix = Self.Path.ToStringWithoutAddress();
foreach (var entry in _registry)
foreach (var (_, bucket) in _registry)
{
var bucket = entry.Value;
foreach (var kv in bucket.Content)
foreach (var (key, _) in bucket.Content)
{
var key = kv.Key;
var value = kv.Value;
if (key.StartsWith(topicPrefix))
if (!key.StartsWith(topicPrefix))
continue;

var topic = key[(topicPrefix.Length + 1)..];
if (!topic.Contains('/'))
{
var topic = key.Substring(topicPrefix.Length + 1);
if (!topic.Contains('/'))
{
yield return Uri.EscapeDataString(topic);
}
yield return topic;
}
}
}
Expand Down Expand Up @@ -501,30 +495,28 @@ private void IgnoreOrSendToDeadLetters(object message)

private void PublishMessage(string path, IWrappedMessage publish, bool allButSelf = false)
{
var counter = 0;
foreach (var r in Refs())
{
if (r == null) continue;
r.Forward(publish.Message);
counter++;
}

if (counter == 0) IgnoreOrSendToDeadLetters(publish);
return;

IEnumerable<IActorRef> Refs()
{
foreach (var entry in _registry)
foreach (var (address, bucket) in _registry)
{
var address = entry.Key;
var bucket = entry.Value;

if (!(allButSelf && address == _cluster.SelfAddress) && bucket.Content.TryGetValue(path, out var valueHolder))
{
if (valueHolder != null && !valueHolder.Ref.IsNobody())
yield return valueHolder.Ref;
}
}
}

var counter = 0;
foreach (var r in Refs())
{
if (r == null) continue;
r.Forward(publish.Message);
counter++;
}

if (counter == 0) IgnoreOrSendToDeadLetters(publish);
}

private void PublishToEachGroup(string path, Publish publish)
Expand Down Expand Up @@ -552,29 +544,27 @@ private void PublishToEachGroup(string path, Publish publish)

private IEnumerable<KeyValuePair<string, Routee>> ExtractGroups(string prefix, string lastKey)
{
foreach (var bucket in _registry.Values)
foreach (var (_, bucket) in _registry)
{
//TODO: optimize into tree-aware key range [prefix, lastKey]
foreach (var keyVal in bucket.Content.Where(kv => kv.Key.CompareTo(prefix) != -1 && kv.Key.CompareTo(lastKey) != 1))
foreach (var (key, value) in bucket.Content.Where(kv => kv.Key.CompareTo(prefix) != -1 && kv.Key.CompareTo(lastKey) != 1))
{
yield return new KeyValuePair<string, Routee>(keyVal.Key, keyVal.Value.Routee);
yield return new KeyValuePair<string, Routee>(key, value.Routee);
}
}
}

private void HandlePrune()
{
var modifications = new Dictionary<Address, Bucket>();
foreach (var entry in _registry)
foreach (var (owner, bucket) in _registry)
{
var owner = entry.Key;
var bucket = entry.Value;

var oldRemoved = bucket.Content
.Where(kv => kv.Value.Ref.IsNobody() && (bucket.Version - kv.Value.Version) > _settings.RemovedTimeToLive.TotalMilliseconds)
.Select(kv => kv.Key);
.Select(kv => kv.Key)
.ToArray();

if (oldRemoved.Any())
if (oldRemoved.Length > 0)
{
modifications.Add(owner, new Bucket(bucket.Owner, bucket.Version, bucket.Content.RemoveRange(oldRemoved)));
}
Expand All @@ -588,7 +578,7 @@ private void HandlePrune()

private void HandleGossip()
{
var node = SelectRandomNode(_nodes.Except(new[] { _cluster.SelfAddress }).ToArray());
var node = SelectRandomNode(_nodes.Except([_cluster.SelfAddress]).ToArray());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might be able to make this work without allocating a list - have SelectRandomNode take an IEnumerable and just apply the random value to .Skip instead of the indexer

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That'll also save you the trouble of fully evaluating the list

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll give that a try

if (node != null)
GossipTo(node);
}
Expand All @@ -599,7 +589,8 @@ private void GossipTo(Address address)
sel.Tell(new Status(versions: OwnVersions, isReplyToStatus: false));
}

private Address SelectRandomNode(IList<Address> addresses)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static Address SelectRandomNode(IList<Address> addresses)
{
if (addresses == null || addresses.Count == 0) return null;
return addresses[ThreadLocalRandom.Current.Next(addresses.Count)];
Expand All @@ -613,26 +604,30 @@ protected override void PreStart()
base.PreStart();
if (_cluster.IsTerminated) throw new IllegalStateException("Cluster node must not be terminated");
_cluster.Subscribe(Self, typeof(ClusterEvent.IMemberEvent));

//Start periodic gossip to random nodes in cluster
Timers.StartPeriodicTimer(GossipTimerKey, GossipTick.Instance, _settings.GossipInterval, _settings.GossipInterval, Self);
Timers.StartPeriodicTimer(PruneTimerKey, Prune.Instance, _pruneInterval, _pruneInterval, Self);
}

/// <summary>
/// TBD
/// </summary>
protected override void PostStop()
{
Timers.CancelAll();
base.PostStop();
_cluster.Unsubscribe(Self);
_gossipCancelable.Cancel();
_pruneCancelable.Cancel();
}

private bool IsMatchingRole(Member member)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static bool IsMatchingRole(Member member, string role)
{
return string.IsNullOrEmpty(_settings.Role) || member.HasRole(_settings.Role);
return string.IsNullOrEmpty(role) || member.HasRole(role);
}

// the version is a timestamp because it is also used when pruning removed entries
private long _version = 0L;
private long _version;
private long NextVersion()
{
var current = DateTime.UtcNow.Ticks / 10000;
Expand Down
Loading
Loading