Skip to content

Commit 04a1f1f

Browse files
authored
Reshape lifecycle watch api to be completely retain cycle safe (#961)
* !watch Safer lifecycle watch API * multi node deathwatch for distributed actors * +testkit improve testkit for lifecycle tests * !docker use 5.7 nightlies * 5.7 "nightly" workarounds; beta 1 has no issues * workaround fragile closures with generic actors /and inheriting context
1 parent 6f6bdc4 commit 04a1f1f

19 files changed

+462
-98
lines changed

Package.swift

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -226,15 +226,23 @@ let products: [PackageDescription.Product] = [
226226
),
227227
]
228228

229+
// This is a workaround since current published nightly docker images don't have the latest Swift availabilities yet
230+
let platforms: [SupportedPlatform]?
231+
#if os(Linux)
232+
platforms = nil
233+
#else
234+
platforms = [
235+
// we require the 'distributed actor' language and runtime feature:
236+
.iOS(.v16),
237+
.macOS(.v13),
238+
.tvOS(.v16),
239+
.watchOS(.v9),
240+
]
241+
#endif
242+
229243
var package = Package(
230244
name: "swift-distributed-actors",
231-
platforms: [
232-
// we require the 'distributed actor' language and runtime feature:
233-
.iOS(.v16),
234-
.macOS(.v13),
235-
.tvOS(.v16),
236-
.watchOS(.v9),
237-
],
245+
platforms: platforms,
238246
products: products,
239247

240248
dependencies: dependencies,

Sources/DistributedActors/ActorLogging.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ internal final class LoggingContext {
7676
extension Logger {
7777
/// Create a logger specific to this actor.
7878
public init<Act: DistributedActor>(actor: Act) where Act.ActorSystem == ClusterSystem {
79-
var log = Logger(label: "\(actor.id)")
79+
var log = actor.actorSystem.settings.logging.baseLogger
8080
log[metadataKey: "actor/path"] = "\(actor.id.path)"
8181
log[metadataKey: "actor/id"] = "\(actor.id)"
8282
self = log

Sources/DistributedActors/ActorRefProvider.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,15 +109,15 @@ extension RemoteActorRefProvider {
109109

110110
return self.localProvider._resolve(context: context)
111111
case .remote:
112-
return self._resolveAsRemoteRef(context, remoteAddress: context.id)
112+
return self._resolveAsRemoteRef(context, remoteAddress: context.id._asRemote)
113113
}
114114
}
115115

116116
public func _resolveUntyped(context: _ResolveContext<Never>) -> _AddressableActorRef {
117117
if self.localNode == context.id.uniqueNode {
118118
return self.localProvider._resolveUntyped(context: context)
119119
} else {
120-
return _AddressableActorRef(self._resolveAsRemoteRef(context, remoteAddress: context.id))
120+
return _AddressableActorRef(self._resolveAsRemoteRef(context, remoteAddress: context.id._asRemote))
121121
}
122122
}
123123

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Distributed Actors open source project
4+
//
5+
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Distributed
16+
import Logging
17+
18+
/// Implements ``LifecycleWatch`` semantics in presence of ``Node`` failures.
19+
///
20+
/// Depends on a failure detector (e.g. SWIM) to actually detect a node failure, however once detected,
21+
/// it handles notifying all _local_ actors which have watched at least one actor the terminating node.
22+
///
23+
/// ### Implementation
24+
/// In order to avoid every actor having to subscribe to cluster events and individually handle the relationship between those
25+
/// and individually watched actors, the watcher handles subscribing for cluster events on behalf of actors which watch
26+
/// other actors on remote nodes, and messages them upon a node becoming down.
27+
///
28+
/// Actor which is notified automatically when a remote actor is `context.watch()`-ed.
29+
///
30+
/// Allows manually mocking membership changes to trigger terminated notifications.
31+
internal actor DistributedNodeDeathWatcher {
32+
// TODO(distributed): actually use this actor rather than the behavior
33+
34+
typealias ActorSystem = ClusterSystem
35+
36+
private let log: Logger
37+
38+
private let selfNode: UniqueNode
39+
private var membership: Cluster.Membership = .empty
40+
41+
/// Members which have been `removed`
42+
// TODO: clear after a few days, or some max count of nodes, use sorted set for this
43+
private var nodeTombstones: Set<UniqueNode> = []
44+
45+
/// Mapping between remote node, and actors which have watched some actors on given remote node.
46+
private var remoteWatchCallbacks: [UniqueNode: Set<WatcherAndCallback>] = [:]
47+
48+
private var eventListenerTask: Task<Void, Error>?
49+
50+
init(actorSystem: ActorSystem) async {
51+
var log = actorSystem.log
52+
self.log = log
53+
self.selfNode = actorSystem.cluster.uniqueNode
54+
// initialized
55+
56+
let events = actorSystem.cluster.events
57+
self.eventListenerTask = Task {
58+
for try await event in events {
59+
switch event {
60+
case .membershipChange(let change):
61+
self.membershipChanged(change)
62+
case .snapshot(let membership):
63+
let diff = Cluster.Membership._diff(from: .empty, to: membership)
64+
for change in diff.changes {
65+
self.membershipChanged(change)
66+
}
67+
case .leadershipChange, .reachabilityChange:
68+
break // ignore those, they don't affect downing
69+
}
70+
}
71+
}
72+
}
73+
74+
func watchActor(
75+
on remoteNode: UniqueNode,
76+
by watcher: ClusterSystem.ActorID,
77+
whenTerminated nodeTerminatedFn: @escaping @Sendable (UniqueNode) async -> Void
78+
) {
79+
guard !self.nodeTombstones.contains(remoteNode) else {
80+
// the system the watcher is attempting to watch has terminated before the watch has been processed,
81+
// thus we have to immediately reply with a termination system message, as otherwise it would never receive one
82+
Task {
83+
await nodeTerminatedFn(remoteNode)
84+
}
85+
return
86+
}
87+
88+
let record = WatcherAndCallback(watcherID: watcher, callback: nodeTerminatedFn)
89+
self.remoteWatchCallbacks[remoteNode, default: []].insert(record)
90+
}
91+
92+
func removeWatcher(id: ClusterSystem.ActorID) {
93+
// TODO: this can be optimized a bit more I suppose, with a reverse lookup table
94+
let removeMe = WatcherAndCallback(watcherID: id, callback: { _ in () })
95+
for (node, var watcherAndCallbacks) in self.remoteWatchCallbacks {
96+
if watcherAndCallbacks.remove(removeMe) != nil {
97+
self.remoteWatchCallbacks[node] = watcherAndCallbacks
98+
}
99+
}
100+
}
101+
102+
func cleanupTombstone(node: UniqueNode) {
103+
_ = self.nodeTombstones.remove(node)
104+
}
105+
106+
func membershipChanged(_ change: Cluster.MembershipChange) {
107+
guard let change = self.membership.applyMembershipChange(change) else {
108+
return // no change, nothing to act on
109+
}
110+
111+
// TODO: make sure we only handle ONCE?
112+
if change.status >= .down {
113+
// can be: down, leaving or removal.
114+
// on any of those we want to ensure we handle the "down"
115+
self.handleAddressDown(change)
116+
}
117+
}
118+
119+
func handleAddressDown(_ change: Cluster.MembershipChange) {
120+
let terminatedNode = change.node
121+
122+
if let watchers = self.remoteWatchCallbacks.removeValue(forKey: terminatedNode) {
123+
for watcher in watchers {
124+
Task {
125+
await watcher.callback(terminatedNode)
126+
}
127+
}
128+
}
129+
130+
// we need to keep a tombstone, so we can immediately reply with a terminated,
131+
// in case another watch was just in progress of being made
132+
self.nodeTombstones.insert(terminatedNode)
133+
}
134+
135+
func cancel() {
136+
self.eventListenerTask?.cancel()
137+
self.eventListenerTask = nil
138+
}
139+
}
140+
141+
extension DistributedNodeDeathWatcher {
142+
struct WatcherAndCallback: Hashable {
143+
/// Address of the local watcher which had issued this watch
144+
let watcherID: ClusterSystem.ActorID
145+
let callback: @Sendable (UniqueNode) async -> Void
146+
147+
func hash(into hasher: inout Hasher) {
148+
hasher.combine(self.watcherID)
149+
}
150+
151+
static func == (lhs: WatcherAndCallback, rhs: WatcherAndCallback) -> Bool {
152+
lhs.watcherID == rhs.watcherID
153+
}
154+
}
155+
}

Sources/DistributedActors/Cluster/NodeDeathWatcher.swift

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,23 @@ internal final class NodeDeathWatcherInstance: NodeDeathWatcher {
131131

132132
func handleAddressDown(_ change: Cluster.MembershipChange) {
133133
let terminatedNode = change.node
134+
135+
// ref
134136
if let watchers = self.remoteWatchers.removeValue(forKey: terminatedNode) {
135137
for ref in watchers {
136138
// we notify each actor that was watching this remote address
137139
ref._sendSystemMessage(.nodeTerminated(terminatedNode))
138140
}
139141
}
140142

143+
if let watchers = self.remoteWatchCallbacks.removeValue(forKey: terminatedNode) {
144+
for watcher in watchers {
145+
Task {
146+
await watcher.callback(terminatedNode)
147+
}
148+
}
149+
}
150+
141151
// we need to keep a tombstone, so we can immediately reply with a terminated,
142152
// in case another watch was just in progress of being made
143153
self.nodeTombstones.insert(terminatedNode)
@@ -186,8 +196,20 @@ enum NodeDeathWatcherShell {
186196

187197
context.system.cluster.events.subscribe(context.subReceive(Cluster.Event.self) { event in
188198
switch event {
199+
case .snapshot(let membership):
200+
context.log.info("Membership snapshot: \(membership)")
201+
let diff = Cluster.Membership._diff(from: .empty, to: membership)
202+
for change in diff.changes {
203+
instance.onMembershipChanged(change)
204+
}
205+
189206
case .membershipChange(let change) where change.isAtLeast(.down):
207+
context.log.info("Node down: \(change)!")
190208
instance.handleAddressDown(change)
209+
case .membershipChange(let change):
210+
context.log.info("Node change: \(change)!")
211+
instance.onMembershipChanged(change)
212+
191213
default:
192214
() // ignore other changes, we only need to react on nodes becoming DOWN
193215
}

Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ extension OpLogDistributedReceptionist: LifecycleWatch {
317317
if self.storage.addRegistration(sequenced: sequenced, key: key, guest: guest) {
318318
// self.instrumentation.actorRegistered(key: key, id: id) // TODO(distributed): make the instrumentation calls compatible with distributed actor based types
319319

320-
watchTermination(of: guest) { onActorTerminated(identity: $0) }
320+
watchTermination(of: guest)
321321

322322
self.log.debug(
323323
"Registered [\(id)] for key [\(key)]",
@@ -562,9 +562,7 @@ extension OpLogDistributedReceptionist {
562562
// We resolve a stub that we cannot really ever send messages to, but we can "watch" it
563563
let resolved = try! actorSystem._resolveStub(identity: identity) // TODO(distributed): remove the throwing here?
564564

565-
watchTermination(of: resolved) {
566-
onActorTerminated(identity: $0)
567-
}
565+
watchTermination(of: resolved)
568566
if self.storage.addRegistration(sequenced: sequenced, key: key, guest: resolved) {
569567
// self.instrumentation.actorRegistered(key: key, id: id) // TODO(distributed): make the instrumentation calls compatible with distributed actor based types
570568
}
@@ -798,8 +796,7 @@ extension OpLogDistributedReceptionist {
798796
// MARK: Termination handling
799797

800798
extension OpLogDistributedReceptionist {
801-
// func onActorTerminated(terminated: Signals.Terminated) {
802-
func onActorTerminated(identity id: ID) {
799+
public distributed func terminated(actor id: ID) {
803800
if id == ActorID._receptionist(on: id.uniqueNode, for: .distributedActors) {
804801
self.log.debug("Watched receptionist terminated: \(id)")
805802
self.receptionistTerminated(identity: id)

Sources/DistributedActors/Cluster/Transport/RemoteClusterActorPersonality.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ public final class _RemoteClusterActorPersonality<Message: Codable> {
6969
private var _cachedAssociation: ManagedAtomicLazyReference<Association>
7070

7171
init(shell: ClusterShell, id: ActorID, system: ClusterSystem) {
72-
precondition(id._isRemote, "RemoteActorRef MUST be remote. ActorID was: \(String(reflecting: id))")
72+
if !id._isRemote {
73+
let _: Void = fatalErrorBacktrace("RemoteActorRef MUST be remote. ActorID was: \(id.detailedDescription)")
74+
}
7375

7476
self._cachedAssociation = ManagedAtomicLazyReference()
7577

Sources/DistributedActors/ClusterSystem.swift

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,11 +392,14 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
392392
/// Starts plugins after the system is fully initialized
393393
await self.settings.plugins.startAll(self)
394394

395-
self.log.info("ClusterSystem [\(self.name)] initialized, listening on: \(self.settings.uniqueBindNode)")
396395
if settings.enabled {
396+
self.log.info("ClusterSystem [\(self.name)] initialized, listening on: \(self.settings.uniqueBindNode): \(self.cluster.ref)")
397+
397398
self.log.info("Setting in effect: .autoLeaderElection: \(self.settings.autoLeaderElection)")
398399
self.log.info("Setting in effect: .downingStrategy: \(self.settings.downingStrategy)")
399400
self.log.info("Setting in effect: .onDownAction: \(self.settings.onDownAction)")
401+
} else {
402+
self.log.info("ClusterSystem [\(self.name)] initialized; Cluster disabled, not listening for connections.")
400403
}
401404
}
402405

Sources/DistributedActors/DistributedActors.docc/Lifecycle.md

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ distributed actor Romeo: LifecycleWatch {
2424
}
2525

2626
distributed func watch(_ romeo: Romeo) {
27-
watchTermination(of: romeo) { terminatedID in
28-
probe.tell("Oh no! \(terminatedID) is dead!")
29-
// TODO: Drink poison
30-
}
27+
watchTermination(of: romeo)
28+
}
29+
30+
distributed func terminated(actor id: ActorID) async {
31+
print("Oh no! \(id) is dead!")
32+
// *Drinks poison*
3133
}
3234
}
3335

@@ -37,20 +39,24 @@ distributed actor Juliet: LifecycleWatch {
3739
}
3840

3941
distributed func watch(_ romeo: Romeo) {
40-
watchTermination(of: romeo) { terminatedID in
41-
probe.tell("Oh no! \(terminatedID) is dead!")
42-
// TODO: Stab through heart
43-
}
42+
watchTermination(of: romeo)
43+
}
44+
45+
distributed func terminated(actor id: ActorID) async {
46+
print("Oh no! \(id) is dead!")
47+
// *Stabs through heart*
4448
}
4549
}
4650
```
4751

48-
The ``LifecycleWatch/watchTermination(of:whenTerminated:file:line:)`` API purposefully does not use async/await because that would cause `romeo` to be retained as this function suspends. Instead, we allow it to complete and once the romeo actor is determined terminated, we get called back with its ``ActorID``.
52+
The ``LifecycleWatch/watchTermination(of:file:line:)`` API purposefully does not use async/await because that would cause `romeo` to be retained as this function suspends. Instead, we allow it, and the function calling it (which keeps a reference to `Romeo`), to complete and once the romeo actor is determined terminated, we get called back with its ``ActorID`` in the separate ``terminated(actor:file:line:)`` method.
4953

5054
This API offers the same semantics, regardless where the actors are located, and always triggers the termination closure as the watched actor is considered to have terminated.
5155

5256
In case the watched actor is _local_, it's termination is tied to Swift's ref-counting mechanisms, and an actor is terminated as soon as there are no more strong references to it in a system. It then is deinitialized, and the actor system's `resignID(actor.id)` is triggered, causing propagation to all the other actors which have been watching that actor.
5357

58+
You can also ``unwatchTermination(of:file:line:)``
59+
5460
In case the watched actor is _remote_, termination may happen because of two reasons:
5561
- either its reference count _on the remote system_ dropped to zero and it followed the same deinitialization steps as just described in the local case;
5662
- or, the entire node the distributed actor was located on has been declared ``Cluster/MemberStatus/down`` and therefore the actor is assumed terminated (regardless if it really has deinitialized or not).

0 commit comments

Comments
 (0)