Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ let ref = try system._spawn(
return .same
}
)
system.cluster.events.subscribe(ref)

Task {
for await event in system.cluster.events {
system.log.info("Event: \(event)")
}
}

if args.count >= 3 {
print("getting host")
Expand Down
4 changes: 2 additions & 2 deletions Sources/DistributedActors/Cluster/ClusterControl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public struct ClusterControl {
///
/// This sequence begins with a snapshot of the current cluster state and continues with events representing changes
/// since the snapshot.
public let events: EventStream<Cluster.Event> // FIXME: make this an AsyncSequence<Cluster.Event>
public let events: ClusterEventStream

/// Offers a snapshot of membership, which may be used to perform ad-hoc tests against the membership.
/// Note that this view may be immediately outdated after checking if, if e.g. a membership change is just being processed.
Expand Down Expand Up @@ -74,7 +74,7 @@ public struct ClusterControl {
private let cluster: ClusterShell?
internal let ref: ClusterShell.Ref

init(_ settings: ClusterSystemSettings, cluster: ClusterShell?, clusterRef: ClusterShell.Ref, eventStream: EventStream<Cluster.Event>) {
init(_ settings: ClusterSystemSettings, cluster: ClusterShell?, clusterRef: ClusterShell.Ref, eventStream: ClusterEventStream) {
self.settings = settings
self.cluster = cluster
self.ref = clusterRef
Expand Down
264 changes: 181 additions & 83 deletions Sources/DistributedActors/Cluster/ClusterEventStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2021 Apple Inc. and the Swift Distributed Actors project authors
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand All @@ -12,97 +12,195 @@
//
//===----------------------------------------------------------------------===//

import Distributed
import Logging

/// Specialized event stream behavior which takes into account emitting a snapshot event on first subscription,
/// followed by a stream of ``Cluster/Event``s.
/// `ClusterEventStream` manages a set of subscribers and forwards every event published to it to
/// all its subscribers. An actor can subscribe/unsubscribe to the event stream via `AsyncSequence`
/// constructs. Subscribers will be watched and removed in case they terminate.
///
/// This ensures that every subscriber to cluster events never misses any of the membership events, meaning
/// it is possible for anyone to maintain a local up-to-date copy of `Membership` by applying all these events to that copy.
internal enum ClusterEventStream {
enum Shell {
static var behavior: _Behavior<EventStreamShell.Message<Cluster.Event>> {
.setup { context in

// We maintain a snapshot i.e. the "latest version of the membership",
// in order to eagerly publish it to anyone who subscribes immediately,
// followed by joining them to the subsequent ``Cluster/Event`` publishes.
//
// Thanks to this, any subscriber immediately gets a pretty recent view of the membership,
// followed by the usual updates via events. Since all events are published through this
// event stream actor, all subscribers are guaranteed to see events in the right order,
// and not miss any information as long as they apply all events they receive.
var snapshot = Cluster.Membership.empty
var subscribers: [ActorID: _ActorRef<Cluster.Event>] = [:]
var asyncSubscribers: [ObjectIdentifier: (Cluster.Event) -> Void] = [:]

let behavior: _Behavior<EventStreamShell.Message<Cluster.Event>> = .receiveMessage { message in
switch message {
case .subscribe(let ref):
subscribers[ref.id] = ref
context.watch(ref)
context.log.trace("Successfully subscribed [\(ref)], offering membership snapshot")
ref.tell(.snapshot(snapshot))

case .unsubscribe(let ref):
if subscribers.removeValue(forKey: ref.id) != nil {
context.unwatch(ref)
context.log.trace("Successfully unsubscribed [\(ref)]")
} else {
context.log.warning("Received `.unsubscribe` for non-subscriber [\(ref)]")
}

case .publish(let event):
try snapshot.apply(event: event)

for subscriber in subscribers.values {
subscriber.tell(event)
}
for subscriber in asyncSubscribers.values {
subscriber(event)
}

context.log.trace(
"Published event \(event) to \(subscribers.count) subscribers and \(asyncSubscribers.count) async subscribers",
metadata: [
"eventStream/event": "\(reflecting: event)",
"eventStream/subscribers": Logger.MetadataValue.array(subscribers.map {
Logger.MetadataValue.stringConvertible($0.key)
}),
"eventStream/asyncSubscribers": Logger.MetadataValue.array(asyncSubscribers.map {
Logger.MetadataValue.stringConvertible("\($0.key)")
}),
]
)

case .asyncSubscribe(let id, let eventHandler, let `continue`):
asyncSubscribers[id] = eventHandler
context.log.trace("Successfully added async subscriber [\(id)]")
`continue`()
eventHandler(Cluster.Event.snapshot(snapshot))

case .asyncUnsubscribe(let id, let `continue`):
if asyncSubscribers.removeValue(forKey: id) != nil {
context.log.trace("Successfully removed async subscriber [\(id)]")
} else {
context.log.warning("Received `.asyncUnsubscribe` for non-subscriber [\(id)]")
}
`continue`()
}
/// `ClusterEventStream` is only meant to be used locally and does not buffer or redeliver messages.
public struct ClusterEventStream: AsyncSequence {
public typealias Element = Cluster.Event

return .same
}
private let actor: ClusterEventStreamActor?

internal init(_ system: ClusterSystem, customName: String? = nil) {
var props = ClusterEventStreamActor.props
if let customName = customName {
props._knownActorName = customName
}

self.actor = _Props.$forSpawn.withValue(props) {
ClusterEventStreamActor(actorSystem: system)
}
}

// For testing only
internal init() {
self.actor = nil
}

func subscribe(_ ref: _ActorRef<Cluster.Event>, file: String = #filePath, line: UInt = #line) async {
guard let actor = self.actor else { return }

await actor.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): this is annoying, we must track "known to be local" in typesystem instead
__secretlyKnownToBeLocal.subscribe(ref)
}
}

func unsubscribe(_ ref: _ActorRef<Cluster.Event>, file: String = #filePath, line: UInt = #line) async {
guard let actor = self.actor else { return }

await actor.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): this is annoying, we must track "known to be local" in typesystem instead
__secretlyKnownToBeLocal.unsubscribe(ref)
}
}

private func subscribe(_ oid: ObjectIdentifier, eventHandler: @escaping (Cluster.Event) -> Void) async {
guard let actor = self.actor else { return }

await actor.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): this is annoying, we must track "known to be local" in typesystem instead
__secretlyKnownToBeLocal.subscribe(oid, eventHandler: eventHandler)
}
}

private func unsubscribe(_ oid: ObjectIdentifier) async {
guard let actor = self.actor else { return }

await actor.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): this is annoying, we must track "known to be local" in typesystem instead
__secretlyKnownToBeLocal.unsubscribe(oid)
}
}

func publish(_ event: Cluster.Event, file: String = #filePath, line: UInt = #line) async {
guard let actor = self.actor else { return }

await actor.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): this is annoying, we must track "known to be local" in typesystem instead
__secretlyKnownToBeLocal.publish(event)
}
}

