diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/BugFix7196Specs.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/BugFix7196Specs.cs new file mode 100644 index 00000000000..caea0cbe1c5 --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/BugFix7196Specs.cs @@ -0,0 +1,164 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2024 Lightbend Inc. +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +#nullable enable +using System; +using System.Collections.Immutable; +using System.Linq; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Cluster.Tools.Singleton; +using Akka.Configuration; +using Akka.TestKit; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Cluster.Tools.Tests.Singleton; + +/// +/// Reproduction for https://github.com/akkadotnet/akka.net/issues/7196 - clearly, what we did +/// +public class BugFix7196Specs : AkkaSpec +{ + private readonly ActorSystem _hostNodeV1; + private readonly ActorSystem _hostNode2V1; + private readonly ActorSystem _hostNodeV2; + + private static Config OriginalNodeConfig() => """ + + akka.loglevel = INFO + akka.actor.provider = "cluster" + akka.cluster.roles = [non-singleton] + akka.cluster.singleton.min-number-of-hand-over-retries = 5 + akka.cluster.app-version = "1.0.0" + akka.remote { + dot-netty.tcp { + hostname = "127.0.0.1" + port = 0 + } + } + """; + + private static Config V2NodeConfig(ActorSystem originalSys) => ConfigurationFactory.ParseString( + "akka.cluster.app-version = \"1.0.2\"").WithFallback(originalSys.Settings.Config); + + public BugFix7196Specs(ITestOutputHelper output) : base(OriginalNodeConfig(), output) + { + _hostNodeV1 = ActorSystem.Create(Sys.Name, + ConfigurationFactory.ParseString("akka.cluster.roles = [singleton]").WithFallback(Sys.Settings.Config)); + InitializeLogger(_hostNodeV1); + _hostNode2V1 = ActorSystem.Create(Sys.Name, + ConfigurationFactory.ParseString("akka.cluster.roles = [singleton]").WithFallback(Sys.Settings.Config)); + InitializeLogger(_hostNode2V1); + _hostNodeV2 = ActorSystem.Create(Sys.Name, + ConfigurationFactory.ParseString("akka.cluster.roles = [singleton]").WithFallback(V2NodeConfig(Sys))); + InitializeLogger(_hostNodeV2); + } + + [Fact(DisplayName = + "Singletons should not move to higher AppVersion nodes until after older incarnation is downed")] + public async Task Bugfix7196Spec() + { + await JoinAsync(Sys, Sys); // have to do a self join first + await JoinAsync(_hostNodeV1, Sys); + await JoinAsync(_hostNode2V1, Sys); + + var proxy = Sys.ActorOf( + ClusterSingletonProxy.Props("user/echo", + ClusterSingletonProxySettings.Create(Sys).WithRole("singleton")), "proxy3"); + + // confirm that singleton is on _hostNodeV1 + await AssertSingletonHostedOn(proxy, _hostNodeV1); + + // have _hostNodeV2 join the cluster + await JoinAsync(_hostNodeV2, Sys); + + // confirm that singleton is STILL on _hostNodeV1 + await AssertSingletonHostedOn(proxy, _hostNodeV1); + + // now, down the original node + Cluster.Get(Sys).Leave(Cluster.Get(_hostNodeV1).SelfAddress); + + // validate that _hostNodeV1 is no longer in the cluster + await WithinAsync(TimeSpan.FromSeconds(5), () => + { + return AwaitAssertAsync(() => + { + Cluster.Get(Sys).State.Members.Select(x => x.UniqueAddress).Should() + .NotContain(Cluster.Get(_hostNodeV1).SelfUniqueAddress); + }); + }); + + // validate that the singleton has moved to _hostNodeV2 + await AssertSingletonHostedOn(proxy, _hostNodeV2); + + /* + * NOTE: an important detail here: _hostNode2V1 is actually OLDER than _hostNodeV2, + * but when selecting a new "oldest" node after the previous one dies Akka.Cluster.Tools.Singleton + * will always prefer to move onto the new version of the software. + */ + } + + private async Task AssertSingletonHostedOn(IActorRef proxy, ActorSystem targetNode) + { + await WithinAsync(TimeSpan.FromSeconds(5), () => + { + return AwaitAssertAsync(() => + { + var probe = CreateTestProbe(Sys); + proxy.Tell("hello", probe.Ref); + probe.ExpectMsg(TimeSpan.FromSeconds(1)) + .Should() + .Be(Cluster.Get(targetNode).SelfUniqueAddress); + }); + }); + } + + public async Task JoinAsync(ActorSystem from, ActorSystem to) + { + if (Cluster.Get(from).SelfRoles.Contains("singleton")) + { + from.ActorOf(ClusterSingletonManager.Props(Props.Create(() => new Singleton()), + PoisonPill.Instance, + ClusterSingletonManagerSettings.Create(from).WithRole("singleton")), "echo"); + } + + + await WithinAsync(TimeSpan.FromSeconds(45), () => + { + AwaitAssert(() => + { + Cluster.Get(from).Join(Cluster.Get(to).SelfAddress); + Cluster.Get(from).State.Members.Select(x => x.UniqueAddress).Should() + .Contain(Cluster.Get(from).SelfUniqueAddress); + Cluster.Get(from) + .State.Members.Select(x => x.Status) + .ToImmutableHashSet() + .Should() + .Equal(ImmutableHashSet.Empty.Add(MemberStatus.Up)); + }); + return Task.CompletedTask; + }); + } + + public class Singleton : ReceiveActor + { + public Singleton() + { + ReceiveAny(_ => { Sender.Tell(Cluster.Get(Context.System).SelfUniqueAddress); }); + } + } + + protected override void AfterAll() + { + Shutdown(_hostNodeV1); + Shutdown(_hostNode2V1); + Shutdown(_hostNodeV2); + base.AfterAll(); + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonConfigSpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonConfigSpec.cs index 22162fabef9..ec55095ecbd 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonConfigSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonConfigSpec.cs @@ -42,6 +42,7 @@ public void ClusterSingletonManagerSettings_must_have_default_config() var config = Sys.Settings.Config.GetConfig("akka.cluster.singleton"); Assert.False(config.IsNullOrEmpty()); config.GetInt("min-number-of-hand-over-retries", 0).ShouldBe(15); + clusterSingletonManagerSettings.ConsiderAppVersion.ShouldBeTrue(); } [Fact] @@ -54,6 +55,7 @@ public void ClusterSingletonProxySettings_must_have_default_config() clusterSingletonProxySettings.Role.ShouldBe(null); clusterSingletonProxySettings.SingletonIdentificationInterval.TotalSeconds.ShouldBe(1); clusterSingletonProxySettings.BufferSize.ShouldBe(1000); + clusterSingletonProxySettings.ConsiderAppVersion.ShouldBeTrue(); } } } diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart2Spec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart2Spec.cs index 3499bd0f158..fc17dcbb504 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart2Spec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart2Spec.cs @@ -29,7 +29,6 @@ public ClusterSingletonRestart2Spec() : base(@" akka.loglevel = INFO akka.actor.provider = ""cluster"" akka.cluster.roles = [singleton] - akka.cluster.auto-down-unreachable-after = 2s akka.cluster.singleton.min-number-of-hand-over-retries = 5 akka.remote { dot-netty.tcp { diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart3Spec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart3Spec.cs index 04f3d2011ff..c04c2562301 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart3Spec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart3Spec.cs @@ -29,7 +29,6 @@ public ClusterSingletonRestart3Spec(ITestOutputHelper output) : base(@" akka.loglevel = DEBUG akka.actor.provider = ""cluster"" akka.cluster.app-version = ""1.0.0"" - akka.cluster.auto-down-unreachable-after = 2s akka.cluster.singleton.min-number-of-hand-over-retries = 5 akka.cluster.singleton.consider-app-version = true akka.remote { diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestartSpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestartSpec.cs index e2f5c5123d6..115afe1f5b7 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestartSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestartSpec.cs @@ -28,7 +28,6 @@ public class ClusterSingletonRestartSpec : AkkaSpec public ClusterSingletonRestartSpec() : base(@" akka.loglevel = INFO akka.actor.provider = ""cluster"" - akka.cluster.auto-down-unreachable-after = 2s akka.remote { dot-netty.tcp { hostname = ""127.0.0.1"" diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/MemberAgeOrderingSpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/MemberAgeOrderingSpec.cs index 5a3f603b868..321f8c63ae5 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/MemberAgeOrderingSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/MemberAgeOrderingSpec.cs @@ -21,7 +21,24 @@ public class MemberAgeOrderingSpec [Fact(DisplayName = "MemberAgeOrdering should sort based on UpNumber")] public void SortByUpNumberTest() { - var members = new SortedSet(MemberAgeOrdering.DescendingWithAppVersion) + var members = new SortedSet(MemberAgeOrdering.OldestToYoungest) + { + Create(Address.Parse("akka://sys@darkstar:1112"), upNumber: 3), + Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 1), + Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 9), + }; + + var seq = members.ToList(); + seq.Count.Should().Be(3); + seq[0].Should().Be(Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 1)); + seq[1].Should().Be(Create(Address.Parse("akka://sys@darkstar:1112"), upNumber: 3)); + seq[2].Should().Be(Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 9)); + } + + [Fact(DisplayName = "MemberAgeOrdering should sort based on UpNumber and AppVersion")] + public void SortByUpNumberAndAppVersionTest() + { + var members = new SortedSet(MemberAgeOrdering.OldestToYoungestWithAppVersion) { Create(Address.Parse("akka://sys@darkstar:1112"), upNumber: 3), Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 1), @@ -38,7 +55,7 @@ public void SortByUpNumberTest() [Fact(DisplayName = "MemberAgeOrdering should sort based on Address if UpNumber is the same")] public void SortByAddressTest() { - var members = new SortedSet(MemberAgeOrdering.DescendingWithAppVersion) + var members = new SortedSet(MemberAgeOrdering.OldestToYoungestWithAppVersion) { Create(Address.Parse("akka://sys@darkstar:1112"), upNumber: 1), Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 1), @@ -55,7 +72,7 @@ public void SortByAddressTest() [Fact(DisplayName = "MemberAgeOrdering should prefer AppVersion over UpNumber")] public void SortByAppVersionTest() { - var members = new SortedSet(MemberAgeOrdering.DescendingWithAppVersion) + var members = new SortedSet(MemberAgeOrdering.OldestToYoungestWithAppVersion) { Create(Address.Parse("akka://sys@darkstar:1112"), upNumber: 3, appVersion: AppVersion.Create("1.0.0")), Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 1, appVersion: AppVersion.Create("1.0.0")), diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldestChangedBufferStateSpecs.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldestChangedBufferStateSpecs.cs new file mode 100644 index 00000000000..9aed70d618c --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldestChangedBufferStateSpecs.cs @@ -0,0 +1,180 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2024 Lightbend Inc. +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +#nullable enable +using System.Collections.Immutable; +using System.Linq; +using Akka.Actor; +using Akka.Cluster.Tools.Singleton; +using Akka.Util; +using FluentAssertions; +using Xunit; + +namespace Akka.Cluster.Tools.Tests.Singleton; + +public class OldestChangedBufferStateSpecs +{ + [Fact] + public void OldestChangedBuffer_should_initially_only_consider_nodes_with_matching_role() + { + // Arrange + var targetRole = "target-role"; + var targetRoles = ImmutableHashSet.Create("role1", "role2", targetRole); + var winningAddress = Address.Parse("akka://sys@darkstar:1112"); + var nonTargetRoles = ImmutableHashSet.Create("role1", "role2"); + var initialMembersByAge = ImmutableSortedSet.Empty + .Add(Create(winningAddress, roles:targetRoles, upNumber: 3)) + .Add(Create(Address.Parse("akka://sys@darkstar:1113"), roles:nonTargetRoles, upNumber: 1)) + .Add(Create(Address.Parse("akka://sys@darkstar:1111"), roles:targetRoles, upNumber: 9)) + .WithComparer(MemberAgeOrdering.OldestToYoungestWithAppVersion); + + // Act + var state = new OldestChangedBufferState(initialMembersByAge, targetRole); + + // Assert + var oldest = state.CurrentOldest; + oldest.Should().NotBeNull(); + oldest!.Address.Should().Be(winningAddress); + } + + [Fact] + public void OldestChangedBuffer_should_not_change_leader_when_higher_AppVersion_added() + { + // Arrange + var winningAddress = Address.Parse("akka://sys@darkstar:1112"); + var appVersion1 = AppVersion.Create("1.0.0"); + var appVersion2 = AppVersion.Create("1.0.2"); + var initialMembersByAge = ImmutableSortedSet.Empty + .Add(Create(winningAddress, upNumber: 3, appVersion: appVersion1)) + .Add(Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 9, appVersion: appVersion1)) + .WithComparer(MemberAgeOrdering.OldestToYoungestWithAppVersion); + + // Act + var state = new OldestChangedBufferState(initialMembersByAge, string.Empty); + var oldest = state.CurrentOldest; + + // higher upNumber - should not affect leader + var newMemberSameVersion = Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 10, appVersion: appVersion1); + + // higher upNumber AND version - should not affect leader + var newMemberNewVersion = Create(Address.Parse("akka://sys@darkstar:1114"), upNumber: 11, appVersion: appVersion2); + + // Act + var (state1, oldestChanged1) = state.AddMember(newMemberSameVersion); + var (state2, oldestChanged2) = state1.AddMember(newMemberNewVersion); + + // Assert + oldest.Should().NotBeNull(); + oldest!.Address.Should().Be(winningAddress); + + state1.CurrentOldest.Should().Be(oldest); + oldestChanged1.Should().BeFalse(); + + state2.CurrentOldest.Should().Be(oldest); + oldestChanged2.Should().BeFalse(); + + // the members by age system is going to chose the appVersion over the upNumber + state2.MembersByAge.FirstOrDefault().Should().NotBe(oldest); + state2.MembersByAge.FirstOrDefault()!.AppVersion.Should().Be(appVersion2); + } + + [Fact] + public void OldestChangedBuffer_should_change_Oldest_when_previous_Oldest_removed() + { + // Arrange + var winningAddress = Address.Parse("akka://sys@darkstar:1112"); + var appVersion1 = AppVersion.Create("1.0.0"); + var appVersion2 = AppVersion.Create("1.0.2"); + + var originalOldest = Create(winningAddress, upNumber: 3, appVersion: appVersion1); + + var initialMembersByAge = ImmutableSortedSet.Empty + .Add(originalOldest) + .Add(Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 9, appVersion: appVersion1)) + .WithComparer(MemberAgeOrdering.OldestToYoungestWithAppVersion); + + // Act + var state = new OldestChangedBufferState(initialMembersByAge, string.Empty); + var oldest = state.CurrentOldest; + + // higher upNumber, same version - won't affect leader + var newMemberSameVersion = Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 4, appVersion: appVersion1); + + // lower upNumber AND version - won't affect the leader until it gets removed + var newMemberHigherVersion = Create(Address.Parse("akka://sys@darkstar:1114"), upNumber: 11, appVersion: appVersion2); + + // Act + var (state1, oldestChanged1) = state.AddMember(newMemberSameVersion); + var (state2, oldestChanged2) = state1.AddMember(newMemberHigherVersion); + var (state3, oldestChanged3) = state2.RemoveMember(originalOldest); + + // Assert + oldest.Should().NotBeNull(); + oldest!.Address.Should().Be(winningAddress); + + state1.CurrentOldest.Should().Be(originalOldest); + oldestChanged1.Should().BeFalse(); + + state2.CurrentOldest.Should().Be(originalOldest); + oldestChanged2.Should().BeFalse(); + + state3.CurrentOldest.Should().Be(newMemberHigherVersion); + oldestChanged3.Should().BeTrue(); + } + + [Fact] + public void OldestChangedBuffer_should_not_change_Oldest_when_nonOldest_node_removed() + { + // Arrange + var winningAddress = Address.Parse("akka://sys@darkstar:1112"); + var appVersion1 = AppVersion.Create("1.0.0"); + var appVersion2 = AppVersion.Create("1.0.2"); + var initialMembersByAge = ImmutableSortedSet.Empty + .Add(Create(winningAddress, upNumber: 3, appVersion: appVersion1)) + .Add(Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 9, appVersion: appVersion1)) + .WithComparer(MemberAgeOrdering.OldestToYoungestWithAppVersion); + + // Act + var state = new OldestChangedBufferState(initialMembersByAge, string.Empty); + var oldest = state.CurrentOldest; + + // higher upNumber - should not affect leader + var newMemberSameVersion = Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 10, appVersion: appVersion1); + + // higher upNumber AND version - should not affect leader + var newMemberNewVersion = Create(Address.Parse("akka://sys@darkstar:1114"), upNumber: 11, appVersion: appVersion2); + + // Act + var (state1, oldestChanged1) = state.AddMember(newMemberSameVersion); + var (state2, oldestChanged2) = state1.AddMember(newMemberNewVersion); + var (state3, oldestChanged3) = state2.RemoveMember(newMemberSameVersion); + + // Assert + oldest.Should().NotBeNull(); + oldest!.Address.Should().Be(winningAddress); + + state1.CurrentOldest.Should().Be(oldest); + oldestChanged1.Should().BeFalse(); + + state2.CurrentOldest.Should().Be(oldest); + oldestChanged2.Should().BeFalse(); + + state3.CurrentOldest.Should().Be(oldest); + oldestChanged3.Should().BeFalse(); + } + + public static Member Create( + Address address, + MemberStatus status = MemberStatus.Up, + ImmutableHashSet? roles = null, + int uid = 0, + int upNumber = 0, + AppVersion? appVersion = null) + { + return Member.Create(new UniqueAddress(address, uid), upNumber, status, roles ?? ImmutableHashSet.Empty, appVersion ?? AppVersion.Zero); + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs index 721752e04a3..a165e5fbbb3 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs @@ -4,7 +4,7 @@ // Copyright (C) 2013-2023 .NET Foundation // //----------------------------------------------------------------------- - +#nullable enable using System; using System.Collections.Generic; using System.Collections.Immutable; @@ -398,9 +398,9 @@ public ReleaseLeaseResult(bool released) [Serializable] internal sealed class AcquireLeaseFailure : IDeadLetterSuppression, INoSerializationVerificationNeeded { - public Exception Failure { get; } + public Exception? Failure { get; } - public AcquireLeaseFailure(Exception failure) + public AcquireLeaseFailure(Exception? failure) { Failure = failure; } @@ -409,9 +409,9 @@ public AcquireLeaseFailure(Exception failure) [Serializable] internal sealed class ReleaseLeaseFailure : IDeadLetterSuppression, INoSerializationVerificationNeeded { - public Exception Failure { get; } + public Exception? Failure { get; } - public ReleaseLeaseFailure(Exception failure) + public ReleaseLeaseFailure(Exception? failure) { Failure = failure; } @@ -420,9 +420,9 @@ public ReleaseLeaseFailure(Exception failure) [Serializable] internal sealed class LeaseLost : IDeadLetterSuppression, INoSerializationVerificationNeeded { - public Exception Reason { get; } + public Exception? Reason { get; } - public LeaseLost(Exception reason) + public LeaseLost(Exception? reason) { Reason = reason; } @@ -622,7 +622,7 @@ public static Props Props(Props singletonProps, object terminationMessage, Clust private bool _selfExited; // started when self member is Up - private IActorRef _oldestChangedBuffer; + private IActorRef? _oldestChangedBuffer; // keep track of previously removed members private ImmutableDictionary _removed = ImmutableDictionary.Empty; private readonly TimeSpan _removalMargin; @@ -630,24 +630,14 @@ public static Props Props(Props singletonProps, object terminationMessage, Clust private readonly int _maxTakeOverRetries; private readonly Cluster _cluster = Cluster.Get(Context.System); private readonly UniqueAddress _selfUniqueAddress; - private ILoggingAdapter _log; private readonly CoordinatedShutdown _coordShutdown = CoordinatedShutdown.Get(Context.System); private readonly TaskCompletionSource _memberExitingProgress = new(); - private readonly string singletonLeaseName; - private readonly Lease lease; - private readonly TimeSpan leaseRetryInterval = TimeSpan.FromSeconds(5); // won't be used - - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD + private readonly string _singletonLeaseName; + private readonly Lease? _lease; + private readonly TimeSpan _leaseRetryInterval = TimeSpan.FromSeconds(5); // won't be used + public ClusterSingletonManager(Props singletonProps, object terminationMessage, ClusterSingletonManagerSettings settings) { var role = settings.Role; @@ -657,13 +647,13 @@ public ClusterSingletonManager(Props singletonProps, object terminationMessage, _singletonProps = singletonProps; _terminationMessage = terminationMessage; _settings = settings; - singletonLeaseName = $"{Context.System.Name}-singleton-{Self.Path}"; + _singletonLeaseName = $"{Context.System.Name}-singleton-{Self.Path}"; if (settings.LeaseSettings != null) { - lease = LeaseProvider.Get(Context.System) - .GetLease(singletonLeaseName, settings.LeaseSettings.LeaseImplementation, _cluster.SelfAddress.HostPort()); - leaseRetryInterval = settings.LeaseSettings.LeaseRetryInterval; + _lease = LeaseProvider.Get(Context.System) + .GetLease(_singletonLeaseName, settings.LeaseSettings.LeaseImplementation, _cluster.SelfAddress.HostPort()); + _leaseRetryInterval = settings.LeaseSettings.LeaseRetryInterval; } _removalMargin = (settings.RemovalMargin <= TimeSpan.Zero) ? _cluster.DowningProvider.DownRemovalMargin : settings.RemovalMargin; @@ -707,7 +697,7 @@ private void SetupCoordinatedShutdown() }); } - private ILoggingAdapter Log { get { return _log ??= Context.GetLogger(); } } + private ILoggingAdapter Log { get; } = Context.GetLogger(); /// protected override void PreStart() @@ -716,7 +706,7 @@ protected override void PreStart() throw new ActorInitializationException("Cluster node must not be terminated"); // subscribe to cluster changes, re-subscribe when restart - _cluster.Subscribe(Self, ClusterEvent.InitialStateAsEvents, typeof(ClusterEvent.MemberRemoved), typeof(ClusterEvent.MemberDowned)); + _cluster.Subscribe(Self, InitialStateAsEvents, typeof(MemberRemoved), typeof(MemberDowned)); SetTimer(CleanupTimer, Cleanup.Instance, TimeSpan.FromMinutes(1.0), repeat: true); @@ -769,7 +759,7 @@ private void GetNextOldestChanged() private State TryAcquireLease() { var self = Self; - lease.Acquire(reason => + _lease.Acquire(reason => { self.Tell(new LeaseLost(reason)); }).ContinueWith(r => @@ -786,7 +776,7 @@ private State TryAcquireLease() private State TryGotoOldest() { // check if lease - if (lease == null) + if (_lease == null) return GoToOldest(); else { @@ -809,7 +799,7 @@ private State HandleOldestChanged( Log.Info("{0} observed OldestChanged: [{1} -> {2}]", StateName, _cluster.SelfAddress, oldest?.Address); switch (oldest) { - case UniqueAddress a when a.Equals(_cluster.SelfUniqueAddress): + case { } a when a.Equals(_cluster.SelfUniqueAddress): // already oldest return Stay(); case UniqueAddress a when !_selfExited && _removed.ContainsKey(a): @@ -1115,7 +1105,7 @@ private void InitializeFSM() } else { - SetTimer(LeaseRetryTimer, LeaseRetry.Instance, leaseRetryInterval, repeat: false); + SetTimer(LeaseRetryTimer, LeaseRetry.Instance, _leaseRetryInterval, repeat: false); return Stay().Using(new AcquiringLeaseData(false, null)); } } @@ -1131,7 +1121,7 @@ private void InitializeFSM() else if (e.FsmEvent is AcquireLeaseFailure alf) { Log.Error(alf.Failure, "Failed to get lease (will be retried)"); - SetTimer(LeaseRetryTimer, LeaseRetry.Instance, leaseRetryInterval, repeat: false); + SetTimer(LeaseRetryTimer, LeaseRetry.Instance, _leaseRetryInterval, repeat: false); return Stay().Using(new AcquiringLeaseData(false, null)); } @@ -1459,9 +1449,9 @@ private void InitializeFSM() if (StateData is AcquiringLeaseData ald && ald.LeaseRequestInProgress) { Log.Info("Releasing lease as leaving AcquiringLease going to [{0}]", to); - if (lease != null) + if (_lease != null) { - lease.Release().ContinueWith(r => + _lease.Release().ContinueWith(r => { if (r.IsCanceled || r.IsFaulted) return (object)new ReleaseLeaseFailure(r.Exception); @@ -1471,10 +1461,10 @@ private void InitializeFSM() } } - if (from == ClusterSingletonState.Oldest && lease != null) + if (from == ClusterSingletonState.Oldest && _lease != null) { Log.Info("Releasing lease as leaving Oldest"); - lease.Release().ContinueWith(r => new ReleaseLeaseResult(r.Result)).PipeTo(Self); + _lease.Release().ContinueWith(r => new ReleaseLeaseResult(r.Result)).PipeTo(Self); } if (to is ClusterSingletonState.Younger or ClusterSingletonState.Oldest) GetNextOldestChanged(); diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs index b624426555b..bafd0765c09 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs @@ -5,6 +5,7 @@ // //----------------------------------------------------------------------- +#nullable enable using System; using System.Collections.Generic; using System.Collections.Immutable; @@ -40,15 +41,15 @@ namespace Akka.Cluster.Tools.Singleton public sealed class ClusterSingletonProxy : ReceiveActor { /// - /// TBD + /// Message used to tell the ClusterSingletonProxy to locate the currently active singleton. /// internal sealed class TryToIdentifySingleton : INoSerializationVerificationNeeded { - /// - /// TBD - /// public static TryToIdentifySingleton Instance { get; } = new(); - private TryToIdentifySingleton() { } + + private TryToIdentifySingleton() + { + } } /// @@ -57,7 +58,8 @@ private TryToIdentifySingleton() { } /// TBD public static Config DefaultConfig() { - return ConfigurationFactory.FromResource("Akka.Cluster.Tools.Singleton.reference.conf"); + return ConfigurationFactory.FromResource( + "Akka.Cluster.Tools.Singleton.reference.conf"); } /// @@ -68,7 +70,7 @@ public static Config DefaultConfig() /// which ends with the name you defined in `actorOf` when creating the . /// /// Cluster singleton proxy settings. - /// TBD + /// The props for the singleton proxy public static Props Props(string singletonManagerPath, ClusterSingletonProxySettings settings) { return Actor.Props.Create(() => new ClusterSingletonProxy(singletonManagerPath, settings)) @@ -83,16 +85,11 @@ public static Props Props(string singletonManagerPath, ClusterSingletonProxySett private readonly string[] _singletonPath; private int _identityCounter = 0; private string _identityId; - private IActorRef _singleton = null; - private ICancelable _identityTimer = null; - private ImmutableSortedSet _membersByAge; + private IActorRef? _singleton; + private ICancelable? _identityTimer; + private OldestChangedBufferState _state; private ILoggingAdapter _log; - /// - /// TBD - /// - /// TBD - /// TBD public ClusterSingletonProxy(string singletonManagerPath, ClusterSingletonProxySettings settings) { _settings = settings; @@ -100,9 +97,10 @@ public ClusterSingletonProxy(string singletonManagerPath, ClusterSingletonProxyS _identityId = CreateIdentifyId(_identityCounter); _memberAgeComparer = settings.ConsiderAppVersion - ? MemberAgeOrdering.DescendingWithAppVersion - : MemberAgeOrdering.Descending; - _membersByAge = ImmutableSortedSet.Empty.WithComparer(_memberAgeComparer); + ? MemberAgeOrdering.OldestToYoungestWithAppVersion + : MemberAgeOrdering.OldestToYoungest; + _state = new OldestChangedBufferState(ImmutableSortedSet.Empty.WithComparer(_memberAgeComparer), + settings.Role); Receive(s => HandleInitial(s)); Receive(m => Add(m.Member)); @@ -119,63 +117,58 @@ public ClusterSingletonProxy(string singletonManagerPath, ClusterSingletonProxyS /* do nothing */ }); Receive(identity => + { + if (identity.Subject != null) { - if (identity.Subject != null) - { - // if the new singleton is defined, deliver all buffered messages - var subject = identity.Subject; - Log.Info("Singleton identified at [{0}]", subject.Path); - _singleton = subject; - Context.Watch(subject); - CancelTimer(); - SendBuffered(); - } - }); + // if the new singleton is defined, deliver all buffered messages + var subject = identity.Subject; + Log.Info("Singleton identified at [{0}]", subject.Path); + _singleton = subject; + Context.Watch(subject); + CancelTimer(); + SendBuffered(); + } + }); Receive(_ => - { - var oldest = _membersByAge.FirstOrDefault(); - if (oldest != null && _identityTimer != null) - { - var singletonAddress = new RootActorPath(oldest.Address) / _singletonPath; - Log.Debug("Trying to identify singleton at [{0}]", singletonAddress); - Context.ActorSelection(singletonAddress).Tell(new Identify(_identityId)); - } - }); + { + var oldest = _state.CurrentOldest; + if (oldest != null && _identityTimer != null) + { + var singletonAddress = new RootActorPath(oldest.Address) / _singletonPath; + Log.Debug("Trying to identify singleton at [{0}]", singletonAddress); + Context.ActorSelection(singletonAddress).Tell(new Identify(_identityId)); + } + }); Receive(terminated => + { + if (Equals(_singleton, terminated.ActorRef)) { - if (Equals(_singleton, terminated.ActorRef)) - { - // buffering mode, identification of new will start when old node is removed - _singleton = null; - } - }); + // buffering mode, identification of new will start when old node is removed + _singleton = null; + } + }); ReceiveAny(msg => + { + if (_singleton != null) { - if (_singleton != null) - { - if (Log.IsDebugEnabled) - Log.Debug("Forwarding message of type [{0}] to current singleton instance at [{1}]", msg.GetType(), _singleton.Path); - _singleton.Forward(msg); - } - else - Buffer(msg); - }); + if (Log.IsDebugEnabled) + Log.Debug("Forwarding message of type [{0}] to current singleton instance at [{1}]", + msg.GetType(), _singleton.Path); + _singleton.Forward(msg); + } + else + Buffer(msg); + }); } private ILoggingAdapter Log => _log ??= Context.GetLogger(); - /// - /// TBD - /// protected override void PreStart() { CancelTimer(); _cluster.Subscribe(Self, typeof(ClusterEvent.IMemberEvent)); } - /// - /// TBD - /// protected override void PostStop() { CancelTimer(); @@ -200,10 +193,19 @@ private bool MatchingRole(Member member) private void HandleInitial(ClusterEvent.CurrentClusterState state) { - TrackChanges(() => - _membersByAge = state.Members - .Where(m => m.Status == MemberStatus.Up && MatchingRole(m)) - .ToImmutableSortedSet(_memberAgeComparer)); + var membersByAge = state.Members + .Where(m => m.Status == MemberStatus.Up && MatchingRole(m)) + .ToImmutableSortedSet(_memberAgeComparer); + + _state = _state with { MembersByAge = membersByAge}; + + // compute the initial oldest + var (newState, _) = _state.ComputeNextOldest(); + _state = newState; + + // if the oldest is defined, start the identification process + if (_state.CurrentOldest != null) + IdentifySingleton(); } // Discard old singleton ActorRef and send a periodic message to self to identify the singleton. @@ -221,31 +223,27 @@ private void IdentifySingleton() message: TryToIdentifySingleton.Instance, sender: Self); } - - private void TrackChanges(Action block) - { - var before = _membersByAge.FirstOrDefault(); - block(); - var after = _membersByAge.FirstOrDefault(); - - // if the head has changed, I need to find the new singleton - if (!Equals(before, after)) IdentifySingleton(); - } - + private void Add(Member member) { if (MatchingRole(member)) - TrackChanges(() => - { - _membersByAge = _membersByAge.Remove(member); //replace - _membersByAge = _membersByAge.Add(member); - }); + { + var (newState, oldestChanged) = _state.AddMember(member); + _state = newState; + if (oldestChanged) + IdentifySingleton(); + } } private void Remove(Member member) { if (MatchingRole(member)) - TrackChanges(() => _membersByAge = _membersByAge.Remove(member)); + { + var (newState, oldestChanged) = _state.RemoveMember(member); + _state = newState; + if (oldestChanged) + IdentifySingleton(); + } } private string CreateIdentifyId(int i) @@ -256,7 +254,8 @@ private string CreateIdentifyId(int i) private void Buffer(object message) { if (_settings.BufferSize == 0) - Log.Debug("Singleton not available and buffering is disabled, dropping message [{0}]", message.GetType()); + Log.Debug("Singleton not available and buffering is disabled, dropping message [{0}]", + message.GetType()); else if (_buffer.Count == _settings.BufferSize) { var first = _buffer.Dequeue(); @@ -276,8 +275,8 @@ private void SendBuffered() while (_buffer.Count != 0) { var pair = _buffer.Dequeue(); - _singleton.Tell(pair.Key, pair.Value); + _singleton?.Tell(pair.Key, pair.Value); } } } -} +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/MemberAgeOrdering.cs b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/MemberAgeOrdering.cs index 420ca1b68c3..1be2fcf2822 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/MemberAgeOrdering.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/MemberAgeOrdering.cs @@ -4,13 +4,13 @@ // Copyright (C) 2013-2023 .NET Foundation // //----------------------------------------------------------------------- - +#nullable enable using System.Collections.Generic; namespace Akka.Cluster.Tools.Singleton { /// - /// TBD + /// Responsible for sorting members based on their age, with the option to consider the app version. /// internal sealed class MemberAgeOrdering : IComparer { @@ -24,8 +24,19 @@ private MemberAgeOrdering(bool ascending, bool considerAppVersion) } /// - public int Compare(Member x, Member y) + public int Compare(Member? x, Member? y) { + switch (x) + { + // add null checks here + case null when y is null: + return 0; + case null: + return _ascending ? -1 : 1; + } + + if (y is null) return _ascending ? 1 : -1; + if (_considerAppVersion) { // prefer nodes with the highest app version, even if they're younger @@ -39,19 +50,9 @@ public int Compare(Member x, Member y) ? (_ascending ? 1 : -1) : (_ascending ? -1 : 1); } - - /// - /// TBD - /// - public static readonly MemberAgeOrdering Ascending = new(true, false); - - public static readonly MemberAgeOrdering AscendingWithAppVersion = new(true, true); - - /// - /// TBD - /// - public static readonly MemberAgeOrdering Descending = new(false, false); - public static readonly MemberAgeOrdering DescendingWithAppVersion = new(false, true); + public static readonly MemberAgeOrdering OldestToYoungest = new(false, false); + + public static readonly MemberAgeOrdering OldestToYoungestWithAppVersion = new(false, true); } } diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBuffer.cs b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBuffer.cs index 7af3a3aeca4..2c2fa8aa524 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBuffer.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBuffer.cs @@ -4,14 +4,13 @@ // Copyright (C) 2013-2023 .NET Foundation // //----------------------------------------------------------------------- - +#nullable enable using System; using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; using System.Threading.Tasks; using Akka.Actor; -using Akka.Util.Internal; namespace Akka.Cluster.Tools.Singleton { @@ -34,15 +33,12 @@ internal sealed class OldestChangedBuffer : UntypedActor [Serializable] public sealed class GetNext { - /// - /// TBD - /// public static GetNext Instance { get; } = new(); private GetNext() { } } /// - /// TBD + /// The first event for determining the oldest member. /// [Serializable] public sealed class InitialOldestState @@ -53,15 +49,10 @@ public sealed class InitialOldestState public List Oldest { get; } /// - /// TBD + /// When true, it's safe to be oldest immediately (no older nodes are in process of leaving) /// public bool SafeToBeOldest { get; } - - /// - /// TBD - /// - /// TBD - /// TBD + public InitialOldestState(List oldest, bool safeToBeOldest) { Oldest = oldest; @@ -70,21 +61,18 @@ public InitialOldestState(List oldest, bool safeToBeOldest) } /// - /// TBD + /// The "oldest" singleton in the cluster has changed, therefore we're going to move to the next oldest singleton. /// [Serializable] public sealed class OldestChanged { /// - /// TBD - /// - public UniqueAddress Oldest { get; } - - /// - /// TBD + /// Can be null if this is the last node in the cluster, + /// in which case this event is moot anyway. /// - /// TBD - public OldestChanged(UniqueAddress oldest) + public UniqueAddress? Oldest { get; } + + public OldestChanged(UniqueAddress? oldest) { Oldest = oldest; } @@ -94,6 +82,8 @@ public OldestChanged(UniqueAddress oldest) private readonly MemberAgeOrdering _memberAgeComparer; private readonly CoordinatedShutdown _coordShutdown = CoordinatedShutdown.Get(Context.System); + + public OldestChangedBufferState State { get; set; } /// /// Creates a new instance of the . @@ -102,11 +92,12 @@ public OldestChanged(UniqueAddress oldest) /// Should cluster AppVersion be considered when sorting member age public OldestChangedBuffer(string role, bool considerAppVersion) { - _role = role; _memberAgeComparer = considerAppVersion - ? MemberAgeOrdering.DescendingWithAppVersion - : MemberAgeOrdering.Descending; - _membersByAge = ImmutableSortedSet.Empty.WithComparer(_memberAgeComparer); + ? MemberAgeOrdering.OldestToYoungestWithAppVersion + : MemberAgeOrdering.OldestToYoungest; + var membersByAge = ImmutableSortedSet.Empty.WithComparer(_memberAgeComparer); + + State = new OldestChangedBufferState(membersByAge, role); SetupCoordinatedShutdown(); } @@ -135,45 +126,35 @@ private void SetupCoordinatedShutdown() } }); } - - private readonly string _role; - private ImmutableSortedSet _membersByAge; + private ImmutableQueue _changes = ImmutableQueue.Empty; private readonly Cluster _cluster = Cluster.Get(Context.System); - - private void TrackChanges(Action block) - { - var before = _membersByAge.FirstOrDefault(); - block(); - var after = _membersByAge.FirstOrDefault(); - - // todo: fix neq comparison - if (!Equals(before, after)) - _changes = _changes.Enqueue(new OldestChanged(after?.UniqueAddress)); - } - - private bool MatchingRole(Member member) - { - return string.IsNullOrEmpty(_role) || member.HasRole(_role); - } - - private void HandleInitial(ClusterEvent.CurrentClusterState state) + + private void HandleInitial(ClusterEvent.CurrentClusterState clusterState) { // all members except Joining and WeaklyUp - _membersByAge = state.Members - .Where(m => m.UpNumber != int.MaxValue && MatchingRole(m)) + var newMembersByAge = clusterState.Members + .Where(m => m.UpNumber != int.MaxValue && State.MatchingRole(m)) .ToImmutableSortedSet(_memberAgeComparer); + + State = State with { MembersByAge = newMembersByAge }; + + // compute the initial oldest - doesn't matter if it's not "safe" or not. That's our parent's problem. + // The subsequent `MemberRemoved` emits that will be emitted if an older node is leaving or downed + // will clean that up inside our state. + var (newState, oldestChanged) = State.ComputeNextOldest(); + State = newState; // If there is some removal in progress of an older node it's not safe to immediately become oldest, // removal of younger nodes doesn't matter. Note that it can also be started via restart after // ClusterSingletonManagerIsStuck. - var selfUpNumber = state.Members + var selfUpNumber = clusterState.Members .Where(m => m.UniqueAddress == _cluster.SelfUniqueAddress) .Select(m => (int?)m.UpNumber) .FirstOrDefault() ?? int.MaxValue; - var oldest = _membersByAge.TakeWhile(m => m.UpNumber <= selfUpNumber).ToList(); + var oldest = State.MembersByAge.TakeWhile(m => m.UpNumber <= selfUpNumber).ToList(); var safeToBeOldest = !oldest.Any(m => m.Status is MemberStatus.Down or MemberStatus.Exiting or MemberStatus.Leaving); var initial = new InitialOldestState(oldest.Select(m => m.UniqueAddress).ToList(), safeToBeOldest); _changes = _changes.Enqueue(initial); @@ -181,19 +162,28 @@ private void HandleInitial(ClusterEvent.CurrentClusterState state) private void Add(Member member) { - if (MatchingRole(member)) - TrackChanges(() => + if (State.MatchingRole(member)) + { + var (newState, oldestChanged) = State.AddMember(member); + State = newState; + if (oldestChanged) { - // replace, it's possible that the upNumber is changed - _membersByAge = _membersByAge.Remove(member); - _membersByAge = _membersByAge.Add(member); - }); + _changes = _changes.Enqueue(new OldestChanged(State.CurrentOldest?.UniqueAddress)); + } + } } private void Remove(Member member) { - if (MatchingRole(member)) - TrackChanges(() => _membersByAge = _membersByAge.Remove(member)); + if (State.MatchingRole(member)) + { + var (newState, oldestChanged) = State.RemoveMember(member); + State = newState; + if (oldestChanged) + { + _changes = _changes.Enqueue(new OldestChanged(State.CurrentOldest?.UniqueAddress)); + } + } } private void SendFirstChange() @@ -221,21 +211,33 @@ protected override void PostStop() /// protected override void OnReceive(object message) { - if (message is ClusterEvent.CurrentClusterState state) HandleInitial(state); - else if (message is ClusterEvent.MemberUp up) Add(up.Member); - else if (message is ClusterEvent.MemberRemoved removed) Remove(removed.Member); - else if (message is ClusterEvent.MemberExited exited && exited.Member.UniqueAddress != _cluster.SelfUniqueAddress) - Remove(exited.Member); - else if (message is SelfExiting) + switch (message) { - Remove(_cluster.ReadView.Self); - Sender.Tell(Done.Instance); // reply to ask - } - else if (message is GetNext && _changes.IsEmpty) Context.BecomeStacked(OnDeliverNext); - else if (message is GetNext) SendFirstChange(); - else - { - Unhandled(message); + case ClusterEvent.CurrentClusterState state: + HandleInitial(state); + break; + case ClusterEvent.MemberUp up: + Add(up.Member); + break; + case ClusterEvent.MemberRemoved removed: + Remove(removed.Member); + break; + case ClusterEvent.MemberExited exited when exited.Member.UniqueAddress != _cluster.SelfUniqueAddress: + Remove(exited.Member); + break; + case SelfExiting: + Remove(_cluster.ReadView.Self); + Sender.Tell(Done.Instance); // reply to ask + break; + case GetNext when _changes.IsEmpty: + Context.BecomeStacked(OnDeliverNext); + break; + case GetNext: + SendFirstChange(); + break; + default: + Unhandled(message); + break; } } @@ -245,36 +247,33 @@ protected override void OnReceive(object message) /// The message to handle. private void OnDeliverNext(object message) { - if (message is ClusterEvent.CurrentClusterState state) - { - HandleInitial(state); - SendFirstChange(); - Context.UnbecomeStacked(); - } - else if (message is ClusterEvent.MemberUp up) - { - Add(up.Member); - DeliverChanges(); - } - else if (message is ClusterEvent.MemberRemoved removed) - { - Remove(removed.Member); - DeliverChanges(); - } - else if (message is ClusterEvent.MemberExited exited && exited.Member.UniqueAddress != _cluster.SelfUniqueAddress) - { - Remove(exited.Member); - DeliverChanges(); - } - else if (message is SelfExiting) - { - Remove(_cluster.ReadView.Self); - DeliverChanges(); - Sender.Tell(Done.Instance); // reply to ask - } - else + switch (message) { - Unhandled(message); + case ClusterEvent.CurrentClusterState state: + HandleInitial(state); + SendFirstChange(); + Context.UnbecomeStacked(); + break; + case ClusterEvent.MemberUp up: + Add(up.Member); + DeliverChanges(); + break; + case ClusterEvent.MemberRemoved removed: + Remove(removed.Member); + DeliverChanges(); + break; + case ClusterEvent.MemberExited exited when exited.Member.UniqueAddress != _cluster.SelfUniqueAddress: + Remove(exited.Member); + DeliverChanges(); + break; + case SelfExiting: + Remove(_cluster.ReadView.Self); + DeliverChanges(); + Sender.Tell(Done.Instance); // reply to ask + break; + default: + Unhandled(message); + break; } } diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBufferState.cs b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBufferState.cs new file mode 100644 index 00000000000..b043cc70ddc --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBufferState.cs @@ -0,0 +1,74 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2024 Lightbend Inc. +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +#nullable enable +using System.Collections.Immutable; +using System.Linq; + +namespace Akka.Cluster.Tools.Singleton; + +/// +/// Immutable data object that represents the state of the oldest changed buffer. +/// +internal sealed record OldestChangedBufferState +{ + public OldestChangedBufferState(ImmutableSortedSet initialMembersByAge, string? role) + { + MembersByAge = initialMembersByAge; + Role = role; + CurrentOldest = MembersByAge.FirstOrDefault(this.MatchingRole); + } + + public string? Role { get; init; } + + public Member? CurrentOldest { get; init; } + + public ImmutableSortedSet MembersByAge { get; init; } +} + +internal static class OldestChangedBufferStateExtensions +{ + internal static bool MatchingRole(this OldestChangedBufferState state, Member member) + { + return string.IsNullOrEmpty(state.Role) || member.HasRole(state.Role); + } + + public static (OldestChangedBufferState newState, bool oldestChanged) AddMember(this OldestChangedBufferState state, Member member) + { + if (MatchingRole(state, member)) + { + // remove then add node to replace it, as it's possible that the upNumber is changed + return ComputeNextOldest(state with { MembersByAge = state.MembersByAge.Remove(member).Add(member) }); + } + + return (state, false); + } + + public static (OldestChangedBufferState newState, bool oldestChanged) RemoveMember(this OldestChangedBufferState state, Member member) + { + if (MatchingRole(state, member)) + { + return ComputeNextOldest(state with { MembersByAge = state.MembersByAge.Remove(member) }); + } + + return (state, false); + } + + public static (OldestChangedBufferState newState, bool oldestChanged) ComputeNextOldest(this OldestChangedBufferState state) + { + // if the current oldest has not been removed, then it remains the oldest + if(state.CurrentOldest is not null && state.MembersByAge.Contains(state.CurrentOldest)) + { + return (state, false); + } + + // compute the next oldest + var nextOldest = state.MembersByAge.FirstOrDefault(); + var oldestChanged = !Equals(nextOldest, state.CurrentOldest); + return (state with { CurrentOldest = nextOldest }, oldestChanged); + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/reference.conf b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/reference.conf index ea374bbffb5..c40d4af5002 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/reference.conf +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/reference.conf @@ -46,7 +46,7 @@ akka.cluster.singleton { # Should akka.cluster.app-version be considered when the cluster singleton instance is being moved to another node # When set to false, singleton instance will always be created on oldest member # When set to true, singleton instance will be created on the oldest node with the highest member app-version number - consider-app-version = false + consider-app-version = true } # //#singleton-config diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt index e8b13f02aac..fe1cc7ddc5d 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt @@ -397,6 +397,7 @@ namespace Akka.Cluster.Tools.Singleton public ClusterSingletonProvider() { } public override Akka.Cluster.Tools.Singleton.ClusterSingleton CreateExtension(Akka.Actor.ExtendedActorSystem system) { } } + [System.Runtime.CompilerServices.NullableAttribute(0)] public sealed class ClusterSingletonProxy : Akka.Actor.ReceiveActor { public ClusterSingletonProxy(string singletonManagerPath, Akka.Cluster.Tools.Singleton.ClusterSingletonProxySettings settings) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt index a25e701e6aa..d8c50cbc86e 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt @@ -397,6 +397,7 @@ namespace Akka.Cluster.Tools.Singleton public ClusterSingletonProvider() { } public override Akka.Cluster.Tools.Singleton.ClusterSingleton CreateExtension(Akka.Actor.ExtendedActorSystem system) { } } + [System.Runtime.CompilerServices.NullableAttribute(0)] public sealed class ClusterSingletonProxy : Akka.Actor.ReceiveActor { public ClusterSingletonProxy(string singletonManagerPath, Akka.Cluster.Tools.Singleton.ClusterSingletonProxySettings settings) { }