Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
acae403
Add wait-for-subscribers feature
Arkatufus May 15, 2025
e72cbd4
Merge remote-tracking branch 'upstream/dev' into Add-wait-for-subscri…
Arkatufus May 15, 2025
8f05840
Fix missing code
Arkatufus May 15, 2025
0177811
Update API approval list
Arkatufus May 15, 2025
d30b8a2
Fix HOCON errors
Arkatufus May 16, 2025
001e056
Merge branch 'dev' into Add-wait-for-subscribers-feature
Arkatufus May 16, 2025
98a6999
Merge branch 'dev' into Add-wait-for-subscribers-feature
Arkatufus May 16, 2025
8083046
Merge branch 'dev' into Add-wait-for-subscribers-feature
Arkatufus May 16, 2025
c0af9fe
Trim features to fit
Arkatufus May 16, 2025
89ecf1d
Merge branch 'Add-wait-for-subscribers-feature' of github.com:Arkatuf…
Arkatufus May 16, 2025
b4b1c5e
Simplify logic
Arkatufus May 19, 2025
00761f5
Code cleanup
Arkatufus May 19, 2025
6cf2e00
Code cleanup
Arkatufus May 19, 2025
4c0f026
Update API Approval list
Arkatufus May 19, 2025
6792915
Fix code
Arkatufus May 19, 2025
cba4f30
Fix codes
Arkatufus May 19, 2025
13d05ca
Fix codes
Arkatufus May 19, 2025
6f26d41
Update code
Arkatufus May 19, 2025
ef05c31
Update API Approval list
Arkatufus May 19, 2025
1dbdd89
Fix PublishWithAck
Arkatufus May 19, 2025
8a36742
Fix cleanup code
Arkatufus May 19, 2025
71d4dd2
refactor DistributedPubSubSettings to record
Arkatufus May 19, 2025
ec979e2
Fix specs
Arkatufus May 19, 2025
8635d6c
Add unit tests
Arkatufus May 19, 2025
2021f83
Fix unit tests
Arkatufus May 19, 2025
4d2130b
Merge branch 'dev' into Add-wait-for-subscribers-feature
Arkatufus May 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// -----------------------------------------------------------------------
// <copyright file="DistributedPubSubPublishWithAckSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using Akka.Actor;
using Akka.Cluster.TestKit;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Configuration;
using Akka.Event;
using Akka.MultiNode.TestAdapter;
using Akka.Remote.TestKit;
using FluentAssertions;
using FluentAssertions.Extensions;

namespace Akka.Cluster.Tools.Tests.MultiNode.PublishSubscribe;

public class DistributedPubSubPublishWithAckSpecSpecConfig : MultiNodeConfig
{
public readonly RoleName First;
public readonly RoleName Second;

public DistributedPubSubPublishWithAckSpecSpecConfig()
{
First = Role("first");
Second = Role("second");

CommonConfig = ConfigurationFactory.ParseString(
"""
akka.loglevel = INFO
akka.actor.provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
akka.actor.serialize-messages = off
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.pub-sub.max-delta-elements = 500
akka.cluster.pub-sub.buffered-messages.max-per-topic = 2
akka.cluster.pub-sub.buffered-messages.timeout-check-interval = 200ms
akka.testconductor.query-timeout = 1m # we were having timeouts shutting down nodes with 5s default
""").WithFallback(DistributedPubSub.DefaultConfig());
}
}

