From 7a3ac22e7c3e4046775a48911dc1e82427772f1b Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 22 May 2024 19:59:58 -0500 Subject: [PATCH 01/13] WIP Akka.Cluster.Singleton fixes --- .../Singleton/OldChangedBufferSpecs.cs | 57 +++++++++ .../Singleton/MemberAgeOrdering.cs | 14 +-- .../Singleton/OldestChangedBuffer.cs | 110 +++++++++--------- 3 files changed, 119 insertions(+), 62 deletions(-) create mode 100644 src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldChangedBufferSpecs.cs diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldChangedBufferSpecs.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldChangedBufferSpecs.cs new file mode 100644 index 00000000000..95eb24891c7 --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldChangedBufferSpecs.cs @@ -0,0 +1,57 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2024 Lightbend Inc. +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- +#nullable enable +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Cluster.Tools.Singleton; +using Akka.Configuration; +using Akka.TestKit; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Cluster.Tools.Tests.Singleton; + +/// +/// Reproduction for https://github.com/akkadotnet/akka.net/issues/7196 - clearly, what we did +/// +public class OldChangedBufferSpecs : AkkaSpec +{ + public IActorRef CreateOldestChangedBuffer(string role, bool considerAppVersion) + { + return Sys.ActorOf(Props.Create(() => new OldestChangedBuffer(role, considerAppVersion))); + } + + private readonly ActorSystem _otherNodeV1; + private readonly ActorSystem _nonHostingNode; + private ActorSystem? _otherNodeV2; + + public OldChangedBufferSpecs(ITestOutputHelper output) : base(""" + + akka.loglevel = INFO + akka.actor.provider = "cluster" + akka.cluster.roles = [singleton] + akka.cluster.auto-down-unreachable-after = 2s + akka.cluster.singleton.min-number-of-hand-over-retries = 5 + akka.remote { + dot-netty.tcp { + hostname = "127.0.0.1" + port = 0 + } + } + """, output) + { + _otherNodeV1 = ActorSystem.Create(Sys.Name, Sys.Settings.Config); + _nonHostingNode = ActorSystem.Create(Sys.Name, ConfigurationFactory.ParseString("akka.cluster.roles = [other]") + .WithFallback(Sys.Settings.Config)); + } + + [Fact(DisplayName = "Singletons should not move to higher AppVersion nodes until after older incarnation is downed")] + public async Task Bugfix7196Spec() + { + + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/MemberAgeOrdering.cs b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/MemberAgeOrdering.cs index 420ca1b68c3..8a36d2b18dd 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/MemberAgeOrdering.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/MemberAgeOrdering.cs @@ -4,13 +4,13 @@ // Copyright (C) 2013-2023 .NET Foundation // //----------------------------------------------------------------------- - +#nullable enable using System.Collections.Generic; namespace Akka.Cluster.Tools.Singleton { /// - /// TBD + /// Responsible for sorting members based on their age, with the option to consider the app version. /// internal sealed class MemberAgeOrdering : IComparer { @@ -39,17 +39,11 @@ public int Compare(Member x, Member y) ? (_ascending ? 1 : -1) : (_ascending ? -1 : 1); } - - /// - /// TBD - /// + public static readonly MemberAgeOrdering Ascending = new(true, false); public static readonly MemberAgeOrdering AscendingWithAppVersion = new(true, true); - - /// - /// TBD - /// + public static readonly MemberAgeOrdering Descending = new(false, false); public static readonly MemberAgeOrdering DescendingWithAppVersion = new(false, true); diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBuffer.cs b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBuffer.cs index 7af3a3aeca4..72b77f31461 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBuffer.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBuffer.cs @@ -4,7 +4,7 @@ // Copyright (C) 2013-2023 .NET Foundation // //----------------------------------------------------------------------- - +#nullable enable using System; using System.Collections.Generic; using System.Collections.Immutable; @@ -70,20 +70,17 @@ public InitialOldestState(List oldest, bool safeToBeOldest) } /// - /// TBD + /// The "oldest" singleton in the cluster has changed, therefore we're going to move to the next oldest singleton. /// [Serializable] public sealed class OldestChanged { /// - /// TBD - /// - public UniqueAddress Oldest { get; } - - /// - /// TBD + /// Can be null if this is the last node in the cluster, + /// in which case this event is moot anyway. /// - /// TBD + public UniqueAddress? Oldest { get; } + public OldestChanged(UniqueAddress oldest) { Oldest = oldest; @@ -221,21 +218,33 @@ protected override void PostStop() /// protected override void OnReceive(object message) { - if (message is ClusterEvent.CurrentClusterState state) HandleInitial(state); - else if (message is ClusterEvent.MemberUp up) Add(up.Member); - else if (message is ClusterEvent.MemberRemoved removed) Remove(removed.Member); - else if (message is ClusterEvent.MemberExited exited && exited.Member.UniqueAddress != _cluster.SelfUniqueAddress) - Remove(exited.Member); - else if (message is SelfExiting) - { - Remove(_cluster.ReadView.Self); - Sender.Tell(Done.Instance); // reply to ask - } - else if (message is GetNext && _changes.IsEmpty) Context.BecomeStacked(OnDeliverNext); - else if (message is GetNext) SendFirstChange(); - else + switch (message) { - Unhandled(message); + case ClusterEvent.CurrentClusterState state: + HandleInitial(state); + break; + case ClusterEvent.MemberUp up: + Add(up.Member); + break; + case ClusterEvent.MemberRemoved removed: + Remove(removed.Member); + break; + case ClusterEvent.MemberExited exited when exited.Member.UniqueAddress != _cluster.SelfUniqueAddress: + Remove(exited.Member); + break; + case SelfExiting: + Remove(_cluster.ReadView.Self); + Sender.Tell(Done.Instance); // reply to ask + break; + case GetNext when _changes.IsEmpty: + Context.BecomeStacked(OnDeliverNext); + break; + case GetNext: + SendFirstChange(); + break; + default: + Unhandled(message); + break; } } @@ -245,36 +254,33 @@ protected override void OnReceive(object message) /// The message to handle. private void OnDeliverNext(object message) { - if (message is ClusterEvent.CurrentClusterState state) - { - HandleInitial(state); - SendFirstChange(); - Context.UnbecomeStacked(); - } - else if (message is ClusterEvent.MemberUp up) - { - Add(up.Member); - DeliverChanges(); - } - else if (message is ClusterEvent.MemberRemoved removed) - { - Remove(removed.Member); - DeliverChanges(); - } - else if (message is ClusterEvent.MemberExited exited && exited.Member.UniqueAddress != _cluster.SelfUniqueAddress) - { - Remove(exited.Member); - DeliverChanges(); - } - else if (message is SelfExiting) - { - Remove(_cluster.ReadView.Self); - DeliverChanges(); - Sender.Tell(Done.Instance); // reply to ask - } - else + switch (message) { - Unhandled(message); + case ClusterEvent.CurrentClusterState state: + HandleInitial(state); + SendFirstChange(); + Context.UnbecomeStacked(); + break; + case ClusterEvent.MemberUp up: + Add(up.Member); + DeliverChanges(); + break; + case ClusterEvent.MemberRemoved removed: + Remove(removed.Member); + DeliverChanges(); + break; + case ClusterEvent.MemberExited exited when exited.Member.UniqueAddress != _cluster.SelfUniqueAddress: + Remove(exited.Member); + DeliverChanges(); + break; + case SelfExiting: + Remove(_cluster.ReadView.Self); + DeliverChanges(); + Sender.Tell(Done.Instance); // reply to ask + break; + default: + Unhandled(message); + break; } } From 8099b9372efcc4b6daa225ba7e7542d19496c88e Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 23 May 2024 09:57:54 -0500 Subject: [PATCH 02/13] designing `OldestChangedBufferState` --- .../Singleton/OldChangedBufferSpecs.cs | 144 ++++++++++++++---- .../Singleton/OldestChangedBufferState.cs | 74 +++++++++ 2 files changed, 185 insertions(+), 33 deletions(-) create mode 100644 src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBufferState.cs diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldChangedBufferSpecs.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldChangedBufferSpecs.cs index 95eb24891c7..bddf559de29 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldChangedBufferSpecs.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldChangedBufferSpecs.cs @@ -5,53 +5,131 @@ // // ----------------------------------------------------------------------- #nullable enable -using System.Threading.Tasks; +using System.Collections.Immutable; +using System.Linq; using Akka.Actor; using Akka.Cluster.Tools.Singleton; -using Akka.Configuration; -using Akka.TestKit; +using Akka.Util; +using FluentAssertions; using Xunit; -using Xunit.Abstractions; namespace Akka.Cluster.Tools.Tests.Singleton; /// /// Reproduction for https://github.com/akkadotnet/akka.net/issues/7196 - clearly, what we did /// -public class OldChangedBufferSpecs : AkkaSpec +// public class OldChangedBufferSpecs : AkkaSpec +// { +// public IActorRef CreateOldestChangedBuffer(string role, bool considerAppVersion) +// { +// return Sys.ActorOf(Props.Create(() => new OldestChangedBuffer(role, considerAppVersion))); +// } +// +// private readonly ActorSystem _otherNodeV1; +// private readonly ActorSystem _nonHostingNode; +// private ActorSystem? _otherNodeV2; +// +// public OldChangedBufferSpecs(ITestOutputHelper output) : base(""" +// +// akka.loglevel = INFO +// akka.actor.provider = "cluster" +// akka.cluster.roles = [singleton] +// akka.cluster.auto-down-unreachable-after = 2s +// akka.cluster.singleton.min-number-of-hand-over-retries = 5 +// akka.remote { +// dot-netty.tcp { +// hostname = "127.0.0.1" +// port = 0 +// } +// } +// """, output) +// { +// _otherNodeV1 = ActorSystem.Create(Sys.Name, Sys.Settings.Config); +// _nonHostingNode = ActorSystem.Create(Sys.Name, ConfigurationFactory.ParseString("akka.cluster.roles = [other]") +// .WithFallback(Sys.Settings.Config)); +// } +// +// [Fact(DisplayName = "Singletons should not move to higher AppVersion nodes until after older incarnation is downed")] +// public async Task Bugfix7196Spec() +// { +// +// } +// } + +public class OldestChangedBufferStateSpecs { - public IActorRef CreateOldestChangedBuffer(string role, bool considerAppVersion) - { - return Sys.ActorOf(Props.Create(() => new OldestChangedBuffer(role, considerAppVersion))); - } - - private readonly ActorSystem _otherNodeV1; - private readonly ActorSystem _nonHostingNode; - private ActorSystem? _otherNodeV2; - - public OldChangedBufferSpecs(ITestOutputHelper output) : base(""" - - akka.loglevel = INFO - akka.actor.provider = "cluster" - akka.cluster.roles = [singleton] - akka.cluster.auto-down-unreachable-after = 2s - akka.cluster.singleton.min-number-of-hand-over-retries = 5 - akka.remote { - dot-netty.tcp { - hostname = "127.0.0.1" - port = 0 - } - } - """, output) + [Fact] + public void OldestChangedBuffer_should_initially_only_consider_nodes_with_matching_role() { - _otherNodeV1 = ActorSystem.Create(Sys.Name, Sys.Settings.Config); - _nonHostingNode = ActorSystem.Create(Sys.Name, ConfigurationFactory.ParseString("akka.cluster.roles = [other]") - .WithFallback(Sys.Settings.Config)); + // Arrange + var targetRole = "target-role"; + var targetRoles = ImmutableHashSet.Create("role1", "role2", targetRole); + var winningAddress = Address.Parse("akka://sys@darkstar:1112"); + var nonTargetRoles = ImmutableHashSet.Create("role1", "role2"); + var initialMembersByAge = ImmutableSortedSet.Empty + .Add(Create(winningAddress, roles:targetRoles, upNumber: 3)) + .Add(Create(Address.Parse("akka://sys@darkstar:1113"), roles:nonTargetRoles, upNumber: 1)) + .Add(Create(Address.Parse("akka://sys@darkstar:1111"), roles:targetRoles, upNumber: 9)) + .WithComparer(MemberAgeOrdering.DescendingWithAppVersion); + + // Act + var state = new OldestChangedBufferState(initialMembersByAge, targetRole); + + // Assert + var oldest = state.CurrentOldest; + oldest.Should().NotBeNull(); + oldest!.Address.Should().Be(winningAddress); } - [Fact(DisplayName = "Singletons should not move to higher AppVersion nodes until after older incarnation is downed")] - public async Task Bugfix7196Spec() + [Fact] + public void OldestChangedBuffer_should_not_change_leader_when_higher_AppVersion_added() { + // Arrange + var winningAddress = Address.Parse("akka://sys@darkstar:1112"); + var appVersion1 = AppVersion.Create("1.0.0"); + var appVersion2 = AppVersion.Create("1.0.2"); + var initialMembersByAge = ImmutableSortedSet.Empty + .Add(Create(winningAddress, upNumber: 3, appVersion: appVersion1)) + .Add(Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 9, appVersion: appVersion1)) + .WithComparer(MemberAgeOrdering.DescendingWithAppVersion); + + // Act + var state = new OldestChangedBufferState(initialMembersByAge, string.Empty); + var oldest = state.CurrentOldest; + + // higher upNumber - should not affect leader + var newMemberSameVersion = Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 10, appVersion: appVersion1); + // higher upNumber AND version - should not affect leader + var newMemberNewVersion = Create(Address.Parse("akka://sys@darkstar:1114"), upNumber: 11, appVersion: appVersion2); + + // Act + var (state1, oldestChanged1) = state.AddMember(newMemberSameVersion); + var (state2, oldestChanged2) = state1.AddMember(newMemberNewVersion); + + // Assert + oldest.Should().NotBeNull(); + oldest!.Address.Should().Be(winningAddress); + + state1.CurrentOldest.Should().Be(oldest); + oldestChanged1.Should().BeFalse(); + + state2.CurrentOldest.Should().Be(oldest); + oldestChanged2.Should().BeFalse(); + + // the members by age system is going to chose the appVersion over the upNumber + state2.MembersByAge.FirstOrDefault().Should().NotBe(oldest); + state2.MembersByAge.FirstOrDefault()!.AppVersion.Should().Be(appVersion2); + } + + public static Member Create( + Address address, + MemberStatus status = MemberStatus.Up, + ImmutableHashSet? roles = null, + int uid = 0, + int upNumber = 0, + AppVersion? appVersion = null) + { + return Member.Create(new UniqueAddress(address, uid), upNumber, status, roles ?? ImmutableHashSet.Empty, appVersion ?? AppVersion.Zero); } } \ No newline at end of file diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBufferState.cs b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBufferState.cs new file mode 100644 index 00000000000..b043cc70ddc --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBufferState.cs @@ -0,0 +1,74 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2024 Lightbend Inc. +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +#nullable enable +using System.Collections.Immutable; +using System.Linq; + +namespace Akka.Cluster.Tools.Singleton; + +/// +/// Immutable data object that represents the state of the oldest changed buffer. +/// +internal sealed record OldestChangedBufferState +{ + public OldestChangedBufferState(ImmutableSortedSet initialMembersByAge, string? role) + { + MembersByAge = initialMembersByAge; + Role = role; + CurrentOldest = MembersByAge.FirstOrDefault(this.MatchingRole); + } + + public string? Role { get; init; } + + public Member? CurrentOldest { get; init; } + + public ImmutableSortedSet MembersByAge { get; init; } +} + +internal static class OldestChangedBufferStateExtensions +{ + internal static bool MatchingRole(this OldestChangedBufferState state, Member member) + { + return string.IsNullOrEmpty(state.Role) || member.HasRole(state.Role); + } + + public static (OldestChangedBufferState newState, bool oldestChanged) AddMember(this OldestChangedBufferState state, Member member) + { + if (MatchingRole(state, member)) + { + // remove then add node to replace it, as it's possible that the upNumber is changed + return ComputeNextOldest(state with { MembersByAge = state.MembersByAge.Remove(member).Add(member) }); + } + + return (state, false); + } + + public static (OldestChangedBufferState newState, bool oldestChanged) RemoveMember(this OldestChangedBufferState state, Member member) + { + if (MatchingRole(state, member)) + { + return ComputeNextOldest(state with { MembersByAge = state.MembersByAge.Remove(member) }); + } + + return (state, false); + } + + public static (OldestChangedBufferState newState, bool oldestChanged) ComputeNextOldest(this OldestChangedBufferState state) + { + // if the current oldest has not been removed, then it remains the oldest + if(state.CurrentOldest is not null && state.MembersByAge.Contains(state.CurrentOldest)) + { + return (state, false); + } + + // compute the next oldest + var nextOldest = state.MembersByAge.FirstOrDefault(); + var oldestChanged = !Equals(nextOldest, state.CurrentOldest); + return (state with { CurrentOldest = nextOldest }, oldestChanged); + } +} \ No newline at end of file From 2b6a21d9f47b97e6f0b562d423c3c15fd71cea8b Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 23 May 2024 10:31:36 -0500 Subject: [PATCH 03/13] finished specs for `OldestChangedBufferState` --- .../Singleton/OldChangedBufferSpecs.cs | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldChangedBufferSpecs.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldChangedBufferSpecs.cs index bddf559de29..51d5d102763 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldChangedBufferSpecs.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldChangedBufferSpecs.cs @@ -122,6 +122,91 @@ public void OldestChangedBuffer_should_not_change_leader_when_higher_AppVersion_ state2.MembersByAge.FirstOrDefault()!.AppVersion.Should().Be(appVersion2); } + [Fact] + public void OldestChangedBuffer_should_change_Oldest_when_previous_Oldest_removed() + { + // Arrange + var winningAddress = Address.Parse("akka://sys@darkstar:1112"); + var appVersion1 = AppVersion.Create("1.0.0"); + var appVersion2 = AppVersion.Create("1.0.2"); + + var originalOldest = Create(winningAddress, upNumber: 3, appVersion: appVersion1); + + var initialMembersByAge = ImmutableSortedSet.Empty + .Add(originalOldest) + .Add(Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 9, appVersion: appVersion1)) + .WithComparer(MemberAgeOrdering.DescendingWithAppVersion); + + // Act + var state = new OldestChangedBufferState(initialMembersByAge, string.Empty); + var oldest = state.CurrentOldest; + + // higher upNumber, same version - won't affect leader + var newMemberSameVersion = Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 4, appVersion: appVersion1); + + // lower upNumber AND version - won't affect the leader until it gets removed + var newMemberHigherVersion = Create(Address.Parse("akka://sys@darkstar:1114"), upNumber: 11, appVersion: appVersion2); + + // Act + var (state1, oldestChanged1) = state.AddMember(newMemberSameVersion); + var (state2, oldestChanged2) = state1.AddMember(newMemberHigherVersion); + var (state3, oldestChanged3) = state2.RemoveMember(originalOldest); + + // Assert + oldest.Should().NotBeNull(); + oldest!.Address.Should().Be(winningAddress); + + state1.CurrentOldest.Should().Be(originalOldest); + oldestChanged1.Should().BeFalse(); + + state2.CurrentOldest.Should().Be(originalOldest); + oldestChanged2.Should().BeFalse(); + + state3.CurrentOldest.Should().Be(newMemberHigherVersion); + oldestChanged3.Should().BeTrue(); + } + + [Fact] + public void OldestChangedBuffer_should_not_change_Oldest_when_nonOldest_node_removed() + { + // Arrange + var winningAddress = Address.Parse("akka://sys@darkstar:1112"); + var appVersion1 = AppVersion.Create("1.0.0"); + var appVersion2 = AppVersion.Create("1.0.2"); + var initialMembersByAge = ImmutableSortedSet.Empty + .Add(Create(winningAddress, upNumber: 3, appVersion: appVersion1)) + .Add(Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 9, appVersion: appVersion1)) + .WithComparer(MemberAgeOrdering.DescendingWithAppVersion); + + // Act + var state = new OldestChangedBufferState(initialMembersByAge, string.Empty); + var oldest = state.CurrentOldest; + + // higher upNumber - should not affect leader + var newMemberSameVersion = Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 10, appVersion: appVersion1); + + // higher upNumber AND version - should not affect leader + var newMemberNewVersion = Create(Address.Parse("akka://sys@darkstar:1114"), upNumber: 11, appVersion: appVersion2); + + // Act + var (state1, oldestChanged1) = state.AddMember(newMemberSameVersion); + var (state2, oldestChanged2) = state1.AddMember(newMemberNewVersion); + var (state3, oldestChanged3) = state2.RemoveMember(newMemberSameVersion); + + // Assert + oldest.Should().NotBeNull(); + oldest!.Address.Should().Be(winningAddress); + + state1.CurrentOldest.Should().Be(oldest); + oldestChanged1.Should().BeFalse(); + + state2.CurrentOldest.Should().Be(oldest); + oldestChanged2.Should().BeFalse(); + + state3.CurrentOldest.Should().Be(oldest); + oldestChanged3.Should().BeFalse(); + } + public static Member Create( Address address, MemberStatus status = MemberStatus.Up, From 01d8e57e21b5b4075598515ccc75df36356e0039 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 23 May 2024 10:47:36 -0500 Subject: [PATCH 04/13] integrated `OldestChangedBufferState` into `OldestChangeBuffer` --- .../Singleton/OldestChangedBuffer.cs | 89 +++++++++---------- 1 file changed, 41 insertions(+), 48 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBuffer.cs b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBuffer.cs index 72b77f31461..03a12a517d3 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBuffer.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBuffer.cs @@ -11,7 +11,6 @@ using System.Linq; using System.Threading.Tasks; using Akka.Actor; -using Akka.Util.Internal; namespace Akka.Cluster.Tools.Singleton { @@ -34,15 +33,12 @@ internal sealed class OldestChangedBuffer : UntypedActor [Serializable] public sealed class GetNext { - /// - /// TBD - /// public static GetNext Instance { get; } = new(); private GetNext() { } } /// - /// TBD + /// The first event for determining the oldest member. /// [Serializable] public sealed class InitialOldestState @@ -53,15 +49,10 @@ public sealed class InitialOldestState public List Oldest { get; } /// - /// TBD + /// When true, it's safe to be oldest immediately (no older nodes are in process of leaving) /// public bool SafeToBeOldest { get; } - - /// - /// TBD - /// - /// TBD - /// TBD + public InitialOldestState(List oldest, bool safeToBeOldest) { Oldest = oldest; @@ -81,7 +72,7 @@ public sealed class OldestChanged /// public UniqueAddress? Oldest { get; } - public OldestChanged(UniqueAddress oldest) + public OldestChanged(UniqueAddress? oldest) { Oldest = oldest; } @@ -91,6 +82,8 @@ public OldestChanged(UniqueAddress oldest) private readonly MemberAgeOrdering _memberAgeComparer; private readonly CoordinatedShutdown _coordShutdown = CoordinatedShutdown.Get(Context.System); + + public OldestChangedBufferState State { get; set; } /// /// Creates a new instance of the . @@ -99,11 +92,12 @@ public OldestChanged(UniqueAddress oldest) /// Should cluster AppVersion be considered when sorting member age public OldestChangedBuffer(string role, bool considerAppVersion) { - _role = role; _memberAgeComparer = considerAppVersion ? MemberAgeOrdering.DescendingWithAppVersion : MemberAgeOrdering.Descending; - _membersByAge = ImmutableSortedSet.Empty.WithComparer(_memberAgeComparer); + var membersByAge = ImmutableSortedSet.Empty.WithComparer(_memberAgeComparer); + + State = new OldestChangedBufferState(membersByAge, role); SetupCoordinatedShutdown(); } @@ -132,45 +126,35 @@ private void SetupCoordinatedShutdown() } }); } - - private readonly string _role; - private ImmutableSortedSet _membersByAge; + private ImmutableQueue _changes = ImmutableQueue.Empty; private readonly Cluster _cluster = Cluster.Get(Context.System); - - private void TrackChanges(Action block) - { - var before = _membersByAge.FirstOrDefault(); - block(); - var after = _membersByAge.FirstOrDefault(); - - // todo: fix neq comparison - if (!Equals(before, after)) - _changes = _changes.Enqueue(new OldestChanged(after?.UniqueAddress)); - } - - private bool MatchingRole(Member member) - { - return string.IsNullOrEmpty(_role) || member.HasRole(_role); - } - - private void HandleInitial(ClusterEvent.CurrentClusterState state) + + private void HandleInitial(ClusterEvent.CurrentClusterState clusterState) { // all members except Joining and WeaklyUp - _membersByAge = state.Members - .Where(m => m.UpNumber != int.MaxValue && MatchingRole(m)) + var newMembersByAge = clusterState.Members + .Where(m => m.UpNumber != int.MaxValue && State.MatchingRole(m)) .ToImmutableSortedSet(_memberAgeComparer); + + State = State with { MembersByAge = newMembersByAge }; + + // compute the initial oldest - doesn't matter if it's not "safe" or not. That's our parent's problem. + // The subsequent `MemberRemoved` emits that will be emitted if an older node is leaving or downed + // will clean that up inside our state. + var (newState, oldestChanged) = State.ComputeNextOldest(); + State = newState; // If there is some removal in progress of an older node it's not safe to immediately become oldest, // removal of younger nodes doesn't matter. Note that it can also be started via restart after // ClusterSingletonManagerIsStuck. - var selfUpNumber = state.Members + var selfUpNumber = clusterState.Members .Where(m => m.UniqueAddress == _cluster.SelfUniqueAddress) .Select(m => (int?)m.UpNumber) .FirstOrDefault() ?? int.MaxValue; - var oldest = _membersByAge.TakeWhile(m => m.UpNumber <= selfUpNumber).ToList(); + var oldest = State.MembersByAge.TakeWhile(m => m.UpNumber <= selfUpNumber).ToList(); var safeToBeOldest = !oldest.Any(m => m.Status is MemberStatus.Down or MemberStatus.Exiting or MemberStatus.Leaving); var initial = new InitialOldestState(oldest.Select(m => m.UniqueAddress).ToList(), safeToBeOldest); _changes = _changes.Enqueue(initial); @@ -178,19 +162,28 @@ private void HandleInitial(ClusterEvent.CurrentClusterState state) private void Add(Member member) { - if (MatchingRole(member)) - TrackChanges(() => + if (State.MatchingRole(member)) + { + var (newState, oldestChanged) = State.AddMember(member); + State = newState; + if (oldestChanged) { - // replace, it's possible that the upNumber is changed - _membersByAge = _membersByAge.Remove(member); - _membersByAge = _membersByAge.Add(member); - }); + _changes = _changes.Enqueue(new OldestChanged(State.CurrentOldest?.UniqueAddress)); + } + } } private void Remove(Member member) { - if (MatchingRole(member)) - TrackChanges(() => _membersByAge = _membersByAge.Remove(member)); + if (State.MatchingRole(member)) + { + var (newState, oldestChanged) = State.RemoveMember(member); + State = newState; + if (oldestChanged) + { + _changes = _changes.Enqueue(new OldestChanged(State.CurrentOldest?.UniqueAddress)); + } + } } private void SendFirstChange() From 620e533c6190e37b2140f013edf520158eb5769b Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 23 May 2024 11:10:14 -0500 Subject: [PATCH 05/13] Updated `ClusterSingletonProxy` to also use `OldestChangedBufferState` --- .../Singleton/ClusterSingletonProxy.cs | 161 +++++++++--------- 1 file changed, 80 insertions(+), 81 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs index b624426555b..73937857dd3 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs @@ -5,6 +5,7 @@ // //----------------------------------------------------------------------- +#nullable enable using System; using System.Collections.Generic; using System.Collections.Immutable; @@ -40,15 +41,15 @@ namespace Akka.Cluster.Tools.Singleton public sealed class ClusterSingletonProxy : ReceiveActor { /// - /// TBD + /// Message used to tell the ClusterSingletonProxy to locate the currently active singleton. /// internal sealed class TryToIdentifySingleton : INoSerializationVerificationNeeded { - /// - /// TBD - /// public static TryToIdentifySingleton Instance { get; } = new(); - private TryToIdentifySingleton() { } + + private TryToIdentifySingleton() + { + } } /// @@ -57,7 +58,8 @@ private TryToIdentifySingleton() { } /// TBD public static Config DefaultConfig() { - return ConfigurationFactory.FromResource("Akka.Cluster.Tools.Singleton.reference.conf"); + return ConfigurationFactory.FromResource( + "Akka.Cluster.Tools.Singleton.reference.conf"); } /// @@ -68,7 +70,7 @@ public static Config DefaultConfig() /// which ends with the name you defined in `actorOf` when creating the . /// /// Cluster singleton proxy settings. - /// TBD + /// The props for the singleton proxy public static Props Props(string singletonManagerPath, ClusterSingletonProxySettings settings) { return Actor.Props.Create(() => new ClusterSingletonProxy(singletonManagerPath, settings)) @@ -83,16 +85,11 @@ public static Props Props(string singletonManagerPath, ClusterSingletonProxySett private readonly string[] _singletonPath; private int _identityCounter = 0; private string _identityId; - private IActorRef _singleton = null; - private ICancelable _identityTimer = null; - private ImmutableSortedSet _membersByAge; + private IActorRef? _singleton; + private ICancelable? _identityTimer; + private OldestChangedBufferState _state; private ILoggingAdapter _log; - /// - /// TBD - /// - /// TBD - /// TBD public ClusterSingletonProxy(string singletonManagerPath, ClusterSingletonProxySettings settings) { _settings = settings; @@ -102,7 +99,8 @@ public ClusterSingletonProxy(string singletonManagerPath, ClusterSingletonProxyS _memberAgeComparer = settings.ConsiderAppVersion ? MemberAgeOrdering.DescendingWithAppVersion : MemberAgeOrdering.Descending; - _membersByAge = ImmutableSortedSet.Empty.WithComparer(_memberAgeComparer); + _state = new OldestChangedBufferState(ImmutableSortedSet.Empty.WithComparer(_memberAgeComparer), + settings.Role); Receive(s => HandleInitial(s)); Receive(m => Add(m.Member)); @@ -119,63 +117,58 @@ public ClusterSingletonProxy(string singletonManagerPath, ClusterSingletonProxyS /* do nothing */ }); Receive(identity => + { + if (identity.Subject != null) { - if (identity.Subject != null) - { - // if the new singleton is defined, deliver all buffered messages - var subject = identity.Subject; - Log.Info("Singleton identified at [{0}]", subject.Path); - _singleton = subject; - Context.Watch(subject); - CancelTimer(); - SendBuffered(); - } - }); + // if the new singleton is defined, deliver all buffered messages + var subject = identity.Subject; + Log.Info("Singleton identified at [{0}]", subject.Path); + _singleton = subject; + Context.Watch(subject); + CancelTimer(); + SendBuffered(); + } + }); Receive(_ => - { - var oldest = _membersByAge.FirstOrDefault(); - if (oldest != null && _identityTimer != null) - { - var singletonAddress = new RootActorPath(oldest.Address) / _singletonPath; - Log.Debug("Trying to identify singleton at [{0}]", singletonAddress); - Context.ActorSelection(singletonAddress).Tell(new Identify(_identityId)); - } - }); + { + var oldest = _state.CurrentOldest; + if (oldest != null && _identityTimer != null) + { + var singletonAddress = new RootActorPath(oldest.Address) / _singletonPath; + Log.Debug("Trying to identify singleton at [{0}]", singletonAddress); + Context.ActorSelection(singletonAddress).Tell(new Identify(_identityId)); + } + }); Receive(terminated => + { + if (Equals(_singleton, terminated.ActorRef)) { - if (Equals(_singleton, terminated.ActorRef)) - { - // buffering mode, identification of new will start when old node is removed - _singleton = null; - } - }); + // buffering mode, identification of new will start when old node is removed + _singleton = null; + } + }); ReceiveAny(msg => + { + if (_singleton != null) { - if (_singleton != null) - { - if (Log.IsDebugEnabled) - Log.Debug("Forwarding message of type [{0}] to current singleton instance at [{1}]", msg.GetType(), _singleton.Path); - _singleton.Forward(msg); - } - else - Buffer(msg); - }); + if (Log.IsDebugEnabled) + Log.Debug("Forwarding message of type [{0}] to current singleton instance at [{1}]", + msg.GetType(), _singleton.Path); + _singleton.Forward(msg); + } + else + Buffer(msg); + }); } private ILoggingAdapter Log => _log ??= Context.GetLogger(); - /// - /// TBD - /// protected override void PreStart() { CancelTimer(); _cluster.Subscribe(Self, typeof(ClusterEvent.IMemberEvent)); } - /// - /// TBD - /// protected override void PostStop() { CancelTimer(); @@ -200,10 +193,19 @@ private bool MatchingRole(Member member) private void HandleInitial(ClusterEvent.CurrentClusterState state) { - TrackChanges(() => - _membersByAge = state.Members - .Where(m => m.Status == MemberStatus.Up && MatchingRole(m)) - .ToImmutableSortedSet(_memberAgeComparer)); + var membersByAge = state.Members + .Where(m => m.Status == MemberStatus.Up && MatchingRole(m)) + .ToImmutableSortedSet(_memberAgeComparer); + + _state = _state with { MembersByAge = membersByAge}; + + // compute the initial oldest + var (newState, _) = _state.ComputeNextOldest(); + _state = newState; + + // if the oldest is defined, start the identification process + if (_state.CurrentOldest != null) + IdentifySingleton(); } // Discard old singleton ActorRef and send a periodic message to self to identify the singleton. @@ -221,31 +223,27 @@ private void IdentifySingleton() message: TryToIdentifySingleton.Instance, sender: Self); } - - private void TrackChanges(Action block) - { - var before = _membersByAge.FirstOrDefault(); - block(); - var after = _membersByAge.FirstOrDefault(); - - // if the head has changed, I need to find the new singleton - if (!Equals(before, after)) IdentifySingleton(); - } - + private void Add(Member member) { if (MatchingRole(member)) - TrackChanges(() => - { - _membersByAge = _membersByAge.Remove(member); //replace - _membersByAge = _membersByAge.Add(member); - }); + { + var (newState, oldestChanged) = _state.AddMember(member); + _state = newState; + if (oldestChanged) + IdentifySingleton(); + } } private void Remove(Member member) { if (MatchingRole(member)) - TrackChanges(() => _membersByAge = _membersByAge.Remove(member)); + { + var (newState, oldestChanged) = _state.RemoveMember(member); + _state = newState; + if (oldestChanged) + IdentifySingleton(); + } } private string CreateIdentifyId(int i) @@ -256,7 +254,8 @@ private string CreateIdentifyId(int i) private void Buffer(object message) { if (_settings.BufferSize == 0) - Log.Debug("Singleton not available and buffering is disabled, dropping message [{0}]", message.GetType()); + Log.Debug("Singleton not available and buffering is disabled, dropping message [{0}]", + message.GetType()); else if (_buffer.Count == _settings.BufferSize) { var first = _buffer.Dequeue(); @@ -276,8 +275,8 @@ private void SendBuffered() while (_buffer.Count != 0) { var pair = _buffer.Dequeue(); - _singleton.Tell(pair.Key, pair.Value); + _singleton?.Tell(pair.Key, pair.Value); } } } -} +} \ No newline at end of file From 96c3c9ba032d71b2cb16a4b0095ab47ce687f193 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 23 May 2024 12:05:25 -0500 Subject: [PATCH 06/13] added end to end repro for #7196 close #7196 --- .../Singleton/OldChangedBufferSpecs.cs | 303 +++++++----------- .../OldestChangedBufferStateSpecs.cs | 180 +++++++++++ 2 files changed, 297 insertions(+), 186 deletions(-) create mode 100644 src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldestChangedBufferStateSpecs.cs diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldChangedBufferSpecs.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldChangedBufferSpecs.cs index 51d5d102763..5c9ad601161 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldChangedBufferSpecs.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldChangedBufferSpecs.cs @@ -4,217 +4,148 @@ // Copyright (C) 2013-2024 .NET Foundation // // ----------------------------------------------------------------------- + #nullable enable +using System; using System.Collections.Immutable; using System.Linq; +using System.Threading.Tasks; using Akka.Actor; using Akka.Cluster.Tools.Singleton; -using Akka.Util; +using Akka.Configuration; +using Akka.TestKit; using FluentAssertions; using Xunit; +using Xunit.Abstractions; namespace Akka.Cluster.Tools.Tests.Singleton; /// /// Reproduction for https://github.com/akkadotnet/akka.net/issues/7196 - clearly, what we did /// -// public class OldChangedBufferSpecs : AkkaSpec -// { -// public IActorRef CreateOldestChangedBuffer(string role, bool considerAppVersion) -// { -// return Sys.ActorOf(Props.Create(() => new OldestChangedBuffer(role, considerAppVersion))); -// } -// -// private readonly ActorSystem _otherNodeV1; -// private readonly ActorSystem _nonHostingNode; -// private ActorSystem? _otherNodeV2; -// -// public OldChangedBufferSpecs(ITestOutputHelper output) : base(""" -// -// akka.loglevel = INFO -// akka.actor.provider = "cluster" -// akka.cluster.roles = [singleton] -// akka.cluster.auto-down-unreachable-after = 2s -// akka.cluster.singleton.min-number-of-hand-over-retries = 5 -// akka.remote { -// dot-netty.tcp { -// hostname = "127.0.0.1" -// port = 0 -// } -// } -// """, output) -// { -// _otherNodeV1 = ActorSystem.Create(Sys.Name, Sys.Settings.Config); -// _nonHostingNode = ActorSystem.Create(Sys.Name, ConfigurationFactory.ParseString("akka.cluster.roles = [other]") -// .WithFallback(Sys.Settings.Config)); -// } -// -// [Fact(DisplayName = "Singletons should not move to higher AppVersion nodes until after older incarnation is downed")] -// public async Task Bugfix7196Spec() -// { -// -// } -// } - -public class OldestChangedBufferStateSpecs +public class OldChangedBufferSpecs : AkkaSpec { - [Fact] - public void OldestChangedBuffer_should_initially_only_consider_nodes_with_matching_role() + private readonly ActorSystem _hostNodeV1; + private readonly ActorSystem _otherNodeV2; + + private static Config OriginalNodeConfig() => """ + + akka.loglevel = INFO + akka.actor.provider = "cluster" + akka.cluster.roles = [non-singleton] + akka.cluster.auto-down-unreachable-after = 2s + akka.cluster.singleton.min-number-of-hand-over-retries = 5 + akka.cluster.app-version = "1.0.0" + akka.remote { + dot-netty.tcp { + hostname = "127.0.0.1" + port = 0 + } + } + """; + + private static Config V2NodeConfig(ActorSystem originalSys) => ConfigurationFactory.ParseString( + "akka.cluster.app-version = \"1.0.2\"").WithFallback(originalSys.Settings.Config); + + public OldChangedBufferSpecs(ITestOutputHelper output) : base(OriginalNodeConfig(), output) { - // Arrange - var targetRole = "target-role"; - var targetRoles = ImmutableHashSet.Create("role1", "role2", targetRole); - var winningAddress = Address.Parse("akka://sys@darkstar:1112"); - var nonTargetRoles = ImmutableHashSet.Create("role1", "role2"); - var initialMembersByAge = ImmutableSortedSet.Empty - .Add(Create(winningAddress, roles:targetRoles, upNumber: 3)) - .Add(Create(Address.Parse("akka://sys@darkstar:1113"), roles:nonTargetRoles, upNumber: 1)) - .Add(Create(Address.Parse("akka://sys@darkstar:1111"), roles:targetRoles, upNumber: 9)) - .WithComparer(MemberAgeOrdering.DescendingWithAppVersion); - - // Act - var state = new OldestChangedBufferState(initialMembersByAge, targetRole); - - // Assert - var oldest = state.CurrentOldest; - oldest.Should().NotBeNull(); - oldest!.Address.Should().Be(winningAddress); + _hostNodeV1 = ActorSystem.Create(Sys.Name, + ConfigurationFactory.ParseString("akka.cluster.roles = [singleton]").WithFallback(Sys.Settings.Config)); + _otherNodeV2 = ActorSystem.Create(Sys.Name, + ConfigurationFactory.ParseString("akka.cluster.roles = [singleton]").WithFallback(V2NodeConfig(Sys))); } - [Fact] - public void OldestChangedBuffer_should_not_change_leader_when_higher_AppVersion_added() + [Fact(DisplayName = + "Singletons should not move to higher AppVersion nodes until after older incarnation is downed")] + public async Task Bugfix7196Spec() { - // Arrange - var winningAddress = Address.Parse("akka://sys@darkstar:1112"); - var appVersion1 = AppVersion.Create("1.0.0"); - var appVersion2 = AppVersion.Create("1.0.2"); - var initialMembersByAge = ImmutableSortedSet.Empty - .Add(Create(winningAddress, upNumber: 3, appVersion: appVersion1)) - .Add(Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 9, appVersion: appVersion1)) - .WithComparer(MemberAgeOrdering.DescendingWithAppVersion); - - // Act - var state = new OldestChangedBufferState(initialMembersByAge, string.Empty); - var oldest = state.CurrentOldest; - - // higher upNumber - should not affect leader - var newMemberSameVersion = Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 10, appVersion: appVersion1); - - // higher upNumber AND version - should not affect leader - var newMemberNewVersion = Create(Address.Parse("akka://sys@darkstar:1114"), upNumber: 11, appVersion: appVersion2); - - // Act - var (state1, oldestChanged1) = state.AddMember(newMemberSameVersion); - var (state2, oldestChanged2) = state1.AddMember(newMemberNewVersion); - - // Assert - oldest.Should().NotBeNull(); - oldest!.Address.Should().Be(winningAddress); - - state1.CurrentOldest.Should().Be(oldest); - oldestChanged1.Should().BeFalse(); - - state2.CurrentOldest.Should().Be(oldest); - oldestChanged2.Should().BeFalse(); + await JoinAsync(Sys, Sys); // have to do a self join first + await JoinAsync(_hostNodeV1, Sys); - // the members by age system is going to chose the appVersion over the upNumber - state2.MembersByAge.FirstOrDefault().Should().NotBe(oldest); - state2.MembersByAge.FirstOrDefault()!.AppVersion.Should().Be(appVersion2); + var proxy = Sys.ActorOf( + ClusterSingletonProxy.Props("user/echo", + ClusterSingletonProxySettings.Create(Sys).WithRole("singleton")), "proxy3"); + + // confirm that singleton is on _hostNodeV1 + await AssertSingletonHostedOn(proxy, _hostNodeV1); + + // have _otherNodeV2 join the cluster + await JoinAsync(_otherNodeV2, Sys); + + // confirm that singleton is STILL on _hostNodeV1 + await AssertSingletonHostedOn(proxy, _hostNodeV1); + + // now, down the original node + Cluster.Get(Sys).Leave(Cluster.Get(_hostNodeV1).SelfAddress); + + // validate that _hostNodeV1 is no longer in the cluster + await WithinAsync(TimeSpan.FromSeconds(5), () => + { + return AwaitAssertAsync(() => + { + Cluster.Get(Sys).State.Members.Select(x => x.UniqueAddress).Should() + .NotContain(Cluster.Get(_hostNodeV1).SelfUniqueAddress); + }); + }); + + // validate that the singleton has moved to _otherNodeV2 + await AssertSingletonHostedOn(proxy, _otherNodeV2); } - - [Fact] - public void OldestChangedBuffer_should_change_Oldest_when_previous_Oldest_removed() - { - // Arrange - var winningAddress = Address.Parse("akka://sys@darkstar:1112"); - var appVersion1 = AppVersion.Create("1.0.0"); - var appVersion2 = AppVersion.Create("1.0.2"); - var originalOldest = Create(winningAddress, upNumber: 3, appVersion: appVersion1); - - var initialMembersByAge = ImmutableSortedSet.Empty - .Add(originalOldest) - .Add(Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 9, appVersion: appVersion1)) - .WithComparer(MemberAgeOrdering.DescendingWithAppVersion); - - // Act - var state = new OldestChangedBufferState(initialMembersByAge, string.Empty); - var oldest = state.CurrentOldest; - - // higher upNumber, same version - won't affect leader - var newMemberSameVersion = Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 4, appVersion: appVersion1); - - // lower upNumber AND version - won't affect the leader until it gets removed - var newMemberHigherVersion = Create(Address.Parse("akka://sys@darkstar:1114"), upNumber: 11, appVersion: appVersion2); - - // Act - var (state1, oldestChanged1) = state.AddMember(newMemberSameVersion); - var (state2, oldestChanged2) = state1.AddMember(newMemberHigherVersion); - var (state3, oldestChanged3) = state2.RemoveMember(originalOldest); - - // Assert - oldest.Should().NotBeNull(); - oldest!.Address.Should().Be(winningAddress); - - state1.CurrentOldest.Should().Be(originalOldest); - oldestChanged1.Should().BeFalse(); - - state2.CurrentOldest.Should().Be(originalOldest); - oldestChanged2.Should().BeFalse(); - - state3.CurrentOldest.Should().Be(newMemberHigherVersion); - oldestChanged3.Should().BeTrue(); + private async Task AssertSingletonHostedOn(IActorRef proxy, ActorSystem targetNode) + { + await WithinAsync(TimeSpan.FromSeconds(5), () => + { + return AwaitAssertAsync(() => + { + var probe = CreateTestProbe(Sys); + proxy.Tell("hello", probe.Ref); + probe.ExpectMsg(TimeSpan.FromSeconds(1)) + .Should() + .Be(Cluster.Get(targetNode).SelfUniqueAddress); + }); + }); } - - [Fact] - public void OldestChangedBuffer_should_not_change_Oldest_when_nonOldest_node_removed() + + public async Task JoinAsync(ActorSystem from, ActorSystem to) { - // Arrange - var winningAddress = Address.Parse("akka://sys@darkstar:1112"); - var appVersion1 = AppVersion.Create("1.0.0"); - var appVersion2 = AppVersion.Create("1.0.2"); - var initialMembersByAge = ImmutableSortedSet.Empty - .Add(Create(winningAddress, upNumber: 3, appVersion: appVersion1)) - .Add(Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 9, appVersion: appVersion1)) - .WithComparer(MemberAgeOrdering.DescendingWithAppVersion); - - // Act - var state = new OldestChangedBufferState(initialMembersByAge, string.Empty); - var oldest = state.CurrentOldest; - - // higher upNumber - should not affect leader - var newMemberSameVersion = Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 10, appVersion: appVersion1); - - // higher upNumber AND version - should not affect leader - var newMemberNewVersion = Create(Address.Parse("akka://sys@darkstar:1114"), upNumber: 11, appVersion: appVersion2); - - // Act - var (state1, oldestChanged1) = state.AddMember(newMemberSameVersion); - var (state2, oldestChanged2) = state1.AddMember(newMemberNewVersion); - var (state3, oldestChanged3) = state2.RemoveMember(newMemberSameVersion); - - // Assert - oldest.Should().NotBeNull(); - oldest!.Address.Should().Be(winningAddress); - - state1.CurrentOldest.Should().Be(oldest); - oldestChanged1.Should().BeFalse(); - - state2.CurrentOldest.Should().Be(oldest); - oldestChanged2.Should().BeFalse(); + if (Cluster.Get(from).SelfRoles.Contains("singleton")) + { + from.ActorOf(ClusterSingletonManager.Props(Props.Create(() => new Singleton()), + PoisonPill.Instance, + ClusterSingletonManagerSettings.Create(from).WithRole("singleton")), "echo"); + } + - state3.CurrentOldest.Should().Be(oldest); - oldestChanged3.Should().BeFalse(); + await WithinAsync(TimeSpan.FromSeconds(45), () => + { + AwaitAssert(() => + { + Cluster.Get(from).Join(Cluster.Get(to).SelfAddress); + Cluster.Get(from).State.Members.Select(x => x.UniqueAddress).Should() + .Contain(Cluster.Get(from).SelfUniqueAddress); + Cluster.Get(from) + .State.Members.Select(x => x.Status) + .ToImmutableHashSet() + .Should() + .Equal(ImmutableHashSet.Empty.Add(MemberStatus.Up)); + }); + return Task.CompletedTask; + }); } - - public static Member Create( - Address address, - MemberStatus status = MemberStatus.Up, - ImmutableHashSet? roles = null, - int uid = 0, - int upNumber = 0, - AppVersion? appVersion = null) + + public class Singleton : ReceiveActor + { + public Singleton() + { + ReceiveAny(_ => { Sender.Tell(Cluster.Get(Context.System).SelfUniqueAddress); }); + } + } + + protected override void AfterAll() { - return Member.Create(new UniqueAddress(address, uid), upNumber, status, roles ?? ImmutableHashSet.Empty, appVersion ?? AppVersion.Zero); + Shutdown(_hostNodeV1); + Shutdown(_otherNodeV2); + base.AfterAll(); } } \ No newline at end of file diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldestChangedBufferStateSpecs.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldestChangedBufferStateSpecs.cs new file mode 100644 index 00000000000..e5d0b4fe767 --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldestChangedBufferStateSpecs.cs @@ -0,0 +1,180 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2024 Lightbend Inc. +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +#nullable enable +using System.Collections.Immutable; +using System.Linq; +using Akka.Actor; +using Akka.Cluster.Tools.Singleton; +using Akka.Util; +using FluentAssertions; +using Xunit; + +namespace Akka.Cluster.Tools.Tests.Singleton; + +public class OldestChangedBufferStateSpecs +{ + [Fact] + public void OldestChangedBuffer_should_initially_only_consider_nodes_with_matching_role() + { + // Arrange + var targetRole = "target-role"; + var targetRoles = ImmutableHashSet.Create("role1", "role2", targetRole); + var winningAddress = Address.Parse("akka://sys@darkstar:1112"); + var nonTargetRoles = ImmutableHashSet.Create("role1", "role2"); + var initialMembersByAge = ImmutableSortedSet.Empty + .Add(Create(winningAddress, roles:targetRoles, upNumber: 3)) + .Add(Create(Address.Parse("akka://sys@darkstar:1113"), roles:nonTargetRoles, upNumber: 1)) + .Add(Create(Address.Parse("akka://sys@darkstar:1111"), roles:targetRoles, upNumber: 9)) + .WithComparer(MemberAgeOrdering.DescendingWithAppVersion); + + // Act + var state = new OldestChangedBufferState(initialMembersByAge, targetRole); + + // Assert + var oldest = state.CurrentOldest; + oldest.Should().NotBeNull(); + oldest!.Address.Should().Be(winningAddress); + } + + [Fact] + public void OldestChangedBuffer_should_not_change_leader_when_higher_AppVersion_added() + { + // Arrange + var winningAddress = Address.Parse("akka://sys@darkstar:1112"); + var appVersion1 = AppVersion.Create("1.0.0"); + var appVersion2 = AppVersion.Create("1.0.2"); + var initialMembersByAge = ImmutableSortedSet.Empty + .Add(Create(winningAddress, upNumber: 3, appVersion: appVersion1)) + .Add(Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 9, appVersion: appVersion1)) + .WithComparer(MemberAgeOrdering.DescendingWithAppVersion); + + // Act + var state = new OldestChangedBufferState(initialMembersByAge, string.Empty); + var oldest = state.CurrentOldest; + + // higher upNumber - should not affect leader + var newMemberSameVersion = Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 10, appVersion: appVersion1); + + // higher upNumber AND version - should not affect leader + var newMemberNewVersion = Create(Address.Parse("akka://sys@darkstar:1114"), upNumber: 11, appVersion: appVersion2); + + // Act + var (state1, oldestChanged1) = state.AddMember(newMemberSameVersion); + var (state2, oldestChanged2) = state1.AddMember(newMemberNewVersion); + + // Assert + oldest.Should().NotBeNull(); + oldest!.Address.Should().Be(winningAddress); + + state1.CurrentOldest.Should().Be(oldest); + oldestChanged1.Should().BeFalse(); + + state2.CurrentOldest.Should().Be(oldest); + oldestChanged2.Should().BeFalse(); + + // the members by age system is going to chose the appVersion over the upNumber + state2.MembersByAge.FirstOrDefault().Should().NotBe(oldest); + state2.MembersByAge.FirstOrDefault()!.AppVersion.Should().Be(appVersion2); + } + + [Fact] + public void OldestChangedBuffer_should_change_Oldest_when_previous_Oldest_removed() + { + // Arrange + var winningAddress = Address.Parse("akka://sys@darkstar:1112"); + var appVersion1 = AppVersion.Create("1.0.0"); + var appVersion2 = AppVersion.Create("1.0.2"); + + var originalOldest = Create(winningAddress, upNumber: 3, appVersion: appVersion1); + + var initialMembersByAge = ImmutableSortedSet.Empty + .Add(originalOldest) + .Add(Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 9, appVersion: appVersion1)) + .WithComparer(MemberAgeOrdering.DescendingWithAppVersion); + + // Act + var state = new OldestChangedBufferState(initialMembersByAge, string.Empty); + var oldest = state.CurrentOldest; + + // higher upNumber, same version - won't affect leader + var newMemberSameVersion = Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 4, appVersion: appVersion1); + + // lower upNumber AND version - won't affect the leader until it gets removed + var newMemberHigherVersion = Create(Address.Parse("akka://sys@darkstar:1114"), upNumber: 11, appVersion: appVersion2); + + // Act + var (state1, oldestChanged1) = state.AddMember(newMemberSameVersion); + var (state2, oldestChanged2) = state1.AddMember(newMemberHigherVersion); + var (state3, oldestChanged3) = state2.RemoveMember(originalOldest); + + // Assert + oldest.Should().NotBeNull(); + oldest!.Address.Should().Be(winningAddress); + + state1.CurrentOldest.Should().Be(originalOldest); + oldestChanged1.Should().BeFalse(); + + state2.CurrentOldest.Should().Be(originalOldest); + oldestChanged2.Should().BeFalse(); + + state3.CurrentOldest.Should().Be(newMemberHigherVersion); + oldestChanged3.Should().BeTrue(); + } + + [Fact] + public void OldestChangedBuffer_should_not_change_Oldest_when_nonOldest_node_removed() + { + // Arrange + var winningAddress = Address.Parse("akka://sys@darkstar:1112"); + var appVersion1 = AppVersion.Create("1.0.0"); + var appVersion2 = AppVersion.Create("1.0.2"); + var initialMembersByAge = ImmutableSortedSet.Empty + .Add(Create(winningAddress, upNumber: 3, appVersion: appVersion1)) + .Add(Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 9, appVersion: appVersion1)) + .WithComparer(MemberAgeOrdering.DescendingWithAppVersion); + + // Act + var state = new OldestChangedBufferState(initialMembersByAge, string.Empty); + var oldest = state.CurrentOldest; + + // higher upNumber - should not affect leader + var newMemberSameVersion = Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 10, appVersion: appVersion1); + + // higher upNumber AND version - should not affect leader + var newMemberNewVersion = Create(Address.Parse("akka://sys@darkstar:1114"), upNumber: 11, appVersion: appVersion2); + + // Act + var (state1, oldestChanged1) = state.AddMember(newMemberSameVersion); + var (state2, oldestChanged2) = state1.AddMember(newMemberNewVersion); + var (state3, oldestChanged3) = state2.RemoveMember(newMemberSameVersion); + + // Assert + oldest.Should().NotBeNull(); + oldest!.Address.Should().Be(winningAddress); + + state1.CurrentOldest.Should().Be(oldest); + oldestChanged1.Should().BeFalse(); + + state2.CurrentOldest.Should().Be(oldest); + oldestChanged2.Should().BeFalse(); + + state3.CurrentOldest.Should().Be(oldest); + oldestChanged3.Should().BeFalse(); + } + + public static Member Create( + Address address, + MemberStatus status = MemberStatus.Up, + ImmutableHashSet? roles = null, + int uid = 0, + int upNumber = 0, + AppVersion? appVersion = null) + { + return Member.Create(new UniqueAddress(address, uid), upNumber, status, roles ?? ImmutableHashSet.Empty, appVersion ?? AppVersion.Zero); + } +} \ No newline at end of file From 7948cf4a76e0cd6e24eddad24a5624575f9d5e86 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 23 May 2024 12:07:50 -0500 Subject: [PATCH 07/13] added API approvals --- .../verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt | 1 + .../verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt | 1 + 2 files changed, 2 insertions(+) diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt index e8b13f02aac..fe1cc7ddc5d 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt @@ -397,6 +397,7 @@ namespace Akka.Cluster.Tools.Singleton public ClusterSingletonProvider() { } public override Akka.Cluster.Tools.Singleton.ClusterSingleton CreateExtension(Akka.Actor.ExtendedActorSystem system) { } } + [System.Runtime.CompilerServices.NullableAttribute(0)] public sealed class ClusterSingletonProxy : Akka.Actor.ReceiveActor { public ClusterSingletonProxy(string singletonManagerPath, Akka.Cluster.Tools.Singleton.ClusterSingletonProxySettings settings) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt index a25e701e6aa..d8c50cbc86e 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt @@ -397,6 +397,7 @@ namespace Akka.Cluster.Tools.Singleton public ClusterSingletonProvider() { } public override Akka.Cluster.Tools.Singleton.ClusterSingleton CreateExtension(Akka.Actor.ExtendedActorSystem system) { } } + [System.Runtime.CompilerServices.NullableAttribute(0)] public sealed class ClusterSingletonProxy : Akka.Actor.ReceiveActor { public ClusterSingletonProxy(string singletonManagerPath, Akka.Cluster.Tools.Singleton.ClusterSingletonProxySettings settings) { } From abb04f12c1b1b7adac6ff7662b980dbfee733d21 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 23 May 2024 12:49:21 -0500 Subject: [PATCH 08/13] adding better logging to spec --- .../Singleton/OldChangedBufferSpecs.cs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldChangedBufferSpecs.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldChangedBufferSpecs.cs index 5c9ad601161..8d65fc0f330 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldChangedBufferSpecs.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldChangedBufferSpecs.cs @@ -26,14 +26,13 @@ namespace Akka.Cluster.Tools.Tests.Singleton; public class OldChangedBufferSpecs : AkkaSpec { private readonly ActorSystem _hostNodeV1; - private readonly ActorSystem _otherNodeV2; + private readonly ActorSystem _hostNodeV2; private static Config OriginalNodeConfig() => """ akka.loglevel = INFO akka.actor.provider = "cluster" akka.cluster.roles = [non-singleton] - akka.cluster.auto-down-unreachable-after = 2s akka.cluster.singleton.min-number-of-hand-over-retries = 5 akka.cluster.app-version = "1.0.0" akka.remote { @@ -51,8 +50,10 @@ public OldChangedBufferSpecs(ITestOutputHelper output) : base(OriginalNodeConfig { _hostNodeV1 = ActorSystem.Create(Sys.Name, ConfigurationFactory.ParseString("akka.cluster.roles = [singleton]").WithFallback(Sys.Settings.Config)); - _otherNodeV2 = ActorSystem.Create(Sys.Name, + InitializeLogger(_hostNodeV1); + _hostNodeV2 = ActorSystem.Create(Sys.Name, ConfigurationFactory.ParseString("akka.cluster.roles = [singleton]").WithFallback(V2NodeConfig(Sys))); + InitializeLogger(_hostNodeV2); } [Fact(DisplayName = @@ -69,8 +70,8 @@ public async Task Bugfix7196Spec() // confirm that singleton is on _hostNodeV1 await AssertSingletonHostedOn(proxy, _hostNodeV1); - // have _otherNodeV2 join the cluster - await JoinAsync(_otherNodeV2, Sys); + // have _hostNodeV2 join the cluster + await JoinAsync(_hostNodeV2, Sys); // confirm that singleton is STILL on _hostNodeV1 await AssertSingletonHostedOn(proxy, _hostNodeV1); @@ -88,8 +89,8 @@ await WithinAsync(TimeSpan.FromSeconds(5), () => }); }); - // validate that the singleton has moved to _otherNodeV2 - await AssertSingletonHostedOn(proxy, _otherNodeV2); + // validate that the singleton has moved to _hostNodeV2 + await AssertSingletonHostedOn(proxy, _hostNodeV2); } private async Task AssertSingletonHostedOn(IActorRef proxy, ActorSystem targetNode) @@ -145,7 +146,7 @@ public Singleton() protected override void AfterAll() { Shutdown(_hostNodeV1); - Shutdown(_otherNodeV2); + Shutdown(_hostNodeV2); base.AfterAll(); } } \ No newline at end of file From 4a5a2f5fc83e547561fdb38f9cdccbb933c03123 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 23 May 2024 13:06:07 -0500 Subject: [PATCH 09/13] disable auto-downing on restart specs --- .../Singleton/ClusterSingletonRestart2Spec.cs | 2 +- .../Singleton/ClusterSingletonRestart3Spec.cs | 2 +- .../Singleton/ClusterSingletonRestartSpec.cs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart2Spec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart2Spec.cs index 3499bd0f158..4ab2a8cd34b 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart2Spec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart2Spec.cs @@ -29,7 +29,7 @@ public ClusterSingletonRestart2Spec() : base(@" akka.loglevel = INFO akka.actor.provider = ""cluster"" akka.cluster.roles = [singleton] - akka.cluster.auto-down-unreachable-after = 2s + #akka.cluster.auto-down-unreachable-after = 2s akka.cluster.singleton.min-number-of-hand-over-retries = 5 akka.remote { dot-netty.tcp { diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart3Spec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart3Spec.cs index 04f3d2011ff..0adbdaf5a02 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart3Spec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart3Spec.cs @@ -29,7 +29,7 @@ public ClusterSingletonRestart3Spec(ITestOutputHelper output) : base(@" akka.loglevel = DEBUG akka.actor.provider = ""cluster"" akka.cluster.app-version = ""1.0.0"" - akka.cluster.auto-down-unreachable-after = 2s + #akka.cluster.auto-down-unreachable-after = 2s akka.cluster.singleton.min-number-of-hand-over-retries = 5 akka.cluster.singleton.consider-app-version = true akka.remote { diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestartSpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestartSpec.cs index e2f5c5123d6..76c6ac98717 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestartSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestartSpec.cs @@ -28,7 +28,7 @@ public class ClusterSingletonRestartSpec : AkkaSpec public ClusterSingletonRestartSpec() : base(@" akka.loglevel = INFO akka.actor.provider = ""cluster"" - akka.cluster.auto-down-unreachable-after = 2s + #akka.cluster.auto-down-unreachable-after = 2s akka.remote { dot-netty.tcp { hostname = ""127.0.0.1"" From 32f1a6a1e078249c84fb56e8263c0fcab47a1130 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 23 May 2024 14:56:45 -0500 Subject: [PATCH 10/13] broadened specs --- ...angedBufferSpecs.cs => BugFix7196Specs.cs} | 20 ++++++++++++---- .../Singleton/ClusterSingletonRestart2Spec.cs | 1 - .../Singleton/ClusterSingletonRestart3Spec.cs | 1 - .../Singleton/ClusterSingletonRestartSpec.cs | 1 - .../Singleton/MemberAgeOrderingSpec.cs | 23 ++++++++++++++++--- .../OldestChangedBufferStateSpecs.cs | 8 +++---- .../Singleton/ClusterSingletonProxy.cs | 4 ++-- .../Singleton/MemberAgeOrdering.cs | 21 +++++++++++------ .../Singleton/OldestChangedBuffer.cs | 4 ++-- 9 files changed, 58 insertions(+), 25 deletions(-) rename src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/{OldChangedBufferSpecs.cs => BugFix7196Specs.cs} (87%) diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldChangedBufferSpecs.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/BugFix7196Specs.cs similarity index 87% rename from src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldChangedBufferSpecs.cs rename to src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/BugFix7196Specs.cs index 8d65fc0f330..caea0cbe1c5 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldChangedBufferSpecs.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/BugFix7196Specs.cs @@ -1,5 +1,5 @@ // ----------------------------------------------------------------------- -// +// // Copyright (C) 2009-2024 Lightbend Inc. // Copyright (C) 2013-2024 .NET Foundation // @@ -23,9 +23,10 @@ namespace Akka.Cluster.Tools.Tests.Singleton; /// /// Reproduction for https://github.com/akkadotnet/akka.net/issues/7196 - clearly, what we did /// -public class OldChangedBufferSpecs : AkkaSpec +public class BugFix7196Specs : AkkaSpec { private readonly ActorSystem _hostNodeV1; + private readonly ActorSystem _hostNode2V1; private readonly ActorSystem _hostNodeV2; private static Config OriginalNodeConfig() => """ @@ -46,11 +47,14 @@ private static Config OriginalNodeConfig() => """ private static Config V2NodeConfig(ActorSystem originalSys) => ConfigurationFactory.ParseString( "akka.cluster.app-version = \"1.0.2\"").WithFallback(originalSys.Settings.Config); - public OldChangedBufferSpecs(ITestOutputHelper output) : base(OriginalNodeConfig(), output) + public BugFix7196Specs(ITestOutputHelper output) : base(OriginalNodeConfig(), output) { _hostNodeV1 = ActorSystem.Create(Sys.Name, ConfigurationFactory.ParseString("akka.cluster.roles = [singleton]").WithFallback(Sys.Settings.Config)); InitializeLogger(_hostNodeV1); + _hostNode2V1 = ActorSystem.Create(Sys.Name, + ConfigurationFactory.ParseString("akka.cluster.roles = [singleton]").WithFallback(Sys.Settings.Config)); + InitializeLogger(_hostNode2V1); _hostNodeV2 = ActorSystem.Create(Sys.Name, ConfigurationFactory.ParseString("akka.cluster.roles = [singleton]").WithFallback(V2NodeConfig(Sys))); InitializeLogger(_hostNodeV2); @@ -62,6 +66,7 @@ public async Task Bugfix7196Spec() { await JoinAsync(Sys, Sys); // have to do a self join first await JoinAsync(_hostNodeV1, Sys); + await JoinAsync(_hostNode2V1, Sys); var proxy = Sys.ActorOf( ClusterSingletonProxy.Props("user/echo", @@ -91,6 +96,12 @@ await WithinAsync(TimeSpan.FromSeconds(5), () => // validate that the singleton has moved to _hostNodeV2 await AssertSingletonHostedOn(proxy, _hostNodeV2); + + /* + * NOTE: an important detail here: _hostNode2V1 is actually OLDER than _hostNodeV2, + * but when selecting a new "oldest" node after the previous one dies Akka.Cluster.Tools.Singleton + * will always prefer to move onto the new version of the software. + */ } private async Task AssertSingletonHostedOn(IActorRef proxy, ActorSystem targetNode) @@ -146,7 +157,8 @@ public Singleton() protected override void AfterAll() { Shutdown(_hostNodeV1); - Shutdown(_hostNodeV2); + Shutdown(_hostNode2V1); + Shutdown(_hostNodeV2); base.AfterAll(); } } \ No newline at end of file diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart2Spec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart2Spec.cs index 4ab2a8cd34b..fc17dcbb504 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart2Spec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart2Spec.cs @@ -29,7 +29,6 @@ public ClusterSingletonRestart2Spec() : base(@" akka.loglevel = INFO akka.actor.provider = ""cluster"" akka.cluster.roles = [singleton] - #akka.cluster.auto-down-unreachable-after = 2s akka.cluster.singleton.min-number-of-hand-over-retries = 5 akka.remote { dot-netty.tcp { diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart3Spec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart3Spec.cs index 0adbdaf5a02..c04c2562301 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart3Spec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart3Spec.cs @@ -29,7 +29,6 @@ public ClusterSingletonRestart3Spec(ITestOutputHelper output) : base(@" akka.loglevel = DEBUG akka.actor.provider = ""cluster"" akka.cluster.app-version = ""1.0.0"" - #akka.cluster.auto-down-unreachable-after = 2s akka.cluster.singleton.min-number-of-hand-over-retries = 5 akka.cluster.singleton.consider-app-version = true akka.remote { diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestartSpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestartSpec.cs index 76c6ac98717..115afe1f5b7 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestartSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestartSpec.cs @@ -28,7 +28,6 @@ public class ClusterSingletonRestartSpec : AkkaSpec public ClusterSingletonRestartSpec() : base(@" akka.loglevel = INFO akka.actor.provider = ""cluster"" - #akka.cluster.auto-down-unreachable-after = 2s akka.remote { dot-netty.tcp { hostname = ""127.0.0.1"" diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/MemberAgeOrderingSpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/MemberAgeOrderingSpec.cs index 5a3f603b868..321f8c63ae5 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/MemberAgeOrderingSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/MemberAgeOrderingSpec.cs @@ -21,7 +21,24 @@ public class MemberAgeOrderingSpec [Fact(DisplayName = "MemberAgeOrdering should sort based on UpNumber")] public void SortByUpNumberTest() { - var members = new SortedSet(MemberAgeOrdering.DescendingWithAppVersion) + var members = new SortedSet(MemberAgeOrdering.OldestToYoungest) + { + Create(Address.Parse("akka://sys@darkstar:1112"), upNumber: 3), + Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 1), + Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 9), + }; + + var seq = members.ToList(); + seq.Count.Should().Be(3); + seq[0].Should().Be(Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 1)); + seq[1].Should().Be(Create(Address.Parse("akka://sys@darkstar:1112"), upNumber: 3)); + seq[2].Should().Be(Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 9)); + } + + [Fact(DisplayName = "MemberAgeOrdering should sort based on UpNumber and AppVersion")] + public void SortByUpNumberAndAppVersionTest() + { + var members = new SortedSet(MemberAgeOrdering.OldestToYoungestWithAppVersion) { Create(Address.Parse("akka://sys@darkstar:1112"), upNumber: 3), Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 1), @@ -38,7 +55,7 @@ public void SortByUpNumberTest() [Fact(DisplayName = "MemberAgeOrdering should sort based on Address if UpNumber is the same")] public void SortByAddressTest() { - var members = new SortedSet(MemberAgeOrdering.DescendingWithAppVersion) + var members = new SortedSet(MemberAgeOrdering.OldestToYoungestWithAppVersion) { Create(Address.Parse("akka://sys@darkstar:1112"), upNumber: 1), Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 1), @@ -55,7 +72,7 @@ public void SortByAddressTest() [Fact(DisplayName = "MemberAgeOrdering should prefer AppVersion over UpNumber")] public void SortByAppVersionTest() { - var members = new SortedSet(MemberAgeOrdering.DescendingWithAppVersion) + var members = new SortedSet(MemberAgeOrdering.OldestToYoungestWithAppVersion) { Create(Address.Parse("akka://sys@darkstar:1112"), upNumber: 3, appVersion: AppVersion.Create("1.0.0")), Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 1, appVersion: AppVersion.Create("1.0.0")), diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldestChangedBufferStateSpecs.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldestChangedBufferStateSpecs.cs index e5d0b4fe767..9aed70d618c 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldestChangedBufferStateSpecs.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/OldestChangedBufferStateSpecs.cs @@ -30,7 +30,7 @@ public void OldestChangedBuffer_should_initially_only_consider_nodes_with_matchi .Add(Create(winningAddress, roles:targetRoles, upNumber: 3)) .Add(Create(Address.Parse("akka://sys@darkstar:1113"), roles:nonTargetRoles, upNumber: 1)) .Add(Create(Address.Parse("akka://sys@darkstar:1111"), roles:targetRoles, upNumber: 9)) - .WithComparer(MemberAgeOrdering.DescendingWithAppVersion); + .WithComparer(MemberAgeOrdering.OldestToYoungestWithAppVersion); // Act var state = new OldestChangedBufferState(initialMembersByAge, targetRole); @@ -51,7 +51,7 @@ public void OldestChangedBuffer_should_not_change_leader_when_higher_AppVersion_ var initialMembersByAge = ImmutableSortedSet.Empty .Add(Create(winningAddress, upNumber: 3, appVersion: appVersion1)) .Add(Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 9, appVersion: appVersion1)) - .WithComparer(MemberAgeOrdering.DescendingWithAppVersion); + .WithComparer(MemberAgeOrdering.OldestToYoungestWithAppVersion); // Act var state = new OldestChangedBufferState(initialMembersByAge, string.Empty); @@ -95,7 +95,7 @@ public void OldestChangedBuffer_should_change_Oldest_when_previous_Oldest_remove var initialMembersByAge = ImmutableSortedSet.Empty .Add(originalOldest) .Add(Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 9, appVersion: appVersion1)) - .WithComparer(MemberAgeOrdering.DescendingWithAppVersion); + .WithComparer(MemberAgeOrdering.OldestToYoungestWithAppVersion); // Act var state = new OldestChangedBufferState(initialMembersByAge, string.Empty); @@ -136,7 +136,7 @@ public void OldestChangedBuffer_should_not_change_Oldest_when_nonOldest_node_rem var initialMembersByAge = ImmutableSortedSet.Empty .Add(Create(winningAddress, upNumber: 3, appVersion: appVersion1)) .Add(Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 9, appVersion: appVersion1)) - .WithComparer(MemberAgeOrdering.DescendingWithAppVersion); + .WithComparer(MemberAgeOrdering.OldestToYoungestWithAppVersion); // Act var state = new OldestChangedBufferState(initialMembersByAge, string.Empty); diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs index 73937857dd3..bafd0765c09 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs @@ -97,8 +97,8 @@ public ClusterSingletonProxy(string singletonManagerPath, ClusterSingletonProxyS _identityId = CreateIdentifyId(_identityCounter); _memberAgeComparer = settings.ConsiderAppVersion - ? MemberAgeOrdering.DescendingWithAppVersion - : MemberAgeOrdering.Descending; + ? MemberAgeOrdering.OldestToYoungestWithAppVersion + : MemberAgeOrdering.OldestToYoungest; _state = new OldestChangedBufferState(ImmutableSortedSet.Empty.WithComparer(_memberAgeComparer), settings.Role); diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/MemberAgeOrdering.cs b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/MemberAgeOrdering.cs index 8a36d2b18dd..1be2fcf2822 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/MemberAgeOrdering.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/MemberAgeOrdering.cs @@ -24,8 +24,19 @@ private MemberAgeOrdering(bool ascending, bool considerAppVersion) } /// - public int Compare(Member x, Member y) + public int Compare(Member? x, Member? y) { + switch (x) + { + // add null checks here + case null when y is null: + return 0; + case null: + return _ascending ? -1 : 1; + } + + if (y is null) return _ascending ? 1 : -1; + if (_considerAppVersion) { // prefer nodes with the highest app version, even if they're younger @@ -40,12 +51,8 @@ public int Compare(Member x, Member y) : (_ascending ? -1 : 1); } - public static readonly MemberAgeOrdering Ascending = new(true, false); - - public static readonly MemberAgeOrdering AscendingWithAppVersion = new(true, true); - - public static readonly MemberAgeOrdering Descending = new(false, false); + public static readonly MemberAgeOrdering OldestToYoungest = new(false, false); - public static readonly MemberAgeOrdering DescendingWithAppVersion = new(false, true); + public static readonly MemberAgeOrdering OldestToYoungestWithAppVersion = new(false, true); } } diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBuffer.cs b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBuffer.cs index 03a12a517d3..2c2fa8aa524 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBuffer.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBuffer.cs @@ -93,8 +93,8 @@ public OldestChanged(UniqueAddress? oldest) public OldestChangedBuffer(string role, bool considerAppVersion) { _memberAgeComparer = considerAppVersion - ? MemberAgeOrdering.DescendingWithAppVersion - : MemberAgeOrdering.Descending; + ? MemberAgeOrdering.OldestToYoungestWithAppVersion + : MemberAgeOrdering.OldestToYoungest; var membersByAge = ImmutableSortedSet.Empty.WithComparer(_memberAgeComparer); State = new OldestChangedBufferState(membersByAge, role); From 2d4f032bbc1fb04692dea44bb61a5a9090def0ae Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 23 May 2024 15:06:40 -0500 Subject: [PATCH 11/13] ensure that we test for `ConsiderAppVersion` in config specs --- .../Singleton/ClusterSingletonConfigSpec.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonConfigSpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonConfigSpec.cs index 22162fabef9..eef88a6d71b 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonConfigSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonConfigSpec.cs @@ -42,6 +42,7 @@ public void ClusterSingletonManagerSettings_must_have_default_config() var config = Sys.Settings.Config.GetConfig("akka.cluster.singleton"); Assert.False(config.IsNullOrEmpty()); config.GetInt("min-number-of-hand-over-retries", 0).ShouldBe(15); + clusterSingletonManagerSettings.ConsiderAppVersion.ShouldBeFalse(); } [Fact] @@ -54,6 +55,7 @@ public void ClusterSingletonProxySettings_must_have_default_config() clusterSingletonProxySettings.Role.ShouldBe(null); clusterSingletonProxySettings.SingletonIdentificationInterval.TotalSeconds.ShouldBe(1); clusterSingletonProxySettings.BufferSize.ShouldBe(1000); + clusterSingletonProxySettings.ConsiderAppVersion.ShouldBeFalse(); } } } From cc3f4fe120f36b2d1ae7bfa4f7ce9b6c3eaa7acf Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 23 May 2024 15:07:36 -0500 Subject: [PATCH 12/13] enable `AppVersion` ordering by default --- .../Singleton/ClusterSingletonConfigSpec.cs | 4 ++-- .../cluster/Akka.Cluster.Tools/Singleton/reference.conf | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonConfigSpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonConfigSpec.cs index eef88a6d71b..ec55095ecbd 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonConfigSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonConfigSpec.cs @@ -42,7 +42,7 @@ public void ClusterSingletonManagerSettings_must_have_default_config() var config = Sys.Settings.Config.GetConfig("akka.cluster.singleton"); Assert.False(config.IsNullOrEmpty()); config.GetInt("min-number-of-hand-over-retries", 0).ShouldBe(15); - clusterSingletonManagerSettings.ConsiderAppVersion.ShouldBeFalse(); + clusterSingletonManagerSettings.ConsiderAppVersion.ShouldBeTrue(); } [Fact] @@ -55,7 +55,7 @@ public void ClusterSingletonProxySettings_must_have_default_config() clusterSingletonProxySettings.Role.ShouldBe(null); clusterSingletonProxySettings.SingletonIdentificationInterval.TotalSeconds.ShouldBe(1); clusterSingletonProxySettings.BufferSize.ShouldBe(1000); - clusterSingletonProxySettings.ConsiderAppVersion.ShouldBeFalse(); + clusterSingletonProxySettings.ConsiderAppVersion.ShouldBeTrue(); } } } diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/reference.conf b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/reference.conf index ea374bbffb5..c40d4af5002 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/reference.conf +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/reference.conf @@ -46,7 +46,7 @@ akka.cluster.singleton { # Should akka.cluster.app-version be considered when the cluster singleton instance is being moved to another node # When set to false, singleton instance will always be created on oldest member # When set to true, singleton instance will be created on the oldest node with the highest member app-version number - consider-app-version = false + consider-app-version = true } # //#singleton-config From 91094a251d8163677b65701bbe29ffbb40f1e129 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 23 May 2024 15:48:46 -0500 Subject: [PATCH 13/13] nullability fixes --- .../Singleton/ClusterSingletonManager.cs | 64 ++++++++----------- 1 file changed, 27 insertions(+), 37 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs index 721752e04a3..a165e5fbbb3 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs @@ -4,7 +4,7 @@ // Copyright (C) 2013-2023 .NET Foundation // //----------------------------------------------------------------------- - +#nullable enable using System; using System.Collections.Generic; using System.Collections.Immutable; @@ -398,9 +398,9 @@ public ReleaseLeaseResult(bool released) [Serializable] internal sealed class AcquireLeaseFailure : IDeadLetterSuppression, INoSerializationVerificationNeeded { - public Exception Failure { get; } + public Exception? Failure { get; } - public AcquireLeaseFailure(Exception failure) + public AcquireLeaseFailure(Exception? failure) { Failure = failure; } @@ -409,9 +409,9 @@ public AcquireLeaseFailure(Exception failure) [Serializable] internal sealed class ReleaseLeaseFailure : IDeadLetterSuppression, INoSerializationVerificationNeeded { - public Exception Failure { get; } + public Exception? Failure { get; } - public ReleaseLeaseFailure(Exception failure) + public ReleaseLeaseFailure(Exception? failure) { Failure = failure; } @@ -420,9 +420,9 @@ public ReleaseLeaseFailure(Exception failure) [Serializable] internal sealed class LeaseLost : IDeadLetterSuppression, INoSerializationVerificationNeeded { - public Exception Reason { get; } + public Exception? Reason { get; } - public LeaseLost(Exception reason) + public LeaseLost(Exception? reason) { Reason = reason; } @@ -622,7 +622,7 @@ public static Props Props(Props singletonProps, object terminationMessage, Clust private bool _selfExited; // started when self member is Up - private IActorRef _oldestChangedBuffer; + private IActorRef? _oldestChangedBuffer; // keep track of previously removed members private ImmutableDictionary _removed = ImmutableDictionary.Empty; private readonly TimeSpan _removalMargin; @@ -630,24 +630,14 @@ public static Props Props(Props singletonProps, object terminationMessage, Clust private readonly int _maxTakeOverRetries; private readonly Cluster _cluster = Cluster.Get(Context.System); private readonly UniqueAddress _selfUniqueAddress; - private ILoggingAdapter _log; private readonly CoordinatedShutdown _coordShutdown = CoordinatedShutdown.Get(Context.System); private readonly TaskCompletionSource _memberExitingProgress = new(); - private readonly string singletonLeaseName; - private readonly Lease lease; - private readonly TimeSpan leaseRetryInterval = TimeSpan.FromSeconds(5); // won't be used - - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD + private readonly string _singletonLeaseName; + private readonly Lease? _lease; + private readonly TimeSpan _leaseRetryInterval = TimeSpan.FromSeconds(5); // won't be used + public ClusterSingletonManager(Props singletonProps, object terminationMessage, ClusterSingletonManagerSettings settings) { var role = settings.Role; @@ -657,13 +647,13 @@ public ClusterSingletonManager(Props singletonProps, object terminationMessage, _singletonProps = singletonProps; _terminationMessage = terminationMessage; _settings = settings; - singletonLeaseName = $"{Context.System.Name}-singleton-{Self.Path}"; + _singletonLeaseName = $"{Context.System.Name}-singleton-{Self.Path}"; if (settings.LeaseSettings != null) { - lease = LeaseProvider.Get(Context.System) - .GetLease(singletonLeaseName, settings.LeaseSettings.LeaseImplementation, _cluster.SelfAddress.HostPort()); - leaseRetryInterval = settings.LeaseSettings.LeaseRetryInterval; + _lease = LeaseProvider.Get(Context.System) + .GetLease(_singletonLeaseName, settings.LeaseSettings.LeaseImplementation, _cluster.SelfAddress.HostPort()); + _leaseRetryInterval = settings.LeaseSettings.LeaseRetryInterval; } _removalMargin = (settings.RemovalMargin <= TimeSpan.Zero) ? _cluster.DowningProvider.DownRemovalMargin : settings.RemovalMargin; @@ -707,7 +697,7 @@ private void SetupCoordinatedShutdown() }); } - private ILoggingAdapter Log { get { return _log ??= Context.GetLogger(); } } + private ILoggingAdapter Log { get; } = Context.GetLogger(); /// protected override void PreStart() @@ -716,7 +706,7 @@ protected override void PreStart() throw new ActorInitializationException("Cluster node must not be terminated"); // subscribe to cluster changes, re-subscribe when restart - _cluster.Subscribe(Self, ClusterEvent.InitialStateAsEvents, typeof(ClusterEvent.MemberRemoved), typeof(ClusterEvent.MemberDowned)); + _cluster.Subscribe(Self, InitialStateAsEvents, typeof(MemberRemoved), typeof(MemberDowned)); SetTimer(CleanupTimer, Cleanup.Instance, TimeSpan.FromMinutes(1.0), repeat: true); @@ -769,7 +759,7 @@ private void GetNextOldestChanged() private State TryAcquireLease() { var self = Self; - lease.Acquire(reason => + _lease.Acquire(reason => { self.Tell(new LeaseLost(reason)); }).ContinueWith(r => @@ -786,7 +776,7 @@ private State TryAcquireLease() private State TryGotoOldest() { // check if lease - if (lease == null) + if (_lease == null) return GoToOldest(); else { @@ -809,7 +799,7 @@ private State HandleOldestChanged( Log.Info("{0} observed OldestChanged: [{1} -> {2}]", StateName, _cluster.SelfAddress, oldest?.Address); switch (oldest) { - case UniqueAddress a when a.Equals(_cluster.SelfUniqueAddress): + case { } a when a.Equals(_cluster.SelfUniqueAddress): // already oldest return Stay(); case UniqueAddress a when !_selfExited && _removed.ContainsKey(a): @@ -1115,7 +1105,7 @@ private void InitializeFSM() } else { - SetTimer(LeaseRetryTimer, LeaseRetry.Instance, leaseRetryInterval, repeat: false); + SetTimer(LeaseRetryTimer, LeaseRetry.Instance, _leaseRetryInterval, repeat: false); return Stay().Using(new AcquiringLeaseData(false, null)); } } @@ -1131,7 +1121,7 @@ private void InitializeFSM() else if (e.FsmEvent is AcquireLeaseFailure alf) { Log.Error(alf.Failure, "Failed to get lease (will be retried)"); - SetTimer(LeaseRetryTimer, LeaseRetry.Instance, leaseRetryInterval, repeat: false); + SetTimer(LeaseRetryTimer, LeaseRetry.Instance, _leaseRetryInterval, repeat: false); return Stay().Using(new AcquiringLeaseData(false, null)); } @@ -1459,9 +1449,9 @@ private void InitializeFSM() if (StateData is AcquiringLeaseData ald && ald.LeaseRequestInProgress) { Log.Info("Releasing lease as leaving AcquiringLease going to [{0}]", to); - if (lease != null) + if (_lease != null) { - lease.Release().ContinueWith(r => + _lease.Release().ContinueWith(r => { if (r.IsCanceled || r.IsFaulted) return (object)new ReleaseLeaseFailure(r.Exception); @@ -1471,10 +1461,10 @@ private void InitializeFSM() } } - if (from == ClusterSingletonState.Oldest && lease != null) + if (from == ClusterSingletonState.Oldest && _lease != null) { Log.Info("Releasing lease as leaving Oldest"); - lease.Release().ContinueWith(r => new ReleaseLeaseResult(r.Result)).PipeTo(Self); + _lease.Release().ContinueWith(r => new ReleaseLeaseResult(r.Result)).PipeTo(Self); } if (to is ClusterSingletonState.Younger or ClusterSingletonState.Oldest) GetNextOldestChanged();