Skip to content

Commit d42b708

Browse files
authored
+cluster,metrics SWIM 0.3.0 with metrics, new props.metrics() and mailbox metrics
* =cluster update SWIM to 0.3.0, includes metrics * wip on metrics via props * move storage of metrics into the actor shell; they are "per actor" rather than props; props is just settings * implement opt in metric in mailbox * make metrics test kit not so verbose * re adjust metrics emitting in mailbox * implement ping roundtrip metrics for SWIM inside the peer * +swim metrics in the SWIMActor Shell, roundtrip times and counts * cleanup
1 parent 7a7ec79 commit d42b708

23 files changed

+1018
-187
lines changed

Package.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ targets.append(
248248
#endif
249249

250250
var dependencies: [Package.Dependency] = [
251-
.package(url: "https://github.com/apple/swift-cluster-membership.git", from: "0.2.0"),
251+
.package(url: "https://github.com/apple/swift-cluster-membership.git", from: "0.3.0"),
252252

253253
.package(url: "https://github.com/apple/swift-nio.git", from: "2.12.0"),
254254
.package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.2.0"),

Samples/Sources/SampleMetrics/main.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ struct MetricPrinter {
8888
}
8989
}
9090

91-
let props = Props().metrics(group: "talkers")
91+
let props = Props().metrics(group: "talkers", measure: [.deserialization])
9292

9393
let t1 = try system.spawn("talker-1", props: props, Talker.talkTo(another: nil))
9494
let t2 = try system.spawn("talker-2", props: props, Talker.talkTo(another: t1))

Sources/DistributedActors/ActorShell.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ public final class ActorShell<Message: ActorMessage>: ActorContext<Message>, Abs
4949
@usableFromInline
5050
var instrumentation: ActorInstrumentation!
5151

52+
@usableFromInline
53+
let metrics: ActiveActorMetrics
54+
5255
// ==== ------------------------------------------------------------------------------------------------------------
5356
// MARK: Basic ActorContext capabilities
5457

@@ -159,6 +162,8 @@ public final class ActorShell<Message: ActorMessage>: ActorContext<Message>, Abs
159162

160163
self.namingContext = ActorNamingContext()
161164

165+
self.metrics = ActiveActorMetrics(system: system, address: address, props: props.metrics)
166+
162167
// TODO: replace with TestMetrics which we could use to inspect the start/stop counts
163168
#if SACT_TESTS_LEAKS
164169
// We deliberately only count user actors here, because the number of
@@ -945,6 +950,7 @@ internal protocol AbstractShellProtocol: _ActorTreeTraversable {
945950
var _myselfReceivesSystemMessages: _ReceivesSystemMessages { get }
946951
var children: Children { get set } // lock-protected
947952
var asAddressable: AddressableActorRef { get }
953+
var metrics: ActiveActorMetrics { get }
948954
}
949955

950956
extension AbstractShellProtocol {

Sources/DistributedActors/ActorSystem.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ public final class ActorSystem {
3333

3434
// /// Impl note: Atomic since we are being called from outside actors here (or MAY be), thus we need to synchronize access
3535
// TODO: avoid the lock...
36-
internal var _namingContext = ActorNamingContext()
36+
internal var namingContext = ActorNamingContext()
3737
internal let namingLock = Lock()
3838
internal func withNamingContext<T>(_ block: (inout ActorNamingContext) throws -> T) rethrows -> T {
3939
try self.namingLock.withLock {
40-
try block(&self._namingContext)
40+
try block(&self.namingContext)
4141
}
4242
}
4343

Sources/DistributedActors/ActorSystemSettings.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public struct ActorSystemSettings {
3535
public var cluster: ClusterSettings = .default {
3636
didSet {
3737
self.serialization.localNode = self.cluster.uniqueBindNode
38+
self.metrics.systemName = self.cluster.node.systemName
3839
}
3940
}
4041

Sources/DistributedActors/Cluster/ClusterSettings.swift

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,11 @@ public struct ClusterSettings {
6161

6262
/// Node representing this node in the cluster.
6363
/// Note that most of the time `uniqueBindNode` is more appropriate, as it includes this node's unique id.
64-
public var node: Node
64+
public var node: Node {
65+
didSet {
66+
self.swim.metrics.systemName = self.node.systemName
67+
}
68+
}
6569

6670
/// `NodeID` to be used when exposing `UniqueNode` for node configured by using these settings.
6771
public var nid: UniqueNodeID
@@ -183,7 +187,12 @@ public struct ClusterSettings {
183187
self.node = node
184188
self.nid = UniqueNodeID.random()
185189
self.tls = tls
190+
186191
self.swim = SWIM.Settings()
187192
self.swim.unreachability = .enabled
193+
if node.systemName != "" {
194+
self.swim.metrics.systemName = node.systemName
195+
}
196+
self.swim.metrics.labelPrefix = "cluster.swim"
188197
}
189198
}

Sources/DistributedActors/Cluster/ClusterShell.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -386,13 +386,13 @@ extension ClusterShell {
386386
let uniqueBindAddress = clusterSettings.uniqueBindNode
387387

388388
let swimBehavior = SWIMActorShell.behavior(settings: clusterSettings.swim, clusterRef: context.myself)
389-
self._swimRef = try context._downcastUnsafe._spawn(SWIMActorShell.naming, props: ._wellKnown, swimBehavior)
389+
self._swimRef = try context.spawn(SWIMActorShell.naming, props: SWIMActorShell.props, swimBehavior)
390390

391391
// automatic leader election, so it may move members: .joining -> .up (and other `LeaderAction`s)
392392
if let leaderElection = context.system.settings.cluster.autoLeaderElection.make(context.system.cluster.settings) {
393393
let leadershipShell = Leadership.Shell(leaderElection)
394394
let leadership = try context.spawn(Leadership.Shell.naming, leadershipShell.behavior)
395-
context.watch(leadership) // if leadership fails fomr some reason, we are in trouble and need to know about it
395+
context.watch(leadership) // if leadership fails for some reason, we are in trouble and need to know about it
396396
}
397397

398398
// .down decisions made by:

Sources/DistributedActors/Cluster/SWIM/SWIMActorShell.swift

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
import ClusterMembership
16+
import struct Dispatch.DispatchTime
1617
import enum Dispatch.DispatchTimeInterval
1718
import Logging
1819
import SWIM
@@ -30,6 +31,10 @@ internal struct SWIMActorShell {
3031
self.swim.settings
3132
}
3233

34+
var metrics: SWIM.Metrics {
35+
self.swim.metrics
36+
}
37+
3338
internal init(_ swim: SWIM.Instance, clusterRef: ClusterShell.Ref) {
3439
self.swim = swim
3540
self.clusterRef = clusterRef
@@ -41,14 +46,8 @@ internal struct SWIMActorShell {
4146
/// Initial behavior, kicks off timers and becomes `ready`.
4247
static func behavior(settings: SWIM.Settings, clusterRef: ClusterShell.Ref) -> Behavior<SWIM.Message> {
4348
.setup { context in
44-
// A bit weird dance, but this way we make the instance use the actor's logger;
45-
// This is always what we want inside an actor system anyway;
46-
// And at the same time we do use the configured log level for the entire actor: instance and shell
47-
var settings = settings
48-
context.log.logLevel = settings.logger.logLevel
49-
settings.logger = context.log
5049
let swim = SWIM.Instance(
51-
settings: settings,
50+
settings: Self.customizeSWIMSettings(settings: settings, context: context),
5251
myself: context.myself
5352
)
5453
let shell = SWIMActorShell(swim, clusterRef: clusterRef)
@@ -58,6 +57,18 @@ internal struct SWIMActorShell {
5857
}
5958
}
6059

60+
/// Applies some default changes to the SWIM settings.
61+
private static func customizeSWIMSettings(settings: SWIM.Settings, context: ActorContext<SWIM.Message>) -> SWIM.Settings {
62+
// A bit weird dance, but this way we make the instance use the actor's logger;
63+
// This is always what we want inside an actor system anyway;
64+
// And at the same time we do use the configured log level for the entire actor: instance and shell
65+
var settings = settings
66+
context.log.logLevel = settings.logger.logLevel
67+
settings.logger = context.log
68+
settings.metrics.systemName = context.system.settings.metrics.systemName
69+
return settings
70+
}
71+
6172
static func ready(shell: SWIMActorShell) -> Behavior<SWIM.Message> {
6273
.receive { context, wrappedMessage in
6374
switch wrappedMessage {
@@ -115,6 +126,8 @@ internal struct SWIMActorShell {
115126
// MARK: Receiving messages
116127

117128
func receiveRemoteMessage(message: SWIM.RemoteMessage, context: MyselfContext) {
129+
self.metrics.shell.messageInboundCount.increment()
130+
118131
switch message {
119132
case .ping(let replyTo, let payload, let sequenceNumber):
120133
self.handlePing(context: context, replyTo: replyTo, payload: payload, sequenceNumber: sequenceNumber)
@@ -221,9 +234,12 @@ internal struct SWIMActorShell {
221234
"swim/timeout": "\(timeout)",
222235
]))
223236

237+
let pingSentAt = DispatchTime.now()
238+
self.metrics.shell.messageOutboundCount.increment()
224239
target.ping(payload: payload, timeout: timeout, sequenceNumber: sequenceNumber, context: context) { result in
225240
switch result {
226241
case .success(let pingResponse):
242+
self.metrics.shell.pingResponseTime.recordInterval(since: pingSentAt)
227243
self.handlePingResponse(
228244
response: pingResponse,
229245
pingRequestOrigin: pingRequestOrigin,
@@ -259,21 +275,34 @@ internal struct SWIMActorShell {
259275
let firstSuccessful = eventLoop.makePromise(of: SWIM.PingResponse.self)
260276
let pingTimeout = directive.timeout
261277
let peerToPing = directive.target
278+
279+
let startedSendingPingRequestsSentAt: DispatchTime = .now()
280+
let pingRequestResponseTimeFirstTimer = self.swim.metrics.shell.pingRequestResponseTimeFirst
281+
firstSuccessful.futureResult.whenComplete { result in
282+
switch result {
283+
case .success: pingRequestResponseTimeFirstTimer.recordInterval(since: startedSendingPingRequestsSentAt)
284+
case .failure: ()
285+
}
286+
}
287+
262288
for pingRequest in directive.requestDetails {
263289
let peerToPingRequestThrough = pingRequest.peerToPingRequestThrough
264290
let payload = pingRequest.payload
265291
let sequenceNumber = pingRequest.sequenceNumber
266292

267293
context.log.trace("Sending ping request for [\(peerToPing)] to [\(peerToPingRequestThrough)] with payload: \(payload)")
268294

269-
// self.tracelog(.send(to: peerToPingRequestThrough), message: "pingRequest(target: \(nodeToPing), replyTo: \(self.peer), payload: \(payload), sequenceNumber: \(sequenceNumber))")
295+
let pingRequestSentAt: DispatchTime = .now()
270296
let eachReplyPromise = eventLoop.makePromise(of: SWIM.PingResponse.self)
271-
peerToPingRequestThrough.pingRequest(target: peerToPing, payload: payload, from: context.myself, timeout: pingTimeout, sequenceNumber: sequenceNumber) { result in
297+
298+
self.metrics.shell.messageOutboundCount.increment()
299+
peerToPingRequestThrough.pingRequest(target: peerToPing, payload: payload, timeout: pingTimeout, sequenceNumber: sequenceNumber, context: context) { result in
272300
eachReplyPromise.completeWith(result)
273301
}
274302
context.onResultAsync(of: eachReplyPromise.futureResult, timeout: .effectivelyInfinite) { result in
275303
switch result {
276304
case .success(let response):
305+
self.metrics.shell.pingRequestResponseTimeAll.recordInterval(since: pingRequestSentAt)
277306
self.handleEveryPingRequestResponse(response: response, pinged: peerToPing, context: context)
278307
if case .ack = response {
279308
// We only cascade successful ping responses (i.e. `ack`s);
@@ -517,6 +546,12 @@ extension SWIMActorShell {
517546
static let name: String = "swim"
518547
static let naming: ActorNaming = .unique(SWIMActorShell.name)
519548

549+
static var props: Props {
550+
Props
551+
._wellKnown
552+
.metrics(group: "swim.shell", measure: [.serialization, .deserialization])
553+
}
554+
520555
static let protocolPeriodTimerKey = TimerKey("\(SWIMActorShell.name)/periodic-ping")
521556
}
522557

Sources/DistributedActors/Cluster/SWIM/SWIMPeer+Actor.swift

Lines changed: 7 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
import ClusterMembership
16+
import CoreMetrics
17+
import struct Dispatch.DispatchTime
1618
import enum Dispatch.DispatchTimeInterval
1719
import SWIM
1820

@@ -60,7 +62,9 @@ extension SWIMPeer {
6062
switch result {
6163
case .success(.remote(.pingResponse(let response))):
6264
switch response {
63-
case .ack, .timeout:
65+
case .ack:
66+
promise.succeed(response)
67+
case .timeout:
6468
promise.succeed(response)
6569
case .nack:
6670
promise.fail(IllegalSWIMMessageTypeError("Unexpected .nack reply to .ping message! Was: \(response)"))
@@ -128,23 +132,7 @@ extension ActorRef: SWIMPeer where Message == SWIM.Message {
128132
sequenceNumber: SWIM.SequenceNumber,
129133
onResponse: @escaping (Result<SWIM.PingResponse, Error>) -> Void
130134
) {
131-
self.ask(for: SWIM.Message.self, timeout: .nanoseconds(timeout.nanoseconds)) { replyTo in
132-
SWIM.Message.remote(.ping(pingOrigin: replyTo, payload: payload, sequenceNumber: sequenceNumber))
133-
}._onComplete { (result: Result<SWIM.Message, Error>) in
134-
switch result {
135-
case .success(.remote(.pingResponse(let response))):
136-
switch response {
137-
case .ack, .timeout:
138-
onResponse(.success(response))
139-
case .nack:
140-
onResponse(.failure(IllegalSWIMMessageTypeError("Unexpected .nack reply to .ping message! Was: \(response)")))
141-
}
142-
case .success(let message):
143-
onResponse(.failure(IllegalSWIMMessageTypeError("Expected .ack, but received unexpected reply to .ping: \(message)")))
144-
case .failure(let error):
145-
onResponse(.failure(error))
146-
}
147-
}
135+
fatalError("Use ping(payload:timeout:sequenceNumber:context:onResponse:) instead")
148136
}
149137

150138
// Implementation note: origin is ignored on purpose, and that's okay since we perform the question via an `ask`
@@ -156,58 +144,10 @@ extension ActorRef: SWIMPeer where Message == SWIM.Message {
156144
sequenceNumber: SWIM.SequenceNumber,
157145
onResponse: @escaping (Result<SWIM.PingResponse, Error>) -> Void
158146
) {
159-
guard let targetRef = target as? SWIM.Ref else {
160-
onResponse(.failure(IllegalSWIMPeerTypeError("Expected target to ge \(SWIM.Ref.self) but was: \(target)")))
161-
return
162-
}
163-
164-
self.ask(for: SWIM.PingRequestOriginRef.Message.self, timeout: .nanoseconds(timeout.nanoseconds)) { replyTo in
165-
SWIM.Message.remote(.pingRequest(target: targetRef, pingRequestOrigin: replyTo, payload: payload, sequenceNumber: sequenceNumber))
166-
}._onComplete { (result: Result<SWIM.Message, Error>) in
167-
switch result {
168-
case .success(.remote(.pingResponse(let response))):
169-
onResponse(.success(response))
170-
case .success(let message):
171-
onResponse(.failure(IllegalSWIMMessageTypeError("Expected .ack, but received unexpected reply to .ping: \(message)")))
172-
case .failure(let error):
173-
onResponse(.failure(error))
174-
}
175-
}
147+
fatalError("Use pingRequest(target:payload:timeout:sequenceNumber:context:onResponse:) instead")
176148
}
177149
}
178150

179-
///// :nodoc:
180-
// extension ActorRef: SWIMPingOriginPeer where Message == SWIM.PingResponse {
181-
// public func ack(
182-
// acknowledging sequenceNumber: SWIM.SequenceNumber,
183-
// target: SWIMPeer,
184-
// incarnation: SWIM.Incarnation,
185-
// payload: SWIM.GossipPayload
186-
// ) {
187-
// guard let targetRef = target as? SWIM.Ref else {
188-
// let error = IllegalSWIMPeerTypeError("Expected target to ge \(SWIM.Ref.self) but was: \(target)")
189-
// fatalError("\(error)")
190-
// }
191-
//
192-
// self.tell(.ack(target: targetRef, incarnation: incarnation, payload: payload, sequenceNumber: sequenceNumber))
193-
// }
194-
// }
195-
//
196-
///// :nodoc:
197-
// extension ActorRef: SWIMPingRequestOriginPeer where Message == SWIM.PingResponse {
198-
// public func nack(
199-
// acknowledging sequenceNumber: SWIM.SequenceNumber,
200-
// target: SWIMPeer
201-
// ) {
202-
// guard let targetRef = target as? SWIM.Ref else {
203-
// let error = IllegalSWIMPeerTypeError("Expected target to ge \(SWIM.Ref.self) but was: \(target)")
204-
// fatalError("\(error)")
205-
// }
206-
//
207-
// self.tell(.nack(target: target, sequenceNumber: sequenceNumber))
208-
// }
209-
// }
210-
211151
/// :nodoc:
212152
extension ActorRef: SWIMPingOriginPeer where Message == SWIM.Message {
213153
public func ack(

0 commit comments

Comments
 (0)