Skip to content

Commit 0b24b2f

Browse files
authored
watchTermination should support async 'whenTerminated' (#939)
Resolves #930
1 parent e28f532 commit 0b24b2f

File tree

3 files changed

+48
-6
lines changed

3 files changed

+48
-6
lines changed

Sources/DistributedActors/ClusterSystem.swift

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,8 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
169169
return box.value
170170
}
171171

172+
internal var downing: DowningStrategyShell?
173+
172174
// ==== ----------------------------------------------------------------------------------------------------------------
173175
// MARK: Logging
174176

@@ -358,10 +360,14 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
358360
}
359361

360362
// downing strategy (automatic downing)
361-
await _Props.$forSpawn.withValue(DowningStrategyShell.props) {
362-
if let downingStrategy = self.settings.downingStrategy.make(self.settings) {
363-
let downingStrategyShell = await DowningStrategyShell(downingStrategy, system: self)
363+
if settings.enabled {
364+
await _Props.$forSpawn.withValue(DowningStrategyShell.props) {
365+
if let downingStrategy = self.settings.downingStrategy.make(self.settings) {
366+
self.downing = await DowningStrategyShell(downingStrategy, system: self)
367+
}
364368
}
369+
} else {
370+
self.downing = nil
365371
}
366372

367373
#if SACT_TESTS_LEAKS
@@ -486,6 +492,7 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
486492
self.userProvider.stopAll()
487493
self.systemProvider.stopAll()
488494
self.dispatcher.shutdown()
495+
self.downing = nil
489496

490497
self._associationTombstoneCleanupTask?.cancel()
491498
self._associationTombstoneCleanupTask = nil

Sources/DistributedActors/LifecycleMonitoring/LifecycleWatch.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ extension LifecycleWatch {
2727
@discardableResult
2828
public func watchTermination<Watchee>(
2929
of watchee: Watchee,
30-
@_inheritActorContext @_implicitSelfCapture whenTerminated: @escaping @Sendable (ID) -> Void,
30+
@_inheritActorContext @_implicitSelfCapture whenTerminated: @escaping @Sendable (ID) async -> Void,
3131
file: String = #file, line: UInt = #line
3232
) -> Watchee where Watchee: DistributedActor, Watchee.ActorSystem == ClusterSystem {
3333
// TODO(distributed): reimplement this as self.id as? _ActorContext which will have the watch things.
@@ -163,7 +163,7 @@ extension LifecycleWatchContainer {
163163
/// Performed by the sending side of "watch", therefore the `watcher` should equal `context.myself`
164164
public func termination<Watchee>(
165165
of watchee: Watchee,
166-
@_inheritActorContext @_implicitSelfCapture whenTerminated: @escaping @Sendable (ClusterSystem.ActorID) -> Void,
166+
@_inheritActorContext @_implicitSelfCapture whenTerminated: @escaping @Sendable (ClusterSystem.ActorID) async -> Void,
167167
file: String = #file, line: UInt = #line
168168
) where Watchee: DistributedActor, Watchee.ActorSystem == ClusterSystem {
169169
traceLog_DeathWatch("issue watch: \(watchee) (from \(self.watcherID))")

Tests/DistributedActorsTests/LifecycleWatchTests.swift

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the Swift Distributed Actors open source project
44
//
5-
// Copyright (c) 2018-2021 Apple Inc. and the Swift Distributed Actors project authors
5+
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
66
// Licensed under Apache License v2.0
77
//
88
// See LICENSE.txt for license information
@@ -71,6 +71,22 @@ distributed actor Juliet: LifecycleWatch, CustomStringConvertible {
7171
}
7272
}
7373

74+
distributed func meetWatchAsyncCallback(
75+
_ romeo: Romeo,
76+
unwatch doUnwatch: Bool
77+
) async throws {
78+
@Sendable
79+
func asyncTerminated(_ terminatedIdentity: ClusterSystem.ActorID) async {
80+
await self.probe.tell("Received terminated: \(terminatedIdentity)")
81+
}
82+
83+
watchTermination(of: romeo, whenTerminated: asyncTerminated)
84+
85+
if doUnwatch {
86+
unwatch(romeo)
87+
}
88+
}
89+
7490
nonisolated var description: String {
7591
"\(Self.self)(\(id))"
7692
}
@@ -99,6 +115,25 @@ final class LifecycleWatchTests: ActorSystemXCTestCase, @unchecked Sendable {
99115
try pj.expectMessage("Received terminated: /user/Romeo-b")
100116
}
101117

118+
func test_watch_shouldTriggerTerminatedWhenWatchedActorDeinits_async() async throws {
119+
let pj = self.testKit.makeTestProbe(expecting: String.self)
120+
let pr = self.testKit.makeTestProbe(expecting: String.self)
121+
let juliet = Juliet(probe: pj, actorSystem: system)
122+
123+
func meet() async throws {
124+
var romeo: Romeo? = Romeo(probe: pr, actorSystem: system)
125+
126+
try await juliet.meetWatchAsyncCallback(romeo!, unwatch: false)
127+
romeo = nil
128+
}
129+
try await meet()
130+
131+
try pj.expectMessage("Juliet init")
132+
try pr.expectMessage("Romeo init")
133+
try pr.expectMessage("Romeo deinit")
134+
try pj.expectMessage("Received terminated: /user/Romeo-b")
135+
}
136+
102137
func test_watchThenUnwatch_shouldTriggerTerminatedWhenWatchedActorDeinits() async throws {
103138
let pj = self.testKit.makeTestProbe(expecting: String.self)
104139
let pr = self.testKit.makeTestProbe(expecting: String.self)

0 commit comments

Comments
 (0)