public class DistributedPubSubPublishWithAckSpec : MultiNodeClusterSpec
{
#region setup
private readonly RoleName _first;
private readonly RoleName _second;

public DistributedPubSubPublishWithAckSpec() : this(new DistributedPubSubPublishWithAckSpecSpecConfig())
{
}

protected DistributedPubSubPublishWithAckSpec(DistributedPubSubPublishWithAckSpecSpecConfig config) : base(config, typeof(DistributedPubSubMediatorSpec))
{
_first = config.First;
_second = config.Second;
}

public IActorRef Mediator => DistributedPubSub.Get(Sys).Mediator;

private void Join(RoleName from, RoleName to)
{
RunOn(() =>
{
Cluster.Join(Node(to).Address);
_ = DistributedPubSub.Get(Sys).Mediator;
}, from);
EnterBarrier(from.Name + "-joined");
}

#endregion

[MultiNodeFact]
public void DistributedPubSubPublishWithAckSpecs()
{
DistributedPubSubMediator_must_startup_2_nodes_cluster();
PublishWithAck_must_buffer_message();
Second_node_must_subscribe();
PublishWithAck_must_deliver_buffered_messages();
}

public void DistributedPubSubMediator_must_startup_2_nodes_cluster()
{
Within(TimeSpan.FromSeconds(15), () =>
{
Join(_first, _first);
Join(_second, _first);
EnterBarrier("after-1");
});
}

public void PublishWithAck_must_buffer_message()
{
Within(15.Seconds(), () =>
{
RunOn(() =>
{
Mediator.Tell(new PublishWithAck("content", "hi-1!", 20.Seconds()));
Mediator.Tell(new PublishWithAck("content", "hi-2!", 20.Seconds()));
ExpectNoMsg(200.Milliseconds());
}, _first);

RunOn(() =>
{
ExpectNoMsg(200.Milliseconds());
}, _second);

EnterBarrier("after-2");
});
}

public void Second_node_must_subscribe()
{
RunOn(() =>
{
Mediator.Tell(new Subscribe("content", TestActor));
// SubscribeAck must arrive first
ExpectMsg<SubscribeAck>();
Sys.Log.Info("Second node subscribed");
}, _second);

EnterBarrier("after-3");
}

public void PublishWithAck_must_deliver_buffered_messages()
{
Within(TimeSpan.FromSeconds(15), () =>
{
RunOn(() =>
{
ExpectMsg<PublishSucceeded>().Message.Message.Should().Be("hi-1!");
ExpectMsg<PublishSucceeded>().Message.Message.Should().Be("hi-2!");
}, _first);

RunOn(() =>
{
ExpectMsg("hi-1!");
ExpectMsg("hi-2!");
}, _second);

EnterBarrier("after-4");
});
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Akka.Configuration;
using Akka.Routing;
using Akka.TestKit;
using FluentAssertions.Extensions;
using Xunit;

namespace Akka.Cluster.Tools.Tests.PublishSubscribe
Expand Down Expand Up @@ -40,6 +41,8 @@ public void DistributedPubSubSettings_must_have_default_config()
distributedPubSubSettings.RemovedTimeToLive.TotalSeconds.ShouldBe(120);
distributedPubSubSettings.SendToDeadLettersWhenNoSubscribers.ShouldBe(true);
distributedPubSubSettings.MaxDeltaElements.ShouldBe(3000);
distributedPubSubSettings.MaxBufferedMessagePerTopic.ShouldBe(1000);
distributedPubSubSettings.BufferedMessageTimeoutCheckInterval.ShouldBe(500.Milliseconds());

var config = Sys.Settings.Config.GetConfig("akka.cluster.pub-sub");
Assert.False(config.IsNullOrEmpty());
Expand Down
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// -----------------------------------------------------------------------
// <copyright file="DistributedPubSubPublishWithAckSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Configuration;
using Akka.TestKit;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Tools.Tests.PublishSubscribe;

[Collection(nameof(DistributedPubSubMediatorSpec))]
public class DistributedPubSubPublishWithAckSpec : AkkaSpec
{
public DistributedPubSubPublishWithAckSpec(ITestOutputHelper output) : base(GetConfig(), output)
{
}

private static Config GetConfig()
{
return ConfigurationFactory.ParseString(
"""
akka.actor.provider = cluster
akka.cluster.pub-sub.buffered-messages.max-per-topic = 2
akka.cluster.pub-sub.buffered-messages.timeout-check-interval = 100ms
""");
}

[Fact(DisplayName = "PublishWithAck message should be buffered")]
public async Task PublishWithAckBufferTest()
{
var mediator = DistributedPubSub.Get(Sys).Mediator;
mediator.Tell(new PublishWithAck("topic", "msg-1", 10.Seconds()));
mediator.Tell(new PublishWithAck("topic", "msg-2", 10.Seconds()));

// Should not send NACKs
await ExpectNoMsgAsync(200.Milliseconds());

// Should not re-send messages if topic does not match
mediator.Tell(new Subscribe("topic2", TestActor));
var subAck = await ExpectMsgAsync<SubscribeAck>();
subAck.Subscribe.Topic.ShouldBe("topic2");
await ExpectNoMsgAsync(200.Milliseconds());

// Should not re-send messages if topic match
mediator.Tell(new Subscribe("topic", TestActor));
subAck = await ExpectMsgAsync<SubscribeAck>();
subAck.Subscribe.Topic.ShouldBe("topic");

await ExpectMsgAllOfMatchingPredicatesAsync([
PredicateInfo.Create<string>(msg => msg is "msg-1"),
PredicateInfo.Create<PublishSucceeded>(msg => msg.Message is { Message: "msg-1", Topic: "topic" }),
PredicateInfo.Create<string>(msg => msg is "msg-2"),
PredicateInfo.Create<PublishSucceeded>(msg => msg.Message is { Message: "msg-2", Topic: "topic" }) ]).ToListAsync();

// Should not send extra messages
await ExpectNoMsgAsync(200.Milliseconds());
}

[Fact(DisplayName = "PublishWithAck message buffer should NACK buffer overflowed messages")]
public async Task PublishWithAckBufferOverflowTest()
{
var mediator = DistributedPubSub.Get(Sys).Mediator;
mediator.Tell(new PublishWithAck("topic", "msg-1", 10.Seconds()));
mediator.Tell(new PublishWithAck("topic", "msg-2", 10.Seconds()));

// Should dequeue and NACK msg-1, the oldest message in the buffer
mediator.Tell(new PublishWithAck("topic", "msg-3", 10.Seconds()));
var failed = await ExpectMsgAsync<PublishFailed>();
failed.Message.Topic.ShouldBe("topic");
failed.Message.Message.ShouldBe("msg-1");

// Should dequeue and NACK msg-2, the oldest message in the buffer
mediator.Tell(new PublishWithAck("topic", "msg-4", 10.Seconds()));
failed = await ExpectMsgAsync<PublishFailed>();
failed.Message.Topic.ShouldBe("topic");
failed.Message.Message.ShouldBe("msg-2");

// Should not re-send messages if topic match.
// msg-1 and msg-2 should never be re-sent
mediator.Tell(new Subscribe("topic", TestActor));
var subAck = await ExpectMsgAsync<SubscribeAck>();
subAck.Subscribe.Topic.ShouldBe("topic");

await ExpectMsgAllOfMatchingPredicatesAsync([
PredicateInfo.Create<string>(msg => msg is "msg-3"),
PredicateInfo.Create<PublishSucceeded>(msg => msg.Message is { Message: "msg-3", Topic: "topic" }),
PredicateInfo.Create<string>(msg => msg is "msg-4"),
PredicateInfo.Create<PublishSucceeded>(msg => msg.Message is { Message: "msg-4", Topic: "topic" }) ]).ToListAsync();

// Should not send extra messages
await ExpectNoMsgAsync(200.Milliseconds());
}

[Fact(DisplayName = "PublishWithAck message should fail when timed-out")]
public async Task PublishWithAckMessageTimeoutTest()
{
var mediator = DistributedPubSub.Get(Sys).Mediator;
mediator.Tell(new PublishWithAck("topic", "msg-1", 300.Milliseconds()));
mediator.Tell(new PublishWithAck("topic", "msg-2", 10.Milliseconds()));

// msg-2 should time out first
var failed = await ExpectMsgAsync<PublishFailed>();
failed.Message.Topic.ShouldBe("topic");
failed.Message.Message.ShouldBe("msg-2");

// msg-1 should time out second
failed = await ExpectMsgAsync<PublishFailed>();
failed.Message.Topic.ShouldBe("topic");
failed.Message.Message.ShouldBe("msg-1");

// Buffer is empty, should not re-send anything
mediator.Tell(new Subscribe("topic", TestActor));
var subAck = await ExpectMsgAsync<SubscribeAck>();
subAck.Subscribe.Topic.ShouldBe("topic");

// Should not send extra messages
await ExpectNoMsgAsync(200.Milliseconds());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,35 @@ public override string ToString()
return $"Publish<topic:{Topic}, sendOneToEachGroup:{SendOneMessageToEachGroup}, message:{Message}>";
}
}

public sealed record PublishWithAck : IDistributedPubSubMessage, IWrappedMessage
{
public PublishWithAck(string topic, object message, TimeSpan timeout, bool sendOneMessageToEachGroup = false)
{
if(timeout.Ticks <= 0)
throw new ArgumentException("Timeout must be greater than zero", nameof(timeout));

Topic = topic;
Message = message;
Timeout = timeout;
SendOneMessageToEachGroup = sendOneMessageToEachGroup;
}

public string Topic { get; }
public object Message { get; }
public TimeSpan Timeout { get; }
public bool SendOneMessageToEachGroup { get; }
}

public enum PublishFailReason
{
Timeout,
MediatorShuttingDown
}

public sealed record PublishFailed(PublishWithAck Message, PublishFailReason Reason): IDeadLetterSuppression;

public sealed record PublishSucceeded(PublishWithAck Message): IDeadLetterSuppression;

/// <summary>
/// TBD
Expand Down
Loading
Loading