Skip to content

Commit 194de5d

Browse files
committed
+sample Add new sample which showcases distributed tracing in cluster
1 parent e8003f3 commit 194de5d

File tree

12 files changed

+529
-4
lines changed

12 files changed

+529
-4
lines changed

Samples/Package.swift

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ var targets: [PackageDescription.Target] = [
2121
name: "SampleDiningPhilosophers",
2222
dependencies: [
2323
.product(name: "DistributedCluster", package: "swift-distributed-actors"),
24+
"_PrettyLogHandler",
2425
],
2526
path: "Sources/SampleDiningPhilosophers",
2627
exclude: [
@@ -29,6 +30,24 @@ var targets: [PackageDescription.Target] = [
2930
]
3031
),
3132

33+
.executableTarget(
34+
name: "SampleClusterTracing",
35+
dependencies: [
36+
.product(name: "DistributedCluster", package: "swift-distributed-actors"),
37+
.product(name: "OpenTelemetry", package: "opentelemetry-swift"),
38+
.product(name: "OtlpGRPCSpanExporting", package: "opentelemetry-swift"),
39+
"_PrettyLogHandler",
40+
],
41+
path: "Sources/SampleClusterTracing"
42+
),
43+
44+
.target(
45+
name: "_PrettyLogHandler",
46+
dependencies: [
47+
.product(name: "DistributedCluster", package: "swift-distributed-actors"),
48+
]
49+
),
50+
3251
/* --- tests --- */
3352

3453
// no-tests placeholder project to not have `swift test` fail on Samples/
@@ -45,6 +64,7 @@ var dependencies: [Package.Dependency] = [
4564
.package(name: "swift-distributed-actors", path: "../"),
4665

4766
// ~~~~~~~ only for samples ~~~~~~~
67+
.package(url: "https://github.com/slashmo/opentelemetry-swift", branch: "main"),
4868
]
4969

5070
let package = Package(
@@ -58,11 +78,14 @@ let package = Package(
5878
],
5979
products: [
6080
/* --- samples --- */
61-
6281
.executable(
6382
name: "SampleDiningPhilosophers",
6483
targets: ["SampleDiningPhilosophers"]
6584
),
85+
.executable(
86+
name: "SampleClusterTracing",
87+
targets: ["SampleClusterTracing"]
88+
),
6689
],
6790

6891
dependencies: dependencies,
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Distributed Actors open source project
4+
//
5+
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import _PrettyLogHandler
16+
import Distributed
17+
import DistributedCluster
18+
import Logging
19+
import NIO
20+
import OpenTelemetry
21+
import OtlpGRPCSpanExporting
22+
import Tracing
23+
24+
// Sleep, with adding a little bit of noise (additional delay) to the duration.
25+
func noisySleep(for duration: ContinuousClock.Duration) async {
26+
var duration = duration + .milliseconds(Int.random(in: 100 ..< 300))
27+
try? await Task.sleep(until: ContinuousClock.now + duration, clock: .continuous)
28+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Distributed Actors open source project
4+
//
5+
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import _PrettyLogHandler
16+
import Distributed
17+
import DistributedCluster
18+
import Logging
19+
import NIO
20+
import Tracing
21+
22+
protocol Chopping {
23+
func chop(_ vegetable: Vegetable) async throws -> Vegetable
24+
}
25+
26+
distributed actor VegetableChopper: Chopping {
27+
@ActorID.Metadata(\.receptionID)
28+
var receptionID: String
29+
30+
init(actorSystem: ActorSystem) async {
31+
self.actorSystem = actorSystem
32+
33+
self.receptionID = "*" // default key for "all of this type"
34+
await actorSystem.receptionist.checkIn(self)
35+
}
36+
37+
distributed func chop(_ vegetable: Vegetable) async throws -> Vegetable {
38+
await InstrumentationSystem.tracer.withSpan(#function) { _ in
39+
await noisySleep(for: .seconds(5))
40+
41+
return vegetable.asChopped
42+
}
43+
}
44+
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Distributed Actors open source project
4+
//
5+
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import _PrettyLogHandler
16+
import Distributed
17+
import DistributedCluster
18+
import Logging
19+
import NIO
20+
import Tracing
21+
22+
distributed actor PrimaryCook: LifecycleWatch {
23+
lazy var log = Logger(actor: self)
24+
25+
var choppers: [ClusterSystem.ActorID: VegetableChopper] = [:]
26+
var waitingForChoppers: (Int, CheckedContinuation<Void, Never>)?
27+
28+
init(actorSystem: ActorSystem) async {
29+
self.actorSystem = actorSystem
30+
31+
_ = self.startChopperListingTask()
32+
}
33+
34+
func startChopperListingTask() -> Task<Void, Never> {
35+
Task {
36+
for await chopper in await actorSystem.receptionist.listing(of: VegetableChopper.self) {
37+
log.notice("Discovered vegetable chopper: \(chopper.id)")
38+
self.choppers[chopper.id] = chopper
39+
40+
/// We implement a simple "if we're waiting for N choppers... let's notify the continuation once that is reached"
41+
/// This would be nice to provide as a fun "active" collection type that can be `.waitFor(...)`-ed.
42+
if let waitingForChoppersCount = self.waitingForChoppers?.0,
43+
choppers.count >= waitingForChoppersCount
44+
{
45+
self.waitingForChoppers?.1.resume()
46+
}
47+
}
48+
}
49+
}
50+
51+
distributed func makeDinner() async throws -> Meal {
52+
try await InstrumentationSystem.tracer.withSpan(#function) { _ in
53+
await noisySleep(for: .milliseconds(200))
54+
55+
log.notice("Cooking dinner, but we need [2] vegetable choppers...! Suspend waiting for nodes to join.")
56+
let (first, second) = try await getChoppers()
57+
async let veggies = try chopVegetables(firstChopper: first, secondChopper: second)
58+
async let meat = marinateMeat()
59+
async let oven = preheatOven(temperature: 350)
60+
// ...
61+
return try await cook(veggies, meat, oven)
62+
}
63+
}
64+
65+
private func getChoppers() async throws -> (some Chopping, some Chopping) {
66+
await withCheckedContinuation { cc in
67+
self.waitingForChoppers = (2, cc)
68+
}
69+
70+
var chopperIDs = self.choppers.keys.makeIterator()
71+
guard let id1 = chopperIDs.next(),
72+
let first = choppers[id1]
73+
else {
74+
throw NotEnoughChoppersError()
75+
}
76+
guard let id2 = chopperIDs.next(),
77+
let second = choppers[id2]
78+
else {
79+
throw NotEnoughChoppersError()
80+
}
81+
82+
return (first, second)
83+
}
84+
85+
// Called by lifecycle watch when a watched actor terminates.
86+
func terminated(actor id: DistributedCluster.ActorID) async {
87+
self.choppers.removeValue(forKey: id)
88+
}
89+
}
90+
91+
func chopVegetables(firstChopper: some Chopping,
92+
secondChopper: some Chopping) async throws -> [Vegetable]
93+
{
94+
try await InstrumentationSystem.tracer.withSpan("chopVegetables") { _ in
95+
// Chop the vegetables...!
96+
//
97+
// However, since chopping is a very difficult operation,
98+
// one chopping task can be performed at the same time on a single service!
99+
// (Imagine that... we cannot parallelize these two tasks, and need to involve another service).
100+
async let carrot = try firstChopper.chop(.carrot(chopped: false))
101+
async let potato = try secondChopper.chop(.potato(chopped: false))
102+
return try await [carrot, potato]
103+
}
104+
}
105+
106+
// func chop(_ vegetable: Vegetable, tracer: any Tracer) async throws -> Vegetable {
107+
// await tracer.withSpan("chop-\(vegetable)") { _ in
108+
// await sleep(for: .seconds(5))
109+
// // ...
110+
// return vegetable // "chopped"
111+
// }
112+
// }
113+
114+
func marinateMeat() async -> Meat {
115+
await noisySleep(for: .milliseconds(620))
116+
117+
return await InstrumentationSystem.tracer.withSpan("marinateMeat") { _ in
118+
await noisySleep(for: .seconds(3))
119+
// ...
120+
return Meat()
121+
}
122+
}
123+
124+
func preheatOven(temperature: Int) async -> Oven {
125+
await InstrumentationSystem.tracer.withSpan("preheatOven") { _ in
126+
// ...
127+
await noisySleep(for: .seconds(6))
128+
return Oven()
129+
}
130+
}
131+
132+
func cook(_: Any, _: Any, _: Any) async -> Meal {
133+
await InstrumentationSystem.tracer.withSpan("cook") { span in
134+
span.addEvent("children-asking-if-done-already")
135+
await noisySleep(for: .seconds(3))
136+
span.addEvent("children-asking-if-done-already-again")
137+
await noisySleep(for: .seconds(2))
138+
// ...
139+
return Meal()
140+
}
141+
}
142+
143+
struct NotEnoughChoppersError: Error {}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Distributed Actors open source project
4+
//
5+
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import _PrettyLogHandler
16+
import Distributed
17+
import DistributedCluster
18+
import Logging
19+
import NIO
20+
import OpenTelemetry
21+
import OtlpGRPCSpanExporting
22+
import Tracing
23+
24+
struct Meal: Sendable, Codable {}
25+
26+
struct Meat: Sendable, Codable {}
27+
28+
struct Oven: Sendable, Codable {}
29+
30+
enum Vegetable: Sendable, Codable {
31+
case potato(chopped: Bool)
32+
case carrot(chopped: Bool)
33+
34+
var asChopped: Self {
35+
switch self {
36+
case .carrot: return .carrot(chopped: true)
37+
case .potato: return .potato(chopped: true)
38+
}
39+
}
40+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Distributed Actors open source project
4+
//
5+
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import _PrettyLogHandler
16+
import Distributed
17+
import DistributedCluster
18+
import Logging
19+
import NIO
20+
import Tracing
21+
22+
struct ChoppingNode {
23+
let system: ClusterSystem
24+
25+
var chopper: VegetableChopper?
26+
27+
init(name: String, port: Int) async {
28+
self.system = await ClusterSystem(name) { settings in
29+
settings.bindPort = port
30+
31+
// We are purposefully making very slow calls, so they show up nicely in tracing:
32+
settings.remoteCall.defaultTimeout = .seconds(20)
33+
}
34+
}
35+
36+
mutating func run() async throws {
37+
monitorMembership(on: self.system)
38+
39+
let leaderEndpoint = Cluster.Endpoint(host: self.system.cluster.endpoint.host, port: 7330)
40+
self.system.log.notice("Joining: \(leaderEndpoint)")
41+
self.system.cluster.join(endpoint: leaderEndpoint)
42+
43+
try await self.system.cluster.up(within: .seconds(30))
44+
self.system.log.notice("Joined!")
45+
46+
let chopper = await VegetableChopper(actorSystem: system)
47+
self.chopper = chopper
48+
self.system.log.notice("Vegetable chopper \(chopper) started!")
49+
50+
for await chopper in await self.system.receptionist.listing(of: VegetableChopper.self) {
51+
self.system.log.warning("GOT: \(chopper.id)")
52+
}
53+
}
54+
}

0 commit comments

Comments
 (0)