From b8c1fe952e09798509fce23d5b29d98f200c87de Mon Sep 17 00:00:00 2001 From: Yim Lee Date: Tue, 7 Jun 2022 16:39:51 -0700 Subject: [PATCH 1/3] Implement remoteCall without ask actors Resolves https://github.com/apple/swift-distributed-actors/issues/947 --- .../Cluster/Association.swift | 6 + .../RemoteClusterActorPersonality.swift | 14 ++ .../Transport/TransportPipelines.swift | 31 ++- Sources/DistributedActors/ClusterSystem.swift | 193 +++++++++++++----- .../InvocationBehavior.swift | 40 +--- Sources/DistributedActors/Refs.swift | 13 -- .../ClusterSystemTests.swift | 2 +- 7 files changed, 191 insertions(+), 108 deletions(-) diff --git a/Sources/DistributedActors/Cluster/Association.swift b/Sources/DistributedActors/Cluster/Association.swift index e2aeeff6a..61f5d335d 100644 --- a/Sources/DistributedActors/Cluster/Association.swift +++ b/Sources/DistributedActors/Cluster/Association.swift @@ -171,6 +171,12 @@ extension Association { let transportEnvelope = TransportEnvelope(envelope: envelope, recipient: recipient) self._send(transportEnvelope, promise: promise) } + + /// Concurrency: safe to invoke from any thread. + func sendInvocation(_ invocation: InvocationMessage, recipient: ActorID, promise: EventLoopPromise? = nil) { + let transportEnvelope = TransportEnvelope(invocation: invocation, recipient: recipient) + self._send(transportEnvelope, promise: promise) + } /// Concurrency: safe to invoke from any thread. func sendSystemMessage(_ message: _SystemMessage, recipient: ActorID, promise: EventLoopPromise? = nil) { diff --git a/Sources/DistributedActors/Cluster/Transport/RemoteClusterActorPersonality.swift b/Sources/DistributedActors/Cluster/Transport/RemoteClusterActorPersonality.swift index 2522dbdc7..8d676b4c9 100644 --- a/Sources/DistributedActors/Cluster/Transport/RemoteClusterActorPersonality.swift +++ b/Sources/DistributedActors/Cluster/Transport/RemoteClusterActorPersonality.swift @@ -101,6 +101,20 @@ public final class _RemoteClusterActorPersonality { self.deadLetters.tell(message, file: file, line: line) } } + + @usableFromInline + func sendInvocation(_ invocation: InvocationMessage, file: String = #file, line: UInt = #line) { + traceLog_Cell("RemoteActorRef(\(self.id)) sendInvocation: \(invocation)") + + switch self.association { + case .association(let association): + association.sendInvocation(invocation, recipient: self.id) + self.instrumentation.actorTold(message: invocation, from: nil) + case .tombstone: + // TODO: metric for dead letter: self.instrumentation.deadLetter(message: message, from: nil) + self.system.deadLetters.tell(DeadLetter(invocation, recipient: self.id), file: file, line: line) + } + } @usableFromInline func sendSystemMessage(_ message: _SystemMessage, file: String = #file, line: UInt = #line) { diff --git a/Sources/DistributedActors/Cluster/Transport/TransportPipelines.swift b/Sources/DistributedActors/Cluster/Transport/TransportPipelines.swift index ca087c6aa..d3aac62f4 100644 --- a/Sources/DistributedActors/Cluster/Transport/TransportPipelines.swift +++ b/Sources/DistributedActors/Cluster/Transport/TransportPipelines.swift @@ -622,7 +622,28 @@ private final class UserMessageHandler: ChannelInboundHandler { /// This ends the processing chain for incoming messages. func channelRead(context: ChannelHandlerContext, data: NIOAny) { - self.deserializeThenDeliver(context, wireEnvelope: self.unwrapInboundIn(data)) + let wireEnvelope = self.unwrapInboundIn(data) + let manifestMessageType = try? self.serializationPool.serialization.summonType(from: wireEnvelope.manifest) + + // FIXME(distributed): we should be able to assume we always get either a remote invocation or reply here + switch manifestMessageType { + case .some(is InvocationMessage.Type): + do { + let invocation = try self.serializationPool.serialization.deserialize(as: InvocationMessage.self, from: wireEnvelope.payload, using: wireEnvelope.manifest) + self.system.receiveInvocation(invocation, recipient: wireEnvelope.recipient, on: context.channel) + } catch { + self.system.log.error("Failed to deserialize [\(InvocationMessage.self)]: \(error)") + } + case .some(is any AnyRemoteCallReply.Type): + do { + let reply = try self.serializationPool.serialization.deserialize(as: (any AnyRemoteCallReply).self, from: wireEnvelope.payload, using: wireEnvelope.manifest) + self.system.receiveRemoteCallReply(reply) + } catch { + self.system.log.error("Failed to deserialize [\(String(describing: manifestMessageType))]: \(error)") + } + default: + self.deserializeThenDeliver(context, wireEnvelope: wireEnvelope) + } } private func deserializeThenDeliver(_ context: ChannelHandlerContext, wireEnvelope: Wire.Envelope) { @@ -858,6 +879,7 @@ internal struct TransportEnvelope: CustomStringConvertible, CustomDebugStringCon /// Note: MAY contain SystemMessageEnvelope case message(Any) // ~~ outbound ~~ + case invocation(InvocationMessage) case systemMessage(_SystemMessage) // ~~ inbound only ~~ case systemMessageEnvelope(SystemMessageEnvelope) @@ -895,6 +917,11 @@ internal struct TransportEnvelope: CustomStringConvertible, CustomDebugStringCon self.recipient = recipient } + init(invocation: InvocationMessage, recipient: ActorID) { + self.storage = .invocation(invocation) + self.recipient = recipient + } + init(systemMessage: _SystemMessage, recipient: ActorID) { self.storage = .systemMessage(systemMessage) self.recipient = recipient @@ -923,6 +950,8 @@ internal struct TransportEnvelope: CustomStringConvertible, CustomDebugStringCon switch self.storage { case .message(let message): return message + case .invocation(let invocation): + return invocation case .systemMessage(let message): return message case .systemMessageEnvelope(let systemEnvelope): diff --git a/Sources/DistributedActors/ClusterSystem.swift b/Sources/DistributedActors/ClusterSystem.swift index d9e00520f..379d98a40 100644 --- a/Sources/DistributedActors/ClusterSystem.swift +++ b/Sources/DistributedActors/ClusterSystem.swift @@ -18,6 +18,7 @@ import CDistributedActorsMailbox import Dispatch import Distributed import DistributedActorsConcurrencyHelpers +import Foundation // for UUID import Logging import NIO @@ -32,6 +33,7 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable { public typealias InvocationEncoder = ClusterInvocationEncoder public typealias SerializationRequirement = any Codable public typealias ResultHandler = ClusterInvocationResultHandler + internal typealias CallID = UUID public let name: String @@ -89,6 +91,9 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable { return s } } + + private let inFlightCallLock = Lock() + private var _inFlightCalls: [CallID: CheckedContinuation] = [:] // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: Receptionist @@ -928,9 +933,6 @@ extension ClusterSystem { _openExistential(watcher, do: doMakeLifecycleWatch) } - let behavior = InvocationBehavior.behavior(instance: Weak(actor)) - let ref = self._spawnDistributedActor(behavior, identifiedBy: actor.id) - self._managedRefs[actor.id] = ref self._managedDistributedActors.insert(actor: actor) } @@ -979,20 +981,18 @@ extension ClusterSystem { throw RemoteCallError.clusterAlreadyShutDown } - let recipient = _ActorRef(.remote(.init(shell: clusterShell, id: actor.id._asRemote, system: self))) - + let recipient = _RemoteClusterActorPersonality(shell: clusterShell, id: actor.id._asRemote, system: self) let arguments = invocation.arguments - let ask: AskResponse> = recipient.ask(timeout: RemoteCall.timeout ?? self.settings.defaultRemoteCallTimeout) { replyTo in + + let reply: RemoteCallReply = try await self.withCallID { callID in let invocation = InvocationMessage( + callID: callID, targetIdentifier: target.identifier, - arguments: arguments, - replyToAddress: replyTo.id + arguments: arguments ) - - return invocation + recipient.sendInvocation(invocation) } - let reply = try await ask.value if let error = reply.thrownError { throw error } @@ -1012,60 +1012,127 @@ extension ClusterSystem { Act.ID == ActorID, Err: Error { - guard let shell = self._cluster else { + guard let clusterShell = self._cluster else { throw RemoteCallError.clusterAlreadyShutDown } - let recipient = _ActorRef(.remote(.init(shell: shell, id: actor.id._asRemote, system: self))) - + let recipient = _RemoteClusterActorPersonality(shell: clusterShell, id: actor.id._asRemote, system: self) let arguments = invocation.arguments - let ask: AskResponse> = recipient.ask(timeout: RemoteCall.timeout ?? self.settings.defaultRemoteCallTimeout) { replyTo in + + let reply: RemoteCallReply<_Done> = try await self.withCallID { callID in let invocation = InvocationMessage( + callID: callID, targetIdentifier: target.identifier, - arguments: arguments, - replyToAddress: replyTo.id + arguments: arguments ) - - return invocation + recipient.sendInvocation(invocation) } - let reply = try await ask.value if let error = reply.thrownError { throw error } } + + private func withCallID( + body: (CallID) -> Void + ) async throws -> Reply + where Reply: AnyRemoteCallReply { + let callID = UUID() + + let timeout = RemoteCall.timeout ?? self.settings.defaultRemoteCallTimeout + let timeoutTask: Task = Task { + await Task.sleep(UInt64(timeout.nanoseconds)) + guard !Task.isCancelled else { + return + } + self.inFlightCallLock.withLockVoid { + if let continuation = self._inFlightCalls.removeValue(forKey: callID) { + continuation.resume(throwing: RemoteCallError.timedOut(TimeoutError(message: "Remote call timed out", timeout: timeout))) + } + } + } + defer { + timeoutTask.cancel() + } + + let reply: any AnyRemoteCallReply = try await withCheckedThrowingContinuation { continuation in + self.inFlightCallLock.withLock { + self._inFlightCalls[callID] = continuation // this is to be resumed from an incoming reply or timeout + } + body(callID) + } + + guard let reply = reply as? Reply else { + // ClusterInvocationResultHandler.onThrow returns RemoteCallReply<_Done> for both + // remoteCallVoid and remoteCall (i.e., it doesn't send back RemoteCallReply). + // The guard check above fails for the latter use-case because of type mismatch. + // The if-block converts the error reply to the proper type then returns it. + if let thrownError = reply.thrownError { + return Reply.init(callID: reply.callID, error: thrownError) + } + + self.log.error("Expected [\(Reply.self)] but got [\(type(of: reply as Any))]") + throw RemoteCallError.invalidReply + } + return reply + } } extension ClusterSystem { - func receiveInvocation(actor: some DistributedActor, message: InvocationMessage) async { + func receiveInvocation(_ invocation: InvocationMessage, recipient: ActorID, on channel: Channel) { guard let shell = self._cluster else { - self.log.error("Cluster has shut down already, yet received message. Message will be dropped: \(message)") + self.log.error("Cluster has shut down already, yet received message. Message will be dropped: \(invocation)") + return + } + + guard let actor = self.resolve(id: recipient) else { + self.log.error("Unable to resolve recipient \(recipient). Message will be dropped: \(invocation)") return } - let target = message.target - - var decoder = ClusterInvocationDecoder(system: self, message: message) - let resultHandler = ClusterInvocationResultHandler( - system: self, - clusterShell: shell, - replyTo: message.replyToAddress - ) + Task { + var decoder = ClusterInvocationDecoder(system: self, message: invocation) - do { - try await executeDistributedTarget( - on: actor, - target: target, - invocationDecoder: &decoder, - handler: resultHandler + let target = invocation.target + let resultHandler = ClusterInvocationResultHandler( + system: self, + clusterShell: shell, + callID: invocation.callID, + channel: channel, + recipient: recipient ) - } catch { - // FIXME(distributed): is this right? + do { - try await resultHandler.onThrow(error: error) + try await executeDistributedTarget( + on: actor, + target: target, + invocationDecoder: &decoder, + handler: resultHandler + ) } catch { - self.log.warning("Unable to invoke result handler for \(message.target) call, error: \(error)") + // FIXME(distributed): is this right? + do { + try await resultHandler.onThrow(error: error) + } catch { + self.log.warning("Unable to invoke result handler for \(invocation.target) call, error: \(error)") + } + } + } + } + + func receiveRemoteCallReply(_ reply: any AnyRemoteCallReply) { + self.inFlightCallLock.withLockVoid { + guard let continuation = self._inFlightCalls.removeValue(forKey: reply.callID) else { + self.log.warning("Missing continuation for remote call \(reply.callID). Reply will be dropped: \(reply)") // this could be because remote call has timed out + return } + continuation.resume(returning: reply) + } + } + + private func resolve(id: ActorID) -> (any DistributedActor)? { + self.namingLock.withLock { + self._managedDistributedActors.get(identifiedBy: id) } } } @@ -1075,60 +1142,73 @@ public struct ClusterInvocationResultHandler: DistributedTargetInvocationResultH let system: ClusterSystem let clusterShell: ClusterShell - let replyToID: ActorID + let callID: ClusterSystem.CallID + let channel: Channel + let recipient: ClusterSystem.ActorID // FIXME(distributed): remove; we need it only because TransportEnvelope requires it - init(system: ClusterSystem, clusterShell: ClusterShell, replyTo: ActorID) { + init(system: ClusterSystem, clusterShell: ClusterShell, callID: ClusterSystem.CallID, channel: Channel, recipient: ClusterSystem.ActorID) { self.system = system self.clusterShell = clusterShell - self.replyToID = replyTo + self.callID = callID + self.channel = channel + self.recipient = recipient } public func onReturn(value: Success) async throws { - let ref = _ActorRef>(.remote(.init(shell: clusterShell, id: replyToID, system: system))) - ref.tell(.init(value: value)) + let reply = RemoteCallReply(callID: self.callID, value: value) + try await channel.writeAndFlush(TransportEnvelope(envelope: Payload(payload: .message(reply)), recipient: self.recipient)) } public func onReturnVoid() async throws { - let ref = _ActorRef>(.remote(.init(shell: clusterShell, id: replyToID, system: system))) - ref.tell(.init(value: _Done.done)) + let reply = RemoteCallReply<_Done>(callID: self.callID, value: .done) + try await channel.writeAndFlush(TransportEnvelope(envelope: Payload(payload: .message(reply)), recipient: self.recipient)) } public func onThrow(error: Err) async throws { self.system.log.warning("Result handler, onThrow: \(error)") - let ref = _ActorRef>(.remote(.init(shell: clusterShell, id: replyToID, system: system))) + let reply: RemoteCallReply<_Done> if let codableError = error as? (Error & Codable) { - ref.tell(.init(error: codableError)) + reply = .init(callID: self.callID, error: codableError) } else { - ref.tell(.init(error: GenericRemoteCallError(message: "Remote call error of [\(type(of: error as Any))] type occurred"))) + reply = .init(callID: self.callID, error: GenericRemoteCallError(message: "Remote call error of [\(type(of: error as Any))] type occurred")) } + try await channel.writeAndFlush(TransportEnvelope(envelope: Payload(payload: .message(reply)), recipient: self.recipient)) } } protocol AnyRemoteCallReply: Codable { associatedtype Value: Codable + typealias CallID = ClusterSystem.CallID + var callID: CallID { get } var value: Value? { get } var thrownError: (any Error & Codable)? { get } - init(value: Value) - init(error: Err) + init(callID: CallID, value: Value) + init(callID: CallID, error: Err) } struct RemoteCallReply: AnyRemoteCallReply { + typealias CallID = ClusterSystem.CallID + + let callID: CallID let value: Value? let thrownError: (any Error & Codable)? - init(value: Value) { + init(callID: CallID, value: Value) { + self.callID = callID self.value = value self.thrownError = nil } - init(error: Err) { + init(callID: CallID, error: Err) { + self.callID = callID self.value = nil self.thrownError = error } enum CodingKeys: String, CodingKey { + case callID = "cid" case value = "v" case wasThrow = "t" case thrownError = "e" @@ -1141,8 +1221,9 @@ struct RemoteCallReply: AnyRemoteCallReply { } let container = try decoder.container(keyedBy: CodingKeys.self) + self.callID = try container.decode(CallID.self, forKey: .callID) + let wasThrow = try container.decodeIfPresent(Bool.self, forKey: .wasThrow) ?? false - if wasThrow { let errorManifest = try container.decode(Serialization.Manifest.self, forKey: .thrownErrorManifest) let summonedErrorType = try context.serialization.summonType(from: errorManifest) @@ -1163,6 +1244,8 @@ struct RemoteCallReply: AnyRemoteCallReply { } var container = encoder.container(keyedBy: CodingKeys.self) + try container.encode(self.callID, forKey: .callID) + if let thrownError = self.thrownError { try container.encode(true, forKey: .wasThrow) let errorManifest = try context.serialization.outboundManifest(type(of: thrownError)) diff --git a/Sources/DistributedActors/InvocationBehavior.swift b/Sources/DistributedActors/InvocationBehavior.swift index 02a9dac23..abc4622c5 100644 --- a/Sources/DistributedActors/InvocationBehavior.swift +++ b/Sources/DistributedActors/InvocationBehavior.swift @@ -18,51 +18,15 @@ import struct Foundation.Data /// Representation of the distributed invocation in the Behavior APIs. /// This needs to be removed eventually as we remove behaviors. public struct InvocationMessage: Sendable, Codable, CustomStringConvertible { + let callID: ClusterSystem.CallID let targetIdentifier: String let arguments: [Data] - var replyToAddress: ActorID var target: RemoteCallTarget { RemoteCallTarget(targetIdentifier) } public var description: String { - "InvocationMessage(target: \(target), arguments: \(arguments.count))" - } -} - -enum InvocationBehavior { - static func behavior(instance weakInstance: Weak) -> _Behavior { - return _Behavior.setup { context in - return ._receiveMessageAsync { (message) async throws -> _Behavior in - guard let instance = weakInstance.actor else { - context.log.warning("Received message \(message) while distributed actor instance was released! Stopping...") - context.system.personalDeadLetters(type: InvocationMessage.self, recipient: context.id).tell(message) - return .stop - } - - await context.system.receiveInvocation(actor: instance, message: message) - return .same - }.receiveSignal { _, signal in - - // We received a signal, but our target actor instance was already released; - // This should not really happen, but let's handle it by stopping the behavior. - guard let instance = weakInstance.actor else { - return .stop - } - - if let terminated = signal as? _Signals.Terminated { - if let watcher = instance as? (any LifecycleWatch) { - let watch = watcher.actorSystem._getLifecycleWatch(watcher: watcher) - watch?.receiveTerminated(terminated) - return .same - } - } - - // Invocation behaviors don't really handle signals at all. - // Watching is done via `LifecycleWatch`. - return .unhandled - } - } + "InvocationMessage(callID: \(callID), target: \(target), arguments: \(arguments.count))" } } diff --git a/Sources/DistributedActors/Refs.swift b/Sources/DistributedActors/Refs.swift index 698376ae6..83dcc9e30 100644 --- a/Sources/DistributedActors/Refs.swift +++ b/Sources/DistributedActors/Refs.swift @@ -237,19 +237,6 @@ extension _ActorRef { // FIXME: can this be removed? public func _tellOrDeadLetter(_ message: Any, file: String = #file, line: UInt = #line) { guard let _message = message as? Message else { - // ClusterInvocationResultHandler.onThrow returns RemoteCallReply<_Done> for both - // remoteCallVoid and remoteCall (i.e., it doesn't send back RemoteCallReply). - // The guard check above fails for the latter use-case because of message type mismatch. - // The logic below converts the error reply to the proper type then sends it to the actor. - if let ReplyType = Message.self as? any AnyRemoteCallReply.Type, - let remoteCallReply = message as? any AnyRemoteCallReply, - let remoteCallError = remoteCallReply.thrownError, - let _message = ReplyType.init(error: remoteCallError) as? Message - { - self.tell(_message, file: file, line: line) - return - } - traceLog_Mailbox(self.path, "_tellOrDeadLetter: [\(message)] failed because of invalid message type, to: \(self); Sent at \(file):\(line)") self._dropAsDeadLetter(message, file: file, line: line) return // TODO: "drop" the message rather than dead letter it? diff --git a/Tests/DistributedActorsTests/ClusterSystemTests.swift b/Tests/DistributedActorsTests/ClusterSystemTests.swift index 9af4c7b6d..0dfbd098a 100644 --- a/Tests/DistributedActorsTests/ClusterSystemTests.swift +++ b/Tests/DistributedActorsTests/ClusterSystemTests.swift @@ -199,7 +199,7 @@ final class ClusterSystemTests: ClusterSystemXCTestCase { // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: Remote call API tests - func test_remoteCall_success() async throws { + func test_remoteCall() async throws { let local = await setUpNode("local") { settings in settings.enabled = true settings.serialization.registerInbound(GreeterCodableError.self) From a7c8dd6e560ccf68de4832084ec959c1a8b997d7 Mon Sep 17 00:00:00 2001 From: Yim Lee Date: Tue, 7 Jun 2022 23:47:49 -0700 Subject: [PATCH 2/3] Keep InvocationBehavior --- Sources/DistributedActors/ClusterSystem.swift | 3 ++ .../InvocationBehavior.swift | 38 +++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/Sources/DistributedActors/ClusterSystem.swift b/Sources/DistributedActors/ClusterSystem.swift index 379d98a40..52f79fcc9 100644 --- a/Sources/DistributedActors/ClusterSystem.swift +++ b/Sources/DistributedActors/ClusterSystem.swift @@ -933,6 +933,9 @@ extension ClusterSystem { _openExistential(watcher, do: doMakeLifecycleWatch) } + let behavior = InvocationBehavior.behavior(instance: Weak(actor)) + let ref = self._spawnDistributedActor(behavior, identifiedBy: actor.id) + self._managedRefs[actor.id] = ref self._managedDistributedActors.insert(actor: actor) } diff --git a/Sources/DistributedActors/InvocationBehavior.swift b/Sources/DistributedActors/InvocationBehavior.swift index abc4622c5..56130713a 100644 --- a/Sources/DistributedActors/InvocationBehavior.swift +++ b/Sources/DistributedActors/InvocationBehavior.swift @@ -30,3 +30,41 @@ public struct InvocationMessage: Sendable, Codable, CustomStringConvertible { "InvocationMessage(callID: \(callID), target: \(target), arguments: \(arguments.count))" } } + +// FIXME(distributed): remove [#957](https://github.com/apple/swift-distributed-actors/issues/957) +enum InvocationBehavior { + static func behavior(instance weakInstance: Weak) -> _Behavior { + return _Behavior.setup { context in + return ._receiveMessageAsync { (message) async throws -> _Behavior in + guard let instance = weakInstance.actor else { + context.log.warning("Received message \(message) while distributed actor instance was released! Stopping...") + context.system.personalDeadLetters(type: InvocationMessage.self, recipient: context.id).tell(message) + return .stop + } + + // `InvocationMessage`s are handled in `UserMessageHandler` +// await context.system.receiveInvocation(actor: instance, message: message) + return .same + }.receiveSignal { _, signal in + + // We received a signal, but our target actor instance was already released; + // This should not really happen, but let's handle it by stopping the behavior. + guard let instance = weakInstance.actor else { + return .stop + } + + if let terminated = signal as? _Signals.Terminated { + if let watcher = instance as? (any LifecycleWatch) { + let watch = watcher.actorSystem._getLifecycleWatch(watcher: watcher) + watch?.receiveTerminated(terminated) + return .same + } + } + + // Invocation behaviors don't really handle signals at all. + // Watching is done via `LifecycleWatch`. + return .unhandled + } + } + } + } From 8eff4df0822f15fc5418d119e458885da391fad5 Mon Sep 17 00:00:00 2001 From: Yim Lee Date: Tue, 7 Jun 2022 23:51:26 -0700 Subject: [PATCH 3/3] Fix formatting --- .../Cluster/Association.swift | 2 +- .../RemoteClusterActorPersonality.swift | 2 +- .../Transport/TransportPipelines.swift | 2 +- Sources/DistributedActors/ClusterSystem.swift | 31 +++++----- .../InvocationBehavior.swift | 60 +++++++++---------- 5 files changed, 49 insertions(+), 48 deletions(-) diff --git a/Sources/DistributedActors/Cluster/Association.swift b/Sources/DistributedActors/Cluster/Association.swift index 61f5d335d..d32e36273 100644 --- a/Sources/DistributedActors/Cluster/Association.swift +++ b/Sources/DistributedActors/Cluster/Association.swift @@ -171,7 +171,7 @@ extension Association { let transportEnvelope = TransportEnvelope(envelope: envelope, recipient: recipient) self._send(transportEnvelope, promise: promise) } - + /// Concurrency: safe to invoke from any thread. func sendInvocation(_ invocation: InvocationMessage, recipient: ActorID, promise: EventLoopPromise? = nil) { let transportEnvelope = TransportEnvelope(invocation: invocation, recipient: recipient) diff --git a/Sources/DistributedActors/Cluster/Transport/RemoteClusterActorPersonality.swift b/Sources/DistributedActors/Cluster/Transport/RemoteClusterActorPersonality.swift index 8d676b4c9..531697e53 100644 --- a/Sources/DistributedActors/Cluster/Transport/RemoteClusterActorPersonality.swift +++ b/Sources/DistributedActors/Cluster/Transport/RemoteClusterActorPersonality.swift @@ -101,7 +101,7 @@ public final class _RemoteClusterActorPersonality { self.deadLetters.tell(message, file: file, line: line) } } - + @usableFromInline func sendInvocation(_ invocation: InvocationMessage, file: String = #file, line: UInt = #line) { traceLog_Cell("RemoteActorRef(\(self.id)) sendInvocation: \(invocation)") diff --git a/Sources/DistributedActors/Cluster/Transport/TransportPipelines.swift b/Sources/DistributedActors/Cluster/Transport/TransportPipelines.swift index d3aac62f4..3257fd164 100644 --- a/Sources/DistributedActors/Cluster/Transport/TransportPipelines.swift +++ b/Sources/DistributedActors/Cluster/Transport/TransportPipelines.swift @@ -624,7 +624,7 @@ private final class UserMessageHandler: ChannelInboundHandler { func channelRead(context: ChannelHandlerContext, data: NIOAny) { let wireEnvelope = self.unwrapInboundIn(data) let manifestMessageType = try? self.serializationPool.serialization.summonType(from: wireEnvelope.manifest) - + // FIXME(distributed): we should be able to assume we always get either a remote invocation or reply here switch manifestMessageType { case .some(is InvocationMessage.Type): diff --git a/Sources/DistributedActors/ClusterSystem.swift b/Sources/DistributedActors/ClusterSystem.swift index 52f79fcc9..c25f51c7f 100644 --- a/Sources/DistributedActors/ClusterSystem.swift +++ b/Sources/DistributedActors/ClusterSystem.swift @@ -91,7 +91,7 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable { return s } } - + private let inFlightCallLock = Lock() private var _inFlightCalls: [CallID: CheckedContinuation] = [:] @@ -986,7 +986,7 @@ extension ClusterSystem { let recipient = _RemoteClusterActorPersonality(shell: clusterShell, id: actor.id._asRemote, system: self) let arguments = invocation.arguments - + let reply: RemoteCallReply = try await self.withCallID { callID in let invocation = InvocationMessage( callID: callID, @@ -1035,11 +1035,12 @@ extension ClusterSystem { throw error } } - + private func withCallID( body: (CallID) -> Void ) async throws -> Reply - where Reply: AnyRemoteCallReply { + where Reply: AnyRemoteCallReply + { let callID = UUID() let timeout = RemoteCall.timeout ?? self.settings.defaultRemoteCallTimeout @@ -1057,7 +1058,7 @@ extension ClusterSystem { defer { timeoutTask.cancel() } - + let reply: any AnyRemoteCallReply = try await withCheckedThrowingContinuation { continuation in self.inFlightCallLock.withLock { self._inFlightCalls[callID] = continuation // this is to be resumed from an incoming reply or timeout @@ -1073,7 +1074,7 @@ extension ClusterSystem { if let thrownError = reply.thrownError { return Reply.init(callID: reply.callID, error: thrownError) } - + self.log.error("Expected [\(Reply.self)] but got [\(type(of: reply as Any))]") throw RemoteCallError.invalidReply } @@ -1087,7 +1088,7 @@ extension ClusterSystem { self.log.error("Cluster has shut down already, yet received message. Message will be dropped: \(invocation)") return } - + guard let actor = self.resolve(id: recipient) else { self.log.error("Unable to resolve recipient \(recipient). Message will be dropped: \(invocation)") return @@ -1122,7 +1123,7 @@ extension ClusterSystem { } } } - + func receiveRemoteCallReply(_ reply: any AnyRemoteCallReply) { self.inFlightCallLock.withLockVoid { guard let continuation = self._inFlightCalls.removeValue(forKey: reply.callID) else { @@ -1132,7 +1133,7 @@ extension ClusterSystem { continuation.resume(returning: reply) } } - + private func resolve(id: ActorID) -> (any DistributedActor)? { self.namingLock.withLock { self._managedDistributedActors.get(identifiedBy: id) @@ -1159,12 +1160,12 @@ public struct ClusterInvocationResultHandler: DistributedTargetInvocationResultH public func onReturn(value: Success) async throws { let reply = RemoteCallReply(callID: self.callID, value: value) - try await channel.writeAndFlush(TransportEnvelope(envelope: Payload(payload: .message(reply)), recipient: self.recipient)) + try await self.channel.writeAndFlush(TransportEnvelope(envelope: Payload(payload: .message(reply)), recipient: self.recipient)) } public func onReturnVoid() async throws { let reply = RemoteCallReply<_Done>(callID: self.callID, value: .done) - try await channel.writeAndFlush(TransportEnvelope(envelope: Payload(payload: .message(reply)), recipient: self.recipient)) + try await self.channel.writeAndFlush(TransportEnvelope(envelope: Payload(payload: .message(reply)), recipient: self.recipient)) } public func onThrow(error: Err) async throws { @@ -1175,7 +1176,7 @@ public struct ClusterInvocationResultHandler: DistributedTargetInvocationResultH } else { reply = .init(callID: self.callID, error: GenericRemoteCallError(message: "Remote call error of [\(type(of: error as Any))] type occurred")) } - try await channel.writeAndFlush(TransportEnvelope(envelope: Payload(payload: .message(reply)), recipient: self.recipient)) + try await self.channel.writeAndFlush(TransportEnvelope(envelope: Payload(payload: .message(reply)), recipient: self.recipient)) } } @@ -1193,7 +1194,7 @@ protocol AnyRemoteCallReply: Codable { struct RemoteCallReply: AnyRemoteCallReply { typealias CallID = ClusterSystem.CallID - + let callID: CallID let value: Value? let thrownError: (any Error & Codable)? @@ -1225,7 +1226,7 @@ struct RemoteCallReply: AnyRemoteCallReply { let container = try decoder.container(keyedBy: CodingKeys.self) self.callID = try container.decode(CallID.self, forKey: .callID) - + let wasThrow = try container.decodeIfPresent(Bool.self, forKey: .wasThrow) ?? false if wasThrow { let errorManifest = try container.decode(Serialization.Manifest.self, forKey: .thrownErrorManifest) @@ -1248,7 +1249,7 @@ struct RemoteCallReply: AnyRemoteCallReply { var container = encoder.container(keyedBy: CodingKeys.self) try container.encode(self.callID, forKey: .callID) - + if let thrownError = self.thrownError { try container.encode(true, forKey: .wasThrow) let errorManifest = try context.serialization.outboundManifest(type(of: thrownError)) diff --git a/Sources/DistributedActors/InvocationBehavior.swift b/Sources/DistributedActors/InvocationBehavior.swift index 56130713a..c0294125b 100644 --- a/Sources/DistributedActors/InvocationBehavior.swift +++ b/Sources/DistributedActors/InvocationBehavior.swift @@ -33,38 +33,38 @@ public struct InvocationMessage: Sendable, Codable, CustomStringConvertible { // FIXME(distributed): remove [#957](https://github.com/apple/swift-distributed-actors/issues/957) enum InvocationBehavior { - static func behavior(instance weakInstance: Weak) -> _Behavior { - return _Behavior.setup { context in - return ._receiveMessageAsync { (message) async throws -> _Behavior in - guard let instance = weakInstance.actor else { - context.log.warning("Received message \(message) while distributed actor instance was released! Stopping...") - context.system.personalDeadLetters(type: InvocationMessage.self, recipient: context.id).tell(message) - return .stop - } + static func behavior(instance weakInstance: Weak) -> _Behavior { + return _Behavior.setup { context in + return ._receiveMessageAsync { (message) async throws -> _Behavior in + guard let instance = weakInstance.actor else { + context.log.warning("Received message \(message) while distributed actor instance was released! Stopping...") + context.system.personalDeadLetters(type: InvocationMessage.self, recipient: context.id).tell(message) + return .stop + } - // `InvocationMessage`s are handled in `UserMessageHandler` + // `InvocationMessage`s are handled in `UserMessageHandler` // await context.system.receiveInvocation(actor: instance, message: message) - return .same - }.receiveSignal { _, signal in + return .same + }.receiveSignal { _, signal in - // We received a signal, but our target actor instance was already released; - // This should not really happen, but let's handle it by stopping the behavior. - guard let instance = weakInstance.actor else { - return .stop - } + // We received a signal, but our target actor instance was already released; + // This should not really happen, but let's handle it by stopping the behavior. + guard let instance = weakInstance.actor else { + return .stop + } - if let terminated = signal as? _Signals.Terminated { - if let watcher = instance as? (any LifecycleWatch) { - let watch = watcher.actorSystem._getLifecycleWatch(watcher: watcher) - watch?.receiveTerminated(terminated) - return .same - } - } + if let terminated = signal as? _Signals.Terminated { + if let watcher = instance as? (any LifecycleWatch) { + let watch = watcher.actorSystem._getLifecycleWatch(watcher: watcher) + watch?.receiveTerminated(terminated) + return .same + } + } - // Invocation behaviors don't really handle signals at all. - // Watching is done via `LifecycleWatch`. - return .unhandled - } - } - } - } + // Invocation behaviors don't really handle signals at all. + // Watching is done via `LifecycleWatch`. + return .unhandled + } + } + } +}