diff --git a/Samples/Sources/SampleDiningPhilosophers/DistributedDiningPhilosophers.swift b/Samples/Sources/SampleDiningPhilosophers/DistributedDiningPhilosophers.swift index 52fb0a72d..71fa91590 100644 --- a/Samples/Sources/SampleDiningPhilosophers/DistributedDiningPhilosophers.swift +++ b/Samples/Sources/SampleDiningPhilosophers/DistributedDiningPhilosophers.swift @@ -40,11 +40,7 @@ final class DistributedDiningPhilosophers { systemC.cluster.join(node: systemB.settings.node) print("waiting for cluster to form...") - // TODO: implement this using "await join cluster" API [#948](https://github.com/apple/swift-distributed-actors/issues/948) - while !(try await self.isClusterFormed(systems)) { - let nanosInSecond: UInt64 = 1_000_000_000 - try await Task.sleep(nanoseconds: 1 * nanosInSecond) - } + try await self.ensureCluster(systems, within: .seconds(10)) print("~~~~~~~ systems joined each other ~~~~~~~") @@ -74,13 +70,17 @@ final class DistributedDiningPhilosophers { try systemA.park(atMost: duration) } - private func isClusterFormed(_ systems: [ClusterSystem]) async throws -> Bool { - for system in systems { - let upCount = try await system.cluster.membershipSnapshot.count(withStatus: .up) - if upCount != systems.count { - return false + private func ensureCluster(_ systems: [ClusterSystem], within: Duration) async throws { + let nodes = Set(systems.map(\.settings.uniqueBindNode)) + + try await withThrowingTaskGroup(of: Void.self) { group in + for system in systems { + group.addTask { + try await system.cluster.waitFor(nodes, .up, within: within) + } } + // loop explicitly to propagagte any error that might have been thrown + for try await _ in group {} } - return true } } diff --git a/Sources/DistributedActors/Cluster/Cluster+Membership.swift b/Sources/DistributedActors/Cluster/Cluster+Membership.swift index 5d49fe4cc..d775aa7d8 100644 --- a/Sources/DistributedActors/Cluster/Cluster+Membership.swift +++ b/Sources/DistributedActors/Cluster/Cluster+Membership.swift @@ -663,7 +663,37 @@ extension MembershipDiff: CustomDebugStringConvertible { // MARK: Errors extension Cluster { - public enum MembershipError: Error { + public enum MembershipError: Error, CustomPrettyStringConvertible { case nonMemberLeaderSelected(Cluster.Membership, wannabeLeader: Cluster.Member) + case notFound(UniqueNode, in: Cluster.Membership) + case atLeastStatusRequirementNotMet(expectedAtLeast: Cluster.MemberStatus, found: Cluster.Member) + case statusRequirementNotMet(expected: Cluster.MemberStatus, found: Cluster.Member) + case awaitStatusTimedOut(Duration, Error?) + + public var prettyDescription: String { + "\(Self.self)(\(self), details: \(self.details))" + } + + private var details: String { + switch self { + case .nonMemberLeaderSelected(let membership, let wannabeLeader): + return "[\(wannabeLeader)] selected leader but is not a member [\(membership)]" + case .notFound(let node, let membership): + return "[\(node)] is not a member [\(membership)]" + case .atLeastStatusRequirementNotMet(let expectedAtLeastStatus, let foundMember): + return "Expected \(reflecting: foundMember.uniqueNode) to be seen as at-least [\(expectedAtLeastStatus)] but was [\(foundMember.status)]" + case .statusRequirementNotMet(let expectedStatus, let foundMember): + return "Expected \(reflecting: foundMember.uniqueNode) to be seen as [\(expectedStatus)] but was [\(foundMember.status)]" + case .awaitStatusTimedOut(let duration, let lastError): + let lastErrorMessage: String + if let error = lastError { + lastErrorMessage = "Last error: \(error)" + } else { + lastErrorMessage = "Last error: " + } + + return "No result within \(duration.prettyDescription). \(lastErrorMessage)" + } + } } } diff --git a/Sources/DistributedActors/Cluster/ClusterControl.swift b/Sources/DistributedActors/Cluster/ClusterControl.swift index d8640fcf3..baac0a8ac 100644 --- a/Sources/DistributedActors/Cluster/ClusterControl.swift +++ b/Sources/DistributedActors/Cluster/ClusterControl.swift @@ -71,10 +71,12 @@ public struct ClusterControl { } } + private let cluster: ClusterShell? internal let ref: ClusterShell.Ref - init(_ settings: ClusterSystemSettings, clusterRef: ClusterShell.Ref, eventStream: EventStream) { + init(_ settings: ClusterSystemSettings, cluster: ClusterShell?, clusterRef: ClusterShell.Ref, eventStream: EventStream) { self.settings = settings + self.cluster = cluster self.ref = clusterRef self.events = eventStream @@ -155,4 +157,135 @@ public struct ClusterControl { public func down(member: Cluster.Member) { self.ref.tell(.command(.downCommandMember(member))) } + + /// Wait, within the given duration, until this actor system has joined the node's cluster. + /// + /// - Parameters + /// - node: The node to be joined by this system. + /// - within: Duration to wait for. + /// + /// - Returns `Cluster.Member` for the joined node. + public func joined(node: UniqueNode, within: Duration) async throws -> Cluster.Member { + try await self.waitFor(node, .up, within: within) + } + + /// Wait, within the given duration, for this actor system to be a member of all the nodes' respective cluster and have the specified status. + /// + /// - Parameters + /// - nodes: The nodes to be joined by this system. + /// - status: The expected member status. + /// - within: Duration to wait for. + public func waitFor(_ nodes: Set, _ status: Cluster.MemberStatus, within: Duration) async throws { + try await withThrowingTaskGroup(of: Void.self) { group in + for node in nodes { + group.addTask { + _ = try await self.waitFor(node, status, within: within) + } + } + // loop explicitly to propagagte any error that might have been thrown + for try await _ in group {} + } + } + + /// Wait, within the given duration, for this actor system to be a member of all the nodes' respective cluster and have **at least** the specified status. + /// + /// - Parameters + /// - nodes: The nodes to be joined by this system. + /// - status: The minimum expected member status. + /// - within: Duration to wait for. + public func waitFor(_ nodes: Set, atLeast atLeastStatus: Cluster.MemberStatus, within: Duration) async throws { + try await withThrowingTaskGroup(of: Void.self) { group in + for node in nodes { + group.addTask { + _ = try await self.waitFor(node, atLeast: atLeastStatus, within: within) + } + } + // loop explicitly to propagagte any error that might have been thrown + for try await _ in group {} + } + } + + /// Wait, within the given duration, for this actor system to be a member of the node's cluster and have the specified status. + /// + /// - Parameters + /// - node: The node to be joined by this system. + /// - status: The expected member status. + /// - within: Duration to wait for. + /// + /// - Returns `Cluster.Member` for the joined node with the expected status. + /// If the expected status is `.down` or `.removed`, and the node is already known to have been removed from the cluster + /// a synthesized `Cluster/MemberStatus/removed` (and `.unreachable`) member is returned. + public func waitFor(_ node: UniqueNode, _ status: Cluster.MemberStatus, within: Duration) async throws -> Cluster.Member { + try await self.waitForMembershipEventually(within: within) { membership in + if status == .down || status == .removed { + if let cluster = self.cluster, let tombstone = cluster.getExistingAssociationTombstone(with: node) { + return Cluster.Member(node: node, status: .removed).asUnreachable + } + } + + guard let foundMember = membership.uniqueMember(node) else { + if status == .down || status == .removed { + // so we're seeing an already removed member, this can indeed happen and is okey + return Cluster.Member(node: node, status: .removed).asUnreachable + } + throw Cluster.MembershipError.notFound(node, in: membership) + } + + if status != foundMember.status { + throw Cluster.MembershipError.statusRequirementNotMet(expected: status, found: foundMember) + } + return foundMember + } + } + + /// Wait, within the given duration, for this actor system to be a member of the node's cluster and have **at least** the specified status. + /// + /// - Parameters + /// - node: The node to be joined by this system. + /// - atLeastStatus: The minimum expected member status. + /// - within: Duration to wait for. + /// + /// - Returns `Cluster.Member` for the joined node with the minimum expected status. + /// If the expected status is at least `.down` or `.removed`, and either a tombstone exists for the node or the associated + /// membership is not found, the `Cluster.Member` returned would have `.removed` status and *unreachable*. + public func waitFor(_ node: UniqueNode, atLeast atLeastStatus: Cluster.MemberStatus, within: Duration) async throws -> Cluster.Member { + try await self.waitForMembershipEventually(within: within) { membership in + if atLeastStatus == .down || atLeastStatus == .removed { + if let cluster = self.cluster, let tombstone = cluster.getExistingAssociationTombstone(with: node) { + return Cluster.Member(node: node, status: .removed).asUnreachable + } + } + + guard let foundMember = membership.uniqueMember(node) else { + if atLeastStatus == .down || atLeastStatus == .removed { + // so we're seeing an already removed member, this can indeed happen and is okey + return Cluster.Member(node: node, status: .removed).asUnreachable + } + throw Cluster.MembershipError.notFound(node, in: membership) + } + + if atLeastStatus <= foundMember.status { + throw Cluster.MembershipError.atLeastStatusRequirementNotMet(expectedAtLeast: atLeastStatus, found: foundMember) + } + return foundMember + } + } + + private func waitForMembershipEventually(within: Duration, interval: Duration = .milliseconds(100), _ block: (Cluster.Membership) async throws -> T) async throws -> T { + let deadline = ContinuousClock.Instant.fromNow(within) + + var lastError: Error? + while deadline.hasTimeLeft() { + let membership = await self.membershipSnapshot + do { + let result = try await block(membership) + return result + } catch { + lastError = error + try await Task.sleep(nanoseconds: UInt64(interval.nanoseconds)) + } + } + + throw Cluster.MembershipError.awaitStatusTimedOut(within, lastError) + } } diff --git a/Sources/DistributedActors/Cluster/ClusterShell.swift b/Sources/DistributedActors/Cluster/ClusterShell.swift index 6c7e95ab8..5aa42236e 100644 --- a/Sources/DistributedActors/Cluster/ClusterShell.swift +++ b/Sources/DistributedActors/Cluster/ClusterShell.swift @@ -64,6 +64,12 @@ internal class ClusterShell { } } + internal func getExistingAssociationTombstone(with node: UniqueNode) -> Association.Tombstone? { + self._associationsLock.withLock { + self._associationTombstones[node] + } + } + /// Get an existing association or ensure that a new one shall be stored and joining kicked off if the target node was not known yet. /// Safe to concurrently access by privileged internals directly internal func getEnsureAssociation(with node: UniqueNode, file: String = #file, line: UInt = #line) -> StoredAssociationState { diff --git a/Sources/DistributedActors/ClusterSystem.swift b/Sources/DistributedActors/ClusterSystem.swift index f1c640f85..7be681fbe 100644 --- a/Sources/DistributedActors/ClusterSystem.swift +++ b/Sources/DistributedActors/ClusterSystem.swift @@ -314,7 +314,7 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable { ) _ = self._clusterStore.storeIfNilThenLoad(Box(nil)) - _ = self._clusterControlStore.storeIfNilThenLoad(Box(ClusterControl(settings, clusterRef: self.deadLetters.adapted(), eventStream: clusterEvents))) + _ = self._clusterControlStore.storeIfNilThenLoad(Box(ClusterControl(settings, cluster: nil, clusterRef: self.deadLetters.adapted(), eventStream: clusterEvents))) } // node watcher MUST be prepared before receptionist (or any other actor) because it (and all actors) need it if we're running clustered @@ -331,7 +331,7 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable { customBehavior: ClusterEventStream.Shell.behavior ) let clusterRef = try! cluster.start(system: self, clusterEvents: clusterEvents) // only spawns when cluster is initialized - _ = self._clusterControlStore.storeIfNilThenLoad(Box(ClusterControl(settings, clusterRef: clusterRef, eventStream: clusterEvents))) + _ = self._clusterControlStore.storeIfNilThenLoad(Box(ClusterControl(settings, cluster: cluster, clusterRef: clusterRef, eventStream: clusterEvents))) self._associationTombstoneCleanupTask = eventLoopGroup.next().scheduleRepeatedTask( initialDelay: settings.associationTombstoneCleanupInterval.toNIO, diff --git a/Sources/DistributedActorsTestKit/Cluster/ClusteredActorSystemsXCTestCase.swift b/Sources/DistributedActorsTestKit/Cluster/ClusteredActorSystemsXCTestCase.swift index 20241ceeb..fafa85c93 100644 --- a/Sources/DistributedActorsTestKit/Cluster/ClusteredActorSystemsXCTestCase.swift +++ b/Sources/DistributedActorsTestKit/Cluster/ClusteredActorSystemsXCTestCase.swift @@ -163,15 +163,13 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase { fatalError("Must at least have 1 system present to use [\(#function)]") } - try await self.testKit(onSystem).eventually(within: within, file: file, line: line) { - do { - // all members on onMember should have reached this status (e.g. up) - for node in nodes { - try await self.assertMemberStatus(on: onSystem, node: node, is: status, file: file, line: line) - } - } catch { - throw error - } + let testKit = self.testKit(onSystem) + do { + try await onSystem.cluster.waitFor(Set(nodes), status, within: within) + } catch let error as Cluster.MembershipError { + throw testKit.error("\(error.prettyDescription)", file: file, line: line) + } catch { + throw testKit.error("\(error)", file: file, line: line) } } @@ -183,15 +181,14 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase { fatalError("Must at least have 1 system present to use [\(#function)]") } - try await self.testKit(onSystem).eventually(within: within, file: file, line: line) { - do { - // all members on onMember should have reached this status (e.g. up) - for node in nodes { - _ = try await self.assertMemberStatus(on: onSystem, node: node, atLeast: status, file: file, line: line) - } - } catch { - throw error - } + let testKit = self.testKit(onSystem) + do { + // all members on onMember should have reached this status (e.g. up) + try await onSystem.cluster.waitFor(Set(nodes), atLeast: status, within: within) + } catch let error as Cluster.MembershipError { + throw testKit.error("\(error.prettyDescription)", file: file, line: line) + } catch { + throw testKit.error("\(error)", file: file, line: line) } } } @@ -354,60 +351,63 @@ extension ClusteredActorSystemsXCTestCase { } } - /// Asserts the given member node has the expected `status`. - /// - /// An error is thrown but NOT failing the test; use in pair with `testKit.eventually` to achieve the expected behavior. + /// Asserts the given member node has the expected `status` within the duration. public func assertMemberStatus( on system: ClusterSystem, node: UniqueNode, is expectedStatus: Cluster.MemberStatus, + within: Duration, file: StaticString = #file, line: UInt = #line ) async throws { let testKit = self.testKit(system) - let membership = await system.cluster.membershipSnapshot - guard let foundMember = membership.uniqueMember(node) else { - throw testKit.error("Expected [\(system.cluster.uniqueNode)] to know about [\(node)] member", file: file, line: line) - } - if foundMember.status != expectedStatus { - throw testKit.error( - """ - Expected \(reflecting: foundMember.uniqueNode) on \(reflecting: system.cluster.uniqueNode) \ - to be seen as: [\(expectedStatus)], but was [\(foundMember.status)] - """, - file: file, - line: line - ) + do { + _ = try await system.cluster.waitFor(node, expectedStatus, within: within) + } catch let error as Cluster.MembershipError { + switch error { + case .notFound: + throw testKit.error("Expected [\(system.cluster.uniqueNode)] to know about [\(node)] member", file: file, line: line) + case .statusRequirementNotMet(_, let foundMember): + throw testKit.error( + """ + Expected \(reflecting: foundMember.uniqueNode) on \(reflecting: system.cluster.uniqueNode) \ + to be seen as: [\(expectedStatus)], but was [\(foundMember.status)] + """, + file: file, + line: line + ) + default: + throw testKit.error(error.prettyDescription, file: file, line: line) + } } } public func assertMemberStatus( on system: ClusterSystem, node: UniqueNode, atLeast expectedAtLeastStatus: Cluster.MemberStatus, + within: Duration, file: StaticString = #file, line: UInt = #line - ) async throws -> Cluster.MemberStatus? { + ) async throws { let testKit = self.testKit(system) - let membership = await system.cluster.membershipSnapshot - guard let foundMember = membership.uniqueMember(node) else { - if expectedAtLeastStatus == .down || expectedAtLeastStatus == .removed { - // so we're seeing an already removed member, this can indeed happen and is okey - return nil - } else { + + do { + _ = try await system.cluster.waitFor(node, atLeast: expectedAtLeastStatus, within: within) + } catch let error as Cluster.MembershipError { + switch error { + case .notFound: throw testKit.error("Expected [\(system.cluster.uniqueNode)] to know about [\(node)] member", file: file, line: line) + case .atLeastStatusRequirementNotMet(_, let foundMember): + throw testKit.error( + """ + Expected \(reflecting: foundMember.uniqueNode) on \(reflecting: system.cluster.uniqueNode) \ + to be seen as at-least: [\(expectedAtLeastStatus)], but was [\(foundMember.status)] + """, + file: file, + line: line + ) + default: + throw testKit.error(error.prettyDescription, file: file, line: line) } } - - if expectedAtLeastStatus <= foundMember.status { - throw testKit.error( - """ - Expected \(reflecting: foundMember.uniqueNode) on \(reflecting: system.cluster.uniqueNode) \ - to be seen as at-least: [\(expectedAtLeastStatus)], but was [\(foundMember.status)] - """, - file: file, - line: line - ) - } - - return expectedAtLeastStatus } /// Assert based on the event stream of ``Cluster/Event`` that the given `node` was downed or removed. diff --git a/Tests/DistributedActorsTests/Cluster/AssociationClusteredTests.swift b/Tests/DistributedActorsTests/Cluster/AssociationClusteredTests.swift index efee56162..86d8b0307 100644 --- a/Tests/DistributedActorsTests/Cluster/AssociationClusteredTests.swift +++ b/Tests/DistributedActorsTests/Cluster/AssociationClusteredTests.swift @@ -372,19 +372,18 @@ final class ClusterAssociationTests: ClusteredActorSystemsXCTestCase { let secondProbe = self.testKit(second).makeTestProbe(expecting: Cluster.Membership.self) // we we down first on first, it should become down there: - try await self.testKit(first).eventually(within: .seconds(3)) { + try self.testKit(first).eventually(within: .seconds(3)) { first.cluster.ref.tell(.query(.currentMembership(firstProbe.ref))) let firstMembership = try firstProbe.expectMessage() guard let selfMember = firstMembership.uniqueMember(first.cluster.uniqueNode) else { throw self.testKit(second).error("No self member in membership! Wanted: \(first.cluster.uniqueNode)", line: #line - 1) } - - try await self.assertMemberStatus(on: first, node: first.cluster.uniqueNode, is: .down) guard selfMember.status == .down else { throw self.testKit(first).error("Wanted self member to be DOWN, but was: \(selfMember)", line: #line - 1) } } + try await self.assertMemberStatus(on: first, node: first.cluster.uniqueNode, is: .down, within: .seconds(3)) // and the second node should also notice try self.testKit(second).eventually(within: .seconds(3)) { diff --git a/Tests/DistributedActorsTests/Cluster/ClusterLeaderActionsClusteredTests.swift b/Tests/DistributedActorsTests/Cluster/ClusterLeaderActionsClusteredTests.swift index 8568bc26e..d52f1f778 100644 --- a/Tests/DistributedActorsTests/Cluster/ClusterLeaderActionsClusteredTests.swift +++ b/Tests/DistributedActorsTests/Cluster/ClusterLeaderActionsClusteredTests.swift @@ -79,23 +79,17 @@ final class ClusterLeaderActionsClusteredTests: ClusteredActorSystemsXCTestCase try assertAssociated(second, withAtLeast: third.cluster.uniqueNode) try assertAssociated(first, withAtLeast: third.cluster.uniqueNode) - try await self.testKit(first).eventually(within: .seconds(10)) { - try await self.assertMemberStatus(on: first, node: first.cluster.uniqueNode, is: .up) - try await self.assertMemberStatus(on: first, node: second.cluster.uniqueNode, is: .up) - try await self.assertMemberStatus(on: first, node: third.cluster.uniqueNode, is: .up) - } + try await self.assertMemberStatus(on: first, node: first.cluster.uniqueNode, is: .up, within: .seconds(10)) + try await self.assertMemberStatus(on: first, node: second.cluster.uniqueNode, is: .up, within: .seconds(10)) + try await self.assertMemberStatus(on: first, node: third.cluster.uniqueNode, is: .up, within: .seconds(10)) - try await self.testKit(second).eventually(within: .seconds(10)) { - try await self.assertMemberStatus(on: second, node: first.cluster.uniqueNode, is: .up) - try await self.assertMemberStatus(on: second, node: second.cluster.uniqueNode, is: .up) - try await self.assertMemberStatus(on: second, node: third.cluster.uniqueNode, is: .up) - } + try await self.assertMemberStatus(on: second, node: first.cluster.uniqueNode, is: .up, within: .seconds(10)) + try await self.assertMemberStatus(on: second, node: second.cluster.uniqueNode, is: .up, within: .seconds(10)) + try await self.assertMemberStatus(on: second, node: third.cluster.uniqueNode, is: .up, within: .seconds(10)) - try await self.testKit(third).eventually(within: .seconds(10)) { - try await self.assertMemberStatus(on: third, node: first.cluster.uniqueNode, is: .up) - try await self.assertMemberStatus(on: third, node: second.cluster.uniqueNode, is: .up) - try await self.assertMemberStatus(on: third, node: third.cluster.uniqueNode, is: .up) - } + try await self.assertMemberStatus(on: third, node: first.cluster.uniqueNode, is: .up, within: .seconds(10)) + try await self.assertMemberStatus(on: third, node: second.cluster.uniqueNode, is: .up, within: .seconds(10)) + try await self.assertMemberStatus(on: third, node: third.cluster.uniqueNode, is: .up, within: .seconds(10)) } func test_joining_to_up_earlyYetStillLettingAllNodesKnowAboutLatestMembershipStatus() async throws { diff --git a/Tests/DistributedActorsTests/Cluster/MembershipGossipClusteredTests.swift b/Tests/DistributedActorsTests/Cluster/MembershipGossipClusteredTests.swift index a5048cf44..23306f306 100644 --- a/Tests/DistributedActorsTests/Cluster/MembershipGossipClusteredTests.swift +++ b/Tests/DistributedActorsTests/Cluster/MembershipGossipClusteredTests.swift @@ -59,11 +59,9 @@ final class MembershipGossipClusteredTests: ClusteredActorSystemsXCTestCase { try assertAssociated(second, withAtLeast: third.cluster.uniqueNode) try assertAssociated(first, withAtLeast: third.cluster.uniqueNode) - try await self.testKit(second).eventually(within: .seconds(10)) { - try await self.assertMemberStatus(on: second, node: first.cluster.uniqueNode, is: .up) - try await self.assertMemberStatus(on: second, node: second.cluster.uniqueNode, is: .up) - try await self.assertMemberStatus(on: second, node: third.cluster.uniqueNode, is: .up) - } + try await self.assertMemberStatus(on: second, node: first.cluster.uniqueNode, is: .up, within: .seconds(10)) + try await self.assertMemberStatus(on: second, node: second.cluster.uniqueNode, is: .up, within: .seconds(10)) + try await self.assertMemberStatus(on: second, node: third.cluster.uniqueNode, is: .up, within: .seconds(10)) let firstEvents = testKit(first).spawnEventStreamTestProbe(subscribedTo: first.cluster.events) let secondEvents = testKit(second).spawnEventStreamTestProbe(subscribedTo: second.cluster.events)