diff --git a/MultiNodeTests/DistributedActorsMultiNodeTests/MultiNode+ReceptionistTests.swift b/MultiNodeTests/DistributedActorsMultiNodeTests/MultiNode+ReceptionistTests.swift new file mode 100644 index 000000000..349bc874b --- /dev/null +++ b/MultiNodeTests/DistributedActorsMultiNodeTests/MultiNode+ReceptionistTests.swift @@ -0,0 +1,85 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import DistributedActors +import MultiNodeTestKit + +public final class MultiNodeReceptionistTests: MultiNodeTestSuite { + public init() {} + + public enum Nodes: String, MultiNodeNodes { + case first + case second + case third + case fourth + } + + public static func configureMultiNodeTest(settings: inout MultiNodeTestSettings) { + settings.dumpNodeLogs = .always + + settings.logCapture.excludeGrep = [ + "SWIMActor.swift", "SWIMInstance.swift", + "Gossiper+Shell.swift", + ] + + settings.installPrettyLogger = true + } + + public static func configureActorSystem(settings: inout ClusterSystemSettings) { + settings.logging.logLevel = .debug + + settings.autoLeaderElection = .lowestReachable(minNumberOfMembers: 3) + } + + public let test_receptionist_checkIn = MultiNodeTest(MultiNodeReceptionistTests.self) { multiNode in + // *All* nodes spawn an echo actor + let localEcho = await DistributedEcho(greeting: "Hi from \(multiNode.system.name), ", actorSystem: multiNode.system) + + try await multiNode.checkPoint("Spawned actors") // ------------------------------------------------------------ + + let expectedCount = Nodes.allCases.count + var discovered: Set = [] + for try await actor in await multiNode.system.receptionist.listing(of: .init(DistributedEcho.self)) { + multiNode.log.notice("Discovered \(actor.id) from \(actor.id.uniqueNode)") + discovered.insert(actor) + + if discovered.count == expectedCount { + break + } + } + + try await multiNode.checkPoint("All members found \(expectedCount) actors") // --------------------------------- + } + + distributed actor DistributedEcho { + typealias ActorSystem = ClusterSystem + + @ActorID.Metadata(\.receptionID) + var receptionID: String + + private let greeting: String + + init(greeting: String, actorSystem: ActorSystem) async { + self.actorSystem = actorSystem + self.greeting = greeting + self.receptionID = "*" + + await actorSystem.receptionist.checkIn(self) + } + + distributed func echo(name: String) -> String { + "echo: \(self.greeting)\(name)! (from node: \(self.id.uniqueNode), id: \(self.id.detailedDescription))" + } + } +} diff --git a/Sources/DistributedActors/ActorMetadata.swift b/Sources/DistributedActors/ActorMetadata.swift index ed9f591a8..887ce8cb7 100644 --- a/Sources/DistributedActors/ActorMetadata.swift +++ b/Sources/DistributedActors/ActorMetadata.swift @@ -51,7 +51,7 @@ extension ActorMetadataKeys { } extension ActorID { - internal var isWellKnown: Bool { + public var isWellKnown: Bool { self.metadata.wellKnown != nil } } diff --git a/Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift b/Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift index ec133fe6b..3ed38df4f 100644 --- a/Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift +++ b/Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift @@ -295,6 +295,22 @@ extension OpLogDistributedReceptionist: LifecycleWatch { } } + public nonisolated func checkIn( + _ guest: Guest + ) async where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem { + guard let keyID: String = guest.id.metadata.receptionID else { + fatalError(""" + Attempted to \(#function) distributed actor without `@ActorID.Metadata(\\.receptionID)` set on ActorID! + Please set the metadata during actor initialization. + """) + } + let key = DistributedReception.Key(Guest.self, id: keyID) + + await self.whenLocal { myself in + await myself._checkIn(guest, with: key) + } + } + // 'local' implementation of checkIn private func _checkIn( _ guest: Guest, @@ -355,7 +371,16 @@ extension OpLogDistributedReceptionist: LifecycleWatch { ) async -> DistributedReception.GuestListing where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem { - return DistributedReception.GuestListing(receptionist: self, key: key, file: file, line: line) + DistributedReception.GuestListing(receptionist: self, key: key, file: file, line: line) + } + + public nonisolated func listing( + of _: Guest.Type, + file: String = #fileID, line: UInt = #line + ) async -> DistributedReception.GuestListing + where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem + { + DistributedReception.GuestListing(receptionist: self, key: Key(), file: file, line: line) } // 'local' impl for 'listing' @@ -374,9 +399,9 @@ extension OpLogDistributedReceptionist: LifecycleWatch { // as new ones come in, they will be reported to this subscription later on for alreadyRegisteredAtSubscriptionTime in self.storage.registrations(forKey: subscription.key) ?? [] { if subscription.tryOffer(registration: alreadyRegisteredAtSubscriptionTime) { - self.log.notice("OFFERED \(alreadyRegisteredAtSubscriptionTime.actorID) TO \(subscription)") + self.log.debug("Offered \(alreadyRegisteredAtSubscriptionTime.actorID) to subscription \(subscription)") } else { - self.log.notice("DROPPED \(alreadyRegisteredAtSubscriptionTime.actorID) TO \(subscription)") + self.log.warning("Dropped \(alreadyRegisteredAtSubscriptionTime.actorID) on subscription \(subscription)") } } } diff --git a/Sources/DistributedActors/Receptionist/DistributedReception.swift b/Sources/DistributedActors/Receptionist/DistributedReception.swift index 521caa96a..f3229d9ec 100644 --- a/Sources/DistributedActors/Receptionist/DistributedReception.swift +++ b/Sources/DistributedActors/Receptionist/DistributedReception.swift @@ -20,6 +20,19 @@ import Distributed /// Namespace for public messages related to the DistributedReceptionist. public enum DistributedReception {} +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: ActorID metadata for easy checking-in + +extension ActorMetadataKeys { + public var receptionID: Key { "$receptionID" } +} + +extension ActorID { + public var hasReceptionID: Bool { + self.metadata.receptionID != nil + } +} + // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: DistributedReception Key diff --git a/Sources/DistributedActors/Receptionist/DistributedReceptionist.swift b/Sources/DistributedActors/Receptionist/DistributedReceptionist.swift index 53abff33e..47a46174f 100644 --- a/Sources/DistributedActors/Receptionist/DistributedReceptionist.swift +++ b/Sources/DistributedActors/Receptionist/DistributedReceptionist.swift @@ -39,6 +39,11 @@ public protocol DistributedReceptionist: DistributedActor { // TODO(distributed): should gain a "retain (or not)" version, the receptionist can keep alive actors, but sometimes we don't want that, it depends ) async where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem + func checkIn( + _ guest: Guest + // TODO(distributed): should gain a "retain (or not)" version, the receptionist can keep alive actors, but sometimes we don't want that, it depends + ) async where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem + /// Returns a "listing" asynchronous sequence which will emit actor references, /// for every distributed actor that the receptionist discovers for the specific key. /// @@ -429,22 +434,18 @@ internal final class AnyDistributedReceptionListingSubscription: Hashable, @unch // the seen vector was unaffected by the merge, which means that the // incoming registration version was already seen, and thus we don't need to emit it again return false - case .happenedAfter: + case .happenedAfter, .concurrent: // the incoming registration has not yet been seen before, // which means that we should emit the actor to the stream. self.onNext(registration.actorID) return true - case .concurrent: - fatalError(""" - It should not be possible for a version vector to be concurrent with a PAST version of itself before the merge - Previously: \(oldSeenRegistrations) - Current: \(self.seenActorRegistrations) - """) case .happenedBefore: fatalError(""" It should not be possible for a *merged* version vector to be in the PAST as compared with itself before the merge Previously: \(oldSeenRegistrations) Current: \(self.seenActorRegistrations) + Registration: \(registration) + Self: \(self) """) } } diff --git a/Sources/DistributedActors/_OrderedSet+Extensions.swift b/Sources/DistributedActors/_OrderedSet+Extensions.swift index 7b4f7e04a..6aeb766c1 100644 --- a/Sources/DistributedActors/_OrderedSet+Extensions.swift +++ b/Sources/DistributedActors/_OrderedSet+Extensions.swift @@ -20,7 +20,7 @@ extension OrderedSet { internal func _filter( _ isIncluded: (Element) throws -> Bool ) rethrows -> Self { - var set = try self.filter(isIncluded) + let set = try self.filter(isIncluded) return OrderedSet(uncheckedUniqueElements: set) } } diff --git a/Sources/MultiNodeTestKitRunner/MultiNode+TestSuites.swift b/Sources/MultiNodeTestKitRunner/MultiNode+TestSuites.swift index 1a6b915be..e60a62259 100644 --- a/Sources/MultiNodeTestKitRunner/MultiNode+TestSuites.swift +++ b/Sources/MultiNodeTestKitRunner/MultiNode+TestSuites.swift @@ -18,4 +18,5 @@ import MultiNodeTestKit let MultiNodeTestSuites: [any MultiNodeTestSuite.Type] = [ MultiNodeConductorTests.self, MultiNodeClusterSingletonTests.self, + MultiNodeReceptionistTests.self, ] diff --git a/Sources/MultiNodeTestKitRunner/boot+MultiNodeTestKitRunner+Test.swift b/Sources/MultiNodeTestKitRunner/boot+MultiNodeTestKitRunner+Test.swift index 803ab95a3..b3ef813c2 100644 --- a/Sources/MultiNodeTestKitRunner/boot+MultiNodeTestKitRunner+Test.swift +++ b/Sources/MultiNodeTestKitRunner/boot+MultiNodeTestKitRunner+Test.swift @@ -176,7 +176,7 @@ extension MultiNodeTestKitRunnerBoot { try grepper.result.wait() } - let testResult = try interpretNodeTestOutput( + let testResult = try await interpretNodeTestOutput( result, nodeName: nodeName, multiNodeTest: multiNodeTest, diff --git a/Sources/MultiNodeTestKitRunner/boot+MultiNodeTestKitRunner.swift b/Sources/MultiNodeTestKitRunner/boot+MultiNodeTestKitRunner.swift index 34c70ff22..c6c49a599 100644 --- a/Sources/MultiNodeTestKitRunner/boot+MultiNodeTestKitRunner.swift +++ b/Sources/MultiNodeTestKitRunner/boot+MultiNodeTestKitRunner.swift @@ -122,6 +122,7 @@ struct MultiNodeTestKitRunnerBoot { .1 } + @MainActor // Main actor only because we want failures to be printed one after another, and not interleaved. func interpretNodeTestOutput( _ result: Result, nodeName: String, @@ -139,28 +140,36 @@ struct MultiNodeTestKitRunnerBoot { } let expectedFailure = expectedFailureRegex != nil + do { + var detectedReason: InterpretedRunResult? + if !expectedFailure { + switch result { + case .failure(let error as MultiNodeProgramError): + var reason: String = "MultiNode test failed, output was dumped." + for line in error.completeOutput { + log("[\(nodeName)](\(multiNodeTest.testName)) \(line)") - if !expectedFailure { - switch result { - case .failure(let error as MultiNodeProgramError): - var reason: String = "MultiNode test failed, output was dumped." - for line in error.completeOutput { - log("[\(nodeName)](\(multiNodeTest.testName)) \(line)") - - if line.contains("Fatal error: ") { - reason = line + if line.contains("Fatal error: "), detectedReason == nil { + detectedReason = .outputError(line) + } else if case .outputError(let reasonLines) = detectedReason { + // keep accumulating lines into the reason, after the "Fatal error:" line. + detectedReason = .outputError("\(reasonLines)\n\(line)") + } } - } - return .outputError(reason) - case .success(let logs): - if settings.dumpNodeLogs == .always { - for line in logs { - log("[\(nodeName)](\(multiNodeTest.testName)) \(line)") + case .success(let logs): + if settings.dumpNodeLogs == .always { + for line in logs { + log("[\(nodeName)](\(multiNodeTest.testName)) \(line)") + } } + return .passedAsExpected + default: + break } - return .passedAsExpected - default: - break + } + + if let detectedReason { + return detectedReason } }