Skip to content

Commit 6f6bdc4

Browse files
yim-leektoso
andauthored
Add API to 'await joining cluster' (#972)
* Add API to 'await joining cluster' Resolves #948 * joined to return member * Rename ensureNodes to waitFor * Fix task group logic * Rename awaitMemberStatus to waitFor * Fix samples * Add API docs * Update tests to use new APIs * Update tests to use new APIs * Fix up Membership description * Check tombstones * Minor rewording * formatting Co-authored-by: Konrad `ktoso` Malawski <[email protected]> Co-authored-by: Konrad `ktoso` Malawski <[email protected]>
1 parent a8fb579 commit 6f6bdc4

File tree

9 files changed

+252
-92
lines changed

9 files changed

+252
-92
lines changed

Samples/Sources/SampleDiningPhilosophers/DistributedDiningPhilosophers.swift

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,7 @@ final class DistributedDiningPhilosophers {
4040
systemC.cluster.join(node: systemB.settings.node)
4141

4242
print("waiting for cluster to form...")
43-
// TODO: implement this using "await join cluster" API [#948](https://github.com/apple/swift-distributed-actors/issues/948)
44-
while !(try await self.isClusterFormed(systems)) {
45-
let nanosInSecond: UInt64 = 1_000_000_000
46-
try await Task.sleep(nanoseconds: 1 * nanosInSecond)
47-
}
43+
try await self.ensureCluster(systems, within: .seconds(10))
4844

4945
print("~~~~~~~ systems joined each other ~~~~~~~")
5046

@@ -74,13 +70,17 @@ final class DistributedDiningPhilosophers {
7470
try systemA.park(atMost: duration)
7571
}
7672

77-
private func isClusterFormed(_ systems: [ClusterSystem]) async throws -> Bool {
78-
for system in systems {
79-
let upCount = try await system.cluster.membershipSnapshot.count(withStatus: .up)
80-
if upCount != systems.count {
81-
return false
73+
private func ensureCluster(_ systems: [ClusterSystem], within: Duration) async throws {
74+
let nodes = Set(systems.map(\.settings.uniqueBindNode))
75+
76+
try await withThrowingTaskGroup(of: Void.self) { group in
77+
for system in systems {
78+
group.addTask {
79+
try await system.cluster.waitFor(nodes, .up, within: within)
80+
}
8281
}
82+
// loop explicitly to propagagte any error that might have been thrown
83+
for try await _ in group {}
8384
}
84-
return true
8585
}
8686
}

Sources/DistributedActors/Cluster/Cluster+Membership.swift

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -663,7 +663,37 @@ extension MembershipDiff: CustomDebugStringConvertible {
663663
// MARK: Errors
664664

665665
extension Cluster {
666-
public enum MembershipError: Error {
666+
public enum MembershipError: Error, CustomPrettyStringConvertible {
667667
case nonMemberLeaderSelected(Cluster.Membership, wannabeLeader: Cluster.Member)
668+
case notFound(UniqueNode, in: Cluster.Membership)
669+
case atLeastStatusRequirementNotMet(expectedAtLeast: Cluster.MemberStatus, found: Cluster.Member)
670+
case statusRequirementNotMet(expected: Cluster.MemberStatus, found: Cluster.Member)
671+
case awaitStatusTimedOut(Duration, Error?)
672+
673+
public var prettyDescription: String {
674+
"\(Self.self)(\(self), details: \(self.details))"
675+
}
676+
677+
private var details: String {
678+
switch self {
679+
case .nonMemberLeaderSelected(let membership, let wannabeLeader):
680+
return "[\(wannabeLeader)] selected leader but is not a member [\(membership)]"
681+
case .notFound(let node, let membership):
682+
return "[\(node)] is not a member [\(membership)]"
683+
case .atLeastStatusRequirementNotMet(let expectedAtLeastStatus, let foundMember):
684+
return "Expected \(reflecting: foundMember.uniqueNode) to be seen as at-least [\(expectedAtLeastStatus)] but was [\(foundMember.status)]"
685+
case .statusRequirementNotMet(let expectedStatus, let foundMember):
686+
return "Expected \(reflecting: foundMember.uniqueNode) to be seen as [\(expectedStatus)] but was [\(foundMember.status)]"
687+
case .awaitStatusTimedOut(let duration, let lastError):
688+
let lastErrorMessage: String
689+
if let error = lastError {
690+
lastErrorMessage = "Last error: \(error)"
691+
} else {
692+
lastErrorMessage = "Last error: <none>"
693+
}
694+
695+
return "No result within \(duration.prettyDescription). \(lastErrorMessage)"
696+
}
697+
}
668698
}
669699
}

Sources/DistributedActors/Cluster/ClusterControl.swift

Lines changed: 134 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,12 @@ public struct ClusterControl {
7171
}
7272
}
7373

74+
private let cluster: ClusterShell?
7475
internal let ref: ClusterShell.Ref
7576

76-
init(_ settings: ClusterSystemSettings, clusterRef: ClusterShell.Ref, eventStream: EventStream<Cluster.Event>) {
77+
init(_ settings: ClusterSystemSettings, cluster: ClusterShell?, clusterRef: ClusterShell.Ref, eventStream: EventStream<Cluster.Event>) {
7778
self.settings = settings
79+
self.cluster = cluster
7880
self.ref = clusterRef
7981
self.events = eventStream
8082

@@ -155,4 +157,135 @@ public struct ClusterControl {
155157
public func down(member: Cluster.Member) {
156158
self.ref.tell(.command(.downCommandMember(member)))
157159
}
160+
161+
/// Wait, within the given duration, until this actor system has joined the node's cluster.
162+
///
163+
/// - Parameters
164+
/// - node: The node to be joined by this system.
165+
/// - within: Duration to wait for.
166+
///
167+
/// - Returns `Cluster.Member` for the joined node.
168+
public func joined(node: UniqueNode, within: Duration) async throws -> Cluster.Member {
169+
try await self.waitFor(node, .up, within: within)
170+
}
171+
172+
/// Wait, within the given duration, for this actor system to be a member of all the nodes' respective cluster and have the specified status.
173+
///
174+
/// - Parameters
175+
/// - nodes: The nodes to be joined by this system.
176+
/// - status: The expected member status.
177+
/// - within: Duration to wait for.
178+
public func waitFor(_ nodes: Set<UniqueNode>, _ status: Cluster.MemberStatus, within: Duration) async throws {
179+
try await withThrowingTaskGroup(of: Void.self) { group in
180+
for node in nodes {
181+
group.addTask {
182+
_ = try await self.waitFor(node, status, within: within)
183+
}
184+
}
185+
// loop explicitly to propagagte any error that might have been thrown
186+
for try await _ in group {}
187+
}
188+
}
189+
190+
/// Wait, within the given duration, for this actor system to be a member of all the nodes' respective cluster and have **at least** the specified status.
191+
///
192+
/// - Parameters
193+
/// - nodes: The nodes to be joined by this system.
194+
/// - status: The minimum expected member status.
195+
/// - within: Duration to wait for.
196+
public func waitFor(_ nodes: Set<UniqueNode>, atLeast atLeastStatus: Cluster.MemberStatus, within: Duration) async throws {
197+
try await withThrowingTaskGroup(of: Void.self) { group in
198+
for node in nodes {
199+
group.addTask {
200+
_ = try await self.waitFor(node, atLeast: atLeastStatus, within: within)
201+
}
202+
}
203+
// loop explicitly to propagagte any error that might have been thrown
204+
for try await _ in group {}
205+
}
206+
}
207+
208+
/// Wait, within the given duration, for this actor system to be a member of the node's cluster and have the specified status.
209+
///
210+
/// - Parameters
211+
/// - node: The node to be joined by this system.
212+
/// - status: The expected member status.
213+
/// - within: Duration to wait for.
214+
///
215+
/// - Returns `Cluster.Member` for the joined node with the expected status.
216+
/// If the expected status is `.down` or `.removed`, and the node is already known to have been removed from the cluster
217+
/// a synthesized `Cluster/MemberStatus/removed` (and `.unreachable`) member is returned.
218+
public func waitFor(_ node: UniqueNode, _ status: Cluster.MemberStatus, within: Duration) async throws -> Cluster.Member {
219+
try await self.waitForMembershipEventually(within: within) { membership in
220+
if status == .down || status == .removed {
221+
if let cluster = self.cluster, let tombstone = cluster.getExistingAssociationTombstone(with: node) {
222+
return Cluster.Member(node: node, status: .removed).asUnreachable
223+
}
224+
}
225+
226+
guard let foundMember = membership.uniqueMember(node) else {
227+
if status == .down || status == .removed {
228+
// so we're seeing an already removed member, this can indeed happen and is okey
229+
return Cluster.Member(node: node, status: .removed).asUnreachable
230+
}
231+
throw Cluster.MembershipError.notFound(node, in: membership)
232+
}
233+
234+
if status != foundMember.status {
235+
throw Cluster.MembershipError.statusRequirementNotMet(expected: status, found: foundMember)
236+
}
237+
return foundMember
238+
}
239+
}
240+
241+
/// Wait, within the given duration, for this actor system to be a member of the node's cluster and have **at least** the specified status.
242+
///
243+
/// - Parameters
244+
/// - node: The node to be joined by this system.
245+
/// - atLeastStatus: The minimum expected member status.
246+
/// - within: Duration to wait for.
247+
///
248+
/// - Returns `Cluster.Member` for the joined node with the minimum expected status.
249+
/// If the expected status is at least `.down` or `.removed`, and either a tombstone exists for the node or the associated
250+
/// membership is not found, the `Cluster.Member` returned would have `.removed` status and *unreachable*.
251+
public func waitFor(_ node: UniqueNode, atLeast atLeastStatus: Cluster.MemberStatus, within: Duration) async throws -> Cluster.Member {
252+
try await self.waitForMembershipEventually(within: within) { membership in
253+
if atLeastStatus == .down || atLeastStatus == .removed {
254+
if let cluster = self.cluster, let tombstone = cluster.getExistingAssociationTombstone(with: node) {
255+
return Cluster.Member(node: node, status: .removed).asUnreachable
256+
}
257+
}
258+
259+
guard let foundMember = membership.uniqueMember(node) else {
260+
if atLeastStatus == .down || atLeastStatus == .removed {
261+
// so we're seeing an already removed member, this can indeed happen and is okey
262+
return Cluster.Member(node: node, status: .removed).asUnreachable
263+
}
264+
throw Cluster.MembershipError.notFound(node, in: membership)
265+
}
266+
267+
if atLeastStatus <= foundMember.status {
268+
throw Cluster.MembershipError.atLeastStatusRequirementNotMet(expectedAtLeast: atLeastStatus, found: foundMember)
269+
}
270+
return foundMember
271+
}
272+
}
273+
274+
private func waitForMembershipEventually<T>(within: Duration, interval: Duration = .milliseconds(100), _ block: (Cluster.Membership) async throws -> T) async throws -> T {
275+
let deadline = ContinuousClock.Instant.fromNow(within)
276+
277+
var lastError: Error?
278+
while deadline.hasTimeLeft() {
279+
let membership = await self.membershipSnapshot
280+
do {
281+
let result = try await block(membership)
282+
return result
283+
} catch {
284+
lastError = error
285+
try await Task.sleep(nanoseconds: UInt64(interval.nanoseconds))
286+
}
287+
}
288+
289+
throw Cluster.MembershipError.awaitStatusTimedOut(within, lastError)
290+
}
158291
}

Sources/DistributedActors/Cluster/ClusterShell.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ internal class ClusterShell {
6464
}
6565
}
6666

67+
internal func getExistingAssociationTombstone(with node: UniqueNode) -> Association.Tombstone? {
68+
self._associationsLock.withLock {
69+
self._associationTombstones[node]
70+
}
71+
}
72+
6773
/// Get an existing association or ensure that a new one shall be stored and joining kicked off if the target node was not known yet.
6874
/// Safe to concurrently access by privileged internals directly
6975
internal func getEnsureAssociation(with node: UniqueNode, file: String = #file, line: UInt = #line) -> StoredAssociationState {

Sources/DistributedActors/ClusterSystem.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
314314
)
315315

316316
_ = self._clusterStore.storeIfNilThenLoad(Box(nil))
317-
_ = self._clusterControlStore.storeIfNilThenLoad(Box(ClusterControl(settings, clusterRef: self.deadLetters.adapted(), eventStream: clusterEvents)))
317+
_ = self._clusterControlStore.storeIfNilThenLoad(Box(ClusterControl(settings, cluster: nil, clusterRef: self.deadLetters.adapted(), eventStream: clusterEvents)))
318318
}
319319

320320
// node watcher MUST be prepared before receptionist (or any other actor) because it (and all actors) need it if we're running clustered
@@ -331,7 +331,7 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
331331
customBehavior: ClusterEventStream.Shell.behavior
332332
)
333333
let clusterRef = try! cluster.start(system: self, clusterEvents: clusterEvents) // only spawns when cluster is initialized
334-
_ = self._clusterControlStore.storeIfNilThenLoad(Box(ClusterControl(settings, clusterRef: clusterRef, eventStream: clusterEvents)))
334+
_ = self._clusterControlStore.storeIfNilThenLoad(Box(ClusterControl(settings, cluster: cluster, clusterRef: clusterRef, eventStream: clusterEvents)))
335335

336336
self._associationTombstoneCleanupTask = eventLoopGroup.next().scheduleRepeatedTask(
337337
initialDelay: settings.associationTombstoneCleanupInterval.toNIO,

0 commit comments

Comments
 (0)