diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/PublishSubscribe/DistributedPubSubPublishWithAckSpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/PublishSubscribe/DistributedPubSubPublishWithAckSpec.cs new file mode 100644 index 00000000000..800c633c8d2 --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/PublishSubscribe/DistributedPubSubPublishWithAckSpec.cs @@ -0,0 +1,147 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using Akka.Actor; +using Akka.Cluster.TestKit; +using Akka.Cluster.Tools.PublishSubscribe; +using Akka.Configuration; +using Akka.Event; +using Akka.MultiNode.TestAdapter; +using Akka.Remote.TestKit; +using FluentAssertions; +using FluentAssertions.Extensions; + +namespace Akka.Cluster.Tools.Tests.MultiNode.PublishSubscribe; + +public class DistributedPubSubPublishWithAckSpecSpecConfig : MultiNodeConfig +{ + public readonly RoleName First; + public readonly RoleName Second; + + public DistributedPubSubPublishWithAckSpecSpecConfig() + { + First = Role("first"); + Second = Role("second"); + + CommonConfig = ConfigurationFactory.ParseString( + """ + akka.loglevel = INFO + akka.actor.provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster" + akka.actor.serialize-messages = off + akka.remote.log-remote-lifecycle-events = off + akka.cluster.auto-down-unreachable-after = 0s + akka.cluster.pub-sub.max-delta-elements = 500 + akka.cluster.pub-sub.buffered-messages.max-per-topic = 2 + akka.cluster.pub-sub.buffered-messages.timeout-check-interval = 200ms + akka.testconductor.query-timeout = 1m # we were having timeouts shutting down nodes with 5s default + """).WithFallback(DistributedPubSub.DefaultConfig()); + } +} + +public class DistributedPubSubPublishWithAckSpec : MultiNodeClusterSpec +{ + #region setup + private readonly RoleName _first; + private readonly RoleName _second; + + public DistributedPubSubPublishWithAckSpec() : this(new DistributedPubSubPublishWithAckSpecSpecConfig()) + { + } + + protected DistributedPubSubPublishWithAckSpec(DistributedPubSubPublishWithAckSpecSpecConfig config) : base(config, typeof(DistributedPubSubMediatorSpec)) + { + _first = config.First; + _second = config.Second; + } + + public IActorRef Mediator => DistributedPubSub.Get(Sys).Mediator; + + private void Join(RoleName from, RoleName to) + { + RunOn(() => + { + Cluster.Join(Node(to).Address); + _ = DistributedPubSub.Get(Sys).Mediator; + }, from); + EnterBarrier(from.Name + "-joined"); + } + + #endregion + + [MultiNodeFact] + public void DistributedPubSubPublishWithAckSpecs() + { + DistributedPubSubMediator_must_startup_2_nodes_cluster(); + PublishWithAck_must_buffer_message(); + Second_node_must_subscribe(); + PublishWithAck_must_deliver_buffered_messages(); + } + + public void DistributedPubSubMediator_must_startup_2_nodes_cluster() + { + Within(TimeSpan.FromSeconds(15), () => + { + Join(_first, _first); + Join(_second, _first); + EnterBarrier("after-1"); + }); + } + + public void PublishWithAck_must_buffer_message() + { + Within(15.Seconds(), () => + { + RunOn(() => + { + Mediator.Tell(new PublishWithAck("content", "hi-1!", 20.Seconds())); + Mediator.Tell(new PublishWithAck("content", "hi-2!", 20.Seconds())); + ExpectNoMsg(200.Milliseconds()); + }, _first); + + RunOn(() => + { + ExpectNoMsg(200.Milliseconds()); + }, _second); + + EnterBarrier("after-2"); + }); + } + + public void Second_node_must_subscribe() + { + RunOn(() => + { + Mediator.Tell(new Subscribe("content", TestActor)); + // SubscribeAck must arrive first + ExpectMsg(); + Sys.Log.Info("Second node subscribed"); + }, _second); + + EnterBarrier("after-3"); + } + + public void PublishWithAck_must_deliver_buffered_messages() + { + Within(TimeSpan.FromSeconds(15), () => + { + RunOn(() => + { + ExpectMsg().Message.Message.Should().Be("hi-1!"); + ExpectMsg().Message.Message.Should().Be("hi-2!"); + }, _first); + + RunOn(() => + { + ExpectMsg("hi-1!"); + ExpectMsg("hi-2!"); + }, _second); + + EnterBarrier("after-4"); + }); + } +} diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/PublishSubscribe/DistributedPubSubConfigSpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/PublishSubscribe/DistributedPubSubConfigSpec.cs index 70a05a47273..9af89417c12 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/PublishSubscribe/DistributedPubSubConfigSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/PublishSubscribe/DistributedPubSubConfigSpec.cs @@ -11,6 +11,7 @@ using Akka.Configuration; using Akka.Routing; using Akka.TestKit; +using FluentAssertions.Extensions; using Xunit; namespace Akka.Cluster.Tools.Tests.PublishSubscribe @@ -40,6 +41,8 @@ public void DistributedPubSubSettings_must_have_default_config() distributedPubSubSettings.RemovedTimeToLive.TotalSeconds.ShouldBe(120); distributedPubSubSettings.SendToDeadLettersWhenNoSubscribers.ShouldBe(true); distributedPubSubSettings.MaxDeltaElements.ShouldBe(3000); + distributedPubSubSettings.MaxBufferedMessagePerTopic.ShouldBe(1000); + distributedPubSubSettings.BufferedMessageTimeoutCheckInterval.ShouldBe(500.Milliseconds()); var config = Sys.Settings.Config.GetConfig("akka.cluster.pub-sub"); Assert.False(config.IsNullOrEmpty()); diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/PublishSubscribe/DistributedPubSubPublishWithAckSpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/PublishSubscribe/DistributedPubSubPublishWithAckSpec.cs new file mode 100644 index 00000000000..f94e241f15e --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/PublishSubscribe/DistributedPubSubPublishWithAckSpec.cs @@ -0,0 +1,128 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System.Linq; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Cluster.Tools.PublishSubscribe; +using Akka.Configuration; +using Akka.TestKit; +using FluentAssertions.Extensions; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Cluster.Tools.Tests.PublishSubscribe; + +[Collection(nameof(DistributedPubSubMediatorSpec))] +public class DistributedPubSubPublishWithAckSpec : AkkaSpec +{ + public DistributedPubSubPublishWithAckSpec(ITestOutputHelper output) : base(GetConfig(), output) + { + } + + private static Config GetConfig() + { + return ConfigurationFactory.ParseString( + """ + akka.actor.provider = cluster + akka.cluster.pub-sub.buffered-messages.max-per-topic = 2 + akka.cluster.pub-sub.buffered-messages.timeout-check-interval = 100ms + """); + } + + [Fact(DisplayName = "PublishWithAck message should be buffered")] + public async Task PublishWithAckBufferTest() + { + var mediator = DistributedPubSub.Get(Sys).Mediator; + mediator.Tell(new PublishWithAck("topic", "msg-1", 10.Seconds())); + mediator.Tell(new PublishWithAck("topic", "msg-2", 10.Seconds())); + + // Should not send NACKs + await ExpectNoMsgAsync(200.Milliseconds()); + + // Should not re-send messages if topic does not match + mediator.Tell(new Subscribe("topic2", TestActor)); + var subAck = await ExpectMsgAsync(); + subAck.Subscribe.Topic.ShouldBe("topic2"); + await ExpectNoMsgAsync(200.Milliseconds()); + + // Should not re-send messages if topic match + mediator.Tell(new Subscribe("topic", TestActor)); + subAck = await ExpectMsgAsync(); + subAck.Subscribe.Topic.ShouldBe("topic"); + + await ExpectMsgAllOfMatchingPredicatesAsync([ + PredicateInfo.Create(msg => msg is "msg-1"), + PredicateInfo.Create(msg => msg.Message is { Message: "msg-1", Topic: "topic" }), + PredicateInfo.Create(msg => msg is "msg-2"), + PredicateInfo.Create(msg => msg.Message is { Message: "msg-2", Topic: "topic" }) ]).ToListAsync(); + + // Should not send extra messages + await ExpectNoMsgAsync(200.Milliseconds()); + } + + [Fact(DisplayName = "PublishWithAck message buffer should NACK buffer overflowed messages")] + public async Task PublishWithAckBufferOverflowTest() + { + var mediator = DistributedPubSub.Get(Sys).Mediator; + mediator.Tell(new PublishWithAck("topic", "msg-1", 10.Seconds())); + mediator.Tell(new PublishWithAck("topic", "msg-2", 10.Seconds())); + + // Should dequeue and NACK msg-1, the oldest message in the buffer + mediator.Tell(new PublishWithAck("topic", "msg-3", 10.Seconds())); + var failed = await ExpectMsgAsync(); + failed.Message.Topic.ShouldBe("topic"); + failed.Message.Message.ShouldBe("msg-1"); + + // Should dequeue and NACK msg-2, the oldest message in the buffer + mediator.Tell(new PublishWithAck("topic", "msg-4", 10.Seconds())); + failed = await ExpectMsgAsync(); + failed.Message.Topic.ShouldBe("topic"); + failed.Message.Message.ShouldBe("msg-2"); + + // Should not re-send messages if topic match. + // msg-1 and msg-2 should never be re-sent + mediator.Tell(new Subscribe("topic", TestActor)); + var subAck = await ExpectMsgAsync(); + subAck.Subscribe.Topic.ShouldBe("topic"); + + await ExpectMsgAllOfMatchingPredicatesAsync([ + PredicateInfo.Create(msg => msg is "msg-3"), + PredicateInfo.Create(msg => msg.Message is { Message: "msg-3", Topic: "topic" }), + PredicateInfo.Create(msg => msg is "msg-4"), + PredicateInfo.Create(msg => msg.Message is { Message: "msg-4", Topic: "topic" }) ]).ToListAsync(); + + // Should not send extra messages + await ExpectNoMsgAsync(200.Milliseconds()); + } + + [Fact(DisplayName = "PublishWithAck message should fail when timed-out")] + public async Task PublishWithAckMessageTimeoutTest() + { + var mediator = DistributedPubSub.Get(Sys).Mediator; + mediator.Tell(new PublishWithAck("topic", "msg-1", 300.Milliseconds())); + mediator.Tell(new PublishWithAck("topic", "msg-2", 10.Milliseconds())); + + // msg-2 should time out first + var failed = await ExpectMsgAsync(); + failed.Message.Topic.ShouldBe("topic"); + failed.Message.Message.ShouldBe("msg-2"); + + // msg-1 should time out second + failed = await ExpectMsgAsync(); + failed.Message.Topic.ShouldBe("topic"); + failed.Message.Message.ShouldBe("msg-1"); + + // Buffer is empty, should not re-send anything + mediator.Tell(new Subscribe("topic", TestActor)); + var subAck = await ExpectMsgAsync(); + subAck.Subscribe.Topic.ShouldBe("topic"); + + // Should not send extra messages + await ExpectNoMsgAsync(200.Milliseconds()); + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedMessages.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedMessages.cs index 027e41f52e9..4154cecc804 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedMessages.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedMessages.cs @@ -415,6 +415,35 @@ public override string ToString() return $"Publish"; } } + + public sealed record PublishWithAck : IDistributedPubSubMessage, IWrappedMessage + { + public PublishWithAck(string topic, object message, TimeSpan timeout, bool sendOneMessageToEachGroup = false) + { + if(timeout.Ticks <= 0) + throw new ArgumentException("Timeout must be greater than zero", nameof(timeout)); + + Topic = topic; + Message = message; + Timeout = timeout; + SendOneMessageToEachGroup = sendOneMessageToEachGroup; + } + + public string Topic { get; } + public object Message { get; } + public TimeSpan Timeout { get; } + public bool SendOneMessageToEachGroup { get; } + } + + public enum PublishFailReason + { + Timeout, + MediatorShuttingDown + } + + public sealed record PublishFailed(PublishWithAck Message, PublishFailReason Reason): IDeadLetterSuppression; + + public sealed record PublishSucceeded(PublishWithAck Message): IDeadLetterSuppression; /// /// TBD diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs index 8bf982d6588..ccb07eefc00 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs @@ -14,6 +14,7 @@ using Akka.Cluster.Tools.PublishSubscribe.Internal; using Akka.Event; using Akka.Pattern; +using Akka.Remote; using Akka.Routing; using Akka.Util; using Status = Akka.Cluster.Tools.PublishSubscribe.Internal.Status; @@ -102,6 +103,7 @@ public class DistributedPubSubMediator : ReceiveActor, IWithTimers { private const string GossipTimerKey = "GossipTimer"; private const string PruneTimerKey = "PruneTimer"; + private const string PruneBufferTimerKey = "PruneBufferTimer"; /// /// TBD @@ -127,20 +129,20 @@ public static Props Props(DistributedPubSubSettings settings) private readonly string _topicPrefix; private readonly PubSubCache _cache; + private readonly int _maxBufferPerTopic; + private readonly TimeSpan _bufferedMessageTimeoutCheckInterval; + private readonly Dictionary> _bufferedMessages = new(); + private readonly List _newlyAddedKeys = new(); + public ITimerScheduler Timers { get; set; } /// /// TBD /// public IImmutableDictionary OwnVersions - { - get - { - return _registry - .Select(entry => new KeyValuePair(entry.Key, entry.Value.Version)) - .ToImmutableDictionary(kv => kv.Key, kv => kv.Value); - } - } + => _registry + .Select(entry => new KeyValuePair(entry.Key, entry.Value.Version)) + .ToImmutableDictionary(kv => kv.Key, kv => kv.Value); /// /// TBD @@ -161,6 +163,9 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) _log = Context.GetLogger(); _role = settings.Role; + _maxBufferPerTopic = settings.MaxBufferedMessagePerTopic; + _bufferedMessageTimeoutCheckInterval = settings.BufferedMessageTimeoutCheckInterval; + _pruneInterval = new TimeSpan(_settings.RemovedTimeToLive.Ticks / 2); _buffer = new PerGroupingBuffer(); @@ -195,7 +200,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) new Router(_settings.RoutingLogic, routees.ToArray()).Route( Internal.Utils.WrapIfNeeded(send.Message), Sender); else - IgnoreOrSendToDeadLetters(send); + IgnoreOrSendToDeadLetters(send, Sender); }); Receive(sendToAll => { @@ -211,6 +216,15 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) else PublishMessage(key, publish); }); + Receive(publish => + { + var encodedTopic = _cache.EncodeName(publish.Topic); + var key = _cache.MakeKey(Self.Path, encodedTopic); + if (publish.SendOneMessageToEachGroup) + PublishToEachGroup(key, publish); + else + PublishMessage(key, publish); + }); Receive(put => { if (put.Ref.Path.Address.HasGlobalScope) @@ -267,6 +281,12 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) Receive(subscribed => { subscribed.Subscriber.Tell(subscribed.Ack); + + if (_newlyAddedKeys.Count <= 0) + return; + + Self.Tell(new NewBucketKeysAdded(_newlyAddedKeys.ToArray())); + _newlyAddedKeys.Clear(); }); Receive(unsubscribe => { @@ -320,14 +340,54 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) if (!_registry.TryGetValue(bucket.Owner, out var myBucket)) myBucket = new Bucket(bucket.Owner); - if (bucket.Version > myBucket.Version) - _registry[bucket.Owner] = new Bucket(myBucket.Owner, bucket.Version, myBucket.Content.SetItems(bucket.Content)); + if (bucket.Version <= myBucket.Version) + continue; + + // Create a bucket diff, we're only interested in new items being added + var keys = new HashSet(bucket.Content.Keys); + keys.ExceptWith(new HashSet(myBucket.Content.Keys)); + if (keys.Count > 0) + { + // Send the diff to ourselves + Self.Tell(new NewBucketKeysAdded(keys.ToArray())); + } + + // Merge remote bucket with ours + var newBucket = new Bucket(myBucket.Owner, bucket.Version, myBucket.Content.SetItems(bucket.Content)); + _registry[bucket.Owner] = newBucket; } } } }); Receive(_ => HandleGossip()); Receive(_ => HandlePrune()); + Receive(subs => + { + foreach (var key in subs.Topics) + { + if (!_bufferedMessages.TryGetValue(key, out var buffer)) + continue; + + foreach (var bufferedMessage in buffer) + { + Self.Tell(bufferedMessage.Message, bufferedMessage.Sender); + } + + _bufferedMessages.Remove(key); + } + }); + Receive(_ => + { + foreach (var buffer in _bufferedMessages.Values) + { + var removed = buffer.Where(bufferedMessage => bufferedMessage.Deadline.IsOverdue).ToArray(); + foreach (var removedMsg in removed) + { + buffer.Remove(removedMsg); + IgnoreOrSendToDeadLetters(removedMsg.Message, removedMsg.Sender); + } + } + }); Receive(terminated => { var key = Internal.Utils.MakeKey(terminated.ActorRef); @@ -381,7 +441,6 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) if (member.Address == _cluster.SelfAddress) { Context.Stop(Self); - _cache.Clear(); } else if (IsMatchingRole(member, _role)) { @@ -417,6 +476,32 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) }); } + private void BufferMessageOrDeadLetter(PublishWithAck message, IActorRef sender) + { + var topic = message.Topic; + var encodedTopic = _cache.EncodeName(topic); + var key = _cache.MakeKey(Self.Path, encodedTopic); + + if (!_bufferedMessages.TryGetValue(key, out var buffer)) + { + buffer = []; + _bufferedMessages[key] = buffer; + } + + if(buffer.Count >= _maxBufferPerTopic) + { + _log.Warning("PublishWithAck buffer overflowed for topic [{0}]. New message inserted: {1}", topic, message); + while (buffer.Count >= _maxBufferPerTopic) + { + var removed = buffer[0]; + buffer.RemoveAt(0); + IgnoreOrSendToDeadLetters(removed.Message, removed.Sender); + } + } + + buffer.Add(new BufferedMessage(message, new Deadline(DateTime.UtcNow + message.Timeout), sender)); + } + private bool OtherHasNewerVersions(IImmutableDictionary versions) { return versions.Any(entry => @@ -438,11 +523,8 @@ private IEnumerable CollectDelta(IImmutableDictionary ver } var count = 0; - foreach (var entry in filledOtherVersions) + foreach (var (owner, v) in filledOtherVersions) { - var owner = entry.Key; - var v = entry.Value; - if (!_registry.TryGetValue(owner, out var bucket)) bucket = new Bucket(owner); @@ -493,20 +575,30 @@ private void HandleRegisterTopic(IActorRef actorRef) private void PutToRegistry(string key, IActorRef value) { var v = NextVersion(); - if (!_registry.TryGetValue(_cluster.SelfAddress, out var bucket)) - _registry.Add(_cluster.SelfAddress, - new Bucket(_cluster.SelfAddress, v, ImmutableDictionary.Empty.Add(key, new ValueHolder(v, value)))); - else - _registry[_cluster.SelfAddress] = new Bucket(bucket.Owner, v, bucket.Content.SetItem(key, new ValueHolder(v, value))); + var newBucket = _registry.TryGetValue(_cluster.SelfAddress, out var bucket) + ? new Bucket(bucket.Owner, v, bucket.Content.SetItem(key, new ValueHolder(v, value))) + : new Bucket(_cluster.SelfAddress, v, ImmutableDictionary.Empty.Add(key, new ValueHolder(v, value))); + + _registry[_cluster.SelfAddress] = newBucket; + + // NOTE: We can't process this immediately here because message publishing needs to be done + // AFTER we send `SubscribeAck` + _newlyAddedKeys.Add(key); } - private void IgnoreOrSendToDeadLetters(IWrappedMessage message) + private void IgnoreOrSendToDeadLetters(object message, IActorRef sender) { + if (message is PublishWithAck needAck) + { + sender.Tell(new PublishFailed(needAck, PublishFailReason.Timeout)); + } + if (_settings.SendToDeadLettersWhenNoSubscribers) { var topic = message switch { Publish publish => publish.Topic, + PublishWithAck publish => publish.Topic, Send send => $"Send:{send.Path}", _ => null }; @@ -514,7 +606,7 @@ private void IgnoreOrSendToDeadLetters(IWrappedMessage message) // Use the specialized DeadLetterWithNoSubscribers class to clearly indicate // that the message was not delivered because there were no subscribers, // not because the mediator itself is dead. - var deadLetter = new DeadLetterWithNoSubscribers(message, topic, Sender, Context.Self); + var deadLetter = new DeadLetterWithNoSubscribers(message, topic, sender, Context.Self); Context.System.DeadLetters.Tell(deadLetter); } } @@ -529,7 +621,16 @@ private void PublishMessage(string path, IWrappedMessage publish, bool allButSel counter++; } - if (counter == 0) IgnoreOrSendToDeadLetters(publish); + if (counter == 0) + { + if(publish is PublishWithAck needAck) + BufferMessageOrDeadLetter(needAck, Sender); + else + IgnoreOrSendToDeadLetters(publish, Sender); + } + else if(publish is PublishWithAck needAck) + Sender.Tell(new PublishSucceeded(needAck)); + return; IEnumerable Refs() @@ -545,7 +646,7 @@ IEnumerable Refs() } } - private void PublishToEachGroup(string path, Publish publish) + private void PublishToEachGroup(string path, IWrappedMessage publish) { var prefix = path + "/"; var lastKey = path + "0"; // '0' is the next char of '/' @@ -555,7 +656,10 @@ private void PublishToEachGroup(string path, Publish publish) if (groups.Count == 0) { - IgnoreOrSendToDeadLetters(publish); + if(publish is PublishWithAck needAck) + BufferMessageOrDeadLetter(needAck, Sender); + else + IgnoreOrSendToDeadLetters(publish, Sender); } else { @@ -565,6 +669,9 @@ private void PublishToEachGroup(string path, Publish publish) if (routees.Length != 0) new Router(_settings.RoutingLogic, routees).Route(wrappedMessage, Sender); } + + if(publish is PublishWithAck needAck) + Sender.Tell(new PublishSucceeded(needAck)); } } @@ -582,8 +689,8 @@ private IEnumerable> ExtractGroups(string prefix, s private void HandlePrune() { - var modifications = new Dictionary(); - foreach (var (owner, bucket) in _registry) + var modifications = new List(); + foreach (var bucket in _registry.Values) { var oldRemoved = bucket.Content .Where(kv => kv.Value.Ref.IsNobody() && (bucket.Version - kv.Value.Version) > _settings.RemovedTimeToLive.TotalMilliseconds) @@ -592,13 +699,13 @@ private void HandlePrune() if (oldRemoved.Length > 0) { - modifications.Add(owner, new Bucket(bucket.Owner, bucket.Version, bucket.Content.RemoveRange(oldRemoved))); + modifications.Add(new Bucket(bucket.Owner, bucket.Version, bucket.Content.RemoveRange(oldRemoved))); } } foreach (var entry in modifications) { - _registry[entry.Key] = entry.Value; + _registry[entry.Owner] = entry; } } @@ -634,6 +741,7 @@ protected override void PreStart() //Start periodic gossip to random nodes in cluster Timers.StartPeriodicTimer(GossipTimerKey, GossipTick.Instance, _settings.GossipInterval, _settings.GossipInterval, Self); Timers.StartPeriodicTimer(PruneTimerKey, Prune.Instance, _pruneInterval, _pruneInterval, Self); + Timers.StartPeriodicTimer(PruneBufferTimerKey, PruneBufferTick.Instance, _bufferedMessageTimeoutCheckInterval, _bufferedMessageTimeoutCheckInterval, Self); } /// @@ -643,6 +751,20 @@ protected override void PostStop() { Timers.CancelAll(); base.PostStop(); + + // We're shutting down, clear buffer + foreach (var list in _bufferedMessages.Values) + { + foreach (var bufferedMessage in list) + { + bufferedMessage.Sender.Tell(new PublishFailed( + (PublishWithAck)bufferedMessage.Message, + PublishFailReason.MediatorShuttingDown)); + } + } + + _bufferedMessages.Clear(); + _cache.Clear(); _cluster.Unsubscribe(Self); } diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubSettings.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubSettings.cs index d4438c483bb..c062b9462ba 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubSettings.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubSettings.cs @@ -15,7 +15,7 @@ namespace Akka.Cluster.Tools.PublishSubscribe /// /// TBD /// - public sealed class DistributedPubSubSettings : INoSerializationVerificationNeeded + public sealed record DistributedPubSubSettings : INoSerializationVerificationNeeded { /// /// Creates cluster publish/subscribe settings from the default configuration `akka.cluster.pub-sub`. @@ -45,70 +45,71 @@ public static DistributedPubSubSettings Create(Config config) if (config.IsNullOrEmpty()) throw ConfigurationException.NullOrEmptyConfig(); - RoutingLogic routingLogic = null; - var routingLogicName = config.GetString("routing-logic"); - switch (routingLogicName) + var routingLogic = config.GetString("routing-logic")?.ToLowerInvariant() switch { - case "random": - routingLogic = new RandomLogic(); - break; - case "round-robin": - routingLogic = new RoundRobinRoutingLogic(); - break; - case "broadcast": - routingLogic = new BroadcastRoutingLogic(); - break; - case "consistent-hashing": - throw new ArgumentException("Consistent hashing routing logic cannot be used by the pub-sub mediator"); - default: - throw new ArgumentException("Unknown routing logic is tried to be applied to the pub-sub mediator: " + - routingLogicName); - } + "random" => (RoutingLogic) new RandomLogic(), + "round-robin" => new RoundRobinRoutingLogic(), + "broadcast" => new BroadcastRoutingLogic(), + "consistent-hashing" => throw new ArgumentException("Consistent hashing routing logic cannot be used by the pub-sub mediator"), + var unknown => throw new ArgumentException($"Unknown routing logic is tried to be applied to the pub-sub mediator: {unknown}") + }; // TODO: This will fail if DistributedPubSub.DefaultConfig() is not inside the fallback chain. // TODO: "gossip-interval" key depends on Config.GetTimeSpan() to return a TimeSpan.Zero default. // TODO: "removed-time-to-live" key depends on Config.GetTimeSpan() to return a TimeSpan.Zero default. // TODO: "max-delta-elements" key depends on Config.GetInt() to return a 0 default. return new DistributedPubSubSettings( - config.GetString("role", null), + config.GetString("role", ""), routingLogic, config.GetTimeSpan("gossip-interval"), config.GetTimeSpan("removed-time-to-live"), config.GetInt("max-delta-elements"), - config.GetBoolean("send-to-dead-letters-when-no-subscribers")); + config.GetBoolean("send-to-dead-letters-when-no-subscribers"), + config.GetInt("buffered-messages.max-per-topic"), + config.GetTimeSpan("buffered-messages.timeout-check-interval")); } /// /// The mediator starts on members tagged with this role. Uses all if undefined. /// - public string Role { get; } + public string Role { get; private init; } /// /// The routing logic to use for . /// - public RoutingLogic RoutingLogic { get; } + public RoutingLogic RoutingLogic { get; private init; } /// /// How often the should send out gossip information /// - public TimeSpan GossipInterval { get; } + public TimeSpan GossipInterval { get; private init; } /// /// Removed entries are pruned after this duration. /// - public TimeSpan RemovedTimeToLive { get; } + public TimeSpan RemovedTimeToLive { get; private init; } /// /// Maximum number of elements to transfer in one message when synchronizing the registries. /// Next chunk will be transferred in next round of gossip. /// - public int MaxDeltaElements { get; } + public int MaxDeltaElements { get; private init; } /// /// When a message is published to a topic with no subscribers send it to the dead letters. /// - public bool SendToDeadLettersWhenNoSubscribers { get; } - + public bool SendToDeadLettersWhenNoSubscribers { get; private init; } + + /// + /// The maximum message buffer size for each topic + /// + public int MaxBufferedMessagePerTopic { get; private init; } + + /// + /// Determine the interval on which all buffered message will be checked for timeout condition + /// + public TimeSpan BufferedMessageTimeoutCheckInterval { get; private init; } + /// /// Creates a new instance of the . /// @@ -119,6 +120,7 @@ public static DistributedPubSubSettings Create(Config config) /// The maximum number of delta elements that can be propagated in a single gossip tick. /// When a message is published to a topic with no subscribers send it to the dead letters. /// Thrown if a user tries to use a with routingLogic. + [Obsolete("Use .ctor that supports WaitForSubscribers instead. Since 1.4.42")] public DistributedPubSubSettings( string role, RoutingLogic routingLogic, @@ -126,6 +128,39 @@ public DistributedPubSubSettings( TimeSpan removedTimeToLive, int maxDeltaElements, bool sendToDeadLettersWhenNoSubscribers) + : this( + role: role, + routingLogic: routingLogic, + gossipInterval: gossipInterval, + removedTimeToLive: removedTimeToLive, + maxDeltaElements: maxDeltaElements, + sendToDeadLettersWhenNoSubscribers: sendToDeadLettersWhenNoSubscribers, + maxBufferedMessagePerTopic: 0, + bufferedMessageTimeoutCheckInterval: TimeSpan.Zero) + { + } + + /// + /// Creates a new instance of the . + /// + /// The role that will host instances. + /// Optional. The routing logic used for distributing messages for topic groups. + /// The gossip interval for propagating topic/subscriber data to other mediators. + /// The amount of time it takes to prune a deactivated subscriber from the network. + /// The maximum number of delta elements that can be propagated in a single gossip tick. + /// When a message is published to a topic with no subscribers send it to the dead letters. + /// Maximum message buffer size for each topic + /// Buffered message timeout condition check interval + /// Thrown if a user tries to use a with routingLogic. + public DistributedPubSubSettings( + string role, + RoutingLogic routingLogic, + TimeSpan gossipInterval, + TimeSpan removedTimeToLive, + int maxDeltaElements, + bool sendToDeadLettersWhenNoSubscribers, + int maxBufferedMessagePerTopic, + TimeSpan bufferedMessageTimeoutCheckInterval) { if (routingLogic is ConsistentHashingRoutingLogic) { @@ -138,66 +173,38 @@ public DistributedPubSubSettings( RemovedTimeToLive = removedTimeToLive; MaxDeltaElements = maxDeltaElements; SendToDeadLettersWhenNoSubscribers = sendToDeadLettersWhenNoSubscribers; + MaxBufferedMessagePerTopic = maxBufferedMessagePerTopic; + BufferedMessageTimeoutCheckInterval = bufferedMessageTimeoutCheckInterval; } - /// - /// TBD - /// - /// TBD - /// TBD public DistributedPubSubSettings WithRole(string role) - { - return new DistributedPubSubSettings(role, RoutingLogic, GossipInterval, RemovedTimeToLive, MaxDeltaElements, SendToDeadLettersWhenNoSubscribers); - } + => this with { Role = role }; - /// - /// TBD - /// - /// TBD - /// TBD public DistributedPubSubSettings WithRoutingLogic(RoutingLogic routingLogic) { - return new DistributedPubSubSettings(Role, routingLogic, GossipInterval, RemovedTimeToLive, MaxDeltaElements, SendToDeadLettersWhenNoSubscribers); + if (routingLogic is ConsistentHashingRoutingLogic) + throw new ArgumentException("Consistent hashing routing logic cannot be used by the pub-sub mediator"); + + return this with { RoutingLogic = routingLogic }; } - /// - /// TBD - /// - /// TBD - /// TBD public DistributedPubSubSettings WithGossipInterval(TimeSpan gossipInterval) - { - return new DistributedPubSubSettings(Role, RoutingLogic, gossipInterval, RemovedTimeToLive, MaxDeltaElements, SendToDeadLettersWhenNoSubscribers); - } + => this with { GossipInterval = gossipInterval }; - /// - /// TBD - /// - /// TBD - /// TBD public DistributedPubSubSettings WithRemovedTimeToLive(TimeSpan removedTtl) - { - return new DistributedPubSubSettings(Role, RoutingLogic, GossipInterval, removedTtl, MaxDeltaElements, SendToDeadLettersWhenNoSubscribers); - } + => this with { RemovedTimeToLive = removedTtl }; - /// - /// TBD - /// - /// TBD - /// TBD public DistributedPubSubSettings WithMaxDeltaElements(int maxDeltaElements) - { - return new DistributedPubSubSettings(Role, RoutingLogic, GossipInterval, RemovedTimeToLive, maxDeltaElements, SendToDeadLettersWhenNoSubscribers); - } + => this with { MaxDeltaElements = maxDeltaElements }; - /// - /// TBD - /// - /// TBD - /// public DistributedPubSubSettings WithSendToDeadLettersWhenNoSubscribers(bool sendToDeadLetterWhenNoSubscribers) - { - return new DistributedPubSubSettings(Role, RoutingLogic, GossipInterval, RemovedTimeToLive, MaxDeltaElements, sendToDeadLetterWhenNoSubscribers); - } + => this with { SendToDeadLettersWhenNoSubscribers = sendToDeadLetterWhenNoSubscribers }; + + public DistributedPubSubSettings WithMaxBufferedMessagePerTopic(int maxBufferedMessagePerTopic) + => this with { MaxBufferedMessagePerTopic = maxBufferedMessagePerTopic }; + + public DistributedPubSubSettings WithBufferedMessageTimeoutCheckInterval(TimeSpan bufferedMessageTimeoutCheckInterval) + => this with { BufferedMessageTimeoutCheckInterval = bufferedMessageTimeoutCheckInterval }; + } } diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/TopicMessages.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/TopicMessages.cs index 481703f241c..7de8496244f 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/TopicMessages.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/TopicMessages.cs @@ -12,6 +12,7 @@ using Akka.Actor; using Akka.Annotations; using Akka.Event; +using Akka.Remote; using Akka.Routing; namespace Akka.Cluster.Tools.PublishSubscribe.Internal @@ -310,12 +311,31 @@ private DeltaCount() { } /// TBD /// [Serializable] - internal sealed class GossipTick + internal sealed class GossipTick: IDeadLetterSuppression { public static GossipTick Instance { get; } = new(); private GossipTick() { } } + /// + /// Internal event signalling that a new subscriber has been added to the registry + /// either locally using , , or from a . + /// + internal sealed record NewBucketKeysAdded(IReadOnlyList Topics): IDeadLetterSuppression; + + /// + /// Container for buffered or messages + /// + /// The original message being buffered + /// The deadline where this buffered message should be timed out + internal readonly record struct BufferedMessage(IWrappedMessage Message, Deadline Deadline, IActorRef Sender); + + internal sealed class PruneBufferTick: IDeadLetterSuppression + { + public static PruneBufferTick Instance { get; } = new(); + private PruneBufferTick() { } + } + /// /// TBD /// diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Serialization/DistributedPubSubMessageSerializer.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Serialization/DistributedPubSubMessageSerializer.cs index 9022f5e1b53..c37c81fae02 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Serialization/DistributedPubSubMessageSerializer.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Serialization/DistributedPubSubMessageSerializer.cs @@ -79,6 +79,8 @@ public override byte[] ToBinary(object obj) return SendToAllToProto(all); case Publish publish: return PublishToProto(publish); + case PublishWithAck: + throw new SerializationException("ClusterClient does not support PublishWithAck"); case SendToOneSubscriber subscriber: return SendToOneSubscriberToProto(subscriber); default: diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/reference.conf b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/reference.conf index 5e7cc6e8cdc..96b0a25364b 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/reference.conf +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/reference.conf @@ -32,6 +32,17 @@ akka.cluster.pub-sub { # When a message is published to a topic with no subscribers send it to the dead letters. send-to-dead-letters-when-no-subscribers = on + # messages only buffered if they're sent using the `PublishWithAck` envelope/wrapper + buffered-messages { + # Set the maximum message buffer size for each topic. + # Default: 1000 messages + max-per-topic = 1000 + + # The interval on which all buffered message will be checked for timeout condition. + # Default: 500 milliseconds + timeout-check-interval = 500ms + } + # The id of the dispatcher to use for DistributedPubSubMediator actors. # If not specified, the internal dispatcher is used. # If specified you need to define the settings of the actual dispatcher. diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt index d0124638e03..8c8ff8a9041 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt @@ -252,10 +252,14 @@ namespace Akka.Cluster.Tools.PublishSubscribe protected override void PreStart() { } public static Akka.Actor.Props Props(Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings settings) { } } - public sealed class DistributedPubSubSettings : Akka.Actor.INoSerializationVerificationNeeded + public sealed class DistributedPubSubSettings : Akka.Actor.INoSerializationVerificationNeeded, System.IEquatable { + [System.ObsoleteAttribute("Use .ctor that supports WaitForSubscribers instead. Since 1.4.42")] public DistributedPubSubSettings(string role, Akka.Routing.RoutingLogic routingLogic, System.TimeSpan gossipInterval, System.TimeSpan removedTimeToLive, int maxDeltaElements, bool sendToDeadLettersWhenNoSubscribers) { } + public DistributedPubSubSettings(string role, Akka.Routing.RoutingLogic routingLogic, System.TimeSpan gossipInterval, System.TimeSpan removedTimeToLive, int maxDeltaElements, bool sendToDeadLettersWhenNoSubscribers, int maxBufferedMessagePerTopic, System.TimeSpan bufferedMessageTimeoutCheckInterval) { } + public System.TimeSpan BufferedMessageTimeoutCheckInterval { get; } public System.TimeSpan GossipInterval { get; } + public int MaxBufferedMessagePerTopic { get; } public int MaxDeltaElements { get; } public System.TimeSpan RemovedTimeToLive { get; } public string Role { get; } @@ -263,7 +267,9 @@ namespace Akka.Cluster.Tools.PublishSubscribe public bool SendToDeadLettersWhenNoSubscribers { get; } public static Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings Create(Akka.Actor.ActorSystem system) { } public static Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings Create(Akka.Configuration.Config config) { } + public Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings WithBufferedMessageTimeoutCheckInterval(System.TimeSpan bufferedMessageTimeoutCheckInterval) { } public Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings WithGossipInterval(System.TimeSpan gossipInterval) { } + public Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings WithMaxBufferedMessagePerTopic(int maxBufferedMessagePerTopic) { } public Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings WithMaxDeltaElements(int maxDeltaElements) { } public Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings WithRemovedTimeToLive(System.TimeSpan removedTtl) { } public Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings WithRole(string role) { } @@ -286,6 +292,30 @@ namespace Akka.Cluster.Tools.PublishSubscribe public override int GetHashCode() { } public override string ToString() { } } + public enum PublishFailReason + { + Timeout = 0, + MediatorShuttingDown = 1, + } + public sealed class PublishFailed : Akka.Event.IDeadLetterSuppression, System.IEquatable + { + public PublishFailed(Akka.Cluster.Tools.PublishSubscribe.PublishWithAck Message, Akka.Cluster.Tools.PublishSubscribe.PublishFailReason Reason) { } + public Akka.Cluster.Tools.PublishSubscribe.PublishWithAck Message { get; set; } + public Akka.Cluster.Tools.PublishSubscribe.PublishFailReason Reason { get; set; } + } + public sealed class PublishSucceeded : Akka.Event.IDeadLetterSuppression, System.IEquatable + { + public PublishSucceeded(Akka.Cluster.Tools.PublishSubscribe.PublishWithAck Message) { } + public Akka.Cluster.Tools.PublishSubscribe.PublishWithAck Message { get; set; } + } + public sealed class PublishWithAck : Akka.Actor.IWrappedMessage, Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, System.IEquatable + { + public PublishWithAck(string topic, object message, System.TimeSpan timeout, bool sendOneMessageToEachGroup = False) { } + public object Message { get; } + public bool SendOneMessageToEachGroup { get; } + public System.TimeSpan Timeout { get; } + public string Topic { get; } + } public sealed class Put : System.IEquatable { public Put(Akka.Actor.IActorRef @ref) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt index e8ed57f9b28..744cb971ab7 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt @@ -252,10 +252,14 @@ namespace Akka.Cluster.Tools.PublishSubscribe protected override void PreStart() { } public static Akka.Actor.Props Props(Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings settings) { } } - public sealed class DistributedPubSubSettings : Akka.Actor.INoSerializationVerificationNeeded + public sealed class DistributedPubSubSettings : Akka.Actor.INoSerializationVerificationNeeded, System.IEquatable { + [System.ObsoleteAttribute("Use .ctor that supports WaitForSubscribers instead. Since 1.4.42")] public DistributedPubSubSettings(string role, Akka.Routing.RoutingLogic routingLogic, System.TimeSpan gossipInterval, System.TimeSpan removedTimeToLive, int maxDeltaElements, bool sendToDeadLettersWhenNoSubscribers) { } + public DistributedPubSubSettings(string role, Akka.Routing.RoutingLogic routingLogic, System.TimeSpan gossipInterval, System.TimeSpan removedTimeToLive, int maxDeltaElements, bool sendToDeadLettersWhenNoSubscribers, int maxBufferedMessagePerTopic, System.TimeSpan bufferedMessageTimeoutCheckInterval) { } + public System.TimeSpan BufferedMessageTimeoutCheckInterval { get; } public System.TimeSpan GossipInterval { get; } + public int MaxBufferedMessagePerTopic { get; } public int MaxDeltaElements { get; } public System.TimeSpan RemovedTimeToLive { get; } public string Role { get; } @@ -263,7 +267,9 @@ namespace Akka.Cluster.Tools.PublishSubscribe public bool SendToDeadLettersWhenNoSubscribers { get; } public static Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings Create(Akka.Actor.ActorSystem system) { } public static Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings Create(Akka.Configuration.Config config) { } + public Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings WithBufferedMessageTimeoutCheckInterval(System.TimeSpan bufferedMessageTimeoutCheckInterval) { } public Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings WithGossipInterval(System.TimeSpan gossipInterval) { } + public Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings WithMaxBufferedMessagePerTopic(int maxBufferedMessagePerTopic) { } public Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings WithMaxDeltaElements(int maxDeltaElements) { } public Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings WithRemovedTimeToLive(System.TimeSpan removedTtl) { } public Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubSettings WithRole(string role) { } @@ -286,6 +292,30 @@ namespace Akka.Cluster.Tools.PublishSubscribe public override int GetHashCode() { } public override string ToString() { } } + public enum PublishFailReason + { + Timeout = 0, + MediatorShuttingDown = 1, + } + public sealed class PublishFailed : Akka.Event.IDeadLetterSuppression, System.IEquatable + { + public PublishFailed(Akka.Cluster.Tools.PublishSubscribe.PublishWithAck Message, Akka.Cluster.Tools.PublishSubscribe.PublishFailReason Reason) { } + public Akka.Cluster.Tools.PublishSubscribe.PublishWithAck Message { get; set; } + public Akka.Cluster.Tools.PublishSubscribe.PublishFailReason Reason { get; set; } + } + public sealed class PublishSucceeded : Akka.Event.IDeadLetterSuppression, System.IEquatable + { + public PublishSucceeded(Akka.Cluster.Tools.PublishSubscribe.PublishWithAck Message) { } + public Akka.Cluster.Tools.PublishSubscribe.PublishWithAck Message { get; set; } + } + public sealed class PublishWithAck : Akka.Actor.IWrappedMessage, Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, System.IEquatable + { + public PublishWithAck(string topic, object message, System.TimeSpan timeout, bool sendOneMessageToEachGroup = False) { } + public object Message { get; } + public bool SendOneMessageToEachGroup { get; } + public System.TimeSpan Timeout { get; } + public string Topic { get; } + } public sealed class Put : System.IEquatable { public Put(Akka.Actor.IActorRef @ref) { }