Skip to content

Commit a620780

Browse files
ktosoyim-lee
andauthored
=cluster,handshake #724 immediately initialize membership (#778)
* =cluster,handshake #724 immediately initialize membership, without "loop" through shell as this may cause a race condition between a node extenging a handshake to this node before the "loop through self adding the myself Member" has a chance to run; This manifested in tests by rejecting handshakes by "no local member" which is nonsense, there always is a local known member after all. * Update Sources/DistributedActors/Cluster/ClusterShellState.swift Co-authored-by: Yim Lee <[email protected]> Co-authored-by: Yim Lee <[email protected]>
1 parent 3448a3c commit a620780

File tree

8 files changed

+62
-67
lines changed

8 files changed

+62
-67
lines changed

Instruments/GenActorInstruments/Sources/ActorInstrumentsPackageDefinition/ActorInstrumentsPackageDefinition.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import SwiftyInstrumentsPackageDefinition
1919

2020
// package
2121
fileprivate let packageID = "com.apple.actors.ActorInstruments"
22-
fileprivate let packageVersion: String = "0.5.0" // TODO: match with project version
22+
fileprivate let packageVersion: String = "0.6.0" // TODO: match with project version
2323
fileprivate let packageTitle = "Actors"
2424

2525
// schema

Sources/DistributedActors/Cluster/ClusterShell+LeaderActions.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import NIO
2222
extension ClusterShellState {
2323
/// If, and only if, the current node is a leader it performs a set of tasks, such as moving nodes to `.up` etc.
2424
func collectLeaderActions() -> [LeaderAction] {
25-
guard self.membership.isLeader(self.localNode) else {
25+
guard self.membership.isLeader(self.selfNode) else {
2626
return [] // since we are not the leader, we perform no tasks
2727
}
2828

Sources/DistributedActors/Cluster/ClusterShell.swift

Lines changed: 18 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -437,23 +437,22 @@ extension ClusterShell {
437437
}
438438
)
439439

440-
let state = ClusterShellState(
440+
var state = ClusterShellState(
441441
settings: clusterSettings,
442442
channel: chan,
443443
events: self.clusterEvents,
444444
gossiperControl: gossiperControl,
445445
log: context.log
446446
)
447447

448-
// loop through "self" cluster shell, which in result causes notifying all subscribers about cluster membership change
449-
var firstGossip = Cluster.MembershipGossip(ownerNode: state.localNode)
450-
_ = firstGossip.membership.join(state.localNode) // change will be put into effect by receiving the "self gossip"
451-
firstGossip.incrementOwnerVersion()
452-
context.system.cluster.updateMembershipSnapshot(state.membership)
453-
454-
gossiperControl.update(payload: firstGossip) // ????
455-
context.myself.tell(.gossipFromGossiper(firstGossip))
456-
// TODO: are we ok if we received another gossip first, not our own initial? should be just fine IMHO
448+
// immediately join the owner node (us), as we always should be part of the membership
449+
// this immediate join is important in case we immediately get a handshake from other nodes,
450+
// and will need to reply to them with our `Member`.
451+
if let change = state.latestGossip.membership.join(state.selfNode) {
452+
// always update the snapshot before emitting events
453+
context.system.cluster.updateMembershipSnapshot(state.membership)
454+
self.clusterEvents.publish(.membershipChange(change))
455+
}
457456

458457
return self.ready(state: state)
459458
}
@@ -539,7 +538,7 @@ extension ClusterShell {
539538

540539
/// Allows processing in one spot, all membership changes which we may have emitted in other places, due to joining, downing etc.
541540
func receiveChangeMembershipRequest(_ context: ActorContext<Message>, event: Cluster.Event) -> Behavior<Message> {
542-
self.tracelog(context, .receive(from: state.localNode.node), message: event)
541+
self.tracelog(context, .receive(from: state.selfNode.node), message: event)
543542
var state = state
544543

545544
// 1) IMPORTANT: We MUST apply and act on the incoming event FIRST, before handling the other events.
@@ -646,7 +645,7 @@ extension ClusterShell {
646645
guard change.status < .down else {
647646
return
648647
}
649-
guard change.member.uniqueNode != state.localNode else {
648+
guard change.member.uniqueNode != state.selfNode else {
650649
return
651650
}
652651
// TODO: make it cleaner? though we decided to go with manual peer management as the ClusterShell owns it, hm
@@ -672,7 +671,7 @@ extension ClusterShell {
672671
internal func beginHandshake(_ context: ActorContext<Message>, _ state: ClusterShellState, with remoteNode: Node) -> Behavior<Message> {
673672
var state = state
674673

675-
guard remoteNode != state.localNode.node else {
674+
guard remoteNode != state.selfNode.node else {
676675
state.log.debug("Ignoring attempt to handshake with myself; Could have been issued as confused attempt to handshake as induced by discovery via gossip?")
677676
return .same
678677
}
@@ -761,28 +760,14 @@ extension ClusterShell {
761760
_ state: ClusterShellState,
762761
_ offer: Wire.HandshakeOffer
763762
) -> Wire.HandshakeReject? {
764-
guard let member = state.localMember else {
765-
// no local member? this is bad
766-
state.log.warning(
767-
"""
768-
Received handshake while no local Cluster.Member available, this may indicate that we were removed form the cluster.
769-
Rejecting handshake
770-
""")
771-
return .init(
772-
version: state.settings.protocolVersion,
773-
targetNode: state.localNode,
774-
originNode: offer.originNode,
775-
reason: "Node cannot be part of cluster, no member available.",
776-
whenHandshakeReplySent: nil
777-
)
778-
}
763+
let member = state.selfMember
779764

780765
if member.status.isAtLeast(.leaving) {
781766
state.log.notice("Received handshake while already [\(member.status)]")
782767

783768
return .init(
784769
version: state.settings.protocolVersion,
785-
targetNode: state.localNode,
770+
targetNode: state.selfNode,
786771
originNode: offer.originNode,
787772
reason: "Node already leaving cluster.",
788773
whenHandshakeReplySent: nil
@@ -1129,7 +1114,7 @@ extension ClusterShell {
11291114
}
11301115

11311116
private func onRestInPeace(_ context: ActorContext<Message>, _ state: ClusterShellState, intendedNode: UniqueNode, fromNode: UniqueNode) -> Behavior<Message> {
1132-
let myselfNode = state.localNode
1117+
let myselfNode = state.selfNode
11331118

11341119
guard myselfNode == myselfNode else {
11351120
state.log.warning(
@@ -1196,12 +1181,12 @@ extension ClusterShell {
11961181
// FIXME: also close all associations (!!!)
11971182
switch $0 {
11981183
case .success:
1199-
context.log.info("Unbound server socket [\(addrDesc)], node: \(reflecting: state.localNode)")
1184+
context.log.info("Unbound server socket [\(addrDesc)], node: \(reflecting: state.selfNode)")
12001185
self.serializationPool.shutdown()
12011186
signalOnceUnbound.offerOnce(())
12021187
return .stop
12031188
case .failure(let err):
1204-
context.log.warning("Failed while unbinding server socket [\(addrDesc)], node: \(reflecting: state.localNode). Error: \(err)")
1189+
context.log.warning("Failed while unbinding server socket [\(addrDesc)], node: \(reflecting: state.selfNode). Error: \(err)")
12051190
self.serializationPool.shutdown()
12061191
signalOnceUnbound.offerOnce(())
12071192
throw err
@@ -1259,7 +1244,7 @@ extension ClusterShell {
12591244
// whenever we down a node we must ensure to confirm it to swim, so it won't keep monitoring it forever needlessly
12601245
self._swimRef?.tell(.local(SWIM.LocalMessage.confirmDead(memberToDown.uniqueNode)))
12611246

1262-
if memberToDown.uniqueNode == state.localNode {
1247+
if memberToDown.uniqueNode == state.selfNode {
12631248
// ==== ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
12641249
// Down(self node); ensuring SWIM knows about this and should likely initiate graceful shutdown
12651250
context.log.warning(

Sources/DistributedActors/Cluster/ClusterShellState.swift

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ internal protocol ReadOnlyClusterState {
2828
var handshakeBackoff: BackoffStrategy { get }
2929

3030
/// Unique address of the current node.
31-
var localNode: UniqueNode { get }
32-
var localMember: Cluster.Member? { get } // TODO: enforce that we always have the localMember
31+
var selfNode: UniqueNode { get }
32+
var selfMember: Cluster.Member { get }
33+
3334
var settings: ClusterSettings { get }
3435
}
3536

@@ -45,9 +46,18 @@ internal struct ClusterShellState: ReadOnlyClusterState {
4546

4647
let channel: Channel
4748

48-
let localNode: UniqueNode
49-
var localMember: Cluster.Member? {
50-
self.membership.uniqueMember(self.localNode)
49+
let selfNode: UniqueNode
50+
var selfMember: Cluster.Member {
51+
if let member = self.membership.uniqueMember(self.selfNode) {
52+
return member
53+
} else {
54+
fatalError("""
55+
ClusterShellState.selfMember was nil! This should be impossible by construction, because a node ALWAYS knows about itself.
56+
Please report a bug on the distributed-actors issue tracker. Details:
57+
Membership: \(self.membership)
58+
Settings: \(self.settings)
59+
""")
60+
}
5161
}
5262

5363
let eventLoopGroup: EventLoopGroup
@@ -58,7 +68,7 @@ internal struct ClusterShellState: ReadOnlyClusterState {
5868

5969
let allocator: ByteBufferAllocator
6070

61-
internal var _handshakes: [Node: HandshakeStateMachine.State] = [:]
71+
var _handshakes: [Node: HandshakeStateMachine.State] = [:]
6272

6373
let gossiperControl: GossiperControl<Cluster.MembershipGossip, Cluster.MembershipGossip>
6474

@@ -109,7 +119,7 @@ internal struct ClusterShellState: ReadOnlyClusterState {
109119
self.allocator = settings.allocator
110120
self.eventLoopGroup = settings.eventLoopGroup ?? settings.makeDefaultEventLoopGroup()
111121

112-
self.localNode = settings.uniqueBindNode
122+
self.selfNode = settings.uniqueBindNode
113123
self._latestGossip = Cluster.MembershipGossip(ownerNode: settings.uniqueBindNode)
114124

115125
self.events = events
@@ -145,7 +155,7 @@ extension ClusterShellState {
145155

146156
let initiated = HandshakeStateMachine.InitiatedState(
147157
settings: self.settings,
148-
localNode: self.localNode,
158+
localNode: self.selfNode,
149159
connectTo: remoteNode
150160
)
151161
let handshakeState = HandshakeStateMachine.State.initiated(initiated)
@@ -452,7 +462,7 @@ extension ClusterShellState {
452462
return .init(applied: changeWasApplied)
453463
}
454464

455-
self.log.trace("Membership updated on [\(self.localNode)] by \(event): \(pretty: self.membership)")
465+
self.log.trace("Membership updated on [\(self.selfNode)] by \(event): \(pretty: self.membership)")
456466
return .init(applied: changeWasApplied)
457467
}
458468

Sources/DistributedActors/Cluster/HandshakeStateMachine.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ internal struct HandshakeStateMachine {
147147

148148
let offer: Wire.HandshakeOffer
149149
var boundAddress: UniqueNode {
150-
self.state.localNode
150+
self.state.selfNode
151151
}
152152

153153
var protocolVersion: DistributedActors.Version {

Tests/DistributedActorsTests/Cluster/ClusterLeaderActionsTests.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,15 @@ final class ClusterLeaderActionsTests: XCTestCase {
3333
var stateC: ClusterShellState!
3434

3535
var nodeA: UniqueNode {
36-
self.stateA.localNode
36+
self.stateA.selfNode
3737
}
3838

3939
var nodeB: UniqueNode {
40-
self.stateB.localNode
40+
self.stateB.selfNode
4141
}
4242

4343
var nodeC: UniqueNode {
44-
self.stateC.localNode
44+
self.stateC.selfNode
4545
}
4646

4747
override func setUp() {

Tests/DistributedActorsTests/Cluster/RemotingHandshakeStateMachineTests.swift

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@ final class RemoteHandshakeStateMachineTests: XCTestCase {
2929

3030
func test_handshake_happyPath() throws {
3131
let serverKernel = ClusterShellState.makeTestMock(side: .server)
32-
let serverAddress = serverKernel.localNode
32+
let serverAddress = serverKernel.selfNode
3333

3434
let clientKernel = ClusterShellState.makeTestMock(side: .client) { settings in
3535
settings.node.port = 2222
3636
}
3737

3838
// client
39-
let clientInitiated = HSM.InitiatedState(settings: clientKernel.settings, localNode: clientKernel.localNode, connectTo: serverAddress.node)
39+
let clientInitiated = HSM.InitiatedState(settings: clientKernel.settings, localNode: clientKernel.selfNode, connectTo: serverAddress.node)
4040
let offer = clientInitiated.makeOffer()
4141

4242
// server
@@ -55,26 +55,26 @@ final class RemoteHandshakeStateMachineTests: XCTestCase {
5555

5656
// then
5757

58-
serverCompleted.localNode.shouldEqual(serverKernel.localNode)
59-
serverCompleted.remoteNode.shouldEqual(clientKernel.localNode)
58+
serverCompleted.localNode.shouldEqual(serverKernel.selfNode)
59+
serverCompleted.remoteNode.shouldEqual(clientKernel.selfNode)
6060

61-
clientCompleted.remoteNode.shouldEqual(serverKernel.localNode)
62-
clientCompleted.localNode.shouldEqual(clientKernel.localNode)
61+
clientCompleted.remoteNode.shouldEqual(serverKernel.selfNode)
62+
clientCompleted.localNode.shouldEqual(clientKernel.selfNode)
6363
}
6464

6565
// ==== ------------------------------------------------------------------------------------------------------------
6666
// MARK: Version negotiation
6767

6868
func test_negotiate_server_shouldAcceptClient_newerPatch() throws {
6969
let serverKernel = ClusterShellState.makeTestMock(side: .server)
70-
let serverAddress = serverKernel.localNode
70+
let serverAddress = serverKernel.selfNode
7171

7272
let clientKernel = ClusterShellState.makeTestMock(side: .client) { settings in
7373
settings.node.port = 2222
7474
settings._protocolVersion.patch += 1
7575
}
7676

77-
let clientInitiated = HSM.InitiatedState(settings: clientKernel.settings, localNode: clientKernel.localNode, connectTo: serverAddress.node)
77+
let clientInitiated = HSM.InitiatedState(settings: clientKernel.settings, localNode: clientKernel.selfNode, connectTo: serverAddress.node)
7878
let offer = clientInitiated.makeOffer()
7979

8080
// server
@@ -92,14 +92,14 @@ final class RemoteHandshakeStateMachineTests: XCTestCase {
9292

9393
func test_negotiate_server_shouldRejectClient_newerMajor() throws {
9494
let serverKernel = ClusterShellState.makeTestMock(side: .server)
95-
let serverAddress = serverKernel.localNode
95+
let serverAddress = serverKernel.selfNode
9696

9797
let clientKernel = ClusterShellState.makeTestMock(side: .client) { settings in
9898
settings.node.port = 2222
9999
settings._protocolVersion.major += 1
100100
}
101101

102-
let clientInitiated = HSM.InitiatedState(settings: clientKernel.settings, localNode: clientKernel.localNode, connectTo: serverAddress.node)
102+
let clientInitiated = HSM.InitiatedState(settings: clientKernel.settings, localNode: clientKernel.selfNode, connectTo: serverAddress.node)
103103
let offer = clientInitiated.makeOffer()
104104

105105
// server
@@ -123,14 +123,14 @@ final class RemoteHandshakeStateMachineTests: XCTestCase {
123123

124124
func test_onTimeout_shouldReturnNewHandshakeOffersMultipleTimes() throws {
125125
let serverKernel = ClusterShellState.makeTestMock(side: .server)
126-
let serverAddress = serverKernel.localNode
126+
let serverAddress = serverKernel.selfNode
127127

128128
let clientKernel = ClusterShellState.makeTestMock(side: .client) { settings in
129129
settings.node.port = 8228
130130
}
131131

132132
// client
133-
var clientInitiated = HSM.InitiatedState(settings: clientKernel.settings, localNode: clientKernel.localNode, connectTo: serverAddress.node)
133+
var clientInitiated = HSM.InitiatedState(settings: clientKernel.settings, localNode: clientKernel.selfNode, connectTo: serverAddress.node)
134134

135135
guard case .scheduleRetryHandshake = clientInitiated.onHandshakeTimeout() else {
136136
throw shouldNotHappen("Expected retry attempt after handshake timeout")

scripts/docs/generate_api.sh

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,13 @@ if [[ "$(uname -s)" == "Linux" ]]; then
5656
cd "$source_kitten_source_path" && swift build -c release && cd "$root_path"
5757
fi
5858
# generate
59-
# for module in "${modules[@]}"; do
60-
## if [[ ! -f "$root_path/.build/sourcekitten/$module.json" ]]; then
61-
# # always generate, otherwise we miss things when we're iterating on adding docs.
62-
# echo "Generating $root_path/.build/sourcekitten/$module.json ..."
63-
# "$source_kitten_path/sourcekitten" doc --spm-module "$module" > "$root_path/.build/sourcekitten/$module.json"
64-
## fi
65-
# done
59+
for module in "${modules[@]}"; do
60+
# if [[ ! -f "$root_path/.build/sourcekitten/$module.json" ]]; then
61+
# always generate, otherwise we miss things when we're iterating on adding docs.
62+
echo "Generating $root_path/.build/sourcekitten/$module.json ..."
63+
"$source_kitten_path/sourcekitten" doc --spm-module "$module" > "$root_path/.build/sourcekitten/$module.json"
64+
# fi
65+
done
6666
fi
6767

6868
# prep index

0 commit comments

Comments
 (0)