Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ public enum OnDownActionStrategySettings {
.setup { context in
guard .milliseconds(0) < shutdownDelay else {
context.log.warning("This node was marked as [.down], delay is immediate. Shutting down the system immediately!")
system.shutdown()
Task {
system.shutdown()
}
return .stop
}

Expand All @@ -66,7 +68,9 @@ public enum OnDownActionStrategySettings {

return .receiveMessage { _ in
system.log.warning("Shutting down...")
system.shutdown()
Task {
system.shutdown()
}
return .stop
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,15 @@ public struct DowningStrategyDirective {
public static func startTimer(key: TimerKey, member: Cluster.Member, delay: TimeAmount) -> Self {
.init(.startTimer(key: key, member: member, delay: delay))
}

public static func cancelTimer(key: TimerKey) -> Self {
.init(.cancelTimer(key: key))
}

public static func markAsDown(members: Set<Cluster.Member>) -> Self {
.init(.markAsDown(members))
}

public static func markAsDown(_ member: Cluster.Member) -> Self {
.init(.markAsDown([member]))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ extension ClusterSystem {
///
/// This version does not have to match the project version, i.e. a library version `1.5.0` may still be using the protocol version `1.0.0`,
/// as this version number is more about the _wire_ compatibility of the underlying protocol, rather than the library capabilities
public static let protocolVersion: ClusterSystem.Version = ClusterSystem.Version(reserved: 0, major: 1, minor: 0, patch: 0)
public static let protocolVersion = ClusterSystem.Version(reserved: 0, major: 1, minor: 0, patch: 0)
}

// ==== ----------------------------------------------------------------------------------------------------------------
Expand Down
74 changes: 42 additions & 32 deletions Sources/DistributedActors/ClusterSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Shutdown
private var shutdownReceptacle = BlockingReceptacle<Error?>()
private let shutdownLock = Lock()
internal let shutdownLock = Lock()

/// Greater than 0 shutdown has been initiated / is in progress.
private let shutdownFlag: ManagedAtomic<Int> = .init(0)
Expand Down Expand Up @@ -488,49 +488,53 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
return Shutdown(receptacle: self.shutdownReceptacle)
}

self.shutdownLock.lock()

/// Down this member as part of shutting down; it may have enough time to notify other nodes on an best effort basis.
if let myselfMember = self.cluster.membershipSnapshot.uniqueMember(self.cluster.uniqueNode) {
self.cluster.down(member: myselfMember)
}

self.settings.plugins.stopAll(self)

queue.async {
self.log.log(level: .debug, "Shutting down actor system [\(self.name)]. All actors will be stopped.", file: #file, function: #function, line: #line)
if let cluster = self._cluster {
let receptacle = BlockingReceptacle<Void>()
cluster.ref.tell(.command(.shutdown(receptacle)))
receptacle.wait()
}
self.userProvider.stopAll()
self.systemProvider.stopAll()
self.dispatcher.shutdown()
self.downing = nil
self.log.log(level: .debug, "Shutting down actor system [\(self.name)]. All actors will be stopped.", file: #file, function: #function, line: #line)
defer {
self.shutdownLock.unlock()
}

self._associationTombstoneCleanupTask?.cancel()
self._associationTombstoneCleanupTask = nil
if let cluster = self._cluster {
let receptacle = BlockingReceptacle<Void>()
cluster.ref.tell(.command(.shutdown(receptacle)))
receptacle.wait()
}
self.userProvider.stopAll()
self.systemProvider.stopAll()
self.dispatcher.shutdown()
self.downing = nil

do {
try self._eventLoopGroup.syncShutdownGracefully()
// self._receptionistRef = self.deadLetters.adapted()
} catch {
self.shutdownReceptacle.offerOnce(error)
afterShutdownCompleted(error)
}
self._associationTombstoneCleanupTask?.cancel()
self._associationTombstoneCleanupTask = nil

/// Only once we've shutdown all dispatchers and loops, we clear cycles between the serialization and system,
/// as they should never be invoked anymore.
/*
self.lazyInitializationLock.withWriterLockVoid {
// self._serialization = nil // FIXME: need to release serialization
}
*/
_ = self._clusterStore.storeIfNilThenLoad(Box(nil))

self.shutdownReceptacle.offerOnce(nil)
afterShutdownCompleted(nil)
do {
try self._eventLoopGroup.syncShutdownGracefully()
// self._receptionistRef = self.deadLetters.adapted()
} catch {
self.shutdownReceptacle.offerOnce(error)
afterShutdownCompleted(error)
}

/// Only once we've shutdown all dispatchers and loops, we clear cycles between the serialization and system,
/// as they should never be invoked anymore.
/*
self.lazyInitializationLock.withWriterLockVoid {
// self._serialization = nil // FIXME: need to release serialization
}
*/
_ = self._clusterStore.storeIfNilThenLoad(Box(nil))

self.shutdownReceptacle.offerOnce(nil)
afterShutdownCompleted(nil)

return Shutdown(receptacle: self.shutdownReceptacle)
}
}
Expand Down Expand Up @@ -983,6 +987,9 @@ extension ClusterSystem {
guard let clusterShell = _cluster else {
throw RemoteCallError.clusterAlreadyShutDown
}
guard self.shutdownFlag.load(ordering: .relaxed) == 0 else {
throw RemoteCallError.clusterAlreadyShutDown
}

let recipient = _RemoteClusterActorPersonality<InvocationMessage>(shell: clusterShell, id: actor.id._asRemote, system: self)
let arguments = invocation.arguments
Expand Down Expand Up @@ -1018,6 +1025,9 @@ extension ClusterSystem {
guard let clusterShell = self._cluster else {
throw RemoteCallError.clusterAlreadyShutDown
}
guard self.shutdownFlag.load(ordering: .relaxed) == 0 else {
throw RemoteCallError.clusterAlreadyShutDown
}

let recipient = _RemoteClusterActorPersonality<InvocationMessage>(shell: clusterShell, id: actor.id._asRemote, system: self)
let arguments = invocation.arguments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public struct PluginsSettings {
plugin.stop(system)
}
}

}

extension PluginsSettings {
Expand Down
43 changes: 22 additions & 21 deletions Sources/DistributedActors/Timers+Distributed.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import Dispatch
import Distributed
import Logging
import struct NIO.TimeAmount

@usableFromInline
Expand Down Expand Up @@ -48,24 +47,19 @@ public final class ActorTimers<Act: DistributedActor> where Act.ActorSystem == C
@usableFromInline
internal var installedTimers: [TimerKey: DistributedActorTimer] = [:]

@usableFromInline
internal var log: Logger
// TODO: this is a workaround, we're removing ActorTimers since they can't participate in structured cancellation
weak var actorSystem: Act.ActorSystem?

/// Create a timers instance owned by the passed in actor.
///
/// Does not retain the distributed actor.
///
/// - Parameter myself:
public init(_ myself: Act) {
self.log = Logger(label: "\(myself)") // FIXME(distributed): pick up the actor logger (!!!)
log[metadataKey: "actor/id"] = "\(myself.id.detailedDescription)"
self.ownerID = myself.id
}

deinit {
if installedTimers.count > 0 {
log.debug("\(Self.self) deinit, cancelling [\(installedTimers.count)] timers") // TODO: include actor address
}
self._cancelAll(includeSystemTimers: true)
}

Expand All @@ -76,7 +70,6 @@ public final class ActorTimers<Act: DistributedActor> where Act.ActorSystem == C

internal func _cancelAll(includeSystemTimers: Bool) {
for key in self.installedTimers.keys where includeSystemTimers || !key.isSystemTimer {
// TODO: represent with "system timer key" type?
// TODO: the reason the `_` keys are not cancelled is because we want to cancel timers in _restartPrepare but we need "our restart timer" to remain
self.cancel(for: key)
}
Expand All @@ -88,9 +81,6 @@ public final class ActorTimers<Act: DistributedActor> where Act.ActorSystem == C
@inlinable
public func cancel(for key: TimerKey) {
if let timer = self.installedTimers.removeValue(forKey: key) {
// if system.settings.logging.verboseTimers {
// self.log.trace("Cancel timer [\(key)] with generation [\(timer.generation)]", metadata: self.metadata)
// }
timer.handle.cancel()
}
}
Expand All @@ -105,8 +95,6 @@ public final class ActorTimers<Act: DistributedActor> where Act.ActorSystem == C

/// Starts a timer that will invoke the provided `call` closure on the actor's context after the specified delay.
///
/// Timer keys are used for logging purposes and should descriptively explain the purpose of this timer.
///
/// - Parameters:
/// - key: the key associated with the timer
/// - call: the call that will be made after the `delay` amount of time elapses
Expand All @@ -122,8 +110,6 @@ public final class ActorTimers<Act: DistributedActor> where Act.ActorSystem == C

/// Starts a timer that will periodically invoke the passed in `call` closure on the actor's context.
///
/// Timer keys are used for logging purposes and should descriptively explain the purpose of this timer.
///
/// - Parameters:
/// - key: the key associated with the timer
/// - call: the call that will be executed after the `delay` amount of time elapses
Expand All @@ -146,22 +132,37 @@ public final class ActorTimers<Act: DistributedActor> where Act.ActorSystem == C
) {
self.cancel(for: key)

// let generation = self.nextTimerGen() // TODO(distributed): we're not using generations since we don't have restarts
// let event = DistributedActorTimerEvent(key: key, generation: generation, owner: self.ownerID)
let handle: Cancelable
if repeated {
handle = self.dispatchQueue.scheduleAsync(initialDelay: interval, interval: interval) {
// We take the lock to prevent the system from shutting down
// while we're in the middle of potentially issuing remote calls
// Which may cause: Cannot schedule tasks on an EventLoop that has already shut down.
// The actual solution is outstanding work tracking potentially.
let system = self.actorSystem
system?.shutdownLock.lock()
defer {
system?.shutdownLock.unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ktoso this isn't legal code because you may lock and unlock on different threads which is UB and will likely crash. So you can never hold a mutex/lock across an await. I'd recommend only using the withLock { ... } version which will prevent you from doing this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh boy, you're right of course -- moving to DispatchSemaphore. Thanks for spotting this!

}

await call()
}
} else {
handle = self.dispatchQueue.scheduleOnceAsync(delay: interval) {
// We take the lock to prevent the system from shutting down
// while we're in the middle of potentially issuing remote calls
// Which may cause: Cannot schedule tasks on an EventLoop that has already shut down.
// The actual solution is outstanding work tracking potentially.
let system = self.actorSystem
system?.shutdownLock.lock()
defer {
system?.shutdownLock.unlock()
}

await call()
}
}

// if system.settings.logging.verboseTimers {
// self.log.trace("Started timer [\(key)] with generation [\(generation)]", metadata: self.metadata)
// }
self.installedTimers[key] = DistributedActorTimer(key: key, repeated: repeated, handle: handle)
}
}
2 changes: 1 addition & 1 deletion Sources/DistributedActors/Version.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

extension ClusterSystem {
/// Version advertised to other nodes while joining the cluster.
///
///
/// Can be used to determine wire of feature compatibility of nodes joining a cluster.
public struct Version: Equatable, CustomStringConvertible {
/// Exact semantics of the reserved field remain to be defined.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ final class ActorMetricsTests: ClusteredActorSystemsXCTestCase {

sleep(5)
let gauge = try self.metrics.expectGauge("first.measuredActorGroup.mailbox.count")
pprint("Mailbox run visualized via \(gauge): \(gauge.values)")
// we can't really reliably test that we get to some "maximum" since the actor starts processing messages as they come in
gauge.lastValue.shouldEqual(0) // after processing we must always go back to zero
}
Expand Down