Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Sources/DistributedActors/Cluster/Association.swift
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ extension Association {
let transportEnvelope = TransportEnvelope(envelope: envelope, recipient: recipient)
self._send(transportEnvelope, promise: promise)
}

/// Concurrency: safe to invoke from any thread.
func sendInvocation(_ invocation: InvocationMessage, recipient: ActorID, promise: EventLoopPromise<Void>? = nil) {
let transportEnvelope = TransportEnvelope(invocation: invocation, recipient: recipient)
self._send(transportEnvelope, promise: promise)
}

/// Concurrency: safe to invoke from any thread.
func sendSystemMessage(_ message: _SystemMessage, recipient: ActorID, promise: EventLoopPromise<Void>? = nil) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,20 @@ public final class _RemoteClusterActorPersonality<Message: ActorMessage> {
self.deadLetters.tell(message, file: file, line: line)
}
}

@usableFromInline
func sendInvocation(_ invocation: InvocationMessage, file: String = #file, line: UInt = #line) {
traceLog_Cell("RemoteActorRef(\(self.id)) sendInvocation: \(invocation)")

switch self.association {
case .association(let association):
association.sendInvocation(invocation, recipient: self.id)
self.instrumentation.actorTold(message: invocation, from: nil)
case .tombstone:
// TODO: metric for dead letter: self.instrumentation.deadLetter(message: message, from: nil)
self.system.deadLetters.tell(DeadLetter(invocation, recipient: self.id), file: file, line: line)
}
}

@usableFromInline
func sendSystemMessage(_ message: _SystemMessage, file: String = #file, line: UInt = #line) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,28 @@ private final class UserMessageHandler: ChannelInboundHandler {

/// This ends the processing chain for incoming messages.
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
self.deserializeThenDeliver(context, wireEnvelope: self.unwrapInboundIn(data))
let wireEnvelope = self.unwrapInboundIn(data)
let manifestMessageType = try? self.serializationPool.serialization.summonType(from: wireEnvelope.manifest)

// FIXME(distributed): we should be able to assume we always get either a remote invocation or reply here
switch manifestMessageType {
case .some(is InvocationMessage.Type):
do {
let invocation = try self.serializationPool.serialization.deserialize(as: InvocationMessage.self, from: wireEnvelope.payload, using: wireEnvelope.manifest)
self.system.receiveInvocation(invocation, recipient: wireEnvelope.recipient, on: context.channel)
} catch {
self.system.log.error("Failed to deserialize [\(InvocationMessage.self)]: \(error)")
}
case .some(is any AnyRemoteCallReply.Type):
do {
let reply = try self.serializationPool.serialization.deserialize(as: (any AnyRemoteCallReply).self, from: wireEnvelope.payload, using: wireEnvelope.manifest)
self.system.receiveRemoteCallReply(reply)
} catch {
self.system.log.error("Failed to deserialize [\(String(describing: manifestMessageType))]: \(error)")
}
default:
self.deserializeThenDeliver(context, wireEnvelope: wireEnvelope)
}
}

private func deserializeThenDeliver(_ context: ChannelHandlerContext, wireEnvelope: Wire.Envelope) {
Expand Down Expand Up @@ -858,6 +879,7 @@ internal struct TransportEnvelope: CustomStringConvertible, CustomDebugStringCon
/// Note: MAY contain SystemMessageEnvelope
case message(Any)
// ~~ outbound ~~
case invocation(InvocationMessage)
case systemMessage(_SystemMessage)
// ~~ inbound only ~~
case systemMessageEnvelope(SystemMessageEnvelope)
Expand Down Expand Up @@ -895,6 +917,11 @@ internal struct TransportEnvelope: CustomStringConvertible, CustomDebugStringCon
self.recipient = recipient
}

init(invocation: InvocationMessage, recipient: ActorID) {
self.storage = .invocation(invocation)
self.recipient = recipient
}

init(systemMessage: _SystemMessage, recipient: ActorID) {
self.storage = .systemMessage(systemMessage)
self.recipient = recipient
Expand Down Expand Up @@ -923,6 +950,8 @@ internal struct TransportEnvelope: CustomStringConvertible, CustomDebugStringCon
switch self.storage {
case .message(let message):
return message
case .invocation(let invocation):
return invocation
case .systemMessage(let message):
return message
case .systemMessageEnvelope(let systemEnvelope):
Expand Down
Loading