Skip to content

Commit 4b6768b

Browse files
buddektoso
andauthored
Down replaced cluster members (#867)
* Make sure replaced cluster members are .down * Add integration test for ungraceful shutdown * Remove dead code from integ test * Remove more dead code * Update MembershipTests for changed replacement semantics * remove whitespace that made it in * fix typo in test * Fix issues in integration test * Fix formatting Co-authored-by: Konrad `ktoso` Malawski <[email protected]>
1 parent dcd618b commit 4b6768b

File tree

5 files changed

+146
-5
lines changed

5 files changed

+146
-5
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Distributed Actors open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the Swift Distributed Actors project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import DistributedActors
16+
17+
var args = CommandLine.arguments
18+
args.removeFirst()
19+
20+
guard args.count >= 2, let bindPort = Int(args[1]) else {
21+
fatalError("Node name and bind port must be provided")
22+
}
23+
24+
let nodeName = args[0]
25+
26+
print("Binding to port \(bindPort)")
27+
28+
let system = ActorSystem(nodeName) { settings in
29+
settings.logging.logLevel = .info
30+
31+
settings.cluster.enabled = true
32+
settings.cluster.bindPort = bindPort
33+
34+
settings.cluster.swim.probeInterval = .milliseconds(300)
35+
settings.cluster.swim.pingTimeout = .milliseconds(100)
36+
}
37+
38+
let ref = try system.spawn(
39+
"streamWatcher",
40+
of: Cluster.Event.self,
41+
.receive { context, event in
42+
context.log.info("Event: \(event)")
43+
return .same
44+
}
45+
)
46+
system.cluster.events.subscribe(ref)
47+
48+
if args.count >= 4, let joinPort = Int(args[3]) {
49+
let joinHost = args[2]
50+
print("Joining node \(joinHost):\(joinPort)")
51+
system.cluster.join(host: joinHost, port: joinPort)
52+
}
53+
54+
Thread.sleep(.seconds(120))
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
#!/bin/bash
2+
##===----------------------------------------------------------------------===##
3+
##
4+
## This source file is part of the Swift Distributed Actors open source project
5+
##
6+
## Copyright (c) 2022 Apple Inc. and the Swift Distributed Actors project authors
7+
## Licensed under Apache License v2.0
8+
##
9+
## See LICENSE.txt for license information
10+
## See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
11+
##
12+
## SPDX-License-Identifier: Apache-2.0
13+
##
14+
##===----------------------------------------------------------------------===##
15+
16+
set -e
17+
#set -x # verbose
18+
19+
declare -r my_path="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
20+
declare -r root_path="$my_path/.."
21+
22+
declare -r app_name='it_Clustered_swim_ungraceful_shutdown'
23+
24+
source ${my_path}/shared.sh
25+
26+
declare -r first_logs=/tmp/sact_first.out
27+
declare -r second_logs=/tmp/sact_second.out
28+
declare -r killed_logs=/tmp/sact_killed.out
29+
declare -r replacement_logs=/tmp/sact_replacement.out
30+
rm -f ${first_logs}
31+
rm -f ${second_logs}
32+
rm -f ${killed_logs}
33+
rm -f ${replacement_logs}
34+
35+
stdbuf -i0 -o0 -e0 swift run it_Clustered_swim_ungraceful_shutdown Frist 7337 > ${first_logs} 2>&1 &
36+
declare -r first_pid=$(echo $!)
37+
wait_log_exists ${first_logs} 'Binding to: ' 200 # since it might be compiling again...
38+
39+
stdbuf -i0 -o0 -e0 swift run it_Clustered_swim_ungraceful_shutdown Second 8228 127.0.0.1 7337 > ${second_logs} 2>&1 &
40+
declare -r second_pid=$(echo $!)
41+
wait_log_exists ${second_logs} 'Binding to: ' 200 # since it might be compiling again...
42+
43+
stdbuf -i0 -o0 -e0 swift run it_Clustered_swim_ungraceful_shutdown Killed 9119 127.0.0.1 7337 > ${killed_logs} 2>&1 &
44+
declare -r killed_pid=$(echo $!)
45+
wait_log_exists ${killed_logs} 'Binding to: ' 200 # since it might be compiling again...
46+
47+
echo "Waiting nodes to become .up..."
48+
wait_log_exists ${first_logs} 'Event: membershipChange(sact://[email protected]:8228 :: \[joining\] -> \[ up\])' 50
49+
wait_log_exists ${first_logs} 'Event: membershipChange(sact://[email protected]:9119 :: \[joining\] -> \[ up\])' 50
50+
echo 'Other two members seen .up, good...'
51+
52+
sleep 1
53+
54+
# SIGKILL the third member, causing ungraceful shutdown
55+
echo "Killing PID ${killed_pid}"
56+
kill -9 ${killed_pid}
57+
58+
# Immediately restart the third process
59+
stdbuf -i0 -o0 -e0 swift run it_Clustered_swim_ungraceful_shutdown Replacement 9119 127.0.0.1 7337 >> ${replacement_logs} 2>&1 &
60+
declare -r replacement_pid=$(echo $!)
61+
wait_log_exists ${replacement_logs} 'Binding to: ' 200 # just to be safe...
62+
63+
# The original third node should go .down while the replacement becomes .up
64+
wait_log_exists ${first_logs} 'Event: membershipChange(sact://[email protected]:9119 :: \[ up\] -> \[ down\])' 50
65+
echo 'Killed member .down, good...'
66+
wait_log_exists ${first_logs} 'Event: membershipChange(sact://[email protected]:9119 :: \[joining\] -> \[ up\])' 50
67+
echo 'Replacement member .up, good...'
68+
69+
# === cleanup ----------------------------------------------------------------------------------------------------------
70+
71+
kill -9 ${first_pid}
72+
kill -9 ${second_pid}
73+
kill -9 ${replacement_pid}
74+
75+
_killall ${app_name}

Package.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,13 @@ var targets: [PackageDescription.Target] = [
197197
],
198198
path: "IntegrationTests/tests_04_cluster/it_Clustered_swim_suspension_reachability"
199199
),
200+
.target(
201+
name: "it_Clustered_swim_ungraceful_shutdown",
202+
dependencies: [
203+
"DistributedActors",
204+
],
205+
path: "IntegrationTests/tests_04_cluster/it_Clustered_swim_ungraceful_shutdown"
206+
),
200207