public func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(self)
}

public class AsyncIterator: AsyncIteratorProtocol {
var underlying: AsyncStream<Cluster.Event>.Iterator!

return behavior.receiveSpecificSignal(_Signals.Terminated.self) { context, signal in
if subscribers.removeValue(forKey: signal.id) != nil {
context.log.trace("Removed subscriber [\(signal.id)], because it terminated")
} else {
context.log.warning("Received unexpected termination signal for non-subscriber [\(signal.id)]")
init(_ eventStream: ClusterEventStream) {
let id = ObjectIdentifier(self)
self.underlying = AsyncStream<Cluster.Event> { continuation in
Task {
await eventStream.subscribe(id) { event in
continuation.yield(event)
}
}

return .same
continuation.onTermination = { _ in
Task {
await eventStream.unsubscribe(id)
}
}
}.makeAsyncIterator()
}

public func next() async -> Cluster.Event? {
await self.underlying.next()
}
}
}

// FIXME(distributed): the only reason this actor is distributed is because of LifecycleWatch
internal distributed actor ClusterEventStreamActor: LifecycleWatch {
typealias ActorSystem = ClusterSystem

static var props: _Props {
var ps = _Props()
ps._knownActorName = "clustEventStream"
ps._systemActor = true
ps._wellKnown = true
return ps
}

// We maintain a snapshot i.e. the "latest version of the membership",
// in order to eagerly publish it to anyone who subscribes immediately,
// followed by joining them to the subsequent ``Cluster/Event`` publishes.
//
// Thanks to this, any subscriber immediately gets a pretty recent view of the membership,
// followed by the usual updates via events. Since all events are published through this
// event stream actor, all subscribers are guaranteed to see events in the right order,
// and not miss any information as long as they apply all events they receive.
private var snapshot = Cluster.Membership.empty

private var subscribers: [ActorID: _ActorRef<Cluster.Event>] = [:]
private var asyncSubscribers: [ObjectIdentifier: (Cluster.Event) -> Void] = [:]

private lazy var log = Logger(actor: self)

internal init(actorSystem: ActorSystem) {
self.actorSystem = actorSystem
}

func subscribe(_ ref: _ActorRef<Cluster.Event>) {
self.subscribers[ref.id] = ref
self.log.trace("Successfully subscribed [\(ref)], offering membership snapshot")
ref.tell(.snapshot(self.snapshot))
}

func unsubscribe(_ ref: _ActorRef<Cluster.Event>) {
if self.subscribers.removeValue(forKey: ref.id) != nil {
self.log.trace("Successfully unsubscribed [\(ref)]")
} else {
self.log.warning("Received `.unsubscribe` for non-subscriber [\(ref)]")
}
}

func subscribe(_ oid: ObjectIdentifier, eventHandler: @escaping (Cluster.Event) -> Void) {
self.asyncSubscribers[oid] = eventHandler
self.log.trace("Successfully added async subscriber [\(oid)], offering membership snapshot")
eventHandler(.snapshot(self.snapshot))
}

func unsubscribe(_ oid: ObjectIdentifier) {
if self.asyncSubscribers.removeValue(forKey: oid) != nil {
self.log.trace("Successfully removed async subscriber [\(oid)]")
} else {
self.log.warning("Received async `.unsubscribe` for non-subscriber [\(oid)]")
}
}

func publish(_ event: Cluster.Event) {
do {
try self.snapshot.apply(event: event)

for subscriber in self.subscribers.values {
subscriber.tell(event)
}
for subscriber in self.asyncSubscribers.values {
subscriber(event)
}

self.log.trace(
"Published event \(event) to \(self.subscribers.count) subscribers and \(self.asyncSubscribers.count) async subscribers",
metadata: [
"eventStream/event": "\(reflecting: event)",
"eventStream/subscribers": Logger.MetadataValue.array(self.subscribers.map {
Logger.MetadataValue.stringConvertible($0.key)
}),
"eventStream/asyncSubscribers": Logger.MetadataValue.array(self.asyncSubscribers.map {
Logger.MetadataValue.stringConvertible("\($0.key)")
}),
]
)
} catch {
self.log.error("Failed to apply [\(event)], error: \(error)")
}
}

distributed func terminated(actor id: ActorID) {
if self.subscribers.removeValue(forKey: id) != nil {
self.log.trace("Removed subscriber [\(id)], because it terminated")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@ extension ClusterShell {
}

system.cluster.updateMembershipSnapshot(state.membership)
eventsToPublish.forEach { state.events.publish($0) }

Task { [eventsToPublish, state] in
for event in eventsToPublish {
await state.events.publish(event)
}
}

previousState.log.trace(
"Membership state after leader actions: \(state.membership)",
Expand Down
Loading