Skip to content

Commit f77cf87

Browse files
committed
Make ActorLeakTests green again
1 parent 36d720e commit f77cf87

File tree

3 files changed

+22
-7
lines changed

3 files changed

+22
-7
lines changed

Sources/DistributedActors/Cluster/ClusterEventStream.swift

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,15 @@ public struct ClusterEventStream: AsyncSequence {
2525

2626
private let shell: ClusterEventStreamShell?
2727

28-
internal init(_ system: ClusterSystem) {
29-
self.shell = ClusterEventStreamShell(actorSystem: system)
28+
internal init(_ system: ClusterSystem, customName: String? = nil) async {
29+
var props = ClusterEventStreamShell.props
30+
if let customName = customName {
31+
props._knownActorName = customName
32+
}
33+
34+
self.shell = await _Props.$forSpawn.withValue(props) {
35+
ClusterEventStreamShell(actorSystem: system)
36+
}
3037
}
3138

3239
// For testing only
@@ -108,6 +115,14 @@ public struct ClusterEventStream: AsyncSequence {
108115
internal distributed actor ClusterEventStreamShell: LifecycleWatch {
109116
typealias ActorSystem = ClusterSystem
110117

118+
static var props: _Props {
119+
var ps = _Props()
120+
ps._knownActorName = "clustEventStream"
121+
ps._systemActor = true
122+
ps._wellKnown = true
123+
return ps
124+
}
125+
111126
// We maintain a snapshot i.e. the "latest version of the membership",
112127
// in order to eagerly publish it to anyone who subscribes immediately,
113128
// followed by joining them to the subsequent ``Cluster/Event`` publishes.

Sources/DistributedActors/ClusterSystem.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
305305
}
306306

307307
if !settings.enabled {
308-
let clusterEvents = ClusterEventStream(self)
308+
let clusterEvents = await ClusterEventStream(self)
309309
_ = self._clusterStore.storeIfNilThenLoad(Box(nil))
310310
_ = self._clusterControlStore.storeIfNilThenLoad(Box(ClusterControl(settings, cluster: nil, clusterRef: self.deadLetters.adapted(), eventStream: clusterEvents)))
311311
}
@@ -317,7 +317,7 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
317317
if let cluster = self._cluster {
318318
// try!-safe, this will spawn under /system/... which we have full control over,
319319
// and there /system namespace and it is known there will be no conflict for this name
320-
let clusterEvents = ClusterEventStream(self)
320+
let clusterEvents = await ClusterEventStream(self)
321321
let clusterRef = try! cluster.start(system: self, clusterEvents: clusterEvents) // only spawns when cluster is initialized
322322
_ = self._clusterControlStore.storeIfNilThenLoad(Box(ClusterControl(settings, cluster: cluster, clusterRef: clusterRef, eventStream: clusterEvents)))
323323

Tests/DistributedActorsTests/Cluster/ClusterEventStreamTests.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ final class ClusterEventStreamTests: ClusterSystemXCTestCase, @unchecked Sendabl
3535
let p1 = self.testKit.makeTestProbe(expecting: Cluster.Event.self)
3636
let p2 = self.testKit.makeTestProbe(expecting: Cluster.Event.self)
3737

38-
let eventStream = ClusterEventStream(system)
38+
let eventStream = await ClusterEventStream(system, customName: "testClusterEvents")
3939

4040
await eventStream.subscribe(p1.ref) // sub before first -> up was published
4141
await eventStream.publish(.membershipChange(.init(member: self.memberA, toStatus: .up)))
@@ -83,7 +83,7 @@ final class ClusterEventStreamTests: ClusterSystemXCTestCase, @unchecked Sendabl
8383
func test_clusterEventStream_collapseManyEventsIntoSnapshot() async throws {
8484
let p1 = self.testKit.makeTestProbe(expecting: Cluster.Event.self)
8585

86-
let eventStream = ClusterEventStream(system)
86+
let eventStream = await ClusterEventStream(system, customName: "testClusterEvents")
8787

8888
await eventStream.publish(.membershipChange(.init(member: self.memberA, toStatus: .joining)))
8989
await eventStream.publish(.membershipChange(.init(member: self.memberA, toStatus: .up)))
@@ -106,7 +106,7 @@ final class ClusterEventStreamTests: ClusterSystemXCTestCase, @unchecked Sendabl
106106
}
107107

108108
func test_clusterEventStream_collapseManyEventsIntoSnapshot_async() async throws {
109-
let eventStream = ClusterEventStream(system)
109+
let eventStream = await ClusterEventStream(system, customName: "testClusterEvents")
110110

111111
// Publish events to change membership
112112
await eventStream.publish(.membershipChange(.init(member: self.memberA, toStatus: .joining)))

0 commit comments

Comments
 (0)