Skip to content

Commit 87c9239

Browse files
authored
!context store lifecycle in DistributedActorContext; internal only (#976)
* !context store lifecycle in DistributedActorContext; internal only * cleanup * wip * formatting * trying CI workarounds * workarounds * formatting
1 parent 9c2c273 commit 87c9239

File tree

14 files changed

+454
-328
lines changed

14 files changed

+454
-328
lines changed

Sources/ActorSingletonPlugin/ActorSingletonManager.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ extension ActorSingletonManager {
100100

101101
extension ActorID {
102102
static func _singletonManager(name: String, on node: UniqueNode) -> ActorID {
103-
ActorID(local: node, path: ._singletonManager(name: name), incarnation: .wellKnown)
103+
._make(local: node, path: ._singletonManager(name: name), incarnation: .wellKnown)
104104
}
105105
}
106106

Sources/ActorSingletonPlugin/ActorSingletonProxy.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ extension ActorSingletonProxy {
239239

240240
extension ActorID {
241241
static func _singletonProxy(name: String, remote node: UniqueNode) -> ActorID {
242-
.init(remote: node, path: ._singletonProxy(name: name), incarnation: .wellKnown)
242+
._make(remote: node, path: ._singletonProxy(name: name), incarnation: .wellKnown)
243243
}
244244
}
245245

Sources/DistributedActors/ActorAddress.swift renamed to Sources/DistributedActors/ActorID.swift

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ extension ClusterSystem {
9999
/// - SeeAlso: `ActorTags` for a detailed discussion of some frequently used tags.
100100
public var tags: ActorTags
101101

102+
/// Internal "actor context" which is used as storage for additional cluster actor features, such as watching.
103+
internal var context: DistributedActorContext
104+
102105
/// Underlying path representation, not attached to a specific Actor instance.
103106
// FIXME(distributed): make optional
104107
public var path: ActorPath {
@@ -123,30 +126,59 @@ extension ClusterSystem {
123126
/// Uniquely identifies the specific "incarnation" of this actor.
124127
public let incarnation: ActorIncarnation
125128

126-
/// :nodoc:
127-
public init(local node: UniqueNode, path: ActorPath?, incarnation: ActorIncarnation) {
129+
// TODO(distributed): remove this workaround; only exists for old ActorSingletonManager
130+
public static func _make(local node: UniqueNode, path: ActorPath?, incarnation: ActorIncarnation) -> Self {
131+
.init(local: node, path: path, incarnation: incarnation)
132+
}
133+
134+
// TODO(distributed): remove this initializer, as it is only for Behavior actors
135+
init(local node: UniqueNode, path: ActorPath?, incarnation: ActorIncarnation) {
136+
self.context = .init(lifecycle: nil)
128137
self._location = .local(node)
129138
self.tags = ActorTags()
130139
self.incarnation = incarnation
131140
if let path {
132141
self.tags[ActorTags.path] = path
133142
}
143+
traceLog_DeathWatch("Made ID: \(self)")
134144
}
135145

136-
/// :nodoc:
137-
public init(remote node: UniqueNode, path: ActorPath?, incarnation: ActorIncarnation) {
146+
// TODO(distributed): remove this workaround; only exists for old ActorSingletonManager
147+
public static func _make(remote node: UniqueNode, path: ActorPath?, incarnation: ActorIncarnation) -> Self {
148+
.init(remote: node, path: path, incarnation: incarnation)
149+
}
150+
151+
// TODO(distributed): remove this initializer, as it is only for Behavior actors
152+
init(remote node: UniqueNode, path: ActorPath?, incarnation: ActorIncarnation) {
153+
self.context = .init(lifecycle: nil)
138154
self._location = .remote(node)
139155
self.incarnation = incarnation
140156
self.tags = ActorTags()
141157
if let path {
142158
self.tags[ActorTags.path] = path
143159
}
160+
traceLog_DeathWatch("Made ID: \(self)")
144161
}
145162

146-
/// :nodoc:
147-
public init<Act>(local node: UniqueNode, type: Act.Type, incarnation: ActorIncarnation)
163+
public init<Act>(remote node: UniqueNode, type: Act.Type, incarnation: ActorIncarnation)
148164
where Act: DistributedActor, Act.ActorSystem == ClusterSystem
149165
{
166+
self.context = .init(lifecycle: nil)
167+
self._location = .remote(node)
168+
self.incarnation = incarnation
169+
self.tags = ActorTags()
170+
// TODO: avoid mangling names on every spawn?
171+
if let mangledName = _mangledTypeName(type) {
172+
self.tags[ActorTags.type] = .init(mangledName: mangledName)
173+
}
174+
traceLog_DeathWatch("Made ID: \(self)")
175+
}
176+
177+
init<Act>(local node: UniqueNode, type: Act.Type, incarnation: ActorIncarnation,
178+
context: DistributedActorContext)
179+
where Act: DistributedActor, Act.ActorSystem == ClusterSystem
180+
{
181+
self.context = context
150182
self._location = .local(node)
151183
self.tags = ActorTags()
152184
self.incarnation = incarnation
@@ -155,18 +187,36 @@ extension ClusterSystem {
155187
if let mangledName = _mangledTypeName(type) {
156188
self.tags[ActorTags.type] = .init(mangledName: mangledName)
157189
}
190+
traceLog_DeathWatch("Made ID: \(self)")
158191
}
159192

160-
/// :nodoc:
161-
public init(remote node: UniqueNode, type: (some DistributedActor).Type, incarnation: ActorIncarnation) {
193+
init<Act>(remote node: UniqueNode, type: Act.Type, incarnation: ActorIncarnation,
194+
context: DistributedActorContext)
195+
where Act: DistributedActor, Act.ActorSystem == ClusterSystem
196+
{
197+
self.context = context
162198
self._location = .remote(node)
163199
self.incarnation = incarnation
164200
self.tags = ActorTags()
165201
// TODO: avoid mangling names on every spawn?
166202
if let mangledName = _mangledTypeName(type) {
167203
self.tags[ActorTags.type] = .init(mangledName: mangledName)
168204
}
205+
traceLog_DeathWatch("Made ID: \(self)")
169206
}
207+
208+
internal var withoutContext: Self {
209+
var copy = self
210+
copy.context = .init(lifecycle: nil)
211+
return copy
212+
}
213+
}
214+
}
215+
216+
extension DistributedActor where ActorSystem == ClusterSystem {
217+
/// INTERNAL: Provides the actor context for use within this actor.
218+
internal var context: DistributedActorContext {
219+
self.id.context
170220
}
171221
}
172222

@@ -237,7 +287,6 @@ extension ActorID {
237287

238288
extension ActorID {
239289
/// :nodoc:
240-
@inlinable
241290
public var _isLocal: Bool {
242291
switch self._location {
243292
case .local: return true
@@ -246,20 +295,17 @@ extension ActorID {
246295
}
247296

248297
/// :nodoc:
249-
@inlinable
250298
public var _isRemote: Bool {
251299
!self._isLocal
252300
}
253301

254302
/// :nodoc:
255-
@inlinable
256303
public var _asRemote: Self {
257304
let remote = Self(remote: self.uniqueNode, path: self.path, incarnation: self.incarnation)
258305
return remote
259306
}
260307

261308
/// :nodoc:
262-
@inlinable
263309
public var _asLocal: Self {
264310
let local = Self(local: self.uniqueNode, path: self.path, incarnation: self.incarnation)
265311
return local

Sources/DistributedActors/ClusterSystem.swift

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,6 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
5555
// TODO: collapse it with the other initialization lock; the other one is not needed now I think?
5656
private let initLock = Lock()
5757

58-
internal let lifecycleWatchLock = Lock()
59-
internal var _lifecycleWatches: [ActorID: LifecycleWatchContainer] = [:]
60-
6158
private var _associationTombstoneCleanupTask: RepeatedTask?
6259

6360
private let dispatcher: InternalMessageDispatcher
@@ -907,17 +904,27 @@ extension ClusterSystem {
907904
where Act: DistributedActor
908905
{
909906
let props = _Props.forSpawn // task-local read for any properties this actor should have
910-
let address = try! self._reserveName(type: Act.self, props: props)
907+
var id = try! self._reserveName(type: Act.self, props: props)
908+
909+
let lifecycleContainer: LifecycleWatchContainer?
910+
if Act.self is (any(LifecycleWatch).Type) {
911+
lifecycleContainer = LifecycleWatchContainer(watcherID: id.withoutContext, actorSystem: self)
912+
} else {
913+
lifecycleContainer = nil
914+
}
915+
traceLog_DeathWatch("Make LifecycleWatchContainer for \(id):::: \(lifecycleContainer)")
916+
917+
id.context = .init(lifecycle: lifecycleContainer)
911918

912919
self.log.warning("Assign identity", metadata: [
913920
"actor/type": "\(actorType)",
914-
"actor/id": "\(address)",
915-
"actor/id/uniqueNode": "\(address.uniqueNode)",
921+
"actor/id": "\(id)",
922+
"actor/id/uniqueNode": "\(id.uniqueNode)",
916923
])
917924

918925
return self.namingLock.withLock {
919-
self._reservedNames.insert(address)
920-
return address
926+
self._reservedNames.insert(id)
927+
return id
921928
}
922929
}
923930

@@ -931,12 +938,12 @@ extension ClusterSystem {
931938
defer { self.namingLock.unlock() }
932939
precondition(self._reservedNames.remove(actor.id) != nil, "Attempted to ready an identity that was not reserved: \(actor.id)")
933940

934-
if let watcher = actor as? any LifecycleWatch {
935-
func doMakeLifecycleWatch<Watcher: LifecycleWatch & DistributedActor>(watcher: Watcher) {
936-
_ = self._makeLifecycleWatch(watcher: watcher)
937-
}
938-
_openExistential(watcher, do: doMakeLifecycleWatch)
939-
}
941+
// if let watcher = actor as? any LifecycleWatch {
942+
// func doMakeLifecycleWatch<Watcher: LifecycleWatch & DistributedActor>(watcher: Watcher) {
943+
// _ = LifecycleWatchContainer(watcher)
944+
// }
945+
// _openExistential(watcher, do: doMakeLifecycleWatch)
946+
// }
940947

941948
let behavior = InvocationBehavior.behavior(instance: Weak(actor))
942949
let ref = self._spawnDistributedActor(behavior, identifiedBy: actor.id)
@@ -953,11 +960,12 @@ extension ClusterSystem {
953960
ref._sendSystemMessage(.stop, file: #file, line: #line)
954961
}
955962
}
956-
self.lifecycleWatchLock.withLockVoid {
957-
if let watch = self._lifecycleWatches.removeValue(forKey: id) {
958-
watch.notifyWatchersWeDied()
959-
}
960-
}
963+
id.context.terminate()
964+
// self.lifecycleWatchLock.withLockVoid {
965+
// if let watch = self._lifecycleWatches.removeValue(forKey: id) {
966+
// watch.notifyWatchersWeDied()
967+
// }
968+
// }
961969
self.namingLock.withLockVoid {
962970
self._managedRefs.removeValue(forKey: id) // TODO: should not be necessary in the future
963971
_ = self._managedDistributedActors.removeActor(identifiedBy: id)
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Distributed Actors open source project
4+
//
5+
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Distributed
16+
17+
/// Internal context object used by the actor system to support per-actor state, such as necessary to implement lifecycle watch etc.
18+
///
19+
/// Access must be carefully managed to only be performed from the actor itself, once the context has been initialized.
20+
/// And also it is allowed to modify this state once the system receives it in the `resignID` (as the context is carried inside the ID),
21+
/// as at that point in time it can no longer be used–by the now deallocated–actor itself.
22+
public final class DistributedActorContext {
23+
let lifecycle: LifecycleWatchContainer?
24+
25+
init(lifecycle: LifecycleWatchContainer?) {
26+
self.lifecycle = lifecycle
27+
traceLog_DeathWatch("Create context; Lifecycle: \(lifecycle)")
28+
}
29+
30+
/// Invoked by the actor system when the owning actor is terminating, so we can clean up all stored data
31+
func terminate() {
32+
guard let lifecycle = self.lifecycle else {
33+
return
34+
}
35+
traceLog_DeathWatch("Terminate: \(lifecycle.watcherID)")
36+
lifecycle.clear()
37+
}
38+
}

Sources/DistributedActors/InvocationBehavior.swift

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ enum InvocationBehavior {
4343
}
4444

4545
// `InvocationMessage`s are handled in `UserMessageHandler`
46-
// await context.system.receiveInvocation(actor: instance, message: message)
47-
return .same
46+
// old impl: await context.system.receiveInvocation(actor: instance, message: message)
47+
return fatalErrorBacktrace("We don't invoke distributed actors via the behavior runtime anymore! ")
4848
}.receiveSignal { _, signal in
4949

5050
// We received a signal, but our target actor instance was already released;
@@ -55,8 +55,13 @@ enum InvocationBehavior {
5555

5656
if let terminated = signal as? _Signals.Terminated {
5757
if let watcher = instance as? (any LifecycleWatch) {
58-
let watch = watcher.actorSystem._getLifecycleWatch(watcher: watcher)
59-
watch?.receiveTerminated(terminated)
58+
Task {
59+
await instance.whenLocal { __secretlyKnownToBeLocalK in
60+
let __secretlyKnownToBeLocal: any LifecycleWatch = __secretlyKnownToBeLocalK as! any LifecycleWatch
61+
await __secretlyKnownToBeLocal._receiveActorTerminated(id: terminated.id)
62+
}
63+
}
64+
6065
return .same
6166
}
6267
}

0 commit comments

Comments
 (0)