diff --git a/Protos/ActorID.proto b/Protos/ActorID.proto index 91a7665c2..4b554a9e1 100644 --- a/Protos/ActorID.proto +++ b/Protos/ActorID.proto @@ -18,10 +18,10 @@ option optimize_for = SPEED; option swift_prefix = "_Proto"; message ActorID { - UniqueNode node = 1; - ActorPath path = 2; - uint32 incarnation = 3; - // TODO: encode tags + UniqueNode node = 1; + ActorPath path = 2; + uint32 incarnation = 3; + map metadata = 4; } message ActorPath { @@ -29,7 +29,7 @@ message ActorPath { } message UniqueNode { - Node node = 1; + Node node = 1; uint64 nid = 2; } diff --git a/Sources/DistributedActors/ActorID.swift b/Sources/DistributedActors/ActorID.swift index 8e7fd2dde..3d65d4a0e 100644 --- a/Sources/DistributedActors/ActorID.swift +++ b/Sources/DistributedActors/ActorID.swift @@ -13,6 +13,7 @@ //===----------------------------------------------------------------------===// import Distributed +import Foundation // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: ActorID @@ -20,12 +21,6 @@ import Distributed /// Convenience alias for ``ClusterSystem/ActorID``. public typealias ActorID = ClusterSystem.ActorID -extension DistributedActor where ActorSystem == ClusterSystem { - public nonisolated var metadata: ActorMetadata { - self.id.metadata - } -} - extension ClusterSystem.ActorID { @propertyWrapper public struct Metadata { @@ -33,7 +28,7 @@ extension ClusterSystem.ActorID { let id: String public init(_ keyPath: KeyPath>) { - let key = ActorMetadataKeys()[keyPath: keyPath] + let key = ActorMetadataKeys.__instance[keyPath: keyPath] self.id = key.id self.keyType = type(of: key) } @@ -64,9 +59,13 @@ extension ClusterSystem.ActorID { let metadata = myself.id.metadata let key = myself[keyPath: storageKeyPath] if let value = metadata[key.id] { - fatalError("Attempted to override ActorID Metadata for key \(key.id):\(key.keyType) which already had value: \(value); with new value: \(String(describing: newValue))") + fatalError("Attempted to override ActorID Metadata for key \(key.id):\(key.keyType) which already had value: [\(value)] with new value: [\(String(describing: newValue))]") } metadata[key.id] = newValue + + if key.id == ActorMetadataKeys.__instance.wellKnown.id { + myself.actorSystem._wellKnownActorReady(myself) + } } } } @@ -143,6 +142,10 @@ extension ClusterSystem { } } + #if DEBUG + private var debugID = UUID() + #endif + /// Collection of tags associated with this actor identity. /// /// Tags MAY be transferred to other peers as the identity is replicated, however they are not necessary to uniquely identify the actor. @@ -157,8 +160,7 @@ extension ClusterSystem { internal var context: DistributedActorContext /// Underlying path representation, not attached to a specific Actor instance. - // FIXME(distributed): make optional - public var path: ActorPath { + public var path: ActorPath { // FIXME(distributed): make optional get { guard let path = metadata.path else { fatalError("FIXME: ActorTags.path was not set on \(self.incarnation)! NOTE THAT PATHS ARE TO BECOME OPTIONAL!!!") // FIXME(distributed): must be removed @@ -166,14 +168,13 @@ extension ClusterSystem { return path } set { - self.metadata[ActorMetadataKeys().path.id] = newValue + self.metadata[ActorMetadataKeys.__instance.path.id] = newValue } } /// Returns the name of the actor represented by this path. /// This is equal to the last path segments string representation. - // FIXME(distributed): make optional - public var name: String { + public var name: String { // FIXME(distributed): make optional self.path.name } @@ -186,7 +187,7 @@ extension ClusterSystem { self._location = .local(node) self.incarnation = incarnation if let path { - self.context.metadata[ActorMetadataKeys().path.id] = path + self.context.metadata.path = path } traceLog_DeathWatch("Made ID: \(self)") } @@ -197,7 +198,7 @@ extension ClusterSystem { self._location = .remote(node) self.incarnation = incarnation if let path { - self.context.metadata[ActorMetadataKeys().path.id] = path + self.context.metadata.path = path } traceLog_DeathWatch("Made ID: \(self)") } @@ -263,7 +264,14 @@ extension ClusterSystem { } extension DistributedActor where ActorSystem == ClusterSystem { - /// INTERNAL: Provides the actor context for use within this actor. + public nonisolated var metadata: ActorMetadata { + self.id.metadata + } +} + +extension DistributedActor where ActorSystem == ClusterSystem { + /// Provides the actor context for use within this actor. + /// The context must not be mutated concurrently with the owning actor, however the things it stores may provide additional synchronization to make this safe. internal var context: DistributedActorContext { self.id.context } @@ -271,14 +279,41 @@ extension DistributedActor where ActorSystem == ClusterSystem { extension ActorID: Hashable { public static func == (lhs: ActorID, rhs: ActorID) -> Bool { - lhs.incarnation == rhs.incarnation && // quickest to check if the incarnations are the same - // if they happen to be equal, we don't know yet for sure if it's the same actor or not, as incarnation is just a random ID - // thus we need to compare the node and path as well - lhs.uniqueNode == rhs.uniqueNode && lhs.path == rhs.path + // Check the metadata based well-known identity names. + // + // The legacy "well known path" is checked using the normal path below, + // since it is implemented as incarnation == 0, and an unique path. + if let lhsWellKnownName = lhs.metadata.wellKnown { + if let rhsWellKnownName = rhs.metadata.wellKnown { + // If we're comparing "well known" actors, we ignore the concrete incarnation, + // and compare the well known name instead. This works for example for "$receptionist" + // and other well known names, that can be resolved using them, without an incarnation number. + if lhsWellKnownName == rhsWellKnownName, lhs.uniqueNode == rhs.uniqueNode { + return true + } + } else { + // 'lhs' WAS well known, but 'rhs' was not + return false + } + } else if rhs.metadata.wellKnown != nil { + // 'lhs' was NOT well known, but 'rhs' was: + return false + } + + // quickest to check if the incarnations are the same + // if they happen to be equal, we don't know yet for sure if it's the same actor or not, + // as incarnation is just a random ID thus we need to compare the node and path as well + return lhs.incarnation == rhs.incarnation && + lhs.uniqueNode == rhs.uniqueNode && + lhs.path == rhs.path } public func hash(into hasher: inout Hasher) { - hasher.combine(self.incarnation) + if let wellKnownName = self.metadata.wellKnown { + hasher.combine(wellKnownName) + } else { + hasher.combine(self.incarnation) + } hasher.combine(self.uniqueNode) hasher.combine(self.path) } @@ -290,7 +325,22 @@ extension ActorID: CustomStringConvertible { if self._isRemote { res += "\(self.uniqueNode)" } - res += "\(self.path)" + if let path = self.metadata.path { + // this is ready for making paths optional already -- and behavior removals + res += "\(path)" + } else { + res += "\(self.incarnation)" + } + + if !self.metadata.isEmpty { + // TODO: we special case the "just a path" metadata to not break existing ActorRef tests + if self.metadata.count == 1, self.metadata.path != nil { + return res + } + + res += self.metadata.description + } + return res } @@ -301,23 +351,33 @@ extension ActorID: CustomStringConvertible { } res += "\(self.path)" - if self.incarnation == ActorIncarnation.wellKnown { - return res - } else { - return "\(res)#\(self.incarnation.value)" + if self.incarnation != ActorIncarnation.wellKnown { + res += "#\(self.incarnation.value)" } + + if !self.metadata.isEmpty { + res += self.metadata.description + } + + return res } + /// Prints all information contained in the ID, including `incarnation` and all `metadata`. public var fullDescription: String { var res = "" res += "\(reflecting: self.uniqueNode)" res += "\(self.path)" + res += "#\(self.incarnation.value)" - if self.incarnation == ActorIncarnation.wellKnown { - return res - } else { - return "\(res)#\(self.incarnation.value)" + if !self.metadata.isEmpty { + res += self.metadata.description } + + #if DEBUG + res += "{debugID:\(debugID)}" + #endif + + return res } } @@ -517,11 +577,11 @@ extension ActorPath { public static let _system: ActorPath = try! ActorPath(root: "system") internal func makeLocalID(on node: UniqueNode, incarnation: ActorIncarnation) -> ActorID { - .init(local: node, path: self, incarnation: incarnation) + ActorID(local: node, path: self, incarnation: incarnation) } internal func makeRemoteID(on node: UniqueNode, incarnation: ActorIncarnation) -> ActorID { - .init(remote: node, path: self, incarnation: incarnation) + ActorID(remote: node, path: self, incarnation: incarnation) } } @@ -704,7 +764,8 @@ public struct ActorIncarnation: Equatable, Hashable, ExpressibleByIntegerLiteral extension ActorIncarnation { /// To be used ONLY by special actors whose existence is wellKnown and identity never-changing. /// Examples: `/system/deadLetters` or `/system/cluster`. - public static let wellKnown: ActorIncarnation = .init(0) + @available(*, deprecated, message: "Useful only with behavior actors, will be removed entirely") + internal static let wellKnown: ActorIncarnation = .init(0) public static func random() -> ActorIncarnation { ActorIncarnation(UInt32.random(in: UInt32(1) ... UInt32.max)) @@ -903,70 +964,6 @@ extension UniqueNodeID { } } -// ==== ---------------------------------------------------------------------------------------------------------------- -// MARK: Codable ActorAddress - -extension ActorID: Codable { - public func encode(to encoder: Encoder) throws { - let metadataSettings = encoder.actorSerializationContext?.system.settings.actorMetadata - let encodeCustomMetadata = - metadataSettings?.encodeCustomMetadata ?? ({ _, _ in () }) - - var container = encoder.container(keyedBy: ActorCoding.CodingKeys.self) - try container.encode(self.uniqueNode, forKey: ActorCoding.CodingKeys.node) - try container.encode(self.path, forKey: ActorCoding.CodingKeys.path) // TODO: remove as we remove the tree - try container.encode(self.incarnation, forKey: ActorCoding.CodingKeys.incarnation) - - if !self.metadata.isEmpty { - var metadataContainer = container.nestedContainer(keyedBy: ActorCoding.MetadataKeys.self, forKey: ActorCoding.CodingKeys.metadata) - - let keys = ActorMetadataKeys() - if (metadataSettings == nil || metadataSettings!.propagateMetadata.contains(keys.path.id)), - let value = self.metadata.path - { - try metadataContainer.encode(value, forKey: ActorCoding.MetadataKeys.path) - } - if (metadataSettings == nil || metadataSettings!.propagateMetadata.contains(keys.type.id)), - let value = self.metadata.type - { - try metadataContainer.encode(value, forKey: ActorCoding.MetadataKeys.type) - } - - try encodeCustomMetadata(self.metadata, &metadataContainer) - } - } - - public init(from decoder: Decoder) throws { - let container = try decoder.container(keyedBy: ActorCoding.CodingKeys.self) - let node = try container.decode(UniqueNode.self, forKey: ActorCoding.CodingKeys.node) - let path = try container.decodeIfPresent(ActorPath.self, forKey: ActorCoding.CodingKeys.path) - let incarnation = try container.decode(UInt32.self, forKey: ActorCoding.CodingKeys.incarnation) - - self.init(remote: node, path: path, incarnation: ActorIncarnation(incarnation)) - - // Decode any tags: - if let metadataContainer = try? container.nestedContainer(keyedBy: ActorCoding.MetadataKeys.self, forKey: ActorCoding.CodingKeys.metadata) { - // tags container found, try to decode all known tags: - - // FIXME: implement decoding tags/metadata in general - - if let context = decoder.actorSerializationContext { - let decodeCustomMetadata = context.system.settings.actorMetadata.decodeCustomMetadata - try decodeCustomMetadata(metadataContainer, self.metadata) - -// for (key, value) in try decodeCustomMetadata(metadataContainer) { -// func store(_: K.Type) { -// if let value = tag.value as? K.Value { -// self.metadata[K.self] = value -// } -// } -// _openExistential(key, do: store) // the `as` here is required, because: inferred result type 'any ActorTagKey.Type' requires explicit coercion due to loss of generic requirements -// } - } - } - } -} - // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: Path errors diff --git a/Sources/DistributedActors/ActorIDMetadataSettings.swift b/Sources/DistributedActors/ActorIDMetadataSettings.swift index f2b14c13e..093fac5a6 100644 --- a/Sources/DistributedActors/ActorIDMetadataSettings.swift +++ b/Sources/DistributedActors/ActorIDMetadataSettings.swift @@ -45,8 +45,9 @@ internal struct ActorIDMetadataSettings { /// What type of tags, known and defined by the cluster system itself, should be automatically propagated. /// Other types of tags, such as user-defined tags, must be propagated by declaring apropriate functions for ``encodeCustomMetadata`` and ``decodeCustomMetadata``. internal var propagateMetadata: Set = [ - ActorMetadataKeys().path.id, - ActorMetadataKeys().type.id, + ActorMetadataKeys.__instance.path.id, + ActorMetadataKeys.__instance.type.id, + ActorMetadataKeys.__instance.wellKnown.id, ] internal var encodeCustomMetadata: (ActorMetadata, inout KeyedEncodingContainer) throws -> Void = diff --git a/Sources/DistributedActors/ActorMetadata.swift b/Sources/DistributedActors/ActorMetadata.swift index cbf3cb5f1..fa25fddb2 100644 --- a/Sources/DistributedActors/ActorMetadata.swift +++ b/Sources/DistributedActors/ActorMetadata.swift @@ -18,19 +18,60 @@ import Distributed // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: ActorMetadata +/// Namespace for ``ActorID`` metadata. public struct ActorMetadataKeys { public typealias Key = ActorMetadataKey + + private init() {} + + /// Necessary for key-path based property wrapper APIs. + internal static var __instance: Self { .init() } } +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Metadata Keys: Well Known + extension ActorMetadataKeys { - var path: Key { "$path" } + internal var path: Key { "$path" } + + /// Actor metadata which impacts how actors with this ID are resolved. + /// + /// Rather than resolving them by their concrete incarnation (unique id), identifiers with + /// the ``wellKnown`` metadata are resolved by their "well known name". + /// + /// In practice this means that it is possible to resolve a concrete well-known instance on a remote host, + /// without ever exchanging information betwen those peers and obtaining the targets exact ID. + /// + /// This is necessary for certain actors like the failure detectors, the cluster receptionist, or other actors + /// which must be interacted with right away, without prior knowlage. + /// + /// **WARNING:** Do not use this mechanism for "normal" actors, as it makes their addressess "guessable", + /// which is bad from a security and system independence stand point. Please use the cluster receptionist instead. + public var wellKnown: Key { "$wellKnown" } +} - var type: Key { "$type" } - struct ActorTypeTagValue: Codable { // FIXME: improve representation to be more efficient +extension ActorID { + internal var isWellKnown: Bool { + self.metadata.wellKnown != nil + } +} + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Metadata Keys: Type + +extension ActorMetadataKeys { + /// The type of the distributed actor identified by this ``ActorID``. + /// Used only for human radability and debugging purposes, does not participate in equality checks of an actor ID. + internal var type: Key { "$type" } // TODO: remove Tag from name + internal struct ActorTypeTagValue: Codable, CustomStringConvertible { // FIXME: improve representation to be more efficient let mangledName: String var simpleName: String { _typeByName(self.mangledName).map { "\($0)" } ?? self.mangledName } + + var description: String { + self.simpleName + } } } @@ -64,7 +105,8 @@ public final class ActorMetadata: CustomStringConvertible, CustomDebugStringConv get { self.lock.wait() defer { lock.signal() } - let key = ActorMetadataKeys()[keyPath: dynamicMember] + + let key = ActorMetadataKeys.__instance[keyPath: dynamicMember] let id = key.id guard let v = self._storage[id] else { return nil @@ -74,10 +116,11 @@ public final class ActorMetadata: CustomStringConvertible, CustomDebugStringConv set { self.lock.wait() defer { lock.signal() } - let key = ActorMetadataKeys()[keyPath: dynamicMember] + + let key = ActorMetadataKeys.__instance[keyPath: dynamicMember] let id = key.id if let existing = self._storage[id] { - fatalError("Existing ActorID [\(id)] metadata, cannot be replaced. Was: [\(existing)], newValue: [\(optional: newValue))]") + fatalError("Existing ActorID metadata, cannot be replaced. Was: [\(existing)], newValue: [\(optional: newValue))]") } self._storage[id] = newValue } @@ -136,30 +179,3 @@ public struct ActorMetadataKey: Hashable, Expressible self.id = value } } - -///// Used to tag actor identities with additional information. -// public protocol ActorMetadataProtocol: Sendable where Value == Key.Value { -// /// Type of the actor tag key, used to obtain an actor tag instance. -// associatedtype Key: ActorTagKey -// -// /// Type of the value stored by this tag. -// associatedtype Value -// -// var value: Value { get } -// } -// -// @available(*, deprecated, message: "remove this") -// public protocol ActorTagKey: Sendable { -// associatedtype Value: Sendable & Codable -// static var id: String { get } -// } -// -//// ==== ---------------------------------------------------------------------------------------------------------------- -// -// extension ActorMetadataProtocol { -// /// String representation of the unique key tag identity, equal to `Key.id`. -// /// -// /// Tag keys should be unique, and must not start with $ unless they are declared by the ClusterSystem itself. -// public var id: String { Key.id } -// public var keyType: Key.Type { Key.self } -// } diff --git a/Sources/DistributedActors/ActorRef+Ask.swift b/Sources/DistributedActors/ActorRef+Ask.swift index 00c65d884..c0f5c1825 100644 --- a/Sources/DistributedActors/ActorRef+Ask.swift +++ b/Sources/DistributedActors/ActorRef+Ask.swift @@ -13,6 +13,7 @@ //===----------------------------------------------------------------------===// import Distributed +import Foundation import class NIO.EventLoopFuture import struct NIO.EventLoopPromise import struct NIO.Scheduled @@ -183,7 +184,7 @@ extension AskResponse: _AsyncResult { let eventLoop = nioFuture.eventLoop let promise: EventLoopPromise = eventLoop.makePromise() let timeoutTask = eventLoop.scheduleTask(in: timeout.toNIO) { - promise.fail(RemoteCallError.timedOut(TimeoutError(message: "\(type(of: self)) timed out after \(timeout.prettyDescription)", timeout: timeout))) + promise.fail(RemoteCallError.timedOut(UUID(), TimeoutError(message: "\(type(of: self)) timed out after \(timeout.prettyDescription)", timeout: timeout))) } nioFuture.whenFailure { timeoutTask.cancel() @@ -263,7 +264,7 @@ internal enum AskActor { Ask was initiated from function [\(function)] in [\(file):\(line)] and \ expected response of type [\(String(reflecting: ResponseType.self))]. """ - completable.fail(RemoteCallError.timedOut(TimeoutError(message: errorMessage, timeout: timeout))) + completable.fail(RemoteCallError.timedOut(UUID(), TimeoutError(message: errorMessage, timeout: timeout))) // FIXME: Hack to stop from subReceive. Should we allow this somehow? // Maybe add a SubReceiveContext or similar? diff --git a/Sources/DistributedActors/Cluster/ClusterShell.swift b/Sources/DistributedActors/Cluster/ClusterShell.swift index a71a362c0..d9912647d 100644 --- a/Sources/DistributedActors/Cluster/ClusterShell.swift +++ b/Sources/DistributedActors/Cluster/ClusterShell.swift @@ -1324,11 +1324,15 @@ extension ClusterShell { extension ActorID { static func _clusterShell(on node: UniqueNode) -> ActorID { - ActorPath._clusterShell.makeRemoteID(on: node, incarnation: .wellKnown) + let id = ActorPath._clusterShell.makeRemoteID(on: node, incarnation: .wellKnown) + // id.metadata.wellKnown = "$cluster" + return id } static func _clusterGossip(on node: UniqueNode) -> ActorID { - ActorPath._clusterGossip.makeRemoteID(on: node, incarnation: .wellKnown) + let id = ActorPath._clusterGossip.makeRemoteID(on: node, incarnation: .wellKnown) + // id.metadata.wellKnown = "$gossip" + return id } } diff --git a/Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift b/Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift index fa5e6dbfb..164e91d05 100644 --- a/Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift +++ b/Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift @@ -151,6 +151,9 @@ public distributed actor OpLogDistributedReceptionist: DistributedReceptionist, public typealias ActorSystem = ClusterSystem + @ActorID.Metadata(\.wellKnown) + var wellKnownName: String + // TODO: remove this typealias ReceptionistRef = OpLogDistributedReceptionist typealias Key = DistributedReception.Key where Guest.ActorSystem == ClusterSystem @@ -230,9 +233,9 @@ public distributed actor OpLogDistributedReceptionist: DistributedReceptionist, static var props: _Props { var ps = _Props() - ps._knownActorName = ActorPath.distributedActorReceptionist.name ps._systemActor = true ps._wellKnown = true + ps._knownActorName = ActorPath.distributedActorReceptionist.name return ps } @@ -250,6 +253,9 @@ public distributed actor OpLogDistributedReceptionist: DistributedReceptionist, self.appliedSequenceNrs = .empty // === listen to cluster events ------------------ + self.wellKnownName = ActorPath.distributedActorReceptionist.name + assert(self.id.path.description == "/system/receptionist") // TODO(distributed): remove when we remove paths entirely + self.eventsListeningTask = Task.detached { try await self.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): this is annoying, we must track "known to be local" in typesystem instead for try await event in system.cluster.events { @@ -339,28 +345,24 @@ extension OpLogDistributedReceptionist: LifecycleWatch { } public nonisolated func listing( - of key: DistributedReception.Key + of key: DistributedReception.Key, + file: String = #fileID, line: UInt = #line ) async -> DistributedReception.GuestListing where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem { - let res = await self.whenLocal { _ in - DistributedReception.GuestListing(receptionist: self, key: key) - } - - guard let r = res else { - return .init(receptionist: self, key: key) - } - - return r + return DistributedReception.GuestListing(receptionist: self, key: key, file: file, line: line) } + // 'local' impl for 'listing' func _listing( - subscription: AnyDistributedReceptionListingSubscription + subscription: AnyDistributedReceptionListingSubscription, + file: String = #fileID, line: UInt = #line ) { if self.storage.addSubscription(key: subscription.key, subscription: subscription) { // self.instrumentation.actorSubscribed(key: anyKey, id: self.id._unwrapActorID) // FIXME: remove the address parameter, it does not make sense anymore - self.log.trace("Subscribed async sequence to \(subscription.key) actors", metadata: [ + self.log.trace("Subscribed async sequence to \(subscription.key)", metadata: [ "subscription/key": "\(subscription.key)", + "subscription/callSite": "\(file):\(line)", ]) } } @@ -625,7 +627,7 @@ extension OpLogDistributedReceptionist { // peerReceptionistRef.tell(ack) Task { do { - assert(self.id.path.description.contains("/system/receptionist")) +// assert(self.id.path.description.contains("/system/receptionist"), "Receptionist path did not include /system/receptionist, was: \(self.id.fullDescription)") try await peerReceptionistRef.ackOps(until: latestAppliedSeqNrFromPeer, by: self) } catch { switch error { @@ -700,7 +702,18 @@ extension OpLogDistributedReceptionist { /// Receive an Ack and potentially continue streaming ops to peer if still pending operations available. distributed func ackOps(until: UInt64, by peer: ReceptionistRef) { - guard var replayer = self.peerReceptionistReplayers[peer] else { + var replayer = self.peerReceptionistReplayers[peer] + + if replayer == nil, until == 0 { + self.log.debug("Received message from \(peer), but no replayer available, create one ad-hoc now", metadata: [ + "peer": "\(peer.id.uniqueNode)", + ]) + // TODO: Generally we should trigger a `onNewClusterMember` but seems we got a message before that triggered + // Seems ordering became less strict here with DA unfortunately...? + replayer = self.ops.replay(from: .beginning) + } + + guard var replayer = replayer else { self.log.trace("Received a confirmation until \(until) from \(peer) but no replayer available for it, ignoring", metadata: [ "receptionist/peer/confirmed": "\(until)", "receptionist/peer": "\(peer.id)", @@ -883,8 +896,8 @@ extension OpLogDistributedReceptionist { func pruneClusterMember(removedNode: UniqueNode) { self.log.trace("Pruning cluster member: \(removedNode)") - let terminatedReceptionistAddress = ActorID._receptionist(on: removedNode, for: .distributedActors) - let equalityHackPeer = try! Self.resolve(id: terminatedReceptionistAddress, using: actorSystem) // try!-safe because we know the address is correct and remote + let terminatedReceptionistID = ActorID._receptionist(on: removedNode, for: .distributedActors) + let equalityHackPeer = try! Self.resolve(id: terminatedReceptionistID, using: actorSystem) // try!-safe because we know the address is correct and remote guard self.peerReceptionistReplayers.removeValue(forKey: equalityHackPeer) != nil else { // we already removed it, so no need to keep scanning for it. @@ -896,8 +909,8 @@ extension OpLogDistributedReceptionist { // clear observations; we only get them directly from the origin node, so since it has been downed // we will never receive more observations from it. - _ = self.observedSequenceNrs.pruneReplica(.actorID(terminatedReceptionistAddress)) - _ = self.appliedSequenceNrs.pruneReplica(.actorID(terminatedReceptionistAddress)) + _ = self.observedSequenceNrs.pruneReplica(.actorID(terminatedReceptionistID)) + _ = self.appliedSequenceNrs.pruneReplica(.actorID(terminatedReceptionistID)) // clear state any registrations still lingering about the now-known-to-be-down node let pruned = self.storage.pruneNode(removedNode) diff --git a/Sources/DistributedActors/Cluster/Transport/TransportPipelines.swift b/Sources/DistributedActors/Cluster/Transport/TransportPipelines.swift index df884b199..8f46620df 100644 --- a/Sources/DistributedActors/Cluster/Transport/TransportPipelines.swift +++ b/Sources/DistributedActors/Cluster/Transport/TransportPipelines.swift @@ -281,7 +281,7 @@ private final class WireEnvelopeHandler: ChannelDuplexHandler { let envelope = try self.serialization.deserialize(as: Wire.Envelope.self, from: .nioByteBuffer(buffer), using: knownSpecializedWireEnvelopeManifest) context.fireChannelRead(self.wrapInboundOut(envelope)) } catch { - self.log.error("Failed to deserialize: \(data)", metadata: [ + self.log.error("Failed to deserialize message", metadata: [ "buffer": "\(buffer)", "error": "\(error)", ]) diff --git a/Sources/DistributedActors/ClusterSystem.swift b/Sources/DistributedActors/ClusterSystem.swift index f2f3709bc..2e3fdb222 100644 --- a/Sources/DistributedActors/ClusterSystem.swift +++ b/Sources/DistributedActors/ClusterSystem.swift @@ -16,7 +16,7 @@ import Atomics import Backtrace import CDistributedActorsMailbox import Dispatch -import Distributed +@_exported import Distributed import DistributedActorsConcurrencyHelpers import Foundation // for UUID import Logging @@ -64,6 +64,9 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable { private var _managedDistributedActors: WeakActorDictionary = .init() private var _reservedNames: Set = [] + typealias WellKnownName = String + private var _managedWellKnownDistributedActors: [WellKnownName: any DistributedActor] = [:] + // TODO: converge into one tree // Note: This differs from Akka, we do full separate trees here private var systemProvider: _ActorRefProvider! @@ -858,39 +861,76 @@ extension ClusterSystem { public func resolve(id: ActorID, as actorType: Act.Type) throws -> Act? where Act: DistributedActor { - self.log.trace("Resolve: \(id)") + if self.settings.logging.verboseResolve { + self.log.trace("Resolve: \(id)") + } + // If it has an interceptor installed, we must pretend to resolve it as "remote", + // though the actual messages will be delivered to the interceptor, + // and not necessarily a remote destination. if let interceptor = id.context.remoteCallInterceptor { - self.log.trace("Resolved \(id) as intercepted", metadata: ["interceptor": "\(interceptor)"]) + if self.settings.logging.verboseResolve { + self.log.trace("Resolved \(id) as intercepted", metadata: ["interceptor": "\(interceptor)"]) + } return nil } + // If the actor is not located on this node, immediately resolve as "remote" guard self.cluster.uniqueNode == id.uniqueNode else { - self.log.trace("Resolved \(id) as remote, on node: \(id.uniqueNode)") + if self.settings.logging.verboseResolve { + self.log.trace("Resolved \(id) as remote, on node: \(id.uniqueNode)") + } return nil } - return try self.namingLock.withLock { - guard let managed = self._managedDistributedActors.get(identifiedBy: id) else { - log.trace("Resolved as remote reference", metadata: [ - "actor/id": "\(id)", - ]) - throw DeadLetterError(recipient: id) + // Is it a well-known actor? If so, we need to special handle the resolution. + if let wellKnownName = id.metadata.wellKnown { + let wellKnownActor = self.namingLock.withLock { + return self._managedWellKnownDistributedActors[wellKnownName] } - if let resolved = managed as? Act { - log.info("Resolved as local instance", metadata: [ - "actor/id": "\(id)", - "actor": "\(resolved)", - ]) - return resolved - } else { - log.trace("Resolved as remote reference", metadata: [ - "actor/id": "\(id)", + if let wellKnownActor { + guard let wellKnownActor = wellKnownActor as? Act else { + self.log.trace("Resolved as local well-known instance, however did not match expected type: \(Act.self), well-known: '\(wellKnownName)", metadata: [ + "actor/id": "\(wellKnownActor.id)", + "actor/type": "\(type(of: wellKnownActor))", + "expected/type": "\(Act.self)", + ]) + + throw DeadLetterError(recipient: id) + } + + // Oh look, it's that well known actor, that goes by the name "wellKnownName"! + self.log.trace("Resolved as local well-known instance: '\(wellKnownName)", metadata: [ + "actor/id": "\(wellKnownActor.id)", ]) - return nil + return wellKnownActor } } + + // Resolve using the usual id lookup method + let managed = self.namingLock.withLock { + return self._managedDistributedActors.get(identifiedBy: id) + } + + guard let managed = managed else { + throw DeadLetterError(recipient: id) + } + + guard let resolved = managed as? Act else { + self.log.trace("Resolved actor identity, however did not match expected type: \(Act.self)", metadata: [ + "actor/id": "\(id)", + "actor/type": "\(type(of: managed))", + "expected/type": "\(Act.self)", + ]) + return nil + } + + self.log.trace("Resolved as local instance", metadata: [ + "actor/id": "\(id)", + "actor": "\(resolved)", + ]) + return resolved } public func assignID(_ actorType: Act.Type) -> ClusterSystem.ActorID @@ -932,16 +972,20 @@ extension ClusterSystem { ) } - self.log.warning("Assign identity", metadata: [ + if let wellKnownName = props._wellKnownName { + id.metadata.wellKnown = wellKnownName + } + + self.log.trace("Assign identity", metadata: [ "actor/type": "\(actorType)", "actor/id": "\(id)", - "actor/id/uniqueNode": "\(id.uniqueNode)", ]) - return self.namingLock.withLock { - self._reservedNames.insert(id) - return id - } +// return self.namingLock.withLock { +// self._reservedNames.insert(id) +// return id +// } + return id } public func actorReady(_ actor: Act) where Act: DistributedActor, Act.ID == ActorID { @@ -952,12 +996,41 @@ extension ClusterSystem { self.namingLock.lock() defer { self.namingLock.unlock() } - precondition(self._reservedNames.remove(actor.id) != nil, "Attempted to ready an identity that was not reserved: \(actor.id)") + if !actor.id.isWellKnown { + precondition(self._reservedNames.remove(actor.id) != nil, "Attempted to ready an identity that was not reserved: \(actor.id)") + } + // Spawn a behavior actor for it: let behavior = InvocationBehavior.behavior(instance: Weak(actor)) let ref = self._spawnDistributedActor(behavior, identifiedBy: actor.id) + + // Store references self._managedRefs[actor.id] = ref self._managedDistributedActors.insert(actor: actor) + + if let wellKnownName = actor.id.metadata.wellKnown { + self._managedWellKnownDistributedActors[wellKnownName] = actor + } + } + + /// Advertise to the cluster system that a "well known" distributed actor has become ready. + /// Store it in a special lookup table and enable looking it up by its unique well-known name identity. + public func _wellKnownActorReady(_ actor: Act) where Act: DistributedActor, Act.ActorSystem == ClusterSystem { + self.namingLock.withLockVoid { +// guard self._managedDistributedActors.get(identifiedBy: actor.id) != nil else { +// preconditionFailure("Attempted to register well known actor, before it was ready; Unable to resolve \(actor.id.detailedDescription)") +// } + + guard let wellKnownName = actor.id.metadata.wellKnown else { + preconditionFailure("Attempted to register actor as well-known but had no well-known name: \(actor.id)") + } + + log.trace("Actor ready, well-known as: \(wellKnownName)", metadata: [ + "actor/id": "\(actor.id)", + ]) + + self._managedWellKnownDistributedActors[wellKnownName] = actor + } } /// Called during actor deinit/destroy. @@ -973,7 +1046,23 @@ extension ClusterSystem { self.namingLock.withLockVoid { self._managedRefs.removeValue(forKey: id) // TODO: should not be necessary in the future + + // Remove the weak actor reference _ = self._managedDistributedActors.removeActor(identifiedBy: id) + + // Well-known actors are held strongly and should be released using `releaseWellKnownActorID` + } + } + + public func releaseWellKnownActorID(_ id: ActorID) { + guard let wellKnownName = id.metadata.wellKnown else { + preconditionFailure("Attempted to release well-known ActorID, but ID was not well known: \(id.fullDescription)") + } + + self.namingLock.withLockVoid { + log.debug("Released well-known ActorID explicitly: \(id), it is expected to resignID soon") // TODO: add checking that we indeed have resigned the ID (the actor has terminated), or we can log a warning if it has not. + + _ = self._managedWellKnownDistributedActors.removeValue(forKey: wellKnownName) } } } @@ -1004,7 +1093,7 @@ extension ClusterSystem { } // ==== ---------------------------------------------------------------------------------------------------------------- -// MARK: Remote Calls +// MARK: Outbound Remote Calls extension ClusterSystem { public func makeInvocationEncoder() -> InvocationEncoder { @@ -1024,9 +1113,19 @@ extension ClusterSystem { Res: Codable { if let interceptor = actor.id.context.remoteCallInterceptor { + self.log.warning("INTERCEPTOR remote call \(actor.id)...") + print("[\(self.cluster.uniqueNode)] INTERCEPTOR remote call \(actor.id)...") return try await interceptor.interceptRemoteCall(on: actor, target: target, invocation: &invocation, throwing: throwing, returning: returning) } + guard actor.id.uniqueNode != self.cluster.uniqueNode else { + // It actually is a remote call, so redirect it to local call-path. + // Such calls can happen when we deal with interceptors and proxies; + // To make their lives easier, we centralize the noticing when a call is local and dispatch it from here. + self.log.warning("ACTUALLY LOCAL CALL: \(target) on \(actor.id)") + return try await self.localCall(on: actor, target: target, invocation: &invocation, throwing: throwing, returning: returning) + } + guard let clusterShell = _cluster else { throw RemoteCallError.clusterAlreadyShutDown } @@ -1044,14 +1143,18 @@ extension ClusterSystem { genericSubstitutions: invocation.genericSubstitutions, arguments: arguments ) + + print("[\(self.cluster.uniqueNode)] SEND INVOCATION: \(invocation) TO \(recipient.id.fullDescription)") + log.warning("[\(self.cluster.uniqueNode)] SEND INVOCATION: \(invocation) TO \(recipient.id.fullDescription)") recipient.sendInvocation(invocation) } if let error = reply.thrownError { + print("[\(self.cluster.uniqueNode)] reply error: \(error)") throw error } guard let value = reply.value else { - throw RemoteCallError.invalidReply + throw RemoteCallError.invalidReply(reply.callID) } return value } @@ -1130,7 +1233,9 @@ extension ClusterSystem { error = RemoteCallError.clusterAlreadyShutDown } else { error = RemoteCallError.timedOut( - TimeoutError(message: "Remote call [\(callID)] to [\(target)](\(actorID)) timed out", timeout: timeout)) + callID, + TimeoutError(message: "Remote call [\(callID)] to [\(target)](\(actorID)) timed out", timeout: timeout) + ) } continuation.resume(throwing: error) @@ -1157,14 +1262,74 @@ extension ClusterSystem { } self.log.error("Expected [\(Reply.self)] but got [\(type(of: reply as Any))]") - throw RemoteCallError.invalidReply + throw RemoteCallError.invalidReply(callID) } return reply } } +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Local "proxied" calls + +extension ClusterSystem { + /// Able to direct a `remoteCall` initiated call, right into a local invocation. + /// This is used to perform proxying to local actors, for such features like the cluster singleton or similar. + internal func localCall( + on actor: Act, + target: RemoteCallTarget, + invocation: inout InvocationEncoder, + throwing: Err.Type, + returning: Res.Type + ) async throws -> Res + where Act: DistributedActor, + Act.ID == ActorID, + Err: Error, + Res: Codable + { + print("[\(self.cluster.uniqueNode)] ACT: \(actor.id.fullDescription)") + precondition( + self.cluster.uniqueNode == actor.id.uniqueNode, + "Attempted to localCall an actor whose ID was a different node: [\(actor.id)], current node: \(self.cluster.uniqueNode)" + ) +// precondition(!__isRemoteActor(actor), +// "Attempted to localCall a remote actor! \(actor.id)") + self.log.trace("Execute local call", metadata: [ + "actor/id": "\(actor.id.fullDescription)", + "target": "\(target)", + ]) + + let anyReturn = try await withCheckedThrowingContinuation { cc in + Task { [invocation] in + var directDecoder = ClusterInvocationDecoder(system: self, invocation: invocation) + let directReturnHandler = ClusterInvocationResultHandler(directReturnContinuation: cc) + + try await executeDistributedTarget( + on: actor, + target: target, + invocationDecoder: &directDecoder, + handler: directReturnHandler + ) + } + } + + guard let wellTypedReturn = anyReturn as? Res else { + throw RemoteCallError.illegalReplyType(UUID(), expected: Res.self, got: type(of: anyReturn)) + } + + return wellTypedReturn + } +} + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Inbound Remote Calls + extension ClusterSystem { func receiveInvocation(_ invocation: InvocationMessage, recipient: ActorID, on channel: Channel) { + self.log.trace("Receive invocation: \(invocation) to: \(recipient.detailedDescription)", metadata: [ + "recipient/id": "\(recipient.detailedDescription)", + "invocation": "\(invocation)", + ]) + guard let shell = self._cluster else { self.log.error("Cluster has shut down already, yet received message. Message will be dropped: \(invocation)") return @@ -1182,7 +1347,7 @@ extension ClusterSystem { ) do { - guard let actor = self.resolve(id: recipient) else { + guard let actor = self.resolveLocalAnyDistributedActor(id: recipient) else { self.deadLetters.tell(DeadLetter(invocation, recipient: recipient)) throw DeadLetterError(recipient: recipient) } @@ -1214,10 +1379,60 @@ extension ClusterSystem { } } - private func resolve(id: ActorID) -> (any DistributedActor)? { - self.namingLock.withLock { + private func resolveLocalAnyDistributedActor(id: ActorID) -> (any DistributedActor)? { + if self.settings.logging.verboseResolve { + self.log.trace("Resolve as any DistributedActor: \(id)") + } + + // If it has an interceptor installed, we must pretend to resolve it as "remote", + // though the actual messages will be delivered to the interceptor, + // and not necessarily a remote destination. + if let interceptor = id.context.remoteCallInterceptor { + if self.settings.logging.verboseResolve { + self.log.trace("Resolved \(id) as intercepted", metadata: ["interceptor": "\(interceptor)"]) + } + return nil + } + + // If the actor is not located on this node, immediately resolve as "remote" + guard self.cluster.uniqueNode == id.uniqueNode else { + self.log.trace("Resolve local failed, ID is for a remote host: \(id.uniqueNode)", metadata: ["actor/id": "\(id)"]) + return nil + } + + // Is it a well-known actor? If so, we need to special handle the resolution. + if let wellKnownName = id.metadata.wellKnown { + let wellKnownActor = self.namingLock.withLock { + return self._managedWellKnownDistributedActors[wellKnownName] + } + + return self.namingLock.withLock { + if let wellKnownActor { + self.log.trace("Resolved \(id) well-known actor: \(wellKnownName)") + return wellKnownActor + } else { + self.log.trace("Resolve failed, no alive actor for well-known ID", metadata: [ + "actor/id": "\(id)", + "wellKnown/actors": "\(self._managedWellKnownDistributedActors.keys)", + ]) + return nil + } + } + } + + // Resolve using the usual id lookup method + let managed = self.namingLock.withLock { self._managedDistributedActors.get(identifiedBy: id) } + + guard let managed = managed else { + self.log.trace("Resolve failed, no alive actor for ID", metadata: [ + "actor/id": "\(id)", + ]) + return nil + } + + return managed } } @@ -1248,7 +1463,9 @@ public struct ClusterInvocationResultHandler: DistributedTargetInvocationResultH case .localDirectReturn(let directReturnContinuation): directReturnContinuation.resume(returning: value) - case .remoteCall(_, let callID, let channel, let recipient): + case .remoteCall(let system, let callID, let channel, let recipient): + system.log.debug("Result handler, onReturn", metadata: ["call/id": "\(callID)"]) + let reply = RemoteCallReply(callID: callID, value: value) try await channel.writeAndFlush(TransportEnvelope(envelope: Payload(payload: .message(reply)), recipient: recipient)) } @@ -1259,7 +1476,9 @@ public struct ClusterInvocationResultHandler: DistributedTargetInvocationResultH case .localDirectReturn(let directReturnContinuation): directReturnContinuation.resume(returning: ()) - case .remoteCall(_, let callID, let channel, let recipient): + case .remoteCall(let system, let callID, let channel, let recipient): + system.log.debug("Result handler, onReturnVoid", metadata: ["call/id": "\(callID)"]) + let reply = RemoteCallReply<_Done>(callID: callID, value: .done) try await channel.writeAndFlush(TransportEnvelope(envelope: Payload(payload: .message(reply)), recipient: recipient)) } @@ -1271,7 +1490,7 @@ public struct ClusterInvocationResultHandler: DistributedTargetInvocationResultH directReturnContinuation.resume(throwing: error) case .remoteCall(let system, let callID, let channel, let recipient): - system.log.warning("Result handler, onThrow: \(error)") + system.log.debug("Result handler, onThrow: \(error)", metadata: ["call/id": "\(callID)"]) let reply: RemoteCallReply<_Done> if let codableError = error as? (Error & Codable) { reply = .init(callID: callID, error: codableError) @@ -1400,8 +1619,9 @@ internal struct LazyStart { enum RemoteCallError: DistributedActorSystemError { case clusterAlreadyShutDown - case timedOut(TimeoutError) - case invalidReply + case timedOut(ClusterSystem.CallID, TimeoutError) + case invalidReply(ClusterSystem.CallID) + case illegalReplyType(ClusterSystem.CallID, expected: Any.Type, got: Any.Type) } /// Allows for configuring of remote calls by setting task-local values around a remote call being made. diff --git a/Sources/DistributedActors/ClusterSystemSettings.swift b/Sources/DistributedActors/ClusterSystemSettings.swift index d907544f5..97925c023 100644 --- a/Sources/DistributedActors/ClusterSystemSettings.swift +++ b/Sources/DistributedActors/ClusterSystemSettings.swift @@ -323,8 +323,10 @@ public struct LoggingSettings { /// Log all detailed timer start/cancel events internal var verboseTimers = false - /// Log all actor `spawn` events + /// Log all actor creation events (`assignID`, `actorReady`) internal var verboseSpawning = false + + internal var verboseResolve = false } // ==== ---------------------------------------------------------------------------------------------------------------- diff --git a/Sources/DistributedActors/DeadLetters.swift b/Sources/DistributedActors/DeadLetters.swift index 5de386160..951a060d7 100644 --- a/Sources/DistributedActors/DeadLetters.swift +++ b/Sources/DistributedActors/DeadLetters.swift @@ -194,7 +194,7 @@ public final class DeadLetterOffice { } metadata["actor/path"] = Logger.MetadataValue.stringConvertible(deadID.path) - recipientString = "to [\(String(reflecting: recipient.detailedDescription))]" + recipientString = "to [\(recipient.detailedDescription)]" } else { recipientString = "" } @@ -260,6 +260,51 @@ extension ActorPath { // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: Errors -public struct DeadLetterError: DistributedActorSystemError, Codable { +/// Dead letter errors may be transported back to remote callers, to indicate the recipient they tried to contact is no longer alive. +public struct DeadLetterError: DistributedActorSystemError, CustomStringConvertible, Hashable, Codable { public let recipient: ClusterSystem.ActorID + + internal let file: String? + internal let line: UInt + + init(recipient: ClusterSystem.ActorID, file: String = #fileID, line: UInt = #line) { + self.recipient = recipient + self.file = file + self.line = line + } + + public func hash(into hasher: inout Hasher) { + self.recipient.hash(into: &hasher) + } + + public static func == (lhs: Self, rhs: Self) -> Bool { + lhs.recipient == rhs.recipient + } + + enum CodingKeys: CodingKey { + case recipient + case file + case line + } + + public init(from decoder: Decoder) throws { + let container = try decoder.container(keyedBy: CodingKeys.self) + self.recipient = try container.decode(ClusterSystem.ActorID.self, forKey: .recipient) + self.file = nil + self.line = 0 + } + + public func encode(to encoder: Encoder) throws { + var container = encoder.container(keyedBy: CodingKeys.self) + try container.encode(self.recipient, forKey: .recipient) + // We do NOT serialize source location, on purpose. + } + + public var description: String { + if let file { + return "\(Self.self)(recipient: \(self.recipient), location: \(file):\(self.line))" + } + + return "\(Self.self)(\(self.recipient))" + } } diff --git a/Sources/DistributedActors/Pattern/WorkerPool.swift b/Sources/DistributedActors/Pattern/WorkerPool.swift index 63ff338e3..c06e157d0 100644 --- a/Sources/DistributedActors/Pattern/WorkerPool.swift +++ b/Sources/DistributedActors/Pattern/WorkerPool.swift @@ -83,7 +83,13 @@ public distributed actor WorkerPool: DistributedWorke /// Control for waiting and getting notified for new worker. private var newWorkerContinuations: [CheckedContinuation] = [] - init(settings: WorkerPoolSettings, system: ActorSystem) async { + public init(selector: Selector, actorSystem: ActorSystem) async throws { + try await self.init(settings: .init(selector: selector), actorSystem: actorSystem) + } + + public init(settings: WorkerPoolSettings, actorSystem system: ActorSystem) async throws { + try settings.validate() + self.actorSystem = system self.whenAllWorkersTerminated = settings.whenAllWorkersTerminated self.logLevel = settings.logLevel @@ -166,21 +172,6 @@ public distributed actor WorkerPool: DistributedWorke self.roundRobinPos = 0 } - // ==== ------------------------------------------------------------------------------------------------------------ - // MARK: Public API, spawning the pool - - // TODO: how can we move the spawn somewhere else so we don't have to pass in the system or context? - // TODO: round robin or what strategy? - public static func _spawn( - _ system: ActorSystem, - select selector: WorkerPool.Selector, - file: String = #filePath, line: UInt = #line - ) async throws -> WorkerPool { - // TODO: pass in settings rather than create them here - let settings = try WorkerPoolSettings(selector: selector).validate() - return await WorkerPool(settings: settings, system: system) - } - public nonisolated var description: String { "\(Self.self)(\(self.id))" } @@ -241,6 +232,7 @@ public struct WorkerPoolSettings where Worker.ActorSy } } + @discardableResult public func validate() throws -> WorkerPoolSettings { switch self.selector { case .static(let workers) where workers.isEmpty: diff --git a/Sources/DistributedActors/Plugins/ClusterSingleton/ClusterSingleton.swift b/Sources/DistributedActors/Plugins/ClusterSingleton/ClusterSingleton.swift index 9104b7e2c..cf1eba3b9 100644 --- a/Sources/DistributedActors/Plugins/ClusterSingleton/ClusterSingleton.swift +++ b/Sources/DistributedActors/Plugins/ClusterSingleton/ClusterSingleton.swift @@ -37,10 +37,13 @@ internal protocol ClusterSingletonBossProtocol { /// determine the node that the singleton runs on. If the singleton falls on *this* node, `ClusterSingletonBoss` /// will spawn the actual singleton actor. Otherwise, `ClusterSingletonBoss` will hand over the singleton /// whenever the node changes. -internal distributed actor ClusterSingletonBoss: ClusterSingletonBossProtocol where Act.ActorSystem == ClusterSystem { +internal distributed actor ClusterSingletonBoss: ClusterSingletonBossProtocol { typealias ActorSystem = ClusterSystem typealias CallID = UUID + @ActorID.Metadata(\.wellKnown) + var wellKnownName: String + private let settings: ClusterSingletonSettings /// The strategy that determines which node the singleton will be allocated. @@ -49,12 +52,14 @@ internal distributed actor ClusterSingletonBoss: /// If `nil`, then this instance will be proxy-only and it will never run the actual actor. let singletonFactory: ((ClusterSystem) async throws -> Act)? - /// The node that the singleton runs on + /// The node that the singleton runs on. private var targetNode: UniqueNode? + /// The target singleton instance we should forward invocations to. + /// /// The concrete distributed actor instance (the "singleton") if this node is indeed hosting it, /// or nil otherwise - meaning that the singleton instance is actually located on another member. - private var singleton: Act? + private var targetSingleton: Act? private var allocationStatus: AllocationStatus = .pending private var allocationTimeoutTask: Task? @@ -78,6 +83,8 @@ internal distributed actor ClusterSingletonBoss: self.singletonFactory = singletonFactory self.buffer = RemoteCallBuffer(capacity: settings.bufferCapacity) + self.wellKnownName = "$singletonBoss-\(settings.name)" + if system.settings.enabled { self.clusterEventsSubscribeTask = Task { // Subscribe to ``Cluster/Event`` in order to update `targetNode` @@ -108,7 +115,7 @@ internal distributed actor ClusterSingletonBoss: private func updateTargetNode(node: UniqueNode?) async throws { guard self.targetNode != node else { - self.log.debug("Skip updating target node. New node is already the same as current targetNode.", metadata: self.metadata()) + self.log.trace("Skip updating target node. New node is already the same as current targetNode.", metadata: self.metadata()) return } @@ -137,26 +144,54 @@ internal distributed actor ClusterSingletonBoss: self.log.debug("Take over singleton [\(self.settings.name)] from [\(String(describing: from))]", metadata: self.metadata()) + if let existing = self.targetSingleton { + self.log.warning("Singleton taking over from \(String(describing: from)), however local active instance already available: \(existing) (\(existing.id)). This is suspicious, we should have only activated the instance once we became active.") + return + } + // TODO: (optimization) tell `from` node that this node is taking over (https://github.com/apple/swift-distributed-actors/issues/329) - let singleton = try await _Props.$forSpawn.withValue(_Props.singleton(settings: self.settings)) { + let props = _Props.singletonInstance(settings: self.settings) + let singleton = try await _Props.$forSpawn.withValue(props) { try await singletonFactory(self.actorSystem) } + self.log.trace("Spawned singleton instance: \(singleton.id.fullDescription)", metadata: self.metadata()) + assert(singleton.id.metadata.wellKnown == self.settings.name, "Singleton instance assigned ID is not well-known, but should be") + self.updateSingleton(singleton) } private func handOver(to: UniqueNode?) { self.log.debug("Hand over singleton [\(self.settings.name)] to [\(String(describing: to))]", metadata: self.metadata()) - // TODO: (optimization) tell `to` node that this node is handing off (https://github.com/apple/swift-distributed-actors/issues/329) - self.updateSingleton(nil) + guard let instance = self.targetSingleton else { + return // we're done, we never activated it at all + } + + Task { + // we ask the singleton to passivate, it may want to flush some writes or similar. + // TODO: potentially do some timeout on this? + await instance.whenLocal { __secretlyKnownToBeLocal in + await __secretlyKnownToBeLocal.passivateSingleton() + } + + // TODO: (optimization) tell `to` node that this node is handing off (https://github.com/apple/swift-distributed-actors/issues/329) + // Finally, release the singleton -- it should not have been refered to strongly by anyone else, + // causing the instance to be released. TODO: we could assert that we have released it soon after here (it's ID must be resigned). + self.actorSystem.releaseWellKnownActorID(instance.id) + self.updateSingleton(nil) + } } private func updateSingleton(node: UniqueNode?) throws { switch node { case .some(let node) where node == self.actorSystem.cluster.uniqueNode: - () - case .some(let node): - let singleton = try Act.resolve(id: .singleton(Act.self, settings: self.settings, remote: node), using: self.actorSystem) + break + case .some(let otherNode): + var targetSingletonID = ActorID(remote: otherNode, type: Act.self, incarnation: .wellKnown) + targetSingletonID.metadata.wellKnown = self.settings.name // FIXME: rather, use the BOSS as the target + targetSingletonID.path = self.id.path + + let singleton = try Act.resolve(id: targetSingletonID, using: self.actorSystem) self.updateSingleton(singleton) case .none: self.updateSingleton(nil) @@ -164,8 +199,8 @@ internal distributed actor ClusterSingletonBoss: } private func updateSingleton(_ newSingleton: Act?) { - self.log.debug("Update singleton from [\(String(describing: self.singleton))] to [\(String(describing: newSingleton))], with \(self.buffer.count) remote calls pending", metadata: self.metadata()) - self.singleton = newSingleton + self.log.debug("Update singleton from [\(String(describing: self.targetSingleton))] to [\(String(describing: newSingleton))], with \(self.buffer.count) remote calls pending", metadata: self.metadata()) + self.targetSingleton = newSingleton // Unstash messages if we have the singleton guard let singleton = newSingleton else { @@ -178,58 +213,20 @@ internal distributed actor ClusterSingletonBoss: self.allocationTimeoutTask?.cancel() self.allocationTimeoutTask = nil + // FIXME: the callIDs are not used in the actual call making (!) while let (callID, continuation) = self.buffer.take() { self.log.debug("Flushing remote call [\(callID)] to [\(singleton)]", metadata: self.metadata()) continuation.resume(returning: singleton) } } - func forwardOrStashRemoteCall( - target: RemoteCallTarget, - invocation: ActorSystem.InvocationEncoder, - throwing: Err.Type, - returning: Res.Type - ) async throws -> Res - where Err: Error, - Res: Codable - { - let singleton = try await self.findSingleton() - self.log.trace("Forwarding invocation [\(invocation)] to [\(singleton)]", metadata: self.metadata()) - - var invocation = invocation // can't be inout param - return try await singleton.actorSystem.remoteCall( - on: singleton, - target: target, - invocation: &invocation, - throwing: throwing, - returning: returning - ) - } - - func forwardOrStashRemoteCallVoid( - target: RemoteCallTarget, - invocation: ActorSystem.InvocationEncoder, - throwing: Err.Type - ) async throws where Err: Error { - let singleton = try await self.findSingleton() - self.log.trace("Forwarding invocation [\(invocation)] to [\(singleton)]", metadata: self.metadata()) - - var invocation = invocation // can't be inout param - return try await singleton.actorSystem.remoteCallVoid( - on: singleton, - target: target, - invocation: &invocation, - throwing: throwing - ) - } - private func findSingleton() async throws -> Act { guard self.allocationStatus != .timedOut else { throw ClusterSingletonError.allocationTimeout } // If singleton is available, forward remote call to it right away. - if let singleton = self.singleton { + if let singleton = self.targetSingleton { return singleton } @@ -258,11 +255,9 @@ internal distributed actor ClusterSingletonBoss: } nonisolated func stop() async { - Task { - await self.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): this is annoying, we must track "known to be local" in typesystem instead - // TODO: perhaps we can figure out where `to` is next and hand over gracefully? - __secretlyKnownToBeLocal.handOver(to: nil) - } + await self.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): this is annoying, we must track "known to be local" in typesystem instead + // TODO: perhaps we can figure out where `to` is next and hand over gracefully? + __secretlyKnownToBeLocal.handOver(to: nil) } } @@ -348,8 +343,8 @@ extension ClusterSingletonBoss { ] metadata["targetNode"] = "\(String(describing: self.targetNode?.debugDescription))" - if let singleton = self.singleton { - metadata["singleton"] = "\(singleton.id)" + if let targetSingleton = self.targetSingleton { + metadata["targetSingleton"] = "\(targetSingleton.id)" } return metadata @@ -357,45 +352,70 @@ extension ClusterSingletonBoss { } // ==== ---------------------------------------------------------------------------------------------------------------- -// MARK: Singleton ID and props - -extension ActorID { - static func singleton( - _ type: Act.Type, - settings: ClusterSingletonSettings, - remote node: UniqueNode - ) throws -> ActorID - where Act: ClusterSingletonProtocol, - Act.ActorSystem == ClusterSystem - { - var id = ActorID(remote: node, type: type, incarnation: .wellKnown) - id.path = try ActorPath._user.appending(settings.clusterSingletonID) - return id - } -} +// MARK: _Props extension _Props { - static func singleton(settings: ClusterSingletonSettings) -> _Props { - _Props().singleton(settings: settings) + internal static func singletonInstance(settings: ClusterSingletonSettings) -> _Props { + _Props().singletonInstance(settings: settings) } - func singleton(settings: ClusterSingletonSettings) -> _Props { - var props = self._asWellKnown - props._knownActorName = settings.clusterSingletonID + internal func singletonInstance(settings: ClusterSingletonSettings) -> _Props { + var props = self + props._knownActorName = settings.name + props._wellKnownName = settings.name return props } } -extension ClusterSingletonSettings { - var clusterSingletonID: String { - "singleton-\(self.name)" - } -} - // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: Remote call interceptor -struct ClusterSingletonRemoteCallInterceptor: RemoteCallInterceptor where Singleton.ActorSystem == ClusterSystem { +extension ClusterSingletonBoss { + /// Handles the incoming message by either stashing or forwarding to the singleton. + func forwardOrStashRemoteCall( + target: RemoteCallTarget, + invocation: ActorSystem.InvocationEncoder, + throwing: Err.Type, + returning: Res.Type + ) async throws -> Res + where Err: Error, + Res: Codable + { + let singleton = try await self.findSingleton() + self.log.trace("Forwarding invocation [\(invocation)] to [\(singleton) @ \(singleton.id.detailedDescription)]", metadata: self.metadata()) + self.log.trace("Remote call on: singleton.actorSystem \(singleton.actorSystem)") + + var invocation = invocation // can't be inout param + return try await singleton.actorSystem.remoteCall( + on: singleton, + target: target, + invocation: &invocation, + throwing: throwing, + returning: returning + ) + } + + /// Handles the incoming message by either stashing or forwarding to the singleton. + func forwardOrStashRemoteCallVoid( + target: RemoteCallTarget, + invocation: ActorSystem.InvocationEncoder, + throwing: Err.Type + ) async throws where Err: Error { + let singleton = try await self.findSingleton() + self.log.trace("Forwarding invocation [\(invocation)] to [\(singleton) @ \(singleton.id.detailedDescription)]", metadata: self.metadata()) + self.log.trace("Remote call on: singleton.actorSystem \(singleton.actorSystem)") + + var invocation = invocation // can't be inout param + return try await singleton.actorSystem.remoteCallVoid( + on: singleton, + target: target, + invocation: &invocation, + throwing: throwing + ) + } +} + +struct ClusterSingletonRemoteCallInterceptor: RemoteCallInterceptor { let system: ClusterSystem let singletonBoss: ClusterSingletonBoss diff --git a/Sources/DistributedActors/Plugins/ClusterSingleton/ClusterSingletonPlugin.swift b/Sources/DistributedActors/Plugins/ClusterSingleton/ClusterSingletonPlugin.swift index e9913a4ca..02f830c68 100644 --- a/Sources/DistributedActors/Plugins/ClusterSingleton/ClusterSingletonPlugin.swift +++ b/Sources/DistributedActors/Plugins/ClusterSingleton/ClusterSingletonPlugin.swift @@ -14,7 +14,20 @@ import Distributed -public protocol ClusterSingletonProtocol: DistributedActor where ActorSystem == ClusterSystem {} +public protocol ClusterSingletonProtocol: DistributedActor where ActorSystem == ClusterSystem { + /// The singleton should no longer be active on this cluster member. + /// + /// Invoked by the cluster singleton manager when it is determined that this member should no longer + /// be hosting this singleton instance. The singleton upon receiving this call, should either cease activity, + /// or take steps to terminate itself entirely. + func passivateSingleton() async +} + +extension ClusterSingletonProtocol { + public func passivateSingleton() async { + // nothing by default + } +} // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: Cluster singleton plugin @@ -33,66 +46,58 @@ public protocol ClusterSingletonProtocol: DistributedActor where ActorSystem == public actor ClusterSingletonPlugin { private var singletons: [String: (proxyID: ActorID, boss: any ClusterSingletonBossProtocol)] = [:] - private var system: ClusterSystem! + private var actorSystem: ClusterSystem! public func proxy( - of type: Act.Type, - name: String - ) async throws -> Act - where Act: ClusterSingletonProtocol - { - let settings = ClusterSingletonSettings(name: name) - return try await self.proxy(of: type, settings: settings, makeInstance: nil) + _ type: Act.Type, + name: String, + settings: ClusterSingletonSettings = .init() + ) async throws -> Act where Act: ClusterSingletonProtocol { + var settings = settings + settings.name = name + return try await self._get(type, settings: settings, system: self.actorSystem, makeInstance: nil) } - public func proxy( - of type: Act.Type, - settings: ClusterSingletonSettings, + /// Configures the singleton plugin to host instances of this actor. + public func host( + _ type: Act.Type = Act.self, + name: String, + settings: ClusterSingletonSettings = .init(), makeInstance factory: ((ClusterSystem) async throws -> Act)? = nil - ) async throws -> Act - where Act: ClusterSingletonProtocol, - Act.ActorSystem == ClusterSystem - { - let known = self.singletons[settings.name] + ) async throws -> Act where Act: ClusterSingletonProtocol { + var settings = settings + settings.name = name + return try await self._get(type, settings: settings, system: self.actorSystem, makeInstance: factory) + } + + internal func _get( + _ type: Act.Type, + settings: ClusterSingletonSettings, + system: ClusterSystem, + makeInstance factory: ((ClusterSystem) async throws -> Act)? + ) async throws -> Act where Act: ClusterSingletonProtocol { + let singletonName = settings.name + guard !singletonName.isEmpty else { + fatalError("ClusterSingleton \(Act.self) must have specified unique name!") + } + + let known = self.singletons[singletonName] if let existingID = known?.proxyID { - return try Act.resolve(id: existingID, using: self.system) + return try Act.resolve(id: existingID, using: system) } // Spawn the singleton boss (one per singleton per node) let boss = try await ClusterSingletonBoss( settings: settings, - system: self.system, + system: system, factory ) + let interceptor = ClusterSingletonRemoteCallInterceptor(system: system, singletonBoss: boss) + let proxied = try system.interceptCalls(to: type, metadata: ActorMetadata(), interceptor: interceptor) - let interceptor = ClusterSingletonRemoteCallInterceptor(system: self.system, singletonBoss: boss) - let proxied = try self.system.interceptCalls(to: type, metadata: ActorMetadata(), interceptor: interceptor) - - self.singletons[settings.name] = (proxied.id, boss) - + self.singletons[singletonName] = (proxied.id, boss) return proxied } - - public func host( - of type: Act.Type = Act.self, - name: String, - makeInstance factory: @escaping (ClusterSystem) async throws -> Act - ) async throws -> Act - where Act: ClusterSingletonProtocol - { - let settings = ClusterSingletonSettings(name: name) - return try await self.host(of: type, settings: settings, makeInstance: factory) - } - - public func host( - of type: Act.Type = Act.self, - settings: ClusterSingletonSettings, - makeInstance factory: @escaping (ClusterSystem) async throws -> Act - ) async throws -> Act - where Act: ClusterSingletonProtocol - { - try await self.proxy(of: type, settings: settings, makeInstance: factory) - } } // ==== ---------------------------------------------------------------------------------------------------------------- @@ -106,10 +111,11 @@ extension ClusterSingletonPlugin: _Plugin { } public func start(_ system: ClusterSystem) async throws { - self.system = system + self.actorSystem = system } public func stop(_ system: ClusterSystem) async { + self.actorSystem = nil for (_, (_, boss)) in self.singletons { await boss.stop() } diff --git a/Sources/DistributedActors/Plugins/ClusterSingleton/ClusterSingletonSettings.swift b/Sources/DistributedActors/Plugins/ClusterSingleton/ClusterSingletonSettings.swift index 7faa4c032..42339d8e5 100644 --- a/Sources/DistributedActors/Plugins/ClusterSingleton/ClusterSingletonSettings.swift +++ b/Sources/DistributedActors/Plugins/ClusterSingleton/ClusterSingletonSettings.swift @@ -12,10 +12,13 @@ // //===----------------------------------------------------------------------===// -/// Settings for a singleton. +/// Settings for a `ClusterSingleton`. public struct ClusterSingletonSettings { - /// Unique name for the singleton. - public let name: String + /// Unique name for the singleton, used to identify the conceptual singleton in the cluster. + /// E.g. there is always one "boss" instance in the cluster, regardless where it is incarnated. + /// + /// The name property is set on a settings object while creating a singleton reference (e.g. using `host` or `proxy`). + public internal(set) var name: String = "" /// Capacity of temporary message buffer in case singleton is unavailable. /// If the buffer becomes full, the *oldest* messages would be disposed to make room for the newer messages. @@ -32,9 +35,7 @@ public struct ClusterSingletonSettings { /// we stop stashing calls and throw error. public var allocationTimeout: Duration = .seconds(30) - public init(name: String) { - self.name = name - } + public init() {} } /// Singleton node allocation strategies. diff --git a/Sources/DistributedActors/Plugins/ClusterSystemSettings+Plugins.swift b/Sources/DistributedActors/Plugins/ClusterSystemSettings+Plugins.swift index bc886be34..6f95c5573 100644 --- a/Sources/DistributedActors/Plugins/ClusterSystemSettings+Plugins.swift +++ b/Sources/DistributedActors/Plugins/ClusterSystemSettings+Plugins.swift @@ -22,7 +22,7 @@ public struct _PluginsSettings { public init() {} - /// Adds a `Plugin`. + /// Adds a `_Plugin`. /// /// - Note: A plugin that depends on others should be added *after* its dependencies. /// - Faults, when plugin of the exact same `PluginKey` is already included in the settings. @@ -44,6 +44,7 @@ public struct _PluginsSettings { internal func startAll(_ system: ClusterSystem) async { for plugin in self.plugins { do { + system.log.info("Starting cluster system plugin: \(plugin.key)") try await plugin.start(system) } catch { fatalError("Failed to start plugin \(plugin.key)! Error: \(error)") diff --git a/Sources/DistributedActors/Props.swift b/Sources/DistributedActors/Props.swift index 2f8cb534f..8e0798253 100644 --- a/Sources/DistributedActors/Props.swift +++ b/Sources/DistributedActors/Props.swift @@ -46,6 +46,13 @@ public struct _Props: @unchecked Sendable { /// only if a single incarnation of actor will ever exist under the given path. internal var _wellKnown: Bool = false + /// Sets the ``ActorMetadataKeys/wellKnown`` key to this name during spawning. + internal var _wellKnownName: String? { + willSet { + self._wellKnown = newValue != nil + } + } + /// INTERNAL API: Internal system actor, spawned under the /system namespace. /// This is likely to go away as we remove the actor tree, and move completely to 'distributed actor'. internal var _systemActor: Bool = false diff --git a/Sources/DistributedActors/Protobuf/ActorID+Serialization.swift b/Sources/DistributedActors/Protobuf/ActorID+Serialization.swift index d6e74f157..5d3cd5158 100644 --- a/Sources/DistributedActors/Protobuf/ActorID+Serialization.swift +++ b/Sources/DistributedActors/Protobuf/ActorID+Serialization.swift @@ -12,6 +12,91 @@ // //===----------------------------------------------------------------------===// +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Codable ActorID + +extension ActorID: Codable { + public func encode(to encoder: Encoder) throws { + let metadataSettings = encoder.actorSerializationContext?.system.settings.actorMetadata + let encodeCustomMetadata = + metadataSettings?.encodeCustomMetadata ?? ({ _, _ in () }) + + var container = encoder.container(keyedBy: ActorCoding.CodingKeys.self) + try container.encode(self.uniqueNode, forKey: ActorCoding.CodingKeys.node) + try container.encode(self.path, forKey: ActorCoding.CodingKeys.path) // TODO: remove as we remove the tree + try container.encode(self.incarnation, forKey: ActorCoding.CodingKeys.incarnation) + + if !self.metadata.isEmpty { + var metadataContainer = container.nestedContainer(keyedBy: ActorCoding.MetadataKeys.self, forKey: ActorCoding.CodingKeys.metadata) + + let keys = ActorMetadataKeys.__instance + func shouldPropagate(_ key: ActorMetadataKey, metadata: ActorMetadata) -> V? { + if metadataSettings == nil || metadataSettings!.propagateMetadata.contains(key.id) { + if let value = metadata[key.id] { + let value = value as! V // as!-safe, the keys guarantee we only store well typed values in metadata + return value + } + } + return nil + } + + // Handle well known metadata types + if let value = shouldPropagate(keys.path, metadata: self.metadata) { + try metadataContainer.encode(value, forKey: ActorCoding.MetadataKeys.path) + } + if let value = shouldPropagate(keys.type, metadata: self.metadata) { + try metadataContainer.encode(value.mangledName, forKey: ActorCoding.MetadataKeys.type) + } + if let value = shouldPropagate(keys.wellKnown, metadata: self.metadata) { + try metadataContainer.encode(value, forKey: ActorCoding.MetadataKeys.wellKnown) + } + + try encodeCustomMetadata(self.metadata, &metadataContainer) + } + } + + public init(from decoder: Decoder) throws { + let container = try decoder.container(keyedBy: ActorCoding.CodingKeys.self) + let node = try container.decode(UniqueNode.self, forKey: ActorCoding.CodingKeys.node) + let path = try container.decodeIfPresent(ActorPath.self, forKey: ActorCoding.CodingKeys.path) + let incarnation = try container.decode(UInt32.self, forKey: ActorCoding.CodingKeys.incarnation) + + self.init(remote: node, path: path, incarnation: ActorIncarnation(incarnation)) + + // Decode any tags: + if let metadataContainer = try? container.nestedContainer(keyedBy: ActorCoding.MetadataKeys.self, forKey: ActorCoding.CodingKeys.metadata) { + // tags container found, try to decode all known tags: + + let metadata = ActorMetadata() + if let value = try? metadataContainer.decodeIfPresent(ActorPath.self, forKey: ActorCoding.MetadataKeys.path) { + metadata.path = value + } + if let value = try? metadataContainer.decodeIfPresent(String.self, forKey: ActorCoding.MetadataKeys.type) { + metadata.type = .init(mangledName: value) + } + if let value = try? metadataContainer.decodeIfPresent(String.self, forKey: ActorCoding.MetadataKeys.wellKnown) { + metadata.wellKnown = value + } + + if let context = decoder.actorSerializationContext { + let decodeCustomMetadata = context.system.settings.actorMetadata.decodeCustomMetadata + try decodeCustomMetadata(metadataContainer, self.metadata) + +// for (key, value) in try decodeCustomMetadata(metadataContainer) { +// func store(_: K.Type) { +// if let value = tag.value as? K.Value { +// self.metadata[K.self] = value +// } +// }v +// _openExistential(key, do: store) // the `as` here is required, because: inferred result type 'any ActorTagKey.Type' requires explicit coercion due to loss of generic requirements +// } + } + + self.context = .init(lifecycle: nil, remoteCallInterceptor: nil, metadata: metadata) + } + } +} + // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: _ProtoActorID @@ -19,23 +104,84 @@ extension ActorID: _ProtobufRepresentable { public typealias ProtobufRepresentation = _ProtoActorID public func toProto(context: Serialization.Context) throws -> _ProtoActorID { - var id = _ProtoActorID() + let metadataSettings = context.system.settings.actorMetadata + let encodeCustomMetadata = metadataSettings.encodeCustomMetadata + + var proto = _ProtoActorID() let node = self.uniqueNode - id.node = try node.toProto(context: context) + proto.node = try node.toProto(context: context) + + proto.path.segments = self.segments.map(\.value) + proto.incarnation = self.incarnation.value + + if !self.metadata.isEmpty { + let keys = ActorMetadataKeys.__instance + func shouldPropagate(_ key: ActorMetadataKey, metadata: ActorMetadata) -> V? { + if metadataSettings.propagateMetadata.contains(key.id) { + if let value = metadata[key.id] { + let value = value as! V // as!-safe, the keys guarantee we only store well typed values in metadata + return value + } + } + return nil + } - id.path.segments = self.segments.map(\.value) - id.incarnation = self.incarnation.value + // Handle well known metadata types + if let value = shouldPropagate(keys.path, metadata: self.metadata) { + let serialized = try context.serialization.serialize(value) + proto.metadata[keys.path.id] = serialized.buffer.readData() + } + if let value = shouldPropagate(keys.type, metadata: self.metadata) { + let serialized = try context.serialization.serialize(value.mangledName) + proto.metadata[keys.type.id] = serialized.buffer.readData() + } + if let value = shouldPropagate(keys.wellKnown, metadata: self.metadata) { + let serialized = try context.serialization.serialize(value) + proto.metadata[keys.wellKnown.id] = serialized.buffer.readData() + } - return id + // FIXME: implement custom metadata transporting https://github.com/apple/swift-distributed-actors/issues/987 + } + + return proto } public init(fromProto proto: _ProtoActorID, context: Serialization.Context) throws { let uniqueNode: UniqueNode = try .init(fromProto: proto.node, context: context) - // TODO: make Error let path = try ActorPath(proto.path.segments.map { try ActorPathSegment($0) }) self.init(remote: uniqueNode, path: path, incarnation: ActorIncarnation(proto.incarnation)) + + // Handle well known metadata + if !proto.metadata.isEmpty { + let keys = ActorMetadataKeys.__instance + + // Path is handled already explicitly in the above ActorID initializer + // TODO: Uncomment impl when we move to entirely not using paths at all: + // if let data = proto.metadata[keys.type.id] { + // let manifest = Serialization.Manifest.stringSerializerManifest + // let serialized = Serialization.Serialized(manifest: manifest, buffer: .data(data)) + // if let value = try? context.serialization.deserialize(as: String.self, from: serialized) { + // self.metadata.type = .init(mangledName: value) + // } + // } + if let data = proto.metadata[keys.type.id] { + let manifest = Serialization.Manifest.stringSerializerManifest + let serialized = Serialization.Serialized(manifest: manifest, buffer: .data(data)) + if let value = try? context.serialization.deserialize(as: String.self, from: serialized) { + self.metadata.type = .init(mangledName: value) + } + } + + if let data = proto.metadata[keys.wellKnown.id] { + let manifest = Serialization.Manifest.stringSerializerManifest + let serialized = Serialization.Serialized(manifest: manifest, buffer: .data(data)) + if let value = try? context.serialization.deserialize(as: String.self, from: serialized) { + self.metadata.wellKnown = value + } + } + } } } diff --git a/Sources/DistributedActors/Protobuf/ActorID.pb.swift b/Sources/DistributedActors/Protobuf/ActorID.pb.swift index ddb908063..49183d46f 100644 --- a/Sources/DistributedActors/Protobuf/ActorID.pb.swift +++ b/Sources/DistributedActors/Protobuf/ActorID.pb.swift @@ -56,12 +56,16 @@ public struct _ProtoActorID { /// Clears the value of `path`. Subsequent reads from it will return its default value. public mutating func clearPath() {_uniqueStorage()._path = nil} - /// TODO: encode tags public var incarnation: UInt32 { get {return _storage._incarnation} set {_uniqueStorage()._incarnation = newValue} } + public var metadata: Dictionary { + get {return _storage._metadata} + set {_uniqueStorage()._metadata = newValue} + } + public var unknownFields = SwiftProtobuf.UnknownStorage() public init() {} @@ -133,12 +137,14 @@ extension _ProtoActorID: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementa 1: .same(proto: "node"), 2: .same(proto: "path"), 3: .same(proto: "incarnation"), + 4: .same(proto: "metadata"), ] fileprivate class _StorageClass { var _node: _ProtoUniqueNode? = nil var _path: _ProtoActorPath? = nil var _incarnation: UInt32 = 0 + var _metadata: Dictionary = [:] static let defaultInstance = _StorageClass() @@ -148,6 +154,7 @@ extension _ProtoActorID: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementa _node = source._node _path = source._path _incarnation = source._incarnation + _metadata = source._metadata } } @@ -166,6 +173,7 @@ extension _ProtoActorID: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementa case 1: try decoder.decodeSingularMessageField(value: &_storage._node) case 2: try decoder.decodeSingularMessageField(value: &_storage._path) case 3: try decoder.decodeSingularUInt32Field(value: &_storage._incarnation) + case 4: try decoder.decodeMapField(fieldType: SwiftProtobuf._ProtobufMap.self, value: &_storage._metadata) default: break } } @@ -183,6 +191,9 @@ extension _ProtoActorID: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementa if _storage._incarnation != 0 { try visitor.visitSingularUInt32Field(value: _storage._incarnation, fieldNumber: 3) } + if !_storage._metadata.isEmpty { + try visitor.visitMapField(fieldType: SwiftProtobuf._ProtobufMap.self, value: _storage._metadata, fieldNumber: 4) + } } try unknownFields.traverse(visitor: &visitor) } @@ -195,6 +206,7 @@ extension _ProtoActorID: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementa if _storage._node != rhs_storage._node {return false} if _storage._path != rhs_storage._path {return false} if _storage._incarnation != rhs_storage._incarnation {return false} + if _storage._metadata != rhs_storage._metadata {return false} return true } if !storagesAreEqual {return false} diff --git a/Sources/DistributedActors/Receptionist/DistributedReceptionist.swift b/Sources/DistributedActors/Receptionist/DistributedReceptionist.swift index 5647a5798..5269eb69a 100644 --- a/Sources/DistributedActors/Receptionist/DistributedReceptionist.swift +++ b/Sources/DistributedActors/Receptionist/DistributedReceptionist.swift @@ -43,7 +43,7 @@ public protocol DistributedReceptionist: DistributedActor { /// /// It emits both values for already existing, checked-in before the listing was created, /// actors; as well as new actors which are checked-in while the listing was already subscribed to. - func listing(of key: DistributedReception.Key) async -> DistributedReception.GuestListing + func listing(of key: DistributedReception.Key, file: String, line: UInt) async -> DistributedReception.GuestListing where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem /// Perform a *single* lookup for a distributed actor identified by the passed in `key`. @@ -54,6 +54,19 @@ public protocol DistributedReceptionist: DistributedActor { where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem } +extension DistributedReceptionist { + /// Returns a "listing" asynchronous sequence which will emit actor references, + /// for every distributed actor that the receptionist discovers for the specific key. + /// + /// It emits both values for already existing, checked-in before the listing was created, + /// actors; as well as new actors which are checked-in while the listing was already subscribed to. + func listing(of key: DistributedReception.Key, file: String = #file, line: UInt = #line) async -> DistributedReception.GuestListing + where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem + { + await self.listing(of: key, file: file, line: line) + } +} + extension DistributedReception { public struct GuestListing: AsyncSequence, Sendable where Guest.ActorSystem == ClusterSystem { public typealias Element = Guest @@ -61,19 +74,29 @@ extension DistributedReception { let receptionist: OpLogDistributedReceptionist let key: DistributedReception.Key - init(receptionist: OpLogDistributedReceptionist, key: DistributedReception.Key) { + // Location where the subscription was created + let file: String + let line: UInt + + init(receptionist: OpLogDistributedReceptionist, key: DistributedReception.Key, + file: String, line: UInt) + { self.receptionist = receptionist self.key = key + self.file = file + self.line = line } public func makeAsyncIterator() -> AsyncIterator { - AsyncIterator(receptionist: self.receptionist, key: self.key) + AsyncIterator(receptionist: self.receptionist, key: self.key, file: self.file, line: self.line) } public class AsyncIterator: AsyncIteratorProtocol { var underlying: AsyncStream.Iterator! - init(receptionist __secretlyKnownToBeLocal: OpLogDistributedReceptionist, key: DistributedReception.Key) { + init(receptionist __secretlyKnownToBeLocal: OpLogDistributedReceptionist, key: DistributedReception.Key, + file: String, line: UInt) + { self.underlying = AsyncStream { continuation in let anySubscribe = AnyDistributedReceptionListingSubscription( subscriptionID: ObjectIdentifier(self), @@ -89,7 +112,7 @@ extension DistributedReception { ) Task { - await __secretlyKnownToBeLocal._listing(subscription: anySubscribe) + await __secretlyKnownToBeLocal._listing(subscription: anySubscribe, file: file, line: line) } continuation.onTermination = { @Sendable termination in diff --git a/Sources/DistributedActors/Receptionist/Receptionist.swift b/Sources/DistributedActors/Receptionist/Receptionist.swift index 79085e494..fbc9484b1 100644 --- a/Sources/DistributedActors/Receptionist/Receptionist.swift +++ b/Sources/DistributedActors/Receptionist/Receptionist.swift @@ -325,7 +325,9 @@ extension ActorID { case .actorRefs: return ActorPath.actorRefReceptionist.makeRemoteID(on: node, incarnation: .wellKnown) case .distributedActors: - return ActorPath.distributedActorReceptionist.makeRemoteID(on: node, incarnation: .wellKnown) + let id = ActorPath.distributedActorReceptionist.makeRemoteID(on: node, incarnation: .wellKnown) + id.metadata.wellKnown = ActorPath.distributedActorReceptionist.name + return id } } } diff --git a/Sources/DistributedActors/Serialization/ActorRef+Serialization.swift b/Sources/DistributedActors/Serialization/ActorRef+Serialization.swift index 763c93f09..7785907e3 100644 --- a/Sources/DistributedActors/Serialization/ActorRef+Serialization.swift +++ b/Sources/DistributedActors/Serialization/ActorRef+Serialization.swift @@ -32,12 +32,14 @@ public enum ActorCoding { public enum MetadataKeys: CodingKey { case path case type + case wellKnown case custom(String) public init?(stringValue: String) { switch stringValue { - case "path": self = .path - case "type": self = .type + case "$path": self = .path + case "$type": self = .type + case "$wellKnown": self = .wellKnown default: self = .custom(stringValue) } } @@ -46,7 +48,8 @@ public enum ActorCoding { switch self { case .path: return 0 case .type: return 1 - case .custom: return 2 + case .wellKnown: return 2 + case .custom: return 64 } } @@ -56,8 +59,9 @@ public enum ActorCoding { public var stringValue: String { switch self { - case .path: return "path" - case .type: return "type" + case .path: return "$path" + case .type: return "$type" + case .wellKnown: return "$wellKnown" case .custom(let id): return id } } diff --git a/Sources/DistributedActors/Serialization/Serialization+Invocation.swift b/Sources/DistributedActors/Serialization/Serialization+Invocation.swift index 9de662349..06b185a90 100644 --- a/Sources/DistributedActors/Serialization/Serialization+Invocation.swift +++ b/Sources/DistributedActors/Serialization/Serialization+Invocation.swift @@ -21,8 +21,9 @@ import SwiftProtobuf public struct ClusterInvocationEncoder: DistributedTargetInvocationEncoder { public typealias SerializationRequirement = any Codable - var genericSubstitutions: [String] = [] + var arguments: [Data] = [] + var genericSubstitutions: [String] = [] var throwing: Bool = false let system: ClusterSystem @@ -31,6 +32,12 @@ public struct ClusterInvocationEncoder: DistributedTargetInvocationEncoder { self.system = system } + init(system: ClusterSystem, arguments: [Data]) { + self.system = system + self.arguments = arguments + self.throwing = false + } + public mutating func recordGenericSubstitution(_ type: T.Type) throws { let typeName = _mangledTypeName(type) ?? _typeName(type) self.genericSubstitutions.append(typeName) @@ -56,9 +63,10 @@ public struct ClusterInvocationEncoder: DistributedTargetInvocationEncoder { public struct ClusterInvocationDecoder: DistributedTargetInvocationDecoder { public typealias SerializationRequirement = any Codable + let system: ClusterSystem let state: _State enum _State { - case remoteCall(system: ClusterSystem, message: InvocationMessage) + case remoteCall(message: InvocationMessage) // Potentially used by interceptors, when invoking a local target directly case localProxyCall(ClusterSystem.InvocationEncoder) } @@ -66,17 +74,19 @@ public struct ClusterInvocationDecoder: DistributedTargetInvocationDecoder { var argumentIdx = 0 public init(system: ClusterSystem, message: InvocationMessage) { - self.state = .remoteCall(system: system, message: message) + self.system = system + self.state = .remoteCall(message: message) } - internal init(invocation: ClusterSystem.InvocationEncoder) { + init(system: ClusterSystem, invocation: ClusterSystem.InvocationEncoder) { + self.system = system self.state = .localProxyCall(invocation) } public mutating func decodeGenericSubstitutions() throws -> [Any.Type] { let genericSubstitutions: [String] switch self.state { - case .remoteCall(_, let message): + case .remoteCall(let message): genericSubstitutions = message.genericSubstitutions case .localProxyCall(let invocation): genericSubstitutions = invocation.genericSubstitutions @@ -91,13 +101,15 @@ public struct ClusterInvocationDecoder: DistributedTargetInvocationDecoder { } public mutating func decodeNextArgument() throws -> Argument { + let argumentData: Data + switch self.state { - case .remoteCall(let system, let message): - guard message.arguments.count > self.argumentIdx else { + case .remoteCall(let message): + guard self.argumentIdx < message.arguments.count else { throw SerializationError.notEnoughArgumentsEncoded(expected: self.argumentIdx + 1, have: message.arguments.count) } - let argumentData = message.arguments[self.argumentIdx] + argumentData = message.arguments[self.argumentIdx] self.argumentIdx += 1 // FIXME: make incoming manifest @@ -111,13 +123,24 @@ public struct ClusterInvocationDecoder: DistributedTargetInvocationDecoder { return argument case .localProxyCall(let invocation): - guard invocation.arguments.count > self.argumentIdx else { + // TODO: potentially able to optimize and avoid serialization round trip for such calls + guard self.argumentIdx < invocation.arguments.count else { throw SerializationError.notEnoughArgumentsEncoded(expected: self.argumentIdx + 1, have: invocation.arguments.count) } + argumentData = invocation.arguments[self.argumentIdx] self.argumentIdx += 1 - return invocation.arguments[self.argumentIdx] as! Argument } + + // FIXME: make incoming manifest + let manifest = try self.system.serialization.outboundManifest(Argument.self) + + let serialized = Serialization.Serialized( + manifest: manifest, + buffer: Serialization.Buffer.data(argumentData) + ) + let argument = try self.system.serialization.deserialize(as: Argument.self, from: serialized) + return argument } public mutating func decodeErrorType() throws -> Any.Type? { diff --git a/Sources/DistributedActors/Serialization/Serialization+PrimitiveSerializers.swift b/Sources/DistributedActors/Serialization/Serialization+PrimitiveSerializers.swift index a10b0ceb1..f0547723d 100644 --- a/Sources/DistributedActors/Serialization/Serialization+PrimitiveSerializers.swift +++ b/Sources/DistributedActors/Serialization/Serialization+PrimitiveSerializers.swift @@ -21,6 +21,11 @@ import Foundation // for Codable // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: String Serializer +extension Serialization.Manifest { + public static let stringSerializerManifest: Self = + .init(serializerID: .specializedWithTypeHint, hint: "S") +} + @usableFromInline internal class StringSerializer: Serializer { private let allocate: ByteBufferAllocator diff --git a/Sources/DistributedActorsTestKit/ActorTestKit.swift b/Sources/DistributedActorsTestKit/ActorTestKit.swift index 7d82b400a..b49850f6f 100644 --- a/Sources/DistributedActorsTestKit/ActorTestKit.swift +++ b/Sources/DistributedActorsTestKit/ActorTestKit.swift @@ -136,18 +136,23 @@ extension ActorTestKit { var lastError: Error? var polledTimes = 0 - ActorTestKit.enterRepeatableContext() - while deadline.hasTimeLeft() { - do { - polledTimes += 1 - let res = try block() - return res - } catch { - lastError = error - usleep(useconds_t(interval.microseconds)) + let res: T? = ActorTestKit.withRepeatableContext { + while deadline.hasTimeLeft() { + do { + polledTimes += 1 + let res = try block() + return res + } catch { + lastError = error + usleep(useconds_t(interval.microseconds)) + } } + return nil + } + + if let res { + return res } - ActorTestKit.leaveRepeatableContext() let error = EventuallyError(callSite, duration, polledTimes, lastError: lastError) if !ActorTestKit.isInRepeatableContext() { @@ -177,18 +182,23 @@ extension ActorTestKit { var lastError: Error? var polledTimes = 0 - ActorTestKit.enterRepeatableContext() - while deadline.hasTimeLeft() { - do { - polledTimes += 1 - let res = try await block() - return res - } catch { - lastError = error - usleep(useconds_t(interval.microseconds)) + let res: T? = try await ActorTestKit.withRepeatableContext { + while deadline.hasTimeLeft() { + do { + polledTimes += 1 + let res = try await block() + return res + } catch { + lastError = error + usleep(useconds_t(interval.microseconds)) + } } + return nil + } + + if let res { + return res } - ActorTestKit.leaveRepeatableContext() let error = EventuallyError(callSite, duration, polledTimes, lastError: lastError) if !ActorTestKit.isInRepeatableContext() { @@ -479,22 +489,21 @@ extension ActorTestKit { // immediately fail the test, but instead lets the `ActorTestKit.eventually` // block handle it. extension ActorTestKit { - internal static let threadLocalContextKey: String = "SACT_TESTKIT_REPEATABLE_CONTEXT" + @TaskLocal + static var repeatableContextCounter = 0 - // Sets a flag that can be checked with `isInRepeatableContext`, to avoid - // failing a test from within blocks that continuously check conditions, - // e.g. `ActorTestKit.eventually`. This is safe to use in nested calls. - internal static func enterRepeatableContext() { - let currentDepth = self.currentRepeatableContextDepth - Foundation.Thread.current.threadDictionary[self.threadLocalContextKey] = currentDepth + 1 + internal static func withRepeatableContext(body: () async throws -> T) async rethrows -> T { + let currentRepeatableContextCounter = Self.repeatableContextCounter + return try await Self.$repeatableContextCounter.withValue(currentRepeatableContextCounter + 1) { + try await body() + } } - // Unsets the flag and causes `isInRepeatableContext` to return `false`. - // This is safe to use in nested calls. - internal static func leaveRepeatableContext() { - let currentDepth = self.currentRepeatableContextDepth - precondition(currentDepth > 0, "Imbalanced `leaveRepeatableContext` detected. Depth was \(currentDepth)") - Foundation.Thread.current.threadDictionary[self.threadLocalContextKey] = currentDepth - 1 + internal static func withRepeatableContext(body: () throws -> T) rethrows -> T { + let currentRepeatableContextCounter = Self.repeatableContextCounter + return try Self.$repeatableContextCounter.withValue(currentRepeatableContextCounter + 1) { + try body() + } } // Returns `true` if we are currently executing a code clock that repeatedly @@ -502,20 +511,7 @@ extension ActorTestKit { // to avoid failing the test on the first iteration in e.g. an // `ActorTestKit.eventually` block. internal static func isInRepeatableContext() -> Bool { - self.currentRepeatableContextDepth > 0 - } - - // Returns the current depth of nested repeatable context calls. - internal static var currentRepeatableContextDepth: Int { - guard let currentValue = Foundation.Thread.current.threadDictionary[self.threadLocalContextKey] else { - return 0 - } - - guard let intValue = currentValue as? Int else { - fatalError("Expected value under key [\(self.threadLocalContextKey)] to be [\(Int.self)], but found [\(currentValue)]:\(type(of: currentValue))") - } - - return intValue + Self.repeatableContextCounter > 0 } } diff --git a/Tests/DistributedActorsTests/ActorAskTests.swift b/Tests/DistributedActorsTests/ActorAskTests.swift index 5abb2f4df..4c89717ab 100644 --- a/Tests/DistributedActorsTests/ActorAskTests.swift +++ b/Tests/DistributedActorsTests/ActorAskTests.swift @@ -179,10 +179,9 @@ final class ActorAskTests: ClusterSystemXCTestCase { } ) - var msg = "timedOut(DistributedActors.TimeoutError(" - msg += "message: \"AskResponse timed out after 100ms\", " - msg += "timeout: 0.1 seconds))" - try p.expectMessage(msg) + let message = try p.expectMessage() + message.shouldStartWith(prefix: "timedOut(") + message.shouldContain("DistributedActors.TimeoutError(message: \"AskResponse timed out after 100ms\", timeout: 0.1 seconds))") } func test_ask_onDeadLetters_shouldPutMessageIntoDeadLetters() async throws { diff --git a/Tests/DistributedActorsTests/ActorIDMetadataTests.swift b/Tests/DistributedActorsTests/ActorIDMetadataTests.swift index d4ec8f446..06272c335 100644 --- a/Tests/DistributedActorsTests/ActorIDMetadataTests.swift +++ b/Tests/DistributedActorsTests/ActorIDMetadataTests.swift @@ -13,7 +13,7 @@ //===----------------------------------------------------------------------===// import Distributed -import DistributedActors +@testable import DistributedActors import DistributedActorsTestKit import XCTest @@ -23,7 +23,7 @@ extension ActorMetadataKeys { } public protocol ExampleClusterSingletonProtocol: DistributedActor { - var singletonID: String { get } + var exampleSingletonID: String { get } /// Must be implemented by providing a metadata property wrapper. // var _singletonID: ActorID.Metadata { get } // FIXME: property wrapper bug? Property '_singletonID' must be as accessible as its enclosing type because it matches a requirement in protocol 'ClusterSingletonProtocol' @@ -32,13 +32,17 @@ public protocol ExampleClusterSingletonProtocol: DistributedActor { distributed actor ThereCanBeOnlyOneClusterSingleton: ExampleClusterSingletonProtocol { typealias ActorSystem = ClusterSystem + @ActorID.Metadata(\.wellKnown) + public var wellKnownName: String + @ActorID.Metadata(\.exampleClusterSingletonID) - public var singletonID: String + public var exampleSingletonID: String // TODO(swift): impossible to assign initial value here, as _enclosingInstance is not available yet "the-one" init(actorSystem: ActorSystem) async { self.actorSystem = actorSystem - self.singletonID = "the-boss" + self.exampleSingletonID = "singer-1234" + self.wellKnownName = "singer-1234" } } @@ -84,6 +88,56 @@ final class ActorIDMetadataTests: ClusteredActorSystemsXCTestCase { let system = await setUpNode("first") let singleton = await ThereCanBeOnlyOneClusterSingleton(actorSystem: system) - singleton.metadata.exampleClusterSingletonID.shouldEqual("the-boss") + singleton.metadata.exampleClusterSingletonID.shouldEqual("singer-1234") + } + + func test_metadata_wellKnown_coding() async throws { + let system = await setUpNode("first") + let singleton = await ThereCanBeOnlyOneClusterSingleton(actorSystem: system) + + let encoded = try JSONEncoder().encode(singleton) + let encodedString = String(data: encoded, encoding: .utf8)! + encodedString.shouldContain("\"$wellKnown\":\"singer-1234\"") + + let back = try! JSONDecoder().decode(ActorID.self, from: encoded) + back.metadata.wellKnown.shouldEqual("singer-1234") + } + + func test_metadata_wellKnown_proto() async throws { + let system = await setUpNode("first") + let singleton = await ThereCanBeOnlyOneClusterSingleton(actorSystem: system) + + let context = Serialization.Context(log: system.log, system: system, allocator: .init()) + let encoded = try singleton.id.toProto(context: context) + + let back = try ActorID(fromProto: encoded, context: context) + back.metadata.wellKnown.shouldEqual(singleton.id.metadata.wellKnown) + } + + func test_metadata_wellKnown_equality() async throws { + let system = await setUpNode("first") + + let singleton = await ThereCanBeOnlyOneClusterSingleton(actorSystem: system) + + let madeUpID = ActorID(local: system.cluster.uniqueNode, path: singleton.id.path, incarnation: .wellKnown) + madeUpID.metadata.wellKnown = singleton.id.metadata.wellKnown! + + singleton.id.shouldEqual(madeUpID) + singleton.id.hashValue.shouldEqual(madeUpID.hashValue) + + let set: Set = [singleton.id, madeUpID] + set.count.shouldEqual(1) + } + + func test_metadata_userDefined_coding() async throws { + let system = await setUpNode("first") + let singleton = await ThereCanBeOnlyOneClusterSingleton(actorSystem: system) + + let encoded = try JSONEncoder().encode(singleton) + let encodedString = String(data: encoded, encoding: .utf8)! + encodedString.shouldContain("\"$wellKnown\":\"singer-1234\"") + + let back = try! JSONDecoder().decode(ActorID.self, from: encoded) + back.metadata.wellKnown.shouldEqual("singer-1234") } } diff --git a/Tests/DistributedActorsTests/ActorIDTests.swift b/Tests/DistributedActorsTests/ActorIDTests.swift index 939ea1458..b59b91e02 100644 --- a/Tests/DistributedActorsTests/ActorIDTests.swift +++ b/Tests/DistributedActorsTests/ActorIDTests.swift @@ -27,7 +27,7 @@ final class ActorIDTests: ClusteredActorSystemsXCTestCase { "\(id.path)".shouldEqual("/user/hello") "\(id.path.name)".shouldEqual("hello") - id.detailedDescription.shouldEqual("/user/hello#8888") + id.detailedDescription.shouldEqual("/user/hello#8888[\"$path\": /user/hello]") String(reflecting: id).shouldEqual("/user/hello") String(reflecting: id.name).shouldEqual("\"hello\"") String(reflecting: id.path).shouldEqual("/user/hello") @@ -42,7 +42,7 @@ final class ActorIDTests: ClusteredActorSystemsXCTestCase { let remoteNode = UniqueNode(systemName: "system", host: "127.0.0.1", port: 1234, nid: UniqueNodeID(11111)) let remote = ActorID(remote: remoteNode, path: id.path, incarnation: ActorIncarnation(8888)) - remote.detailedDescription.shouldEqual("sact://system:11111@127.0.0.1:1234/user/hello#8888") + remote.detailedDescription.shouldEqual("sact://system:11111@127.0.0.1:1234/user/hello#8888[\"$path\": /user/hello]") String(reflecting: remote).shouldEqual("sact://system@127.0.0.1:1234/user/hello") "\(remote)".shouldEqual("sact://system@127.0.0.1:1234/user/hello") "\(remote.name)".shouldEqual("hello") @@ -155,7 +155,7 @@ final class ActorIDTests: ClusteredActorSystemsXCTestCase { serializedJson.shouldContain(#""incarnation":1"#) serializedJson.shouldContain(#""node":["sact","one","127.0.0.1",1234,11111]"#) - serializedJson.shouldContain(#""metadata":{"path":{"path":["user","a"]}}"#) + serializedJson.shouldContain(#""path":{"path":["user","a"]}"#) serializedJson.shouldNotContain(#"$test":"test-value""#) let back = try JSONDecoder().decode(ActorID.self, from: data) @@ -178,7 +178,7 @@ final class ActorIDTests: ClusteredActorSystemsXCTestCase { // TODO: improve serialization format of identities to be more compact serializedJson.shouldContain(#""incarnation":1"#) serializedJson.shouldContain(#""node":["sact","one","127.0.0.1",1234,11111]"#) - serializedJson.shouldContain(#""metadata":{"path":{"path":["user","a"]}}"#) + serializedJson.shouldContain(#""path":{"path":["user","a"]}"#) serializedJson.shouldNotContain(#"$test":"test-value""#) } @@ -190,11 +190,11 @@ final class ActorIDTests: ClusteredActorSystemsXCTestCase { let system = await self.setUpNode("test_serializing_ActorAddress_propagateCustomTag") { settings in settings.bindPort = 1234 settings.actorMetadata.encodeCustomMetadata = { metadata, container in - try container.encodeIfPresent(metadata.test, forKey: ActorCoding.MetadataKeys.custom(ActorMetadataKeys().test.id)) + try container.encodeIfPresent(metadata.test, forKey: ActorCoding.MetadataKeys.custom(ActorMetadataKeys.__instance.test.id)) } settings.actorMetadata.decodeCustomMetadata = { container, metadata in - if let value = try container.decodeIfPresent(String.self, forKey: .custom(ActorMetadataKeys().test.id)) { + if let value = try container.decodeIfPresent(String.self, forKey: .custom(ActorMetadataKeys.__instance.test.id)) { metadata.test = value } } @@ -207,7 +207,7 @@ final class ActorIDTests: ClusteredActorSystemsXCTestCase { serializedJson.shouldContain(#""incarnation":1"#) serializedJson.shouldContain(#""node":["sact","one","127.0.0.1",1234,11111]"#) serializedJson.shouldContain(#""path":{"path":["user","a"]}"#) - serializedJson.shouldContain("\"\(ActorMetadataKeys().test.id)\":\"\(a.metadata.test!)\"") + serializedJson.shouldContain("\"\(ActorMetadataKeys.__instance.test.id)\":\"\(a.metadata.test!)\"") } } diff --git a/Tests/DistributedActorsTests/Cluster/Reception/OpLogDistributedReceptionistClusteredTests.swift b/Tests/DistributedActorsTests/Cluster/Reception/OpLogDistributedReceptionistClusteredTests.swift index 1998bbf3e..1d1472494 100644 --- a/Tests/DistributedActorsTests/Cluster/Reception/OpLogDistributedReceptionistClusteredTests.swift +++ b/Tests/DistributedActorsTests/Cluster/Reception/OpLogDistributedReceptionistClusteredTests.swift @@ -85,19 +85,19 @@ final class OpLogDistributedReceptionistClusteredTests: ClusteredActorSystemsXCT // MARK: Sync func test_shouldReplicateRegistrations() async throws { - let (local, remote) = await self.setUpPair() - let testKit: ActorTestKit = self.testKit(local) - try await self.joinNodes(node: local, with: remote) + let (first, second) = await self.setUpPair() + let testKit = self.testKit(first) + try await self.joinNodes(node: first, with: second) let probe = testKit.makeTestProbe(expecting: String.self) - // Create forwarder on 'local' - let forwarder = StringForwarder(probe: probe, actorSystem: local) + // Create forwarder on 'first' + let forwarder = StringForwarder(probe: probe, actorSystem: first) // subscribe on `remote` let subscriberProbe = testKit.makeTestProbe("subscriber", expecting: StringForwarder.self) let subscriptionTask = Task { - for try await forwarder in await remote.receptionist.listing(of: .stringForwarders) { + for try await forwarder in await second.receptionist.listing(of: .stringForwarders) { subscriberProbe.tell(forwarder) } } @@ -105,8 +105,9 @@ final class OpLogDistributedReceptionistClusteredTests: ClusteredActorSystemsXCT subscriptionTask.cancel() } - // checkIn on `local` - await local.receptionist.checkIn(forwarder, with: .stringForwarders) + // checkIn on `first` + await first.receptionist.checkIn(forwarder, with: .stringForwarders) + first.log.notice("Checked in: \(forwarder)") try await Task { let found = try subscriberProbe.expectMessage() diff --git a/Tests/DistributedActorsTests/ClusterSystemTests.swift b/Tests/DistributedActorsTests/ClusterSystemTests.swift index c6e17b5ad..21833ae4a 100644 --- a/Tests/DistributedActorsTests/ClusterSystemTests.swift +++ b/Tests/DistributedActorsTests/ClusterSystemTests.swift @@ -345,7 +345,7 @@ final class ClusterSystemTests: ClusterSystemXCTestCase { } } - guard case RemoteCallError.timedOut(let timeoutError) = error else { + guard case RemoteCallError.timedOut(_, let timeoutError) = error else { throw testKit.fail("Expected RemoteCallError.timedOut, got \(error)") } guard timeoutError.timeout == .milliseconds(200) else { @@ -371,7 +371,7 @@ final class ClusterSystemTests: ClusterSystemXCTestCase { } } - guard case RemoteCallError.timedOut(let timeoutError) = error else { + guard case RemoteCallError.timedOut(_, let timeoutError) = error else { throw testKit.fail("Expected RemoteCallError.timedOut, got \(error)") } guard timeoutError.timeout == .milliseconds(200) else { @@ -393,7 +393,7 @@ final class ClusterSystemTests: ClusterSystemXCTestCase { let message: String = "hello" let value = try await shouldNotThrow { - try await remoteGreeterRef.echo(message) + try await remoteGreeterRef.genericEcho(message) } value.shouldEqual(message) } @@ -434,7 +434,7 @@ private distributed actor Greeter { try await self.muted() } - distributed func echo(_ message: T) -> T { + distributed func genericEcho(_ message: T) -> T { message } } diff --git a/Tests/DistributedActorsTests/DeadLetterTests.swift b/Tests/DistributedActorsTests/DeadLetterTests.swift index 5cba503c7..3a48b826d 100644 --- a/Tests/DistributedActorsTests/DeadLetterTests.swift +++ b/Tests/DistributedActorsTests/DeadLetterTests.swift @@ -30,7 +30,7 @@ final class DeadLetterTests: ClusterSystemXCTestCase { office.deliver("Hello") - try self.logCapture.awaitLogContaining(self.testKit, text: "was not delivered to [\"/user/someone") + try self.logCapture.awaitLogContaining(self.testKit, text: "was not delivered to [/user/someone") } // ==== ------------------------------------------------------------------------------------------------------------ diff --git a/Tests/DistributedActorsTests/DistributedReceptionistTests.swift b/Tests/DistributedActorsTests/DistributedReceptionistTests.swift index c663a3d7f..95dbef2d4 100644 --- a/Tests/DistributedActorsTests/DistributedReceptionistTests.swift +++ b/Tests/DistributedActorsTests/DistributedReceptionistTests.swift @@ -111,12 +111,12 @@ extension DistributedReception.Key { final class DistributedReceptionistTests: ClusterSystemXCTestCase { let receptionistBehavior = _OperationLogClusterReceptionist(settings: .default).behavior - func test_receptionist_mustHaveWellKnownAddress() throws { + func test_receptionist_mustHaveWellKnownID() throws { let opLogReceptionist = system.receptionist - let receptionistAddress = opLogReceptionist.id + let id = opLogReceptionist.id - receptionistAddress.detailedDescription.shouldEqual("/system/receptionist") - receptionistAddress.incarnation.shouldEqual(.wellKnown) + id.metadata.wellKnown.shouldEqual("receptionist") + id.incarnation.shouldEqual(.wellKnown) } func test_receptionist_shouldRespondWithRegisteredRefsForKey() throws { diff --git a/Tests/DistributedActorsTests/InterceptorTests.swift b/Tests/DistributedActorsTests/InterceptorTests.swift index f12415ccf..5de537b6c 100644 --- a/Tests/DistributedActorsTests/InterceptorTests.swift +++ b/Tests/DistributedActorsTests/InterceptorTests.swift @@ -296,7 +296,7 @@ private struct GreeterRemoteCallInterceptor: RemoteCallInterceptor { func interceptRemoteCall( on actor: Act, target: RemoteCallTarget, - invocation _invocation: inout ClusterSystem.InvocationEncoder, + invocation: inout ClusterSystem.InvocationEncoder, throwing: Err.Type, returning: Res.Type ) async throws -> Res @@ -310,9 +310,8 @@ private struct GreeterRemoteCallInterceptor: RemoteCallInterceptor { } let anyReturn = try await withCheckedThrowingContinuation { (cc: CheckedContinuation) in - let invocation = _invocation - Task { - var directDecoder = ClusterInvocationDecoder(invocation: invocation) + Task { [invocation] in + var directDecoder = ClusterInvocationDecoder(system: actor.actorSystem as! ClusterSystem, invocation: invocation) let directReturnHandler = ClusterInvocationResultHandler(directReturnContinuation: cc) try await self.greeter.actorSystem.executeDistributedTarget( @@ -329,7 +328,7 @@ private struct GreeterRemoteCallInterceptor: RemoteCallInterceptor { func interceptRemoteCallVoid( on actor: Act, target: RemoteCallTarget, - invocation _invocation: inout ClusterSystem.InvocationEncoder, + invocation: inout ClusterSystem.InvocationEncoder, throwing: Err.Type ) async throws where Act: DistributedActor, @@ -341,9 +340,8 @@ private struct GreeterRemoteCallInterceptor: RemoteCallInterceptor { } _ = try await withCheckedThrowingContinuation { (cc: CheckedContinuation) in - let invocation = _invocation - Task { - var directDecoder = ClusterInvocationDecoder(invocation: invocation) + Task { [invocation] in + var directDecoder = ClusterInvocationDecoder(system: actor.actorSystem as! ClusterSystem, invocation: invocation) let directReturnHandler = ClusterInvocationResultHandler(directReturnContinuation: cc) try await self.greeter.actorSystem.executeDistributedTarget( diff --git a/Tests/DistributedActorsTests/Metrics/ActorMemoryTests.swift b/Tests/DistributedActorsTests/Metrics/ActorMemoryTests.swift index 62f58263d..8c09d9929 100644 --- a/Tests/DistributedActorsTests/Metrics/ActorMemoryTests.swift +++ b/Tests/DistributedActorsTests/Metrics/ActorMemoryTests.swift @@ -24,8 +24,8 @@ final class ActorMemoryTests: XCTestCase { func test_osx_actorShell_instanceSize() { #if os(OSX) - class_getInstanceSize(_ActorShell.self).shouldEqual(608) - class_getInstanceSize(_ActorShell.self).shouldEqual(608) + class_getInstanceSize(_ActorShell.self).shouldEqual(664) + class_getInstanceSize(_ActorShell.self).shouldEqual(664) #else print("Skipping test_osx_actorShell_instanceSize as requires Objective-C runtime") #endif diff --git a/Tests/DistributedActorsTests/MetricsTestKit/MetricsTestKit.swift b/Tests/DistributedActorsTests/MetricsTestKit/MetricsTestKit.swift index 66f71274c..5959c1b45 100644 --- a/Tests/DistributedActorsTests/MetricsTestKit/MetricsTestKit.swift +++ b/Tests/DistributedActorsTests/MetricsTestKit/MetricsTestKit.swift @@ -26,7 +26,7 @@ // //===----------------------------------------------------------------------===// -@testable import CoreMetrics +import CoreMetrics import DistributedActors @testable import Metrics import XCTest @@ -126,7 +126,7 @@ extension TestMetrics { // MARK: Counter public func expectCounter(_ metric: Counter) throws -> TestCounter { - metric.handler as! TestCounter + metric._handler as! TestCounter } public func expectCounter(_ label: String, _ dimensions: [(String, String)] = []) throws -> TestCounter { @@ -159,7 +159,7 @@ extension TestMetrics { // MARK: Recorder public func expectRecorder(_ metric: Recorder) throws -> TestRecorder { - metric.handler as! TestRecorder + metric._handler as! TestRecorder } public func expectRecorder(_ label: String, _ dimensions: [(String, String)] = []) throws -> TestRecorder { @@ -177,7 +177,7 @@ extension TestMetrics { // MARK: Timer public func expectTimer(_ metric: Timer) throws -> TestTimer { - metric.handler as! TestTimer + metric._handler as! TestTimer } public func expectTimer(_ label: String, _ dimensions: [(String, String)] = []) throws -> TestTimer { diff --git a/Tests/DistributedActorsTests/Pattern/WorkerPoolTests.swift b/Tests/DistributedActorsTests/Pattern/WorkerPoolTests.swift index f1ab61ede..4a0eac729 100644 --- a/Tests/DistributedActorsTests/Pattern/WorkerPoolTests.swift +++ b/Tests/DistributedActorsTests/Pattern/WorkerPoolTests.swift @@ -23,7 +23,8 @@ final class WorkerPoolTests: ClusterSystemXCTestCase { func test_workerPool_registerNewlyStartedActors() async throws { let workerKey = DistributedReception.Key(Greeter.self, id: "request-workers") - let workers = try await WorkerPool._spawn(self.system, select: .dynamic(workerKey)) + let settings = WorkerPoolSettings(selector: .dynamic(workerKey)) + let workers = try await WorkerPool(settings: settings, actorSystem: system) let pA: ActorTestProbe = self.testKit.makeTestProbe("pA") let pB: ActorTestProbe = self.testKit.makeTestProbe("pB") @@ -72,7 +73,7 @@ final class WorkerPoolTests: ClusterSystemXCTestCase { let workerKey = DistributedReception.Key(Greeter.self, id: "request-workers") - let workers = try await WorkerPool._spawn(self.system, select: .dynamic(workerKey)) + let workers = try await WorkerPool(selector: .dynamic(workerKey), actorSystem: system) let pA: ActorTestProbe = self.testKit.makeTestProbe("pA") let pB: ActorTestProbe = self.testKit.makeTestProbe("pB") @@ -163,7 +164,7 @@ final class WorkerPoolTests: ClusterSystemXCTestCase { var workerC: Greeter? = Greeter(probe: pC, actorSystem: self.system) // !-safe since we initialize workers above - let workers = try await WorkerPool._spawn(self.system, select: .static([workerA!, workerB!, workerC!])) + let workers = try await WorkerPool(settings: .init(selector: .static([workerA!, workerB!, workerC!])), actorSystem: system) let workerProbes: [ClusterSystem.ActorID: ActorTestProbe] = [ workerA!.id: pA, @@ -226,7 +227,7 @@ final class WorkerPoolTests: ClusterSystemXCTestCase { func test_workerPool_static_throwOnEmptyInitialSet() async throws { let error = try await shouldThrow { - let _: WorkerPool = try await WorkerPool._spawn(self.system, select: .static([])) + let _: WorkerPool = try await WorkerPool(selector: .static([]), actorSystem: system) } guard case WorkerPoolError.emptyStaticWorkerPool(let errorMessage) = error else { diff --git a/Tests/DistributedActorsTests/Plugins/ClusterSingleton/ClusterSingletonPluginClusteredTests.swift b/Tests/DistributedActorsTests/Plugins/ClusterSingleton/ClusterSingletonPluginClusteredTests.swift index 082f8fd13..cbb0ff95f 100644 --- a/Tests/DistributedActorsTests/Plugins/ClusterSingleton/ClusterSingletonPluginClusteredTests.swift +++ b/Tests/DistributedActorsTests/Plugins/ClusterSingleton/ClusterSingletonPluginClusteredTests.swift @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// +import AsyncAlgorithms import Distributed @testable import DistributedActors import DistributedActorsTestKit @@ -23,13 +24,12 @@ final class ClusterSingletonPluginClusteredTests: ClusteredActorSystemsXCTestCas "/system/cluster/swim", "/system/cluster", "/system/cluster/gossip", + "/system/receptionist", ] } func test_singletonByClusterLeadership_happyPath() async throws { - throw XCTSkip("!!! Skipping test \(#function) !!!") // FIXME(distributed): disable test until https://github.com/apple/swift-distributed-actors/pull/1001 - - var singletonSettings = ClusterSingletonSettings(name: TheSingleton.name) + var singletonSettings = ClusterSingletonSettings() singletonSettings.allocationStrategy = .byLeadership let first = await self.setUpNode("first") { settings in @@ -49,13 +49,14 @@ final class ClusterSingletonPluginClusteredTests: ClusteredActorSystemsXCTestCas } // Bring up `ClusterSingletonBoss` before setting up cluster (https://github.com/apple/swift-distributed-actors/issues/463) - let ref1 = try await first.singleton.host(of: TheSingleton.self, settings: singletonSettings) { actorSystem in + let name = "the-one" + let ref1 = try await first.singleton.host(name: name, settings: singletonSettings) { actorSystem in TheSingleton(greeting: "Hello-1", actorSystem: actorSystem) } - let ref2 = try await second.singleton.host(of: TheSingleton.self, settings: singletonSettings) { actorSystem in + let ref2 = try await second.singleton.host(name: name, settings: singletonSettings) { actorSystem in TheSingleton(greeting: "Hello-2", actorSystem: actorSystem) } - let ref3 = try await third.singleton.host(of: TheSingleton.self, settings: singletonSettings) { actorSystem in + let ref3 = try await third.singleton.host(name: name, settings: singletonSettings) { actorSystem in TheSingleton(greeting: "Hello-3", actorSystem: actorSystem) } @@ -63,15 +64,227 @@ final class ClusterSingletonPluginClusteredTests: ClusteredActorSystemsXCTestCas third.cluster.join(node: first.cluster.uniqueNode.node) // `first` will be the leader (lowest address) and runs the singleton - try await self.ensureNodes(.up, on: first, nodes: second.cluster.uniqueNode, third.cluster.uniqueNode) + try await self.ensureNodes(.up, on: first, within: .seconds(10), nodes: second.cluster.uniqueNode, third.cluster.uniqueNode) try await self.assertSingletonRequestReply(first, singleton: ref1, greetingName: "Alice", expectedPrefix: "Hello-1 Alice!") try await self.assertSingletonRequestReply(second, singleton: ref2, greetingName: "Bob", expectedPrefix: "Hello-1 Bob!") try await self.assertSingletonRequestReply(third, singleton: ref3, greetingName: "Charlie", expectedPrefix: "Hello-1 Charlie!") } + func test_singletonByClusterLeadership_stashMessagesIfNoLeader() async throws { + var singletonSettings = ClusterSingletonSettings() + singletonSettings.allocationStrategy = .byLeadership + singletonSettings.allocationTimeout = .seconds(15) + + let first = await self.setUpNode("first") { settings in + settings.node.port = 7111 + settings.autoLeaderElection = .lowestReachable(minNumberOfMembers: 3) + settings += ClusterSingletonPlugin() + } + let second = await self.setUpNode("second") { settings in + settings.node.port = 8222 + settings.autoLeaderElection = .lowestReachable(minNumberOfMembers: 3) + settings += ClusterSingletonPlugin() + } + let third = await self.setUpNode("third") { settings in + settings.node.port = 9333 + settings.autoLeaderElection = .lowestReachable(minNumberOfMembers: 3) + settings += ClusterSingletonPlugin() + } + + // Bring up `ClusterSingletonBoss`. No leader yet so singleton is not available. + let name = "the-one" + let ref1 = try await first.singleton.host(name: name, settings: singletonSettings) { actorSystem in + TheSingleton(greeting: "Hello-1", actorSystem: actorSystem) + } + let ref2 = try await second.singleton.host(name: name, settings: singletonSettings) { actorSystem in + TheSingleton(greeting: "Hello-2", actorSystem: actorSystem) + } + let ref3 = try await third.singleton.host(name: name, settings: singletonSettings) { actorSystem in + TheSingleton(greeting: "Hello-3", actorSystem: actorSystem) + } + + enum TaskType { + case cluster + case remoteCall + } + + func requestReplyTask(singleton: TheSingleton, greetingName: String, expectedPrefix: String) -> (@Sendable () async throws -> TaskType) { + { + let reply = try await singleton.greet(name: greetingName) + reply.shouldStartWith(prefix: expectedPrefix) + return .remoteCall + } + } + + try await withThrowingTaskGroup(of: TaskType.self) { group in + group.addTask { + // Set up the cluster + first.cluster.join(node: second.cluster.uniqueNode.node) + third.cluster.join(node: first.cluster.uniqueNode.node) + + // `first` will be the leader (lowest address) and runs the singleton. + // + // No need to `ensureNodes` status. A leader should only be selected when all three nodes have joined and are up, + // and it's possible for `ensureNodes` to return positive response *after* singleton has been allocated, + // which means stashed calls have started getting processed and that would cause the test to fail. + + return TaskType.cluster + } + + // Remote calls should be stashed until singleton is allocated + group.addTask(operation: requestReplyTask(singleton: ref1, greetingName: "Alice", expectedPrefix: "Hello-1 Alice!")) + group.addTask(operation: requestReplyTask(singleton: ref2, greetingName: "Bob", expectedPrefix: "Hello-1 Bob!")) + group.addTask(operation: requestReplyTask(singleton: ref3, greetingName: "Charlie", expectedPrefix: "Hello-1 Charlie!")) + + var taskTypes = [TaskType]() + for try await taskType in group { + taskTypes.append(taskType) + } + + guard taskTypes.first == .cluster else { + throw TestError("Received reply to remote call reply before singleton is allocated") + } + } + } + + func test_singletonByClusterLeadership_withLeaderChange() async throws { + var singletonSettings = ClusterSingletonSettings() + singletonSettings.allocationStrategy = .byLeadership + singletonSettings.allocationTimeout = .seconds(15) + + let first = await self.setUpNode("first") { settings in + settings.node.port = 7111 + settings.autoLeaderElection = .lowestReachable(minNumberOfMembers: 3) + settings += ClusterSingletonPlugin() + } + let second = await self.setUpNode("second") { settings in + settings.node.port = 8222 + settings.autoLeaderElection = .lowestReachable(minNumberOfMembers: 3) + settings += ClusterSingletonPlugin() + } + let third = await self.setUpNode("third") { settings in + settings.node.port = 9333 + settings.autoLeaderElection = .lowestReachable(minNumberOfMembers: 3) + settings += ClusterSingletonPlugin() + } + let fourth = await self.setUpNode("fourth") { settings in + settings.node.port = 7444 + settings.autoLeaderElection = .lowestReachable(minNumberOfMembers: 3) + settings += ClusterSingletonPlugin() + } + + // Bring up `ClusterSingletonBoss` + let name = "the-one" + let ref1 = try await first.singleton.host(name: name, settings: singletonSettings) { actorSystem in + TheSingleton(greeting: "Hello-1", actorSystem: actorSystem) + } + let ref2 = try await second.singleton.host(name: name, settings: singletonSettings) { actorSystem in + TheSingleton(greeting: "Hello-2", actorSystem: actorSystem) + } + let ref3 = try await third.singleton.host(name: name, settings: singletonSettings) { actorSystem in + TheSingleton(greeting: "Hello-3", actorSystem: actorSystem) + } + _ = try await fourth.singleton.host(name: name, settings: singletonSettings) { actorSystem in + TheSingleton(greeting: "Hello-4", actorSystem: actorSystem) + } + + first.cluster.join(node: second.cluster.uniqueNode.node) + third.cluster.join(node: first.cluster.uniqueNode.node) + + // `first` will be the leader (lowest address) and runs the singleton + try await self.ensureNodes(.up, on: first, within: .seconds(10), nodes: second.cluster.uniqueNode, third.cluster.uniqueNode) + pinfo("Nodes up: \([first.cluster.uniqueNode, second.cluster.uniqueNode, third.cluster.uniqueNode])") + + try await self.assertSingletonRequestReply(first, singleton: ref1, greetingName: "Alice", expectedPrefix: "Hello-1 Alice!") + try await self.assertSingletonRequestReply(second, singleton: ref2, greetingName: "Bob", expectedPrefix: "Hello-1 Bob!") + try await self.assertSingletonRequestReply(third, singleton: ref3, greetingName: "Charlie", expectedPrefix: "Hello-1 Charlie!") + pinfo("All three nodes communicated with singleton") + + let firstNode = first.cluster.uniqueNode + first.cluster.leave() + + // Make sure that `second` and `third` see `first` as down and become leader-less + try await self.assertMemberStatus(on: second, node: firstNode, is: .down, within: .seconds(10)) + try await self.assertMemberStatus(on: third, node: firstNode, is: .down, within: .seconds(10)) + + try self.testKit(second).eventually(within: .seconds(10)) { + try self.assertLeaderNode(on: second, is: nil) + try self.assertLeaderNode(on: third, is: nil) + } + pinfo("Node \(firstNode) left cluster...") + + // `fourth` will become the new leader and singleton + pinfo("Node \(fourth.cluster.uniqueNode) joining cluster...") + fourth.cluster.join(node: second.cluster.uniqueNode.node) + let start = ContinuousClock.Instant.now + + // No leader so singleton is not available, messages sent should be stashed + func requestReplyTask(singleton: TheSingleton, greetingName: String) -> Task<[String], Error> { + Task { + try await withThrowingTaskGroup(of: String.self) { group in + var attempt = 0 + for await _ in AsyncTimerSequence.repeating(every: .seconds(1), clock: .continuous) { + attempt += 1 + let message = "\(greetingName) (\(attempt))" + group.addTask { + pnote(" Sending: \(message) -> \(singleton) (it may be terminated/not-re-pointed yet)") + return try await singleton.greet(name: message) + } + } + + var replies = [String]() + for try await reply in group { + replies.append(reply) + } + return replies + } + } + } + + let ref2Task = requestReplyTask(singleton: ref2, greetingName: "Bob") + let ref3Task = requestReplyTask(singleton: ref2, greetingName: "Charlie") + + try await self.ensureNodes(.up, on: second, within: .seconds(10), nodes: third.cluster.uniqueNode, fourth.cluster.uniqueNode) + pinfo("Fourth node joined, will become leader; Members now: \([fourth.cluster.uniqueNode, second.cluster.uniqueNode, third.cluster.uniqueNode])") + + ref2Task.cancel() + ref3Task.cancel() + + let got2 = try await ref2Task.value + pinfo("Received replies (by \(ref2)) from singleton: \(got2)") + + let got2First = got2.first + got2First.shouldNotBeNil() + got2First!.shouldStartWith(prefix: "Hello-4 Bob") + + if got2First!.starts(with: "Hello-4 Bob (1)!") { + pinfo(" No messages were lost! Total \(got2.count) deliveries.") + } else { + pinfo(" Initial messages may have been lost, delivered message: \(String(describing: got2First))") + } + + let got3 = try await ref3Task.value + pinfo("Received replies (by \(ref3)) from singleton: \(got3)") + + let got3First = got3.first + got3First.shouldNotBeNil() + got3First!.shouldStartWith(prefix: "Hello-4 Charlie") + + if got3First!.starts(with: "Hello-4 Charlie (1)!") { + pinfo(" No messages were lost! Total \(got3.count) deliveries.") + } else { + pinfo(" Initial messages may have been lost, delivered message: \(String(describing: got3First))") + } + + let stop = ContinuousClock.Instant.now + pinfo("Singleton re-pointing took: \((stop - start).prettyDescription)") + + pinfo("Nodes communicated successfully with singleton on [fourth]") + } + func test_remoteCallShouldFailAfterAllocationTimedOut() async throws { - var singletonSettings = ClusterSingletonSettings(name: TheSingleton.name) + var singletonSettings = ClusterSingletonSettings() singletonSettings.allocationStrategy = .byLeadership singletonSettings.allocationTimeout = .milliseconds(100) @@ -87,10 +300,11 @@ final class ClusterSingletonPluginClusteredTests: ClusteredActorSystemsXCTestCas } // Bring up `ClusterSingletonBoss` before setting up cluster (https://github.com/apple/swift-distributed-actors/issues/463) - _ = try await first.singleton.host(of: TheSingleton.self, settings: singletonSettings) { actorSystem in + let name = "the-one" + _ = try await first.singleton.host(name: name, settings: singletonSettings) { actorSystem in TheSingleton(greeting: "Hello-1", actorSystem: actorSystem) } - let ref2 = try await second.singleton.host(of: TheSingleton.self, settings: singletonSettings) { actorSystem in + let ref2 = try await second.singleton.host(name: name, settings: singletonSettings) { actorSystem in TheSingleton(greeting: "Hello-2", actorSystem: actorSystem) } @@ -122,191 +336,6 @@ final class ClusterSingletonPluginClusteredTests: ClusteredActorSystemsXCTestCas } } - /* - - func test_singletonByClusterLeadership_stashMessagesIfNoLeader() throws { - var singletonSettings = ActorSingletonSettings(name: GreeterSingleton.name) - singletonSettings.allocationStrategy = .byLeadership - - let first = self.setUpNode("first") { settings in - settings += ActorSingletonPlugin() - - settings.node.port = 7111 - settings.autoLeaderElection = .lowestReachable(minNumberOfMembers: 3) - settings.serialization.register(GreeterSingleton.Message.self) - } - let second = self.setUpNode("second") { settings in - settings += ActorSingletonPlugin() - - settings.node.port = 8222 - settings.autoLeaderElection = .lowestReachable(minNumberOfMembers: 3) - settings.serialization.register(GreeterSingleton.Message.self) - } - let third = self.setUpNode("third") { settings in - settings += ActorSingletonPlugin() - - settings.node.port = 9333 - settings.autoLeaderElection = .lowestReachable(minNumberOfMembers: 3) - settings.serialization.register(GreeterSingleton.Message.self) - } - - // No leader so singleton is not available, messages sent should be stashed - let replyProbe1 = self.testKit(first).makeTestProbe(expecting: String.self) - let ref1 = try first.singleton.host(GreeterSingleton.Message.self, settings: singletonSettings, GreeterSingleton("Hello-1").behavior) - ref1.tell(.greet(name: "Alice-1", replyTo: replyProbe1.ref)) - - let replyProbe2 = self.testKit(second).makeTestProbe(expecting: String.self) - let ref2 = try second.singleton.host(GreeterSingleton.Message.self, settings: singletonSettings, GreeterSingleton("Hello-2").behavior) - ref2.tell(.greet(name: "Bob-2", replyTo: replyProbe2.ref)) - - let replyProbe3 = self.testKit(third).makeTestProbe(expecting: String.self) - let ref3 = try third.singleton.host(GreeterSingleton.Message.self, settings: singletonSettings, GreeterSingleton("Hello-3").behavior) - ref3.tell(.greet(name: "Charlie-3", replyTo: replyProbe3.ref)) - - try replyProbe1.expectNoMessage(for: .milliseconds(200)) - try replyProbe2.expectNoMessage(for: .milliseconds(200)) - try replyProbe3.expectNoMessage(for: .milliseconds(200)) - - first.cluster.join(node: second.cluster.uniqueNode.node) - third.cluster.join(node: second.cluster.uniqueNode.node) - - // `first` becomes the leader (lowest address) and runs the singleton - try self.ensureNodes(.up, nodes: first.cluster.uniqueNode, second.cluster.uniqueNode, third.cluster.uniqueNode) - - try replyProbe1.expectMessage("Hello-1 Alice-1!") - try replyProbe2.expectMessage("Hello-1 Bob-2!") - try replyProbe3.expectMessage("Hello-1 Charlie-3!") - } - - func test_singletonByClusterLeadership_withLeaderChange() throws { - var singletonSettings = ActorSingletonSettings(name: GreeterSingleton.name) - singletonSettings.allocationStrategy = .byLeadership - - let first = self.setUpNode("first") { settings in - settings += ActorSingletonPlugin() - - settings.node.port = 7111 - settings.autoLeaderElection = .lowestReachable(minNumberOfMembers: 3) - settings.serialization.register(GreeterSingleton.Message.self) - } - let second = self.setUpNode("second") { settings in - settings += ActorSingletonPlugin() - - settings.node.port = 8222 - settings.autoLeaderElection = .lowestReachable(minNumberOfMembers: 3) - settings.serialization.register(GreeterSingleton.Message.self) - } - let third = self.setUpNode("third") { settings in - settings += ActorSingletonPlugin() - - settings.node.port = 9333 - settings.autoLeaderElection = .lowestReachable(minNumberOfMembers: 3) - settings.serialization.register(GreeterSingleton.Message.self) - } - let fourth = self.setUpNode("fourth") { settings in - settings += ActorSingletonPlugin() - - settings.node.port = 7444 - settings.autoLeaderElection = .lowestReachable(minNumberOfMembers: 3) - settings.serialization.register(GreeterSingleton.Message.self) - } - - // Bring up `ActorSingletonProxy` before setting up cluster (https://github.com/apple/swift-distributed-actors/issues/463) - let ref1 = try first.singleton.host(GreeterSingleton.Message.self, settings: singletonSettings, GreeterSingleton("Hello-1").behavior) - let ref2 = try second.singleton.host(GreeterSingleton.Message.self, settings: singletonSettings, GreeterSingleton("Hello-2").behavior) - let ref3 = try third.singleton.host(GreeterSingleton.Message.self, settings: singletonSettings, GreeterSingleton("Hello-3").behavior) - _ = try fourth.singleton.host(GreeterSingleton.Message.self, settings: singletonSettings, GreeterSingleton("Hello-4").behavior) - - first.cluster.join(node: second.cluster.uniqueNode.node) - third.cluster.join(node: second.cluster.uniqueNode.node) - - try self.ensureNodes(.up, nodes: first.cluster.uniqueNode, second.cluster.uniqueNode, third.cluster.uniqueNode) - pinfo("Nodes up: \([first.cluster.uniqueNode, second.cluster.uniqueNode, third.cluster.uniqueNode])") - - let replyProbe2 = self.testKit(second).makeTestProbe(expecting: String.self) - let replyProbe3 = self.testKit(third).makeTestProbe(expecting: String.self) - - // `first` has the lowest address so it should be the leader and singleton - try self.assertSingletonRequestReply(first, singletonRef: ref1, message: "Alice", expect: "Hello-1 Alice!") - try self.assertSingletonRequestReply(second, singletonRef: ref2, message: "Bob", expect: "Hello-1 Bob!") - try self.assertSingletonRequestReply(third, singletonRef: ref3, message: "Charlie", expect: "Hello-1 Charlie!") - pinfo("All three nodes communicated with singleton") - - let firstNode = first.cluster.uniqueNode - first.cluster.leave() - - // Make sure that `second` and `third` see `first` as down and become leader-less - try self.testKit(second).eventually(within: .seconds(10)) { - try self.assertMemberStatus(on: second, node: firstNode, is: .down) - try self.assertLeaderNode(on: second, is: nil) - } - try self.testKit(third).eventually(within: .seconds(10)) { - try self.assertMemberStatus(on: third, node: firstNode, is: .down) - try self.assertLeaderNode(on: third, is: nil) - } - pinfo("Node \(first.cluster.uniqueNode) left cluster...") - - // `fourth` will become the new leader and singleton - pinfo("Node \(fourth.cluster.uniqueNode) joining cluster...") - fourth.cluster.join(node: second.cluster.uniqueNode.node) - let start = ContinuousClock.Instant.now() - - // No leader so singleton is not available, messages sent should be stashed - _ = try second._spawn("teller", of: String.self, .setup { context in - context.timers.startPeriodic(key: "periodic-try-send", message: "tick", interval: .seconds(1)) - var attempt = 0 - - return .receiveMessage { _ in - attempt += 1 - // No leader so singleton is not available, messages sent should be stashed - let m2 = "Bob-2 (\(attempt))" - pnote(" Sending: \(m2) -> \(ref2) (it may be terminated/not-re-pointed yet)") - ref2.tell(.greet(name: m2, replyTo: replyProbe2.ref)) - - let m3 = "Charlie-3 (\(attempt))" - pnote(" Sending: \(m3) -> \(ref3) (it may be terminated/not-re-pointed yet)") - ref3.tell(.greet(name: m3, replyTo: replyProbe3.ref)) - return .same - } - }) - - try self.ensureNodes(.up, on: second, nodes: second.cluster.uniqueNode, third.cluster.uniqueNode, fourth.cluster.uniqueNode) - pinfo("Fourth node joined, will become leader; Members now: \([fourth.cluster.uniqueNode, second.cluster.uniqueNode, third.cluster.uniqueNode])") - - // The stashed messages get routed to new singleton running on `fourth` - let got2 = try replyProbe2.expectMessage() - got2.shouldStartWith(prefix: "Hello-4 Bob-2") - pinfo("Received reply (by \(replyProbe2.id.path)) from singleton: \(got2)") - if got2 == "Hello-4 Bob-2 (1)!" { - var counter = 0 - while try replyProbe2.maybeExpectMessage(within: .milliseconds(100)) != nil { - counter += 1 - } - pinfo(" No messages were lost! Including \(counter) more, following the previous delivery.") - } else { - pinfo(" Initial messages may have been lost, delivered message: \(got2)") - } - - let got3 = try replyProbe3.expectMessage() - got3.shouldStartWith(prefix: "Hello-4 Charlie-3") - pinfo("Received reply (by \(replyProbe3.id.path)) from singleton: \(got3)") - if got3 == "Hello-4 Charlie-3 (1)!" { - var counter = 0 - while try replyProbe3.maybeExpectMessage(within: .milliseconds(100)) != nil { - counter += 1 - } - pinfo(" No messages were lost! Including \(counter) more, following the previous delivery.") - } else { - pinfo(" Initial messages may have been lost, delivered message: \(got3)") - } - - let stop = ContinuousClock.Instant.now() - pinfo("Singleton re-pointing took: \((stop - start).prettyDescription)") - - pinfo("Nodes communicated successfully with singleton on [fourth]") - } - */ - /// Since during re-balancing it may happen that a message gets lost, we send messages a few times and only if none "got through" it would be a serious error. private func assertSingletonRequestReply(_ system: ClusterSystem, singleton: TheSingleton, greetingName: String, expectedPrefix: String) async throws { let testKit: ActorTestKit = self.testKit(system) @@ -316,7 +345,9 @@ final class ClusterSingletonPluginClusteredTests: ClusteredActorSystemsXCTestCas attempts += 1 do { - let reply = try await singleton.greet(name: greetingName) + let reply = try await RemoteCall.with(timeout: .seconds(1)) { + try await singleton.greet(name: greetingName) + } reply.shouldStartWith(prefix: expectedPrefix) } catch { throw TestError( @@ -332,8 +363,6 @@ final class ClusterSingletonPluginClusteredTests: ClusteredActorSystemsXCTestCas distributed actor TheSingleton: ClusterSingletonProtocol { typealias ActorSystem = ClusterSystem - static let name = "greeter" - private let greeting: String init(greeting: String, actorSystem: ActorSystem) { @@ -342,6 +371,6 @@ distributed actor TheSingleton: ClusterSingletonProtocol { } distributed func greet(name: String) -> String { - "\(self.greeting) \(name)! (from node: \(self.id.uniqueNode)" + "\(self.greeting) \(name)! (from node: \(self.id.uniqueNode), id: \(self.id.detailedDescription))" } } diff --git a/Tests/DistributedActorsTests/Plugins/ClusterSingleton/ClusterSingletonPluginTests.swift b/Tests/DistributedActorsTests/Plugins/ClusterSingleton/ClusterSingletonPluginTests.swift index 76b8bc89e..244281522 100644 --- a/Tests/DistributedActorsTests/Plugins/ClusterSingleton/ClusterSingletonPluginTests.swift +++ b/Tests/DistributedActorsTests/Plugins/ClusterSingleton/ClusterSingletonPluginTests.swift @@ -18,23 +18,23 @@ import XCTest final class ClusterSingletonPluginTests: ClusterSystemXCTestCase { func test_singletonPlugin_clusterDisabled() async throws { - throw XCTSkip("!!! Skipping test \(#function) !!!") // FIXME(distributed): disable test until https://github.com/apple/swift-distributed-actors/pull/1001 - // Singleton should work just fine without clustering let test = await setUpNode("test") { settings in settings.enabled = false settings += ClusterSingletonPlugin() } + let name = "the-one" + // singleton.host - let ref = try await test.singleton.host(of: TheSingleton.self, name: TheSingleton.name) { actorSystem in + let ref = try await test.singleton.host(name: name) { actorSystem in TheSingleton(greeting: "Hello", actorSystem: actorSystem) } let reply = try await ref.greet(name: "Charlie") reply.shouldStartWith(prefix: "Hello Charlie!") // singleton.ref (proxy-only) - let proxyRef = try await test.singleton.proxy(of: TheSingleton.self, name: TheSingleton.name) + let proxyRef = try await test.singleton.proxy(TheSingleton.self, name: name) let proxyReply = try await proxyRef.greet(name: "Charlene") proxyReply.shouldStartWith(prefix: "Hello Charlene!") }