Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import DistributedActors
import MultiNodeTestKit

public final class MultiNodeReceptionistTests: MultiNodeTestSuite {
public init() {}

public enum Nodes: String, MultiNodeNodes {
case first
case second
case third
case fourth
}

public static func configureMultiNodeTest(settings: inout MultiNodeTestSettings) {
settings.dumpNodeLogs = .always

settings.logCapture.excludeGrep = [
"SWIMActor.swift", "SWIMInstance.swift",
"Gossiper+Shell.swift",
]

settings.installPrettyLogger = true
}

public static func configureActorSystem(settings: inout ClusterSystemSettings) {
settings.logging.logLevel = .debug

settings.autoLeaderElection = .lowestReachable(minNumberOfMembers: 3)
}

public let test_receptionist_checkIn = MultiNodeTest(MultiNodeReceptionistTests.self) { multiNode in
// *All* nodes spawn an echo actor
let localEcho = await DistributedEcho(greeting: "Hi from \(multiNode.system.name), ", actorSystem: multiNode.system)

try await multiNode.checkPoint("Spawned actors") // ------------------------------------------------------------

let expectedCount = Nodes.allCases.count
var discovered: Set<DistributedEcho> = []
for try await actor in await multiNode.system.receptionist.listing(of: .init(DistributedEcho.self)) {
multiNode.log.notice("Discovered \(actor.id) from \(actor.id.uniqueNode)")
discovered.insert(actor)

if discovered.count == expectedCount {
break
}
}

try await multiNode.checkPoint("All members found \(expectedCount) actors") // ---------------------------------
}

distributed actor DistributedEcho {
typealias ActorSystem = ClusterSystem

@ActorID.Metadata(\.receptionID)
var receptionID: String

private let greeting: String

init(greeting: String, actorSystem: ActorSystem) async {
self.actorSystem = actorSystem
self.greeting = greeting
self.receptionID = "*"

await actorSystem.receptionist.checkIn(self)
}

distributed func echo(name: String) -> String {
"echo: \(self.greeting)\(name)! (from node: \(self.id.uniqueNode), id: \(self.id.detailedDescription))"
}
}
}
2 changes: 1 addition & 1 deletion Sources/DistributedActors/ActorMetadata.swift
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ extension ActorMetadataKeys {
}

