Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,7 @@ final class DistributedDiningPhilosophers {
systemC.cluster.join(node: systemB.settings.node)

print("waiting for cluster to form...")
// TODO: implement this using "await join cluster" API [#948](https://github.com/apple/swift-distributed-actors/issues/948)
while !(try await self.isClusterFormed(systems)) {
let nanosInSecond: UInt64 = 1_000_000_000
try await Task.sleep(nanoseconds: 1 * nanosInSecond)
}
try await self.ensureCluster(systems, within: .seconds(10))

print("~~~~~~~ systems joined each other ~~~~~~~")

Expand Down Expand Up @@ -74,13 +70,17 @@ final class DistributedDiningPhilosophers {
try systemA.park(atMost: duration)
}

private func isClusterFormed(_ systems: [ClusterSystem]) async throws -> Bool {
for system in systems {
let upCount = try await system.cluster.membershipSnapshot.count(withStatus: .up)
if upCount != systems.count {
return false
private func ensureCluster(_ systems: [ClusterSystem], within: Duration) async throws {
let nodes = Set(systems.map(\.settings.uniqueBindNode))

try await withThrowingTaskGroup(of: Void.self) { group in
for system in systems {
group.addTask {
try await system.cluster.waitFor(nodes, .up, within: within)
}
}
// loop explicitly to propagagte any error that might have been thrown
for try await _ in group {}
}
return true
}
}
31 changes: 30 additions & 1 deletion Sources/DistributedActors/Cluster/Cluster+Membership.swift
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,36 @@ extension MembershipDiff: CustomDebugStringConvertible {
// MARK: Errors

extension Cluster {
public enum MembershipError: Error {
public enum MembershipError: Error, CustomStringConvertible {
case nonMemberLeaderSelected(Cluster.Membership, wannabeLeader: Cluster.Member)
case notFound(UniqueNode, in: Cluster.Membership)
case atLeastStatusRequirementNotMet(expectedAtLeast: Cluster.MemberStatus, found: Cluster.Member)
case statusRequirementNotMet(expected: Cluster.MemberStatus, found: Cluster.Member)
case awaitStatusTimedOut(Duration, Error?)

public var description: String {
switch self {
case .nonMemberLeaderSelected(let membership, let wannabeLeader):
return "[\(wannabeLeader)] selected leader but is not a member [\(membership)]"
case .notFound(let node, let membership):
return "[\(node)] is not a member [\(membership)]"
case .atLeastStatusRequirementNotMet(let expectedAtLeastStatus, let foundMember):
return "Expected \(reflecting: foundMember.uniqueNode) to be seen as at-least [\(expectedAtLeastStatus)] but was [\(foundMember.status)]"
case .statusRequirementNotMet(let expectedStatus, let foundMember):
return "Expected \(reflecting: foundMember.uniqueNode) to be seen as [\(expectedStatus)] but was [\(foundMember.status)]"
case .awaitStatusTimedOut(let duration, let lastError):
let lastErrorMessage: String
if let error = lastError {
lastErrorMessage = "Last error: \(error)"
} else {
lastErrorMessage = "Last error: <none>"
}

return """
No result within \(duration.prettyDescription).
\(lastErrorMessage)
"""
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good but Perhaps must be wrapped in "MembershipError(\(self), details: <the nice strings>")? Otherwise might be weird not seeing what type the error was?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

amended bce275d

}
}
112 changes: 112 additions & 0 deletions Sources/DistributedActors/Cluster/ClusterControl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,116 @@ public struct ClusterControl {
public func down(member: Cluster.Member) {
self.ref.tell(.command(.downCommandMember(member)))
}

/// Wait, within the given duration, until this actor system has joined the node's cluster.
///
/// - Parameters
/// - node: The node to be joined by this system.
/// - within: Duration to wait for.
///
/// - Returns `Cluster.Member` for the joined node.
public func joined(node: UniqueNode, within: Duration) async throws -> Cluster.Member {
try await self.waitFor(node, .up, within: within)
}

/// Wait, within the given duration, for this actor system to be a member of all the nodes' respective cluster and have the specified status.
///
/// - Parameters
/// - nodes: The nodes to be joined by this system.
/// - status: The expected member status.
/// - within: Duration to wait for.
public func waitFor(_ nodes: Set<UniqueNode>, _ status: Cluster.MemberStatus, within: Duration) async throws {
try await withThrowingTaskGroup(of: Void.self) { group in
for node in nodes {
group.addTask {
_ = try await self.waitFor(node, status, within: within)
}
}
// loop explicitly to propagagte any error that might have been thrown
for try await _ in group {}
}
}

/// Wait, within the given duration, for this actor system to be a member of all the nodes' respective cluster and have **at least** the specified status.
///
/// - Parameters
/// - nodes: The nodes to be joined by this system.
/// - status: The minimum expected member status.
/// - within: Duration to wait for.
public func waitFor(_ nodes: Set<UniqueNode>, atLeast atLeastStatus: Cluster.MemberStatus, within: Duration) async throws {
try await withThrowingTaskGroup(of: Void.self) { group in
for node in nodes {
group.addTask {
_ = try await self.waitFor(node, atLeast: atLeastStatus, within: within)
}
}
// loop explicitly to propagagte any error that might have been thrown
for try await _ in group {}
}
}

/// Wait, within the given duration, for this actor system to be a member of the node's cluster and have the specified status.
///
/// - Parameters
/// - node: The node to be joined by this system.
/// - status: The expected member status.
/// - within: Duration to wait for.
///
/// - Returns `Cluster.Member` for the joined node.
public func waitFor(_ node: UniqueNode, _ status: Cluster.MemberStatus, within: Duration) async throws -> Cluster.Member {
try await self.waitForMembershipEventually(within: within) { membership in
guard let foundMember = membership.uniqueMember(node) else {
throw Cluster.MembershipError.notFound(node, in: membership)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry to bother you but I thought of another case...

If a node was removed BEFORE, we'll have a tombstone for it inside the private var _associationTombstones: [UniqueNode: Association.Tombstone] protected by _associationsLock. If it is in there, we should return that the member was .removed

You can "make up" a member for a removed node by creating a Member( from the unique node and put .unreachable and .removed in there 👍

This could be in a separate PR 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

amended 5178fa4

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good!

if status != foundMember.status {
throw Cluster.MembershipError.statusRequirementNotMet(expected: status, found: foundMember)
}
return foundMember
}
}

/// Wait, within the given duration, for this actor system to be a member of the node's cluster and have **at least** the specified status.
///
/// - Parameters
/// - node: The node to be joined by this system.
/// - atLeastStatus: The minimum expected member status.
/// - within: Duration to wait for.
///
/// - Returns `Cluster.Member` for the joined node or `nil` if member is expected to be down or removed.
public func waitFor(_ node: UniqueNode, atLeast atLeastStatus: Cluster.MemberStatus, within: Duration) async throws -> Cluster.Member? {
try await self.waitForMembershipEventually(within: within) { membership in
guard let foundMember = membership.uniqueMember(node) else {
if atLeastStatus == .down || atLeastStatus == .removed {
// so we're seeing an already removed member, this can indeed happen and is okey
return nil
} else {
throw Cluster.MembershipError.notFound(node, in: membership)
}
}

if atLeastStatus <= foundMember.status {
throw Cluster.MembershipError.atLeastStatusRequirementNotMet(expectedAtLeast: atLeastStatus, found: foundMember)
}
return foundMember
}
}

private func waitForMembershipEventually<T>(within: Duration, interval: Duration = .milliseconds(100), _ block: (Cluster.Membership) async throws -> T) async throws -> T {
let deadline = ContinuousClock.Instant.fromNow(within)

var lastError: Error?
while deadline.hasTimeLeft() {
let membership = await self.membershipSnapshot
do {
let result = try await block(membership)
return result
} catch {
lastError = error
try await Task.sleep(nanoseconds: UInt64(interval.nanoseconds))
}
}

throw Cluster.MembershipError.awaitStatusTimedOut(within, lastError)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,11 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase {
fatalError("Must at least have 1 system present to use [\(#function)]")
}

try await self.testKit(onSystem).eventually(within: within, file: file, line: line) {
do {
// all members on onMember should have reached this status (e.g. up)
for node in nodes {
try await self.assertMemberStatus(on: onSystem, node: node, is: status, file: file, line: line)
}
} catch {
throw error
}
let testKit = self.testKit(onSystem)
do {
try await onSystem.cluster.waitFor(Set(nodes), status, within: within)
} catch {
throw testKit.error("\(error)", file: file, line: line)
}
}

Expand All @@ -183,15 +179,12 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase {
fatalError("Must at least have 1 system present to use [\(#function)]")
}

try await self.testKit(onSystem).eventually(within: within, file: file, line: line) {
do {
// all members on onMember should have reached this status (e.g. up)
for node in nodes {
_ = try await self.assertMemberStatus(on: onSystem, node: node, atLeast: status, file: file, line: line)
}
} catch {
throw error
}
let testKit = self.testKit(onSystem)
do {
// all members on onMember should have reached this status (e.g. up)
try await onSystem.cluster.waitFor(Set(nodes), atLeast: status, within: within)
} catch {
throw testKit.error("\(error)", file: file, line: line)
}
}
}
Expand Down Expand Up @@ -354,60 +347,64 @@ extension ClusteredActorSystemsXCTestCase {
}
}

/// Asserts the given member node has the expected `status`.
///
/// An error is thrown but NOT failing the test; use in pair with `testKit.eventually` to achieve the expected behavior.
/// Asserts the given member node has the expected `status` within the duration.
public func assertMemberStatus(
on system: ClusterSystem, node: UniqueNode,
is expectedStatus: Cluster.MemberStatus,
within: Duration,
file: StaticString = #file, line: UInt = #line
) async throws {
let testKit = self.testKit(system)
let membership = await system.cluster.membershipSnapshot
guard let foundMember = membership.uniqueMember(node) else {
throw testKit.error("Expected [\(system.cluster.uniqueNode)] to know about [\(node)] member", file: file, line: line)
}

if foundMember.status != expectedStatus {
throw testKit.error(
"""
Expected \(reflecting: foundMember.uniqueNode) on \(reflecting: system.cluster.uniqueNode) \
to be seen as: [\(expectedStatus)], but was [\(foundMember.status)]
""",
file: file,
line: line
)
do {
_ = try await system.cluster.waitFor(node, expectedStatus, within: within)
} catch let error as Cluster.MembershipError {
switch error {
case .notFound:
throw testKit.error("Expected [\(system.cluster.uniqueNode)] to know about [\(node)] member", file: file, line: line)
case .statusRequirementNotMet(_, let foundMember):
throw testKit.error(
"""
Expected \(reflecting: foundMember.uniqueNode) on \(reflecting: system.cluster.uniqueNode) \
to be seen as: [\(expectedStatus)], but was [\(foundMember.status)]
""",
file: file,
line: line
)
default:
throw testKit.error(error.description, file: file, line: line)
}
}
}

public func assertMemberStatus(
on system: ClusterSystem, node: UniqueNode,
atLeast expectedAtLeastStatus: Cluster.MemberStatus,
within: Duration,
file: StaticString = #file, line: UInt = #line
) async throws -> Cluster.MemberStatus? {
let testKit = self.testKit(system)
let membership = await system.cluster.membershipSnapshot
guard let foundMember = membership.uniqueMember(node) else {
if expectedAtLeastStatus == .down || expectedAtLeastStatus == .removed {
// so we're seeing an already removed member, this can indeed happen and is okey
return nil
} else {

do {
let foundMember = try await system.cluster.waitFor(node, atLeast: expectedAtLeastStatus, within: within)
return foundMember == nil ? nil : expectedAtLeastStatus
} catch let error as Cluster.MembershipError {
switch error {
case .notFound:
throw testKit.error("Expected [\(system.cluster.uniqueNode)] to know about [\(node)] member", file: file, line: line)
case .atLeastStatusRequirementNotMet(_, let foundMember):
throw testKit.error(
"""
Expected \(reflecting: foundMember.uniqueNode) on \(reflecting: system.cluster.uniqueNode) \
to be seen as at-least: [\(expectedAtLeastStatus)], but was [\(foundMember.status)]
""",
file: file,
line: line
)
default:
throw testKit.error(error.description, file: file, line: line)
}
}

if expectedAtLeastStatus <= foundMember.status {
throw testKit.error(
"""
Expected \(reflecting: foundMember.uniqueNode) on \(reflecting: system.cluster.uniqueNode) \
to be seen as at-least: [\(expectedAtLeastStatus)], but was [\(foundMember.status)]
""",
file: file,
line: line
)
}

return expectedAtLeastStatus
}

/// Assert based on the event stream of ``Cluster/Event`` that the given `node` was downed or removed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,19 +372,18 @@ final class ClusterAssociationTests: ClusteredActorSystemsXCTestCase {
let secondProbe = self.testKit(second).makeTestProbe(expecting: Cluster.Membership.self)

// we we down first on first, it should become down there:
try await self.testKit(first).eventually(within: .seconds(3)) {
try self.testKit(first).eventually(within: .seconds(3)) {
first.cluster.ref.tell(.query(.currentMembership(firstProbe.ref)))
let firstMembership = try firstProbe.expectMessage()

guard let selfMember = firstMembership.uniqueMember(first.cluster.uniqueNode) else {
throw self.testKit(second).error("No self member in membership! Wanted: \(first.cluster.uniqueNode)", line: #line - 1)
}

try await self.assertMemberStatus(on: first, node: first.cluster.uniqueNode, is: .down)
guard selfMember.status == .down else {
throw self.testKit(first).error("Wanted self member to be DOWN, but was: \(selfMember)", line: #line - 1)
}
}
try await self.assertMemberStatus(on: first, node: first.cluster.uniqueNode, is: .down, within: .seconds(3))

// and the second node should also notice
try self.testKit(second).eventually(within: .seconds(3)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,17 @@ final class ClusterLeaderActionsClusteredTests: ClusteredActorSystemsXCTestCase
try assertAssociated(second, withAtLeast: third.cluster.uniqueNode)
try assertAssociated(first, withAtLeast: third.cluster.uniqueNode)

try await self.testKit(first).eventually(within: .seconds(10)) {
try await self.assertMemberStatus(on: first, node: first.cluster.uniqueNode, is: .up)
try await self.assertMemberStatus(on: first, node: second.cluster.uniqueNode, is: .up)
try await self.assertMemberStatus(on: first, node: third.cluster.uniqueNode, is: .up)
}
try await self.assertMemberStatus(on: first, node: first.cluster.uniqueNode, is: .up, within: .seconds(10))
try await self.assertMemberStatus(on: first, node: second.cluster.uniqueNode, is: .up, within: .seconds(10))
try await self.assertMemberStatus(on: first, node: third.cluster.uniqueNode, is: .up, within: .seconds(10))

try await self.testKit(second).eventually(within: .seconds(10)) {
try await self.assertMemberStatus(on: second, node: first.cluster.uniqueNode, is: .up)
try await self.assertMemberStatus(on: second, node: second.cluster.uniqueNode, is: .up)
try await self.assertMemberStatus(on: second, node: third.cluster.uniqueNode, is: .up)
}
try await self.assertMemberStatus(on: second, node: first.cluster.uniqueNode, is: .up, within: .seconds(10))
try await self.assertMemberStatus(on: second, node: second.cluster.uniqueNode, is: .up, within: .seconds(10))
try await self.assertMemberStatus(on: second, node: third.cluster.uniqueNode, is: .up, within: .seconds(10))

try await self.testKit(third).eventually(within: .seconds(10)) {
try await self.assertMemberStatus(on: third, node: first.cluster.uniqueNode, is: .up)
try await self.assertMemberStatus(on: third, node: second.cluster.uniqueNode, is: .up)
try await self.assertMemberStatus(on: third, node: third.cluster.uniqueNode, is: .up)
}
try await self.assertMemberStatus(on: third, node: first.cluster.uniqueNode, is: .up, within: .seconds(10))
try await self.assertMemberStatus(on: third, node: second.cluster.uniqueNode, is: .up, within: .seconds(10))
try await self.assertMemberStatus(on: third, node: third.cluster.uniqueNode, is: .up, within: .seconds(10))
}

func test_joining_to_up_earlyYetStillLettingAllNodesKnowAboutLatestMembershipStatus() async throws {
Expand Down
Loading