Skip to content

Commit ac99491

Browse files
committed
=timers protect timers form shutting down system ELG during timer task
1 parent 17201e1 commit ac99491

File tree

8 files changed

+74
-59
lines changed

8 files changed

+74
-59
lines changed

Sources/DistributedActors/Cluster/Downing/DowningSettings.swift

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ public enum OnDownActionStrategySettings {
5757
.setup { context in
5858
guard .milliseconds(0) < shutdownDelay else {
5959
context.log.warning("This node was marked as [.down], delay is immediate. Shutting down the system immediately!")
60-
system.shutdown()
60+
Task {
61+
system.shutdown()
62+
}
6163
return .stop
6264
}
6365

@@ -66,7 +68,9 @@ public enum OnDownActionStrategySettings {
6668

6769
return .receiveMessage { _ in
6870
system.log.warning("Shutting down...")
69-
system.shutdown()
71+
Task {
72+
system.shutdown()
73+
}
7074
return .stop
7175
}
7276
}

Sources/DistributedActors/Cluster/Downing/DowningStrategy.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,15 @@ public struct DowningStrategyDirective {
5050
public static func startTimer(key: TimerKey, member: Cluster.Member, delay: TimeAmount) -> Self {
5151
.init(.startTimer(key: key, member: member, delay: delay))
5252
}
53+
5354
public static func cancelTimer(key: TimerKey) -> Self {
5455
.init(.cancelTimer(key: key))
5556
}
5657

5758
public static func markAsDown(members: Set<Cluster.Member>) -> Self {
5859
.init(.markAsDown(members))
5960
}
61+
6062
public static func markAsDown(_ member: Cluster.Member) -> Self {
6163
.init(.markAsDown([member]))
6264
}

Sources/DistributedActors/Cluster/HandshakeStateMachine.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ extension ClusterSystem {
2222
///
2323
/// This version does not have to match the project version, i.e. a library version `1.5.0` may still be using the protocol version `1.0.0`,
2424
/// as this version number is more about the _wire_ compatibility of the underlying protocol, rather than the library capabilities
25-
public static let protocolVersion: ClusterSystem.Version = ClusterSystem.Version(reserved: 0, major: 1, minor: 0, patch: 0)
25+
public static let protocolVersion = ClusterSystem.Version(reserved: 0, major: 1, minor: 0, patch: 0)
2626
}
2727

2828
// ==== ----------------------------------------------------------------------------------------------------------------

Sources/DistributedActors/ClusterSystem.swift

Lines changed: 42 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
184184
// ==== ----------------------------------------------------------------------------------------------------------------
185185
// MARK: Shutdown
186186
private var shutdownReceptacle = BlockingReceptacle<Error?>()
187-
private let shutdownLock = Lock()
187+
internal let shutdownLock = Lock()
188188

189189
/// Greater than 0 shutdown has been initiated / is in progress.
190190
private let shutdownFlag: ManagedAtomic<Int> = .init(0)
@@ -488,49 +488,53 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
488488
return Shutdown(receptacle: self.shutdownReceptacle)
489489
}
490490

491+
self.shutdownLock.lock()
492+
491493
/// Down this member as part of shutting down; it may have enough time to notify other nodes on an best effort basis.
492494
if let myselfMember = self.cluster.membershipSnapshot.uniqueMember(self.cluster.uniqueNode) {
493495
self.cluster.down(member: myselfMember)
494496
}
495497

496498
self.settings.plugins.stopAll(self)
497499

498-
queue.async {
499-
self.log.log(level: .debug, "Shutting down actor system [\(self.name)]. All actors will be stopped.", file: #file, function: #function, line: #line)
500-
if let cluster = self._cluster {
501-
let receptacle = BlockingReceptacle<Void>()
502-
cluster.ref.tell(.command(.shutdown(receptacle)))
503-
receptacle.wait()
504-
}
505-
self.userProvider.stopAll()
506-
self.systemProvider.stopAll()
507-
self.dispatcher.shutdown()
508-
self.downing = nil
500+
self.log.log(level: .debug, "Shutting down actor system [\(self.name)]. All actors will be stopped.", file: #file, function: #function, line: #line)
501+
defer {
502+
self.shutdownLock.unlock()
503+
}
509504

510-
self._associationTombstoneCleanupTask?.cancel()
511-
self._associationTombstoneCleanupTask = nil
505+
if let cluster = self._cluster {
506+
let receptacle = BlockingReceptacle<Void>()
507+
cluster.ref.tell(.command(.shutdown(receptacle)))
508+
receptacle.wait()
509+
}
510+
self.userProvider.stopAll()
511+
self.systemProvider.stopAll()
512+
self.dispatcher.shutdown()
513+
self.downing = nil
512514

513-
do {
514-
try self._eventLoopGroup.syncShutdownGracefully()
515-
// self._receptionistRef = self.deadLetters.adapted()
516-
} catch {
517-
self.shutdownReceptacle.offerOnce(error)
518-
afterShutdownCompleted(error)
519-
}
515+
self._associationTombstoneCleanupTask?.cancel()
516+
self._associationTombstoneCleanupTask = nil
520517

521-
/// Only once we've shutdown all dispatchers and loops, we clear cycles between the serialization and system,
522-
/// as they should never be invoked anymore.
523-
/*
524-
self.lazyInitializationLock.withWriterLockVoid {
525-
// self._serialization = nil // FIXME: need to release serialization
526-
}
527-
*/
528-
_ = self._clusterStore.storeIfNilThenLoad(Box(nil))
529-
530-
self.shutdownReceptacle.offerOnce(nil)
531-
afterShutdownCompleted(nil)
518+
do {
519+
try self._eventLoopGroup.syncShutdownGracefully()
520+
// self._receptionistRef = self.deadLetters.adapted()
521+
} catch {
522+
self.shutdownReceptacle.offerOnce(error)
523+
afterShutdownCompleted(error)
532524
}
533525

526+
/// Only once we've shutdown all dispatchers and loops, we clear cycles between the serialization and system,
527+
/// as they should never be invoked anymore.
528+
/*
529+
self.lazyInitializationLock.withWriterLockVoid {
530+
// self._serialization = nil // FIXME: need to release serialization
531+
}
532+
*/
533+
_ = self._clusterStore.storeIfNilThenLoad(Box(nil))
534+
535+
self.shutdownReceptacle.offerOnce(nil)
536+
afterShutdownCompleted(nil)
537+
534538
return Shutdown(receptacle: self.shutdownReceptacle)
535539
}
536540
}
@@ -983,6 +987,9 @@ extension ClusterSystem {
983987
guard let clusterShell = _cluster else {
984988
throw RemoteCallError.clusterAlreadyShutDown
985989
}
990+
guard self.shutdownFlag.load(ordering: .relaxed) == 0 else {
991+
throw RemoteCallError.clusterAlreadyShutDown
992+
}
986993

987994
let recipient = _RemoteClusterActorPersonality<InvocationMessage>(shell: clusterShell, id: actor.id._asRemote, system: self)
988995
let arguments = invocation.arguments
@@ -1018,6 +1025,9 @@ extension ClusterSystem {
10181025
guard let clusterShell = self._cluster else {
10191026
throw RemoteCallError.clusterAlreadyShutDown
10201027
}
1028+
guard self.shutdownFlag.load(ordering: .relaxed) == 0 else {
1029+
throw RemoteCallError.clusterAlreadyShutDown
1030+
}
10211031

10221032
let recipient = _RemoteClusterActorPersonality<InvocationMessage>(shell: clusterShell, id: actor.id._asRemote, system: self)
10231033
let arguments = invocation.arguments

Sources/DistributedActors/Plugins/ClusterSystemSettings+Plugins.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ public struct PluginsSettings {
5959
plugin.stop(system)
6060
}
6161
}
62-
6362
}
6463

6564
extension PluginsSettings {

Sources/DistributedActors/Timers+Distributed.swift

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
import Dispatch
1616
import Distributed
17-
import Logging
1817
import struct NIO.TimeAmount
1918

2019
@usableFromInline
@@ -48,24 +47,19 @@ public final class ActorTimers<Act: DistributedActor> where Act.ActorSystem == C
4847
@usableFromInline
4948
internal var installedTimers: [TimerKey: DistributedActorTimer] = [:]
5049

51-
@usableFromInline
52-
internal var log: Logger
50+
// TODO: this is a workaround, we're removing ActorTimers since they can't participate in structured cancellation
51+
weak var actorSystem: Act.ActorSystem?
5352

5453
/// Create a timers instance owned by the passed in actor.
5554
///
5655
/// Does not retain the distributed actor.
5756
///
5857
/// - Parameter myself:
5958
public init(_ myself: Act) {
60-
self.log = Logger(label: "\(myself)") // FIXME(distributed): pick up the actor logger (!!!)
61-
log[metadataKey: "actor/id"] = "\(myself.id.detailedDescription)"
6259
self.ownerID = myself.id
6360
}
6461

6562
deinit {
66-
if installedTimers.count > 0 {
67-
log.debug("\(Self.self) deinit, cancelling [\(installedTimers.count)] timers") // TODO: include actor address
68-
}
6963
self._cancelAll(includeSystemTimers: true)
7064
}
7165

@@ -76,7 +70,6 @@ public final class ActorTimers<Act: DistributedActor> where Act.ActorSystem == C
7670

7771
internal func _cancelAll(includeSystemTimers: Bool) {
7872
for key in self.installedTimers.keys where includeSystemTimers || !key.isSystemTimer {
79-
// TODO: represent with "system timer key" type?
8073
// TODO: the reason the `_` keys are not cancelled is because we want to cancel timers in _restartPrepare but we need "our restart timer" to remain
8174
self.cancel(for: key)
8275
}
@@ -88,9 +81,6 @@ public final class ActorTimers<Act: DistributedActor> where Act.ActorSystem == C
8881
@inlinable
8982
public func cancel(for key: TimerKey) {
9083
if let timer = self.installedTimers.removeValue(forKey: key) {
91-
// if system.settings.logging.verboseTimers {
92-
// self.log.trace("Cancel timer [\(key)] with generation [\(timer.generation)]", metadata: self.metadata)
93-
// }
9484
timer.handle.cancel()
9585
}
9686
}
@@ -105,8 +95,6 @@ public final class ActorTimers<Act: DistributedActor> where Act.ActorSystem == C
10595

10696
/// Starts a timer that will invoke the provided `call` closure on the actor's context after the specified delay.
10797
///
108-
/// Timer keys are used for logging purposes and should descriptively explain the purpose of this timer.
109-
///
11098
/// - Parameters:
11199
/// - key: the key associated with the timer
112100
/// - call: the call that will be made after the `delay` amount of time elapses
@@ -122,8 +110,6 @@ public final class ActorTimers<Act: DistributedActor> where Act.ActorSystem == C
122110

123111
/// Starts a timer that will periodically invoke the passed in `call` closure on the actor's context.
124112
///
125-
/// Timer keys are used for logging purposes and should descriptively explain the purpose of this timer.
126-
///
127113
/// - Parameters:
128114
/// - key: the key associated with the timer
129115
/// - call: the call that will be executed after the `delay` amount of time elapses
@@ -146,22 +132,37 @@ public final class ActorTimers<Act: DistributedActor> where Act.ActorSystem == C
146132
) {
147133
self.cancel(for: key)
148134

149-
// let generation = self.nextTimerGen() // TODO(distributed): we're not using generations since we don't have restarts
150-
// let event = DistributedActorTimerEvent(key: key, generation: generation, owner: self.ownerID)
151135
let handle: Cancelable
152136
if repeated {
153137
handle = self.dispatchQueue.scheduleAsync(initialDelay: interval, interval: interval) {
138+
// We take the lock to prevent the system from shutting down
139+
// while we're in the middle of potentially issuing remote calls
140+
// Which may cause: Cannot schedule tasks on an EventLoop that has already shut down.
141+
// The actual solution is outstanding work tracking potentially.
142+
let system = self.actorSystem
143+
system?.shutdownLock.lock()
144+
defer {
145+
system?.shutdownLock.unlock()
146+
}
147+
154148
await call()
155149
}
156150
} else {
157151
handle = self.dispatchQueue.scheduleOnceAsync(delay: interval) {
152+
// We take the lock to prevent the system from shutting down
153+
// while we're in the middle of potentially issuing remote calls
154+
// Which may cause: Cannot schedule tasks on an EventLoop that has already shut down.
155+
// The actual solution is outstanding work tracking potentially.
156+
let system = self.actorSystem
157+
system?.shutdownLock.lock()
158+
defer {
159+
system?.shutdownLock.unlock()
160+
}
161+
158162
await call()
159163
}
160164
}
161165

162-
// if system.settings.logging.verboseTimers {
163-
// self.log.trace("Started timer [\(key)] with generation [\(generation)]", metadata: self.metadata)
164-
// }
165166
self.installedTimers[key] = DistributedActorTimer(key: key, repeated: repeated, handle: handle)
166167
}
167168
}

Sources/DistributedActors/Version.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
extension ClusterSystem {
1616
/// Version advertised to other nodes while joining the cluster.
17-
///
17+
///
1818
/// Can be used to determine wire of feature compatibility of nodes joining a cluster.
1919
public struct Version: Equatable, CustomStringConvertible {
2020
/// Exact semantics of the reserved field remain to be defined.

Tests/DistributedActorsTests/Metrics/ActorMetricsTests.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ final class ActorMetricsTests: ClusteredActorSystemsXCTestCase {
7272

7373
sleep(5)
7474
let gauge = try self.metrics.expectGauge("first.measuredActorGroup.mailbox.count")
75-
pprint("Mailbox run visualized via \(gauge): \(gauge.values)")
7675
// we can't really reliably test that we get to some "maximum" since the actor starts processing messages as they come in
7776
gauge.lastValue.shouldEqual(0) // after processing we must always go back to zero
7877
}

0 commit comments

Comments
 (0)