extension ActorID {
internal var isWellKnown: Bool {
public var isWellKnown: Bool {
self.metadata.wellKnown != nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,22 @@ extension OpLogDistributedReceptionist: LifecycleWatch {
}
}

public nonisolated func checkIn<Guest>(
_ guest: Guest
) async where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem {
guard let keyID: String = guest.id.metadata.receptionID else {
fatalError("""
Attempted to \(#function) distributed actor without `@ActorID.Metadata(\\.receptionID)` set on ActorID!
Please set the metadata during actor initialization.
""")
}
let key = DistributedReception.Key(Guest.self, id: keyID)

await self.whenLocal { myself in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reads very nicely 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recently realized we don't always have to do the __ trickery by the way :)

await myself._checkIn(guest, with: key)
}
}

// 'local' implementation of checkIn
private func _checkIn<Guest>(
_ guest: Guest,
Expand Down Expand Up @@ -355,7 +371,16 @@ extension OpLogDistributedReceptionist: LifecycleWatch {
) async -> DistributedReception.GuestListing<Guest>
where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem
{
return DistributedReception.GuestListing<Guest>(receptionist: self, key: key, file: file, line: line)
DistributedReception.GuestListing<Guest>(receptionist: self, key: key, file: file, line: line)
}

public nonisolated func listing<Guest>(
of _: Guest.Type,
file: String = #fileID, line: UInt = #line
) async -> DistributedReception.GuestListing<Guest>
where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem
{
DistributedReception.GuestListing<Guest>(receptionist: self, key: Key<Guest>(), file: file, line: line)
}

// 'local' impl for 'listing'
Expand All @@ -374,9 +399,9 @@ extension OpLogDistributedReceptionist: LifecycleWatch {
// as new ones come in, they will be reported to this subscription later on
for alreadyRegisteredAtSubscriptionTime in self.storage.registrations(forKey: subscription.key) ?? [] {
if subscription.tryOffer(registration: alreadyRegisteredAtSubscriptionTime) {
self.log.notice("OFFERED \(alreadyRegisteredAtSubscriptionTime.actorID) TO \(subscription)")
self.log.debug("Offered \(alreadyRegisteredAtSubscriptionTime.actorID) to subscription \(subscription)")
} else {
self.log.notice("DROPPED \(alreadyRegisteredAtSubscriptionTime.actorID) TO \(subscription)")
self.log.warning("Dropped \(alreadyRegisteredAtSubscriptionTime.actorID) on subscription \(subscription)")
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions Sources/DistributedActors/Receptionist/DistributedReception.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,19 @@ import Distributed
/// Namespace for public messages related to the DistributedReceptionist.
public enum DistributedReception {}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: ActorID metadata for easy checking-in

extension ActorMetadataKeys {
public var receptionID: Key<String> { "$receptionID" }
}

extension ActorID {
public var hasReceptionID: Bool {
self.metadata.receptionID != nil
}
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: DistributedReception Key

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public protocol DistributedReceptionist: DistributedActor {
// TODO(distributed): should gain a "retain (or not)" version, the receptionist can keep alive actors, but sometimes we don't want that, it depends
) async where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem

func checkIn<Guest>(
_ guest: Guest
// TODO(distributed): should gain a "retain (or not)" version, the receptionist can keep alive actors, but sometimes we don't want that, it depends
) async where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem

/// Returns a "listing" asynchronous sequence which will emit actor references,
/// for every distributed actor that the receptionist discovers for the specific key.
///
Expand Down Expand Up @@ -429,22 +434,18 @@ internal final class AnyDistributedReceptionListingSubscription: Hashable, @unch
// the seen vector was unaffected by the merge, which means that the
// incoming registration version was already seen, and thus we don't need to emit it again
return false
case .happenedAfter:
case .happenedAfter, .concurrent:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, makes sense I think.

// the incoming registration has not yet been seen before,
// which means that we should emit the actor to the stream.
self.onNext(registration.actorID)
return true
case .concurrent:
fatalError("""
It should not be possible for a version vector to be concurrent with a PAST version of itself before the merge
Previously: \(oldSeenRegistrations)
Current: \(self.seenActorRegistrations)
""")
case .happenedBefore:
fatalError("""
It should not be possible for a *merged* version vector to be in the PAST as compared with itself before the merge
Previously: \(oldSeenRegistrations)
Current: \(self.seenActorRegistrations)
Registration: \(registration)
Self: \(self)
""")
}
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/DistributedActors/_OrderedSet+Extensions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ extension OrderedSet {
internal func _filter(
_ isIncluded: (Element) throws -> Bool
) rethrows -> Self {
var set = try self.filter(isIncluded)
let set = try self.filter(isIncluded)
return OrderedSet(uncheckedUniqueElements: set)
}
}
1 change: 1 addition & 0 deletions Sources/MultiNodeTestKitRunner/MultiNode+TestSuites.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ import MultiNodeTestKit
let MultiNodeTestSuites: [any MultiNodeTestSuite.Type] = [
MultiNodeConductorTests.self,
MultiNodeClusterSingletonTests.self,
MultiNodeReceptionistTests.self,
]
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ extension MultiNodeTestKitRunnerBoot {
try grepper.result.wait()
}

let testResult = try interpretNodeTestOutput(
let testResult = try await interpretNodeTestOutput(
result,
nodeName: nodeName,
multiNodeTest: multiNodeTest,
Expand Down
45 changes: 27 additions & 18 deletions Sources/MultiNodeTestKitRunner/boot+MultiNodeTestKitRunner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ struct MultiNodeTestKitRunnerBoot {
.1
}

@MainActor // Main actor only because we want failures to be printed one after another, and not interleaved.
func interpretNodeTestOutput(
_ result: Result<ProgramOutput, Error>,
nodeName: String,
Expand All @@ -139,28 +140,36 @@ struct MultiNodeTestKitRunnerBoot {
}

let expectedFailure = expectedFailureRegex != nil
do {
var detectedReason: InterpretedRunResult?
if !expectedFailure {
switch result {
case .failure(let error as MultiNodeProgramError):
var reason: String = "MultiNode test failed, output was dumped."
for line in error.completeOutput {
log("[\(nodeName)](\(multiNodeTest.testName)) \(line)")

if !expectedFailure {
switch result {
case .failure(let error as MultiNodeProgramError):
var reason: String = "MultiNode test failed, output was dumped."
for line in error.completeOutput {
log("[\(nodeName)](\(multiNodeTest.testName)) \(line)")

if line.contains("Fatal error: ") {
reason = line
if line.contains("Fatal error: "), detectedReason == nil {
detectedReason = .outputError(line)
} else if case .outputError(let reasonLines) = detectedReason {
// keep accumulating lines into the reason, after the "Fatal error:" line.
detectedReason = .outputError("\(reasonLines)\n\(line)")
}
}
}
return .outputError(reason)
case .success(let logs):
if settings.dumpNodeLogs == .always {
for line in logs {
log("[\(nodeName)](\(multiNodeTest.testName)) \(line)")
case .success(let logs):
if settings.dumpNodeLogs == .always {
for line in logs {
log("[\(nodeName)](\(multiNodeTest.testName)) \(line)")
}
}
return .passedAsExpected
default:
break
}
return .passedAsExpected
default:
break
}

if let detectedReason {
return detectedReason
}
}

Expand Down