-
Notifications
You must be signed in to change notification settings - Fork 78
=receptionist Fix receptionist crash on concurrent registrations; those are fine and expected #1059
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
f563bbb
084d4db
cc51dba
b557e35
f1b074a
a92cb05
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,85 @@ | ||
| //===----------------------------------------------------------------------===// | ||
| // | ||
| // This source file is part of the Swift Distributed Actors open source project | ||
| // | ||
| // Copyright (c) 2022 Apple Inc. and the Swift Distributed Actors project authors | ||
| // Licensed under Apache License v2.0 | ||
| // | ||
| // See LICENSE.txt for license information | ||
| // See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors | ||
| // | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| // | ||
| //===----------------------------------------------------------------------===// | ||
|
|
||
| import DistributedActors | ||
| import MultiNodeTestKit | ||
|
|
||
| public final class MultiNodeReceptionistTests: MultiNodeTestSuite { | ||
| public init() {} | ||
|
|
||
| public enum Nodes: String, MultiNodeNodes { | ||
| case first | ||
| case second | ||
| case third | ||
| case fourth | ||
| } | ||
|
|
||
| public static func configureMultiNodeTest(settings: inout MultiNodeTestSettings) { | ||
| settings.dumpNodeLogs = .always | ||
|
|
||
| settings.logCapture.excludeGrep = [ | ||
| "SWIMActor.swift", "SWIMInstance.swift", | ||
| "Gossiper+Shell.swift", | ||
| ] | ||
|
|
||
| settings.installPrettyLogger = true | ||
| } | ||
|
|
||
| public static func configureActorSystem(settings: inout ClusterSystemSettings) { | ||
| settings.logging.logLevel = .debug | ||
|
|
||
| settings.autoLeaderElection = .lowestReachable(minNumberOfMembers: 3) | ||
| } | ||
|
|
||
| public let test_receptionist_checkIn = MultiNodeTest(MultiNodeReceptionistTests.self) { multiNode in | ||
| // *All* nodes spawn an echo actor | ||
| let localEcho = await DistributedEcho(greeting: "Hi from \(multiNode.system.name), ", actorSystem: multiNode.system) | ||
|
|
||
| try await multiNode.checkPoint("Spawned actors") // ------------------------------------------------------------ | ||
|
|
||
| let expectedCount = Nodes.allCases.count | ||
| var discovered: Set<DistributedEcho> = [] | ||
| for try await actor in await multiNode.system.receptionist.listing(of: .init(DistributedEcho.self)) { | ||
| multiNode.log.notice("Discovered \(actor.id) from \(actor.id.uniqueNode)") | ||
| discovered.insert(actor) | ||
|
|
||
| if discovered.count == expectedCount { | ||
| break | ||
| } | ||
| } | ||
|
|
||
| try await multiNode.checkPoint("All members found \(expectedCount) actors") // --------------------------------- | ||
| } | ||
|
|
||
| distributed actor DistributedEcho { | ||
| typealias ActorSystem = ClusterSystem | ||
|
|
||
| @ActorID.Metadata(\.receptionID) | ||
| var receptionID: String | ||
|
|
||
| private let greeting: String | ||
|
|
||
| init(greeting: String, actorSystem: ActorSystem) async { | ||
| self.actorSystem = actorSystem | ||
| self.greeting = greeting | ||
| self.receptionID = "*" | ||
|
|
||
| await actorSystem.receptionist.checkIn(self) | ||
| } | ||
|
|
||
| distributed func echo(name: String) -> String { | ||
| "echo: \(self.greeting)\(name)! (from node: \(self.id.uniqueNode), id: \(self.id.detailedDescription))" | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -295,6 +295,22 @@ extension OpLogDistributedReceptionist: LifecycleWatch { | |
| } | ||
| } | ||
|
|
||
| public nonisolated func checkIn<Guest>( | ||
| _ guest: Guest | ||
| ) async where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem { | ||
| guard let keyID: String = guest.id.metadata.receptionID else { | ||
| fatalError(""" | ||
| Attempted to \(#function) distributed actor without `@ActorID.Metadata(\\.receptionKey)` set on ActorID! | ||
| Please set the metadata during actor initialization. | ||
| """) | ||
| } | ||
| let key = DistributedReception.Key(Guest.self, id: keyID) | ||
|
|
||
| await self.whenLocal { myself in | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This reads very nicely 👍
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I recently realized we don't always have to do the |
||
| await myself._checkIn(guest, with: key) | ||
| } | ||
| } | ||
|
|
||
| // 'local' implementation of checkIn | ||
| private func _checkIn<Guest>( | ||
| _ guest: Guest, | ||
|
|
@@ -355,7 +371,16 @@ extension OpLogDistributedReceptionist: LifecycleWatch { | |
| ) async -> DistributedReception.GuestListing<Guest> | ||
| where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem | ||
| { | ||
| return DistributedReception.GuestListing<Guest>(receptionist: self, key: key, file: file, line: line) | ||
| DistributedReception.GuestListing<Guest>(receptionist: self, key: key, file: file, line: line) | ||
| } | ||
|
|
||
| public nonisolated func listing<Guest>( | ||
| of _: Guest.Type, | ||
| file: String = #fileID, line: UInt = #line | ||
| ) async -> DistributedReception.GuestListing<Guest> | ||
| where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem | ||
| { | ||
| DistributedReception.GuestListing<Guest>(receptionist: self, key: Key<Guest>(), file: file, line: line) | ||
| } | ||
|
|
||
| // 'local' impl for 'listing' | ||
|
|
@@ -374,9 +399,9 @@ extension OpLogDistributedReceptionist: LifecycleWatch { | |
| // as new ones come in, they will be reported to this subscription later on | ||
| for alreadyRegisteredAtSubscriptionTime in self.storage.registrations(forKey: subscription.key) ?? [] { | ||
| if subscription.tryOffer(registration: alreadyRegisteredAtSubscriptionTime) { | ||
| self.log.notice("OFFERED \(alreadyRegisteredAtSubscriptionTime.actorID) TO \(subscription)") | ||
| self.log.debug("Offered \(alreadyRegisteredAtSubscriptionTime.actorID) to subscription \(subscription)") | ||
| } else { | ||
| self.log.notice("DROPPED \(alreadyRegisteredAtSubscriptionTime.actorID) TO \(subscription)") | ||
| self.log.warning("Dropped \(alreadyRegisteredAtSubscriptionTime.actorID) on subscription \(subscription)") | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,6 +39,11 @@ public protocol DistributedReceptionist: DistributedActor { | |
| // TODO(distributed): should gain a "retain (or not)" version, the receptionist can keep alive actors, but sometimes we don't want that, it depends | ||
| ) async where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem | ||
|
|
||
| func checkIn<Guest>( | ||
| _ guest: Guest | ||
| // TODO(distributed): should gain a "retain (or not)" version, the receptionist can keep alive actors, but sometimes we don't want that, it depends | ||
| ) async where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem | ||
|
|
||
| /// Returns a "listing" asynchronous sequence which will emit actor references, | ||
| /// for every distributed actor that the receptionist discovers for the specific key. | ||
| /// | ||
|
|
@@ -429,22 +434,18 @@ internal final class AnyDistributedReceptionListingSubscription: Hashable, @unch | |
| // the seen vector was unaffected by the merge, which means that the | ||
| // incoming registration version was already seen, and thus we don't need to emit it again | ||
| return false | ||
| case .happenedAfter: | ||
| case .happenedAfter, .concurrent: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, makes sense I think. |
||
| // the incoming registration has not yet been seen before, | ||
| // which means that we should emit the actor to the stream. | ||
| self.onNext(registration.actorID) | ||
| return true | ||
| case .concurrent: | ||
| fatalError(""" | ||
| It should not be possible for a version vector to be concurrent with a PAST version of itself before the merge | ||
| Previously: \(oldSeenRegistrations) | ||
| Current: \(self.seenActorRegistrations) | ||
| """) | ||
| case .happenedBefore: | ||
| fatalError(""" | ||
| It should not be possible for a *merged* version vector to be in the PAST as compared with itself before the merge | ||
| Previously: \(oldSeenRegistrations) | ||
| Current: \(self.seenActorRegistrations) | ||
| Registration: \(registration) | ||
| Self: \(self) | ||
| """) | ||
| } | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.