Skip to content

Commit e8003f3

Browse files
committed
+tracing Instrument Invocation handlers using swift-distributed-tracing
+tracing injector/extractor for tracing and InvocationMessage
1 parent 4934208 commit e8003f3

File tree

4 files changed

+127
-40
lines changed

4 files changed

+127
-40
lines changed

Package.swift

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,13 @@ var targets: [PackageDescription.Target] = [
3939
.product(name: "NIOSSL", package: "swift-nio-ssl"),
4040
.product(name: "NIOExtras", package: "swift-nio-extras"),
4141
.product(name: "SwiftProtobuf", package: "swift-protobuf"),
42-
.product(name: "Logging", package: "swift-log"),
43-
.product(name: "Metrics", package: "swift-metrics"),
4442
.product(name: "ServiceDiscovery", package: "swift-service-discovery"),
4543
.product(name: "Backtrace", package: "swift-backtrace"),
4644
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
45+
// Observability
46+
.product(name: "Logging", package: "swift-log"),
47+
.product(name: "Metrics", package: "swift-metrics"),
48+
.product(name: "Tracing", package: "swift-distributed-tracing"),
4749
]
4850
),
4951

@@ -182,7 +184,8 @@ var dependencies: [Package.Dependency] = [
182184
.package(url: "https://github.com/apple/swift-collections", from: "1.0.1"),
183185

184186
// ~~~ Observability ~~~
185-
.package(url: "https://github.com/apple/swift-log", from: "1.0.0"),
187+
.package(url: "https://github.com/apple/swift-log", from: "1.4.0"),
188+
.package(url: "https://github.com/apple/swift-distributed-tracing", branch: "main"),
186189
// swift-metrics 1.x and 2.x are almost API compatible, so most clients should use
187190
.package(url: "https://github.com/apple/swift-metrics", "1.0.0" ..< "3.0.0"),
188191
.package(url: "https://github.com/apple/swift-service-discovery", from: "1.0.0"),

Sources/DistributedCluster/ClusterSystem.swift

Lines changed: 63 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import DistributedActorsConcurrencyHelpers
2121
import Foundation // for UUID
2222
import Logging
2323
import NIO
24+
import Tracing
2425

2526
/// A `ClusterSystem` is a confined space which runs and manages Actors.
2627
///
@@ -1148,28 +1149,38 @@ extension ClusterSystem {
11481149
let recipient = _RemoteClusterActorPersonality<InvocationMessage>(shell: clusterShell, id: actor.id._asRemote, system: self)
11491150
let arguments = invocation.arguments
11501151

1151-
let reply: RemoteCallReply<Res> = try await self.withCallID(on: actor.id, target: target) { callID in
1152-
let invocation = InvocationMessage(
1153-
callID: callID,
1154-
targetIdentifier: target.identifier,
1155-
genericSubstitutions: invocation.genericSubstitutions,
1156-
arguments: arguments
1157-
)
1152+
// -- Pick up the distributed-tracing baggage; It will be injected into the message inside the withCallID body.
1153+
let baggage = Baggage.current ?? .topLevel
1154+
// TODO: we can enrich this with actor and system information here if not already present.
1155+
1156+
return try await InstrumentationSystem.tracer.withSpan("\(target)", baggage: baggage, ofKind: .client) { span in
1157+
let reply: RemoteCallReply<Res> = try await self.withCallID(on: actor.id, target: target) { callID in
1158+
var invocation = InvocationMessage(
1159+
callID: callID,
1160+
targetIdentifier: target.identifier,
1161+
genericSubstitutions: invocation.genericSubstitutions,
1162+
arguments: arguments
1163+
)
11581164

1159-
recipient.sendInvocation(invocation)
1160-
}
1165+
if let baggage {
1166+
InstrumentationSystem.instrument.inject(baggage, into: &invocation, using: .invocationMessage)
1167+
}
11611168

1162-
if let error = reply.thrownError {
1163-
throw error
1164-
}
1165-
guard let value = reply.value else {
1166-
throw RemoteCallError(
1167-
.invalidReply(reply.callID),
1168-
on: actor.id,
1169-
target: target
1170-
)
1169+
recipient.sendInvocation(invocation)
1170+
}
1171+
1172+
if let error = reply.thrownError {
1173+
throw error
1174+
}
1175+
guard let value = reply.value else {
1176+
throw RemoteCallError(
1177+
.invalidReply(reply.callID),
1178+
on: actor.id,
1179+
target: target
1180+
)
1181+
}
1182+
return value
11711183
}
1172-
return value
11731184
}
11741185

11751186
public func remoteCallVoid<Act, Err>(
@@ -1211,18 +1222,29 @@ extension ClusterSystem {
12111222
let recipient = _RemoteClusterActorPersonality<InvocationMessage>(shell: clusterShell, id: actor.id._asRemote, system: self)
12121223
let arguments = invocation.arguments
12131224

1214-
let reply: RemoteCallReply<_Done> = try await self.withCallID(on: actor.id, target: target) { callID in
1215-
let invocation = InvocationMessage(
1216-
callID: callID,
1217-
targetIdentifier: target.identifier,
1218-
genericSubstitutions: invocation.genericSubstitutions,
1219-
arguments: arguments
1220-
)
1221-
recipient.sendInvocation(invocation)
1222-
}
1225+
// -- Pick up the distributed-tracing baggage; It will be injected into the message inside the withCallID body.
1226+
let baggage = Baggage.current ?? .topLevel
1227+
// TODO: we can enrich this with actor and system information here if not already present.
1228+
1229+
return try await InstrumentationSystem.tracer.withSpan("\(target)", baggage: baggage, ofKind: .client) { span in
1230+
let reply: RemoteCallReply<_Done> = try await self.withCallID(on: actor.id, target: target) { callID in
1231+
var invocation = InvocationMessage(
1232+
callID: callID,
1233+
targetIdentifier: target.identifier,
1234+
genericSubstitutions: invocation.genericSubstitutions,
1235+
arguments: arguments
1236+
)
12231237

1224-
if let error = reply.thrownError {
1225-
throw error
1238+
if let baggage {
1239+
InstrumentationSystem.instrument.inject(baggage, into: &invocation, using: .invocationMessage)
1240+
}
1241+
1242+
recipient.sendInvocation(invocation)
1243+
}
1244+
1245+
if let error = reply.thrownError {
1246+
throw error
1247+
}
12261248
}
12271249
}
12281250

@@ -1403,6 +1425,9 @@ extension ClusterSystem {
14031425
return
14041426
}
14051427

1428+
var baggage: Baggage = .topLevel
1429+
InstrumentationSystem.instrument.extract(invocation, into: &baggage, using: .invocationMessage)
1430+
14061431
Task {
14071432
var decoder = ClusterInvocationDecoder(system: self, message: invocation)
14081433

@@ -1420,12 +1445,14 @@ extension ClusterSystem {
14201445
throw DeadLetterError(recipient: recipient)
14211446
}
14221447

1423-
try await executeDistributedTarget(
1424-
on: actor,
1425-
target: target,
1426-
invocationDecoder: &decoder,
1427-
handler: resultHandler
1428-
)
1448+
try await InstrumentationSystem.tracer.withSpan("\(target)", baggage: baggage, ofKind: .server) { span in
1449+
try await executeDistributedTarget(
1450+
on: actor,
1451+
target: target,
1452+
invocationDecoder: &decoder,
1453+
handler: resultHandler
1454+
)
1455+
}
14291456
} catch {
14301457
// FIXME(distributed): is this right?
14311458
do {

Sources/DistributedCluster/InvocationBehavior.swift

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
import Distributed
16+
import InstrumentationBaggage
1617
import struct Foundation.Data
1718

1819
/// Representation of the distributed invocation in the Behavior APIs.
@@ -23,12 +24,19 @@ public struct InvocationMessage: Sendable, Codable, CustomStringConvertible {
2324
let genericSubstitutions: [String]
2425
let arguments: [Data]
2526

27+
/// Tracing metadata, injected/extracted by distributed-tracing.
28+
var metadata: [String: String] = [:]
29+
30+
var hasMetadata: Bool {
31+
!self.metadata.isEmpty
32+
}
33+
2634
var target: RemoteCallTarget {
2735
RemoteCallTarget(targetIdentifier)
2836
}
2937

3038
public var description: String {
31-
"InvocationMessage(callID: \(callID), target: \(target), genericSubstitutions: \(genericSubstitutions), arguments: \(arguments.count))"
39+
"InvocationMessage(callID: \(callID), target: \(target), genericSubstitutions: \(genericSubstitutions), arguments: \(arguments.count), metadata: \(metadata))"
3240
}
3341
}
3442

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Distributed Actors open source project
4+
//
5+
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Tracing
16+
17+
// ==== ----------------------------------------------------------------------------------------------------------------
18+
// MARK: Injector
19+
20+
struct InvocationMessageInjector: Tracing.Injector {
21+
typealias Carrier = InvocationMessage
22+
23+
func inject(_ value: String, forKey key: String, into carrier: inout Carrier) {
24+
carrier.metadata[key] = value
25+
}
26+
}
27+
28+
extension Tracing.Injector where Self == InvocationMessageInjector {
29+
static var invocationMessage: InvocationMessageInjector {
30+
InvocationMessageInjector()
31+
}
32+
}
33+
34+
// ==== ----------------------------------------------------------------------------------------------------------------
35+
// MARK: Extractor
36+
37+
struct InvocationMessageExtractor: Tracing.Extractor {
38+
typealias Carrier = InvocationMessage
39+
40+
func extract(key: String, from carrier: Carrier) -> String? {
41+
carrier.metadata[key]
42+
}
43+
}
44+
45+
extension Tracing.Extractor where Self == InvocationMessageExtractor {
46+
static var invocationMessage: InvocationMessageExtractor {
47+
InvocationMessageExtractor()
48+
}
49+
}

0 commit comments

Comments
 (0)