201208
// ==== ----------------------------------------------------------------------------------------------------------------
202209
// MARK: Performance / Benchmarks

Sources/DistributedActors/Cluster/Cluster+Membership.swift

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,11 @@ extension Cluster.Membership {
286286

287287
if let previousMember = self.member(change.node.node) {
288288
// we are joining "over" an existing incarnation of a node; causing the existing node to become .down immediately
289-
_ = self.removeCompletely(previousMember.uniqueNode) // the replacement event will handle the down notifications
289+
if previousMember.status < .down {
290+
_ = self.mark(previousMember.uniqueNode, as: .down)
291+
} else {
292+
_ = self.removeCompletely(previousMember.uniqueNode) // the replacement event will handle the down notifications
293+
}
290294
self._members[change.node] = change.member
291295

292296
// emit a replacement membership change, this will cause down cluster events for previous member

Tests/DistributedActorsTests/MembershipTests.swift

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ final class MembershipTests: XCTestCase {
133133
Cluster.MembershipChange(member: self.memberB, toStatus: .down)
134134
)
135135

136-
membership.members(atLeast: .joining).count.shouldEqual(2)
136+
membership.members(atLeast: .joining).count.shouldEqual(3)
137+
membership.members(atLeast: .down).count.shouldEqual(1)
137138
let memberNode = membership.uniqueMember(change.member.uniqueNode)
138139
memberNode?.status.shouldEqual(Cluster.MemberStatus.up)
139140
}
@@ -349,7 +350,7 @@ final class MembershipTests: XCTestCase {
349350
// ==== ----------------------------------------------------------------------------------------------------------------
350351
// MARK: Replacements
351352

352-
func test_join_overAnExistingMode_replacement() {
353+
func test_join_overAnExistingNode_replacement() {
353354
var membership = self.initialMembership
354355
let secondReplacement = Cluster.Member(node: UniqueNode(node: self.nodeB.node, nid: .random()), status: .joining)
355356
let change = membership.join(secondReplacement.uniqueNode)!
@@ -359,9 +360,9 @@ final class MembershipTests: XCTestCase {
359360
var secondDown = self.memberB
360361
secondDown.status = .down
361362

362-
members.count.shouldEqual(3)
363+
members.count.shouldEqual(4)
363364
members.shouldContain(secondReplacement)
364-
members.shouldNotContain(self.memberB) // was replaced
365+
members.shouldContain(secondDown) // replaced node should be .down
365366
}
366367

367368
func test_mark_replacement() throws {

0 commit comments

Comments
 (0)