Skip to content

Commit cd57b91

Browse files
yim-leektoso
andauthored
Change generic EventStream<Event> to ClusterEventStream and convert to actor (#1012)
* Change generic EventStream<Event> to ClusterEventStream and convert to actor Resolves #821 * Fix formatting * Make ActorLeakTests green again * ClusterEventStream.init doesn't need to be async * Rename ClusterEventStreamShell to ClusterEventStreamActor * Update Sources/DistributedActors/Cluster/ClusterShell.swift Co-authored-by: Konrad `ktoso` Malawski <[email protected]> * address review feedback Co-authored-by: Konrad `ktoso` Malawski <[email protected]>
1 parent 8194429 commit cd57b91

23 files changed

+314
-579
lines changed

IntegrationTests/tests_01_cluster/it_Clustered_swim_suspension_reachability/main.swift

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,12 @@ let ref = try system._spawn(
4949
return .same
5050
}
5151
)
52-
system.cluster.events.subscribe(ref)
52+
53+
Task {
54+
for await event in system.cluster.events {
55+
system.log.info("Event: \(event)")
56+
}
57+
}
5358

5459
if args.count >= 3 {
5560
print("getting host")

Sources/DistributedActors/Cluster/ClusterControl.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public struct ClusterControl {
2929
///
3030
/// This sequence begins with a snapshot of the current cluster state and continues with events representing changes
3131
/// since the snapshot.
32-
public let events: EventStream<Cluster.Event> // FIXME: make this an AsyncSequence<Cluster.Event>
32+
public let events: ClusterEventStream
3333

3434
/// Offers a snapshot of membership, which may be used to perform ad-hoc tests against the membership.
3535
/// Note that this view may be immediately outdated after checking if, if e.g. a membership change is just being processed.
@@ -74,7 +74,7 @@ public struct ClusterControl {
7474
private let cluster: ClusterShell?
7575
internal let ref: ClusterShell.Ref
7676

77-
init(_ settings: ClusterSystemSettings, cluster: ClusterShell?, clusterRef: ClusterShell.Ref, eventStream: EventStream<Cluster.Event>) {
77+
init(_ settings: ClusterSystemSettings, cluster: ClusterShell?, clusterRef: ClusterShell.Ref, eventStream: ClusterEventStream) {
7878
self.settings = settings
7979
self.cluster = cluster
8080
self.ref = clusterRef

Sources/DistributedActors/Cluster/ClusterEventStream.swift

Lines changed: 181 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the Swift Distributed Actors open source project
44
//
5-
// Copyright (c) 2018-2021 Apple Inc. and the Swift Distributed Actors project authors
5+
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
66
// Licensed under Apache License v2.0
77
//
88
// See LICENSE.txt for license information
@@ -12,97 +12,195 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15+
import Distributed
1516
import Logging
1617

17-
/// Specialized event stream behavior which takes into account emitting a snapshot event on first subscription,
18-
/// followed by a stream of ``Cluster/Event``s.
18+
/// `ClusterEventStream` manages a set of subscribers and forwards every event published to it to
19+
/// all its subscribers. An actor can subscribe/unsubscribe to the event stream via `AsyncSequence`
20+
/// constructs. Subscribers will be watched and removed in case they terminate.
1921
///
20-
/// This ensures that every subscriber to cluster events never misses any of the membership events, meaning
21-
/// it is possible for anyone to maintain a local up-to-date copy of `Membership` by applying all these events to that copy.
22-
internal enum ClusterEventStream {
23-
enum Shell {
24-
static var behavior: _Behavior<EventStreamShell.Message<Cluster.Event>> {
25-
.setup { context in
26-
27-
// We maintain a snapshot i.e. the "latest version of the membership",
28-
// in order to eagerly publish it to anyone who subscribes immediately,
29-
// followed by joining them to the subsequent ``Cluster/Event`` publishes.
30-
//
31-
// Thanks to this, any subscriber immediately gets a pretty recent view of the membership,
32-
// followed by the usual updates via events. Since all events are published through this
33-
// event stream actor, all subscribers are guaranteed to see events in the right order,
34-
// and not miss any information as long as they apply all events they receive.
35-
var snapshot = Cluster.Membership.empty
36-
var subscribers: [ActorID: _ActorRef<Cluster.Event>] = [:]
37-
var asyncSubscribers: [ObjectIdentifier: (Cluster.Event) -> Void] = [:]
38-
39-
let behavior: _Behavior<EventStreamShell.Message<Cluster.Event>> = .receiveMessage { message in
40-
switch message {
41-
case .subscribe(let ref):
42-
subscribers[ref.id] = ref
43-
context.watch(ref)
44-
context.log.trace("Successfully subscribed [\(ref)], offering membership snapshot")
45-
ref.tell(.snapshot(snapshot))
46-
47-
case .unsubscribe(let ref):
48-
if subscribers.removeValue(forKey: ref.id) != nil {
49-
context.unwatch(ref)
50-
context.log.trace("Successfully unsubscribed [\(ref)]")
51-
} else {
52-
context.log.warning("Received `.unsubscribe` for non-subscriber [\(ref)]")
53-
}
54-
55-
case .publish(let event):
56-
try snapshot.apply(event: event)
57-
58-
for subscriber in subscribers.values {
59-
subscriber.tell(event)
60-
}
61-
for subscriber in asyncSubscribers.values {
62-
subscriber(event)
63-
}
64-
65-
context.log.trace(
66-
"Published event \(event) to \(subscribers.count) subscribers and \(asyncSubscribers.count) async subscribers",
67-
metadata: [
68-
"eventStream/event": "\(reflecting: event)",
69-
"eventStream/subscribers": Logger.MetadataValue.array(subscribers.map {
70-
Logger.MetadataValue.stringConvertible($0.key)
71-
}),
72-
"eventStream/asyncSubscribers": Logger.MetadataValue.array(asyncSubscribers.map {
73-
Logger.MetadataValue.stringConvertible("\($0.key)")
74-
}),
75-
]
76-
)
77-
78-
case .asyncSubscribe(let id, let eventHandler, let `continue`):
79-
asyncSubscribers[id] = eventHandler
80-
context.log.trace("Successfully added async subscriber [\(id)]")
81-
`continue`()
82-
eventHandler(Cluster.Event.snapshot(snapshot))
83-
84-
case .asyncUnsubscribe(let id, let `continue`):
85-
if asyncSubscribers.removeValue(forKey: id) != nil {
86-
context.log.trace("Successfully removed async subscriber [\(id)]")
87-
} else {
88-
context.log.warning("Received `.asyncUnsubscribe` for non-subscriber [\(id)]")
89-
}
90-
`continue`()
91-
}
22+
/// `ClusterEventStream` is only meant to be used locally and does not buffer or redeliver messages.
23+
public struct ClusterEventStream: AsyncSequence {
24+
public typealias Element = Cluster.Event
9225

93-
return .same
94-
}
26+
private let actor: ClusterEventStreamActor?
27+
28+
internal init(_ system: ClusterSystem, customName: String? = nil) {
29+
var props = ClusterEventStreamActor.props
30+
if let customName = customName {
31+
props._knownActorName = customName
32+
}
33+
34+
self.actor = _Props.$forSpawn.withValue(props) {
35+
ClusterEventStreamActor(actorSystem: system)
36+
}
37+
}
38+
39+
// For testing only
40+
internal init() {
41+
self.actor = nil
42+
}
43+
44+
func subscribe(_ ref: _ActorRef<Cluster.Event>, file: String = #filePath, line: UInt = #line) async {
45+
guard let actor = self.actor else { return }
46+
47+
await actor.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): this is annoying, we must track "known to be local" in typesystem instead
48+
__secretlyKnownToBeLocal.subscribe(ref)
49+
}
50+
}
51+
52+
func unsubscribe(_ ref: _ActorRef<Cluster.Event>, file: String = #filePath, line: UInt = #line) async {
53+
guard let actor = self.actor else { return }
54+
55+
await actor.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): this is annoying, we must track "known to be local" in typesystem instead
56+
__secretlyKnownToBeLocal.unsubscribe(ref)
57+
}
58+
}
59+
60+
private func subscribe(_ oid: ObjectIdentifier, eventHandler: @escaping (Cluster.Event) -> Void) async {
61+
guard let actor = self.actor else { return }
62+
63+
await actor.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): this is annoying, we must track "known to be local" in typesystem instead
64+
__secretlyKnownToBeLocal.subscribe(oid, eventHandler: eventHandler)
65+
}
66+
}
67+
68+
private func unsubscribe(_ oid: ObjectIdentifier) async {
69+
guard let actor = self.actor else { return }
70+
71+
await actor.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): this is annoying, we must track "known to be local" in typesystem instead
72+
__secretlyKnownToBeLocal.unsubscribe(oid)
73+
}
74+
}
75+
76+
func publish(_ event: Cluster.Event, file: String = #filePath, line: UInt = #line) async {
77+
guard let actor = self.actor else { return }
78+
79+
await actor.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): this is annoying, we must track "known to be local" in typesystem instead
80+
__secretlyKnownToBeLocal.publish(event)
81+
}
82+
}
83+
84+
public func makeAsyncIterator() -> AsyncIterator {
85+
AsyncIterator(self)
86+
}
87+
88+
public class AsyncIterator: AsyncIteratorProtocol {
89+
var underlying: AsyncStream<Cluster.Event>.Iterator!
9590

96-
return behavior.receiveSpecificSignal(_Signals.Terminated.self) { context, signal in
97-
if subscribers.removeValue(forKey: signal.id) != nil {
98-
context.log.trace("Removed subscriber [\(signal.id)], because it terminated")
99-
} else {
100-
context.log.warning("Received unexpected termination signal for non-subscriber [\(signal.id)]")
91+
init(_ eventStream: ClusterEventStream) {
92+
let id = ObjectIdentifier(self)
93+
self.underlying = AsyncStream<Cluster.Event> { continuation in
94+
Task {
95+
await eventStream.subscribe(id) { event in
96+
continuation.yield(event)
10197
}
98+
}
10299

103-
return .same
100+
continuation.onTermination = { _ in
101+
Task {
102+
await eventStream.unsubscribe(id)
103+
}
104104
}
105+
}.makeAsyncIterator()
106+
}
107+
108+
public func next() async -> Cluster.Event? {
109+
await self.underlying.next()
110+
}
111+
}
112+
}
113+
114+
// FIXME(distributed): the only reason this actor is distributed is because of LifecycleWatch
115+
internal distributed actor ClusterEventStreamActor: LifecycleWatch {
116+
typealias ActorSystem = ClusterSystem
117+
118+
static var props: _Props {
119+
var ps = _Props()
120+
ps._knownActorName = "clustEventStream"
121+
ps._systemActor = true
122+
ps._wellKnown = true
123+
return ps
124+
}
125+
126+
// We maintain a snapshot i.e. the "latest version of the membership",
127+
// in order to eagerly publish it to anyone who subscribes immediately,
128+
// followed by joining them to the subsequent ``Cluster/Event`` publishes.
129+
//
130+
// Thanks to this, any subscriber immediately gets a pretty recent view of the membership,
131+
// followed by the usual updates via events. Since all events are published through this
132+
// event stream actor, all subscribers are guaranteed to see events in the right order,
133+
// and not miss any information as long as they apply all events they receive.
134+
private var snapshot = Cluster.Membership.empty
135+
136+
private var subscribers: [ActorID: _ActorRef<Cluster.Event>] = [:]
137+
private var asyncSubscribers: [ObjectIdentifier: (Cluster.Event) -> Void] = [:]
138+
139+
private lazy var log = Logger(actor: self)
140+
141+
internal init(actorSystem: ActorSystem) {
142+
self.actorSystem = actorSystem
143+
}
144+
145+
func subscribe(_ ref: _ActorRef<Cluster.Event>) {
146+
self.subscribers[ref.id] = ref
147+
self.log.trace("Successfully subscribed [\(ref)], offering membership snapshot")
148+
ref.tell(.snapshot(self.snapshot))
149+
}
150+
151+
func unsubscribe(_ ref: _ActorRef<Cluster.Event>) {
152+
if self.subscribers.removeValue(forKey: ref.id) != nil {
153+
self.log.trace("Successfully unsubscribed [\(ref)]")
154+
} else {
155+
self.log.warning("Received `.unsubscribe` for non-subscriber [\(ref)]")
156+
}
157+
}
158+
159+
func subscribe(_ oid: ObjectIdentifier, eventHandler: @escaping (Cluster.Event) -> Void) {
160+
self.asyncSubscribers[oid] = eventHandler
161+
self.log.trace("Successfully added async subscriber [\(oid)], offering membership snapshot")
162+
eventHandler(.snapshot(self.snapshot))
163+
}
164+
165+
func unsubscribe(_ oid: ObjectIdentifier) {
166+
if self.asyncSubscribers.removeValue(forKey: oid) != nil {
167+
self.log.trace("Successfully removed async subscriber [\(oid)]")
168+
} else {
169+
self.log.warning("Received async `.unsubscribe` for non-subscriber [\(oid)]")
170+
}
171+
}
172+
173+
func publish(_ event: Cluster.Event) {
174+
do {
175+
try self.snapshot.apply(event: event)
176+
177+
for subscriber in self.subscribers.values {
178+
subscriber.tell(event)
179+
}
180+
for subscriber in self.asyncSubscribers.values {
181+
subscriber(event)
105182
}
183+
184+
self.log.trace(
185+
"Published event \(event) to \(self.subscribers.count) subscribers and \(self.asyncSubscribers.count) async subscribers",
186+
metadata: [
187+
"eventStream/event": "\(reflecting: event)",
188+
"eventStream/subscribers": Logger.MetadataValue.array(self.subscribers.map {
189+
Logger.MetadataValue.stringConvertible($0.key)
190+
}),
191+
"eventStream/asyncSubscribers": Logger.MetadataValue.array(self.asyncSubscribers.map {
192+
Logger.MetadataValue.stringConvertible("\($0.key)")
193+
}),
194+
]
195+
)
196+
} catch {
197+
self.log.error("Failed to apply [\(event)], error: \(error)")
198+
}
199+
}
200+
201+
distributed func terminated(actor id: ActorID) {
202+
if self.subscribers.removeValue(forKey: id) != nil {
203+
self.log.trace("Removed subscriber [\(id)], because it terminated")
106204
}
107205
}
108206
}

Sources/DistributedActors/Cluster/ClusterShell+LeaderActions.swift

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,12 @@ extension ClusterShell {
9999
}
100100

101101
system.cluster.updateMembershipSnapshot(state.membership)
102-
eventsToPublish.forEach { state.events.publish($0) }
102+
103+
Task { [eventsToPublish, state] in
104+
for event in eventsToPublish {
105+
await state.events.publish(event)
106+
}
107+
}
103108

104109
previousState.log.trace(
105110
"Membership state after leader actions: \(state.membership)",

0 commit comments

Comments
 (0)