Skip to content

Commit 2081527

Browse files
authored
Address remaining feedback in #1012 (#1015)
1 parent cd57b91 commit 2081527

File tree

10 files changed

+37
-45
lines changed

10 files changed

+37
-45
lines changed

IntegrationTests/tests_01_cluster/it_Clustered_swim_suspension_reachability/main.swift

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,6 @@ let system = await ClusterSystem("System") { settings in
4141
settings.downingStrategy = .none
4242
}
4343

44-
let ref = try system._spawn(
45-
"streamWatcher",
46-
of: Cluster.Event.self,
47-
.receive { context, event in
48-
context.log.info("Event: \(event)")
49-
return .same
50-
}
51-
)
52-
5344
Task {
5445
for await event in system.cluster.events {
5546
system.log.info("Event: \(event)")

Sources/DistributedActors/Cluster/ClusterEventStream.swift

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,27 @@ public struct ClusterEventStream: AsyncSequence {
4141
self.actor = nil
4242
}
4343

44-
func subscribe(_ ref: _ActorRef<Cluster.Event>, file: String = #filePath, line: UInt = #line) async {
44+
nonisolated func subscribe(_ ref: _ActorRef<Cluster.Event>, file: String = #filePath, line: UInt = #line) {
45+
Task {
46+
await self._subscribe(ref, file: file, line: line)
47+
}
48+
}
49+
50+
func _subscribe(_ ref: _ActorRef<Cluster.Event>, file: String = #filePath, line: UInt = #line) async {
4551
guard let actor = self.actor else { return }
4652

4753
await actor.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): this is annoying, we must track "known to be local" in typesystem instead
4854
__secretlyKnownToBeLocal.subscribe(ref)
4955
}
5056
}
5157

52-
func unsubscribe(_ ref: _ActorRef<Cluster.Event>, file: String = #filePath, line: UInt = #line) async {
58+
nonisolated func unsubscribe(_ ref: _ActorRef<Cluster.Event>, file: String = #filePath, line: UInt = #line) {
59+
Task {
60+
await self._unsubscribe(ref, file: file, line: line)
61+
}
62+
}
63+
64+
func _unsubscribe(_ ref: _ActorRef<Cluster.Event>, file: String = #filePath, line: UInt = #line) async {
5365
guard let actor = self.actor else { return }
5466

5567
await actor.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): this is annoying, we must track "known to be local" in typesystem instead

Sources/DistributedActors/Cluster/Leadership.swift

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,7 @@ extension Leadership {
112112
var behavior: _Behavior<Cluster.Event> {
113113
.setup { context in
114114
context.log.trace("Configured with \(self.election)")
115-
Task {
116-
await context.system.cluster.events.subscribe(context.myself)
117-
}
115+
context.system.cluster.events.subscribe(context.myself)
118116

119117
// FIXME: we have to add "own node" since we're not getting the .snapshot... so we have to manually act as if..
120118
_ = self.membership.applyMembershipChange(Cluster.MembershipChange(node: context.system.cluster.uniqueNode, previousStatus: nil, toStatus: .joining))

Sources/DistributedActors/Cluster/NodeDeathWatcher.swift

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,9 +214,7 @@ enum NodeDeathWatcherShell {
214214
() // ignore other changes, we only need to react on nodes becoming DOWN
215215
}
216216
}
217-
Task {
218-
await context.system.cluster.events.subscribe(onClusterEventRef)
219-
}
217+
context.system.cluster.events.subscribe(onClusterEventRef)
220218

221219
return NodeDeathWatcherShell.behavior(instance)
222220
}

Sources/DistributedActors/Cluster/Reception/_OperationLogClusterReceptionistBehavior.swift

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,10 @@ public final class _OperationLogClusterReceptionist {
9595
context.log.debug("Initialized receptionist")
9696

9797
// === listen to cluster events ------------------
98-
Task {
99-
await context.system.cluster.events.subscribe(
100-
context.subReceive(Cluster.Event.self) { event in
101-
self.onClusterEvent(context, event: event)
102-
}
103-
)
98+
let onClusterEventRef = context.subReceive(Cluster.Event.self) { event in
99+
self.onClusterEvent(context, event: event)
104100
}
101+
context.system.cluster.events.subscribe(onClusterEventRef)
105102

106103
// === timers ------------------
107104
// periodically gossip to other receptionists with the last seqNr we've seen,

Sources/DistributedActors/Gossip/Gossiper+Shell.swift

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -330,9 +330,7 @@ extension GossipShell {
330330
() // ignore
331331
}
332332
}
333-
Task {
334-
await context.system.cluster.events.subscribe(onClusterEventRef)
335-
}
333+
context.system.cluster.events.subscribe(onClusterEventRef)
336334

337335
case .fromReceptionistListing(let id):
338336
let key = _Reception.Key(_ActorRef<Message>.self, id: id)

Sources/DistributedActorsTestKit/ActorTestKit.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ extension ActorTestKit {
106106
) async -> ActorTestProbe<Cluster.Event> {
107107
let eventStream = self.system.cluster.events
108108
let p = self.makeTestProbe(naming ?? _ActorNaming.prefixed(with: "\(ClusterEventStream.self)-subscriberProbe"), expecting: Cluster.Event.self)
109-
await eventStream.subscribe(p.ref)
109+
await eventStream._subscribe(p.ref)
110110
return p
111111
}
112112
}

Tests/DistributedActorsTests/Cluster/AssociationClusteredTests.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ final class ClusterAssociationTests: ClusteredActorSystemsXCTestCase {
9797

9898
let firstEventsProbe = self.testKit(first).makeTestProbe(expecting: Cluster.Event.self)
9999
let secondEventsProbe = self.testKit(second).makeTestProbe(expecting: Cluster.Event.self)
100-
await first.cluster.events.subscribe(firstEventsProbe.ref)
101-
await second.cluster.events.subscribe(secondEventsProbe.ref)
100+
await first.cluster.events._subscribe(firstEventsProbe.ref)
101+
await second.cluster.events._subscribe(secondEventsProbe.ref)
102102

103103
first.cluster.join(node: second.cluster.uniqueNode.node)
104104

@@ -113,8 +113,8 @@ final class ClusterAssociationTests: ClusteredActorSystemsXCTestCase {
113113
settings.bindPort = secondPort
114114
}
115115
let secondReplacementEventsProbe = self.testKit(secondReplacement).makeTestProbe(expecting: Cluster.Event.self)
116-
await secondReplacement.cluster.events.subscribe(secondReplacementEventsProbe.ref)
117-
await second.cluster.events.subscribe(secondReplacementEventsProbe.ref)
116+
await secondReplacement.cluster.events._subscribe(secondReplacementEventsProbe.ref)
117+
await second.cluster.events._subscribe(secondReplacementEventsProbe.ref)
118118

119119
// the new replacement node is now going to initiate a handshake with 'first' which knew about the previous
120120
// instance (oldSecond) on the same node; It should accept this new handshake, and ban the previous node.

Tests/DistributedActorsTests/Cluster/ClusterEventStreamTests.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ final class ClusterEventStreamTests: ClusterSystemXCTestCase, @unchecked Sendabl
3737

3838
let eventStream = ClusterEventStream(system, customName: "testClusterEvents")
3939

40-
await eventStream.subscribe(p1.ref) // sub before first -> up was published
40+
await eventStream._subscribe(p1.ref) // sub before first -> up was published
4141
await eventStream.publish(.membershipChange(.init(member: self.memberA, toStatus: .up)))
42-
await eventStream.subscribe(p2.ref)
42+
await eventStream._subscribe(p2.ref)
4343
await eventStream.publish(.membershipChange(.init(member: self.memberB, toStatus: .up)))
4444

4545
// ==== p1 ---------------------
@@ -89,7 +89,7 @@ final class ClusterEventStreamTests: ClusterSystemXCTestCase, @unchecked Sendabl
8989
await eventStream.publish(.membershipChange(.init(member: self.memberA, toStatus: .up)))
9090
await eventStream.publish(.membershipChange(.init(member: self.memberB, toStatus: .joining)))
9191
await eventStream.publish(.membershipChange(.init(member: self.memberB, toStatus: .up)))
92-
await eventStream.subscribe(p1.ref)
92+
await eventStream._subscribe(p1.ref)
9393

9494
// ==== p1 ---------------------
9595

Tests/DistributedActorsTests/Cluster/ClusterLeaderActionsClusteredTests.swift

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,7 @@ final class ClusterLeaderActionsClusteredTests: ClusteredActorSystemsXCTestCase
3535
_ = try first._spawn(
3636
"selfishSingleLeader",
3737
_Behavior<Cluster.Event>.setup { context in
38-
Task {
39-
await context.system.cluster.events.subscribe(context.myself)
40-
}
38+
context.system.cluster.events.subscribe(context.myself)
4139

4240
return .receiveMessage { event in
4341
switch event {
@@ -154,9 +152,9 @@ final class ClusterLeaderActionsClusteredTests: ClusteredActorSystemsXCTestCase
154152
}
155153

156154
let p1 = self.testKit(first).makeTestProbe(expecting: Cluster.Event.self)
157-
await first.cluster.events.subscribe(p1.ref)
155+
await first.cluster.events._subscribe(p1.ref)
158156
let p2 = self.testKit(second).makeTestProbe(expecting: Cluster.Event.self)
159-
await second.cluster.events.subscribe(p2.ref)
157+
await second.cluster.events._subscribe(p2.ref)
160158

161159
first.cluster.join(node: second.cluster.uniqueNode.node)
162160

@@ -221,21 +219,21 @@ final class ClusterLeaderActionsClusteredTests: ClusteredActorSystemsXCTestCase
221219
settings.downingStrategy = .timeout(.init(downUnreachableMembersAfter: .milliseconds(300)))
222220
}
223221
let p1 = testKit(first).makeTestProbe(expecting: Cluster.Event.self)
224-
await first.cluster.events.subscribe(p1.ref)
222+
await first.cluster.events._subscribe(p1.ref)
225223

226224
let second = await setUpNode("second") { settings in
227225
settings.autoLeaderElection = .lowestReachable(minNumberOfMembers: 2)
228226
settings.downingStrategy = .timeout(.init(downUnreachableMembersAfter: .milliseconds(300)))
229227
}
230228
let p2 = testKit(second).makeTestProbe(expecting: Cluster.Event.self)
231-
await second.cluster.events.subscribe(p2.ref)
229+
await second.cluster.events._subscribe(p2.ref)
232230

233231
let third = await setUpNode("third") { settings in
234232
settings.autoLeaderElection = .lowestReachable(minNumberOfMembers: 2)
235233
settings.downingStrategy = .timeout(.init(downUnreachableMembersAfter: .milliseconds(300)))
236234
}
237235
let p3 = self.testKit(third).makeTestProbe(expecting: Cluster.Event.self)
238-
await third.cluster.events.subscribe(p3.ref)
236+
await third.cluster.events._subscribe(p3.ref)
239237

240238
try await self.joinNodes(node: first, with: second)
241239
try await self.joinNodes(node: second, with: third)
@@ -293,7 +291,7 @@ final class ClusterLeaderActionsClusteredTests: ClusteredActorSystemsXCTestCase
293291
settings.downingStrategy = .timeout(.init(downUnreachableMembersAfter: .milliseconds(200)))
294292
}
295293
let p1 = self.testKit(first).makeTestProbe(expecting: Cluster.Event.self)
296-
await first.cluster.events.subscribe(p1.ref)
294+
await first.cluster.events._subscribe(p1.ref)
297295

298296
let second = await setUpNode("second") { settings in
299297
settings.swim.probeInterval = .milliseconds(300)
@@ -302,7 +300,7 @@ final class ClusterLeaderActionsClusteredTests: ClusteredActorSystemsXCTestCase
302300
settings.downingStrategy = .timeout(.init(downUnreachableMembersAfter: .milliseconds(200)))
303301
}
304302
let p2 = self.testKit(second).makeTestProbe(expecting: Cluster.Event.self)
305-
await second.cluster.events.subscribe(p2.ref)
303+
await second.cluster.events._subscribe(p2.ref)
306304

307305
let third = await setUpNode("third") { settings in
308306
settings.swim.probeInterval = .milliseconds(300)
@@ -311,7 +309,7 @@ final class ClusterLeaderActionsClusteredTests: ClusteredActorSystemsXCTestCase
311309
settings.downingStrategy = .timeout(.init(downUnreachableMembersAfter: .milliseconds(200)))
312310
}
313311
let p3 = self.testKit(third).makeTestProbe(expecting: Cluster.Event.self)
314-
await third.cluster.events.subscribe(p3.ref)
312+
await third.cluster.events._subscribe(p3.ref)
315313

316314
try await self.joinNodes(node: first, with: second)
317315
try await self.joinNodes(node: second, with: third)

0 commit comments

Comments
 (0)