diff --git a/Package.swift b/Package.swift index 390d087aa..483c2fa45 100644 --- a/Package.swift +++ b/Package.swift @@ -30,6 +30,7 @@ var targets: [PackageDescription.Target] = [ .product(name: "Metrics", package: "swift-metrics"), .product(name: "ServiceDiscovery", package: "swift-service-discovery"), .product(name: "Backtrace", package: "swift-backtrace"), + .product(name: "AsyncAlgorithms", package: "swift-async-algorithms"), ] ), @@ -179,26 +180,27 @@ var targets: [PackageDescription.Target] = [ var dependencies: [Package.Dependency] = [ .package(url: "https://github.com/apple/swift-atomics", from: "1.0.2"), - .package(url: "https://github.com/apple/swift-cluster-membership.git", from: "0.3.0"), + .package(url: "https://github.com/apple/swift-cluster-membership", from: "0.3.0"), - .package(url: "https://github.com/apple/swift-nio.git", from: "2.40.0"), - .package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.2.0"), - .package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.16.1"), + .package(url: "https://github.com/apple/swift-nio", from: "2.40.0"), + .package(url: "https://github.com/apple/swift-nio-extras", from: "1.2.0"), + .package(url: "https://github.com/apple/swift-nio-ssl", from: "2.16.1"), - .package(url: "https://github.com/apple/swift-protobuf.git", from: "1.7.0"), + .package(url: "https://github.com/apple/swift-protobuf", from: "1.7.0"), // ~~~ backtraces ~~~ // TODO: optimally, library should not pull swift-backtrace - .package(url: "https://github.com/swift-server/swift-backtrace.git", from: "1.1.1"), + .package(url: "https://github.com/swift-server/swift-backtrace", from: "1.1.1"), - // ~~~ Swift Collections ~~~ - .package(url: "https://github.com/apple/swift-collections.git", from: "1.0.1"), + // ~~~ Swift libraries ~~~ + .package(url: "https://github.com/apple/swift-async-algorithms", from: "0.0.3"), + .package(url: "https://github.com/apple/swift-collections", from: "1.0.1"), // ~~~ Observability ~~~ - .package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"), + .package(url: "https://github.com/apple/swift-log", from: "1.0.0"), // swift-metrics 1.x and 2.x are almost API compatible, so most clients should use - .package(url: "https://github.com/apple/swift-metrics.git", "1.0.0" ..< "3.0.0"), - .package(url: "https://github.com/apple/swift-service-discovery.git", from: "1.0.0"), + .package(url: "https://github.com/apple/swift-metrics", "1.0.0" ..< "3.0.0"), + .package(url: "https://github.com/apple/swift-service-discovery", from: "1.0.0"), // ~~~ SwiftPM Plugins ~~~ .package(url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0"), diff --git a/Samples/Sources/SampleDiningPhilosophers/Philosopher.swift b/Samples/Sources/SampleDiningPhilosophers/Philosopher.swift index f72836215..c59413059 100644 --- a/Samples/Sources/SampleDiningPhilosophers/Philosopher.swift +++ b/Samples/Sources/SampleDiningPhilosophers/Philosopher.swift @@ -2,7 +2,7 @@ // // This source file is part of the Swift Distributed Actors open source project // -// Copyright (c) 2018-2021 Apple Inc. and the Swift Distributed Actors project authors +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -24,8 +24,6 @@ distributed actor Philosopher: CustomStringConvertible { private let rightFork: Fork private var state: State = .thinking - private lazy var timers = DistributedActors.ActorTimers(self) - init(name: String, leftFork: Fork, rightFork: Fork, actorSystem: ActorSystem) { self.actorSystem = actorSystem self.name = name @@ -58,15 +56,16 @@ distributed actor Philosopher: CustomStringConvertible { } self.state = .thinking - self.timers.startSingle(key: .becomeHungry, delay: .seconds(1)) { + Task { + try await Task.sleep(until: .now + .seconds(1), clock: .continuous) await self.attemptToTakeForks() } - self.log.info("\(self.self.name) is thinking...") + self.log.info("\(self.name) is thinking...") } distributed func attemptToTakeForks() async { guard self.state == .thinking else { - self.log.error("\(self.self.name) tried to take a fork but was not in the thinking state!") + self.log.error("\(self.name) tried to take a fork but was not in the thinking state!") return } @@ -89,14 +88,14 @@ distributed actor Philosopher: CustomStringConvertible { } self.forkTaken(self.rightFork) } catch { - self.log.info("\(self.self.name) wasn't able to take both forks!") + self.log.info("\(self.name) wasn't able to take both forks!") self.think() } } /// Message sent to oneself after a timer exceeds and we're done `eating` and can become `thinking` again. distributed func stopEating() { - self.log.info("\(self.self.name) is done eating and replaced both forks!") + self.log.info("\(self.name) is done eating and replaced both forks!") Task { do { try await self.leftFork.putBack() @@ -128,10 +127,10 @@ distributed actor Philosopher: CustomStringConvertible { switch fork { case self.leftFork: - self.log.info("\(self.self.name) received their left fork!") + self.log.info("\(self.name) received their left fork!") self.state = .takingForks(leftTaken: true, rightTaken: rightForkIsTaken) case self.rightFork: - self.log.info("\(self.self.name) received their right fork!") + self.log.info("\(self.name) received their right fork!") self.state = .takingForks(leftTaken: leftForkIsTaken, rightTaken: true) default: self.log.error("Received unknown fork! Got: \(fork). Known forks: \(self.leftFork), \(self.rightFork)") @@ -144,18 +143,14 @@ distributed actor Philosopher: CustomStringConvertible { private func becomeEating() { self.state = .eating - self.log.notice("\(self.self.name) began eating!") - self.timers.startSingle(key: .becomeHungry, delay: .seconds(3)) { - await self.stopEating() + self.log.notice("\(self.name) began eating!") + Task { + try await Task.sleep(until: .now + .seconds(3), clock: .continuous) + self.stopEating() } } } -extension TimerKey { - static let becomeHungry: Self = "become-hungry" - static let finishEating: Self = "finish-eating" -} - extension Philosopher { private enum State: Equatable { case thinking diff --git a/Sources/DistributedActors/Cluster/ClusterControl.swift b/Sources/DistributedActors/Cluster/ClusterControl.swift index be9076827..c93ec2eb0 100644 --- a/Sources/DistributedActors/Cluster/ClusterControl.swift +++ b/Sources/DistributedActors/Cluster/ClusterControl.swift @@ -218,7 +218,7 @@ public struct ClusterControl { public func waitFor(_ node: UniqueNode, _ status: Cluster.MemberStatus, within: Duration) async throws -> Cluster.Member { try await self.waitForMembershipEventually(within: within) { membership in if status == .down || status == .removed { - if let cluster = self.cluster, let tombstone = cluster.getExistingAssociationTombstone(with: node) { + if let cluster = self.cluster, cluster.getExistingAssociationTombstone(with: node) != nil { return Cluster.Member(node: node, status: .removed).asUnreachable } } @@ -251,7 +251,7 @@ public struct ClusterControl { public func waitFor(_ node: UniqueNode, atLeast atLeastStatus: Cluster.MemberStatus, within: Duration) async throws -> Cluster.Member { try await self.waitForMembershipEventually(within: within) { membership in if atLeastStatus == .down || atLeastStatus == .removed { - if let cluster = self.cluster, let tombstone = cluster.getExistingAssociationTombstone(with: node) { + if let cluster = self.cluster, cluster.getExistingAssociationTombstone(with: node) != nil { return Cluster.Member(node: node, status: .removed).asUnreachable } } diff --git a/Sources/DistributedActors/Cluster/ClusterShell.swift b/Sources/DistributedActors/Cluster/ClusterShell.swift index 64e9d9c0b..2adf72aac 100644 --- a/Sources/DistributedActors/Cluster/ClusterShell.swift +++ b/Sources/DistributedActors/Cluster/ClusterShell.swift @@ -943,7 +943,7 @@ extension ClusterShell { "handshake/retryDelay": "\(retryDelay)", ]) context.timers.startSingle( - key: TimerKey("handshake-timer-\(remoteNode)"), + key: _TimerKey("handshake-timer-\(remoteNode)"), message: .command(.retryHandshake(initiated)), delay: retryDelay ) diff --git a/Sources/DistributedActors/Cluster/Downing/DowningStrategy.swift b/Sources/DistributedActors/Cluster/Downing/DowningStrategy.swift index c36bf9ca2..c3ead2ef1 100644 --- a/Sources/DistributedActors/Cluster/Downing/DowningStrategy.swift +++ b/Sources/DistributedActors/Cluster/Downing/DowningStrategy.swift @@ -35,8 +35,8 @@ public struct DowningStrategyDirective { internal enum Repr { case none case markAsDown(Set) - case startTimer(key: TimerKey, member: Cluster.Member, delay: Duration) - case cancelTimer(key: TimerKey) + case startTimer(member: Cluster.Member, delay: Duration) + case cancelTimer(member: Cluster.Member) } internal init(_ underlying: Repr) { @@ -47,12 +47,12 @@ public struct DowningStrategyDirective { .init(.none) } - public static func startTimer(key: TimerKey, member: Cluster.Member, delay: Duration) -> Self { - .init(.startTimer(key: key, member: member, delay: delay)) + public static func startTimer(member: Cluster.Member, delay: Duration) -> Self { + .init(.startTimer(member: member, delay: delay)) } - public static func cancelTimer(key: TimerKey) -> Self { - .init(.cancelTimer(key: key)) + public static func cancelTimer(member: Cluster.Member) -> Self { + .init(.cancelTimer(member: member)) } public static func markAsDown(members: Set) -> Self { @@ -86,8 +86,8 @@ internal distributed actor DowningStrategyShell { /// `Task` for subscribing to cluster events. private var eventsListeningTask: Task? - - private lazy var timers = ActorTimers(self) + /// Timer `Task`s + private var memberTimerTasks: [Cluster.Member: Task] = [:] init(_ strategy: DowningStrategy, system: ActorSystem) async { self.strategy = strategy @@ -113,14 +113,23 @@ internal distributed actor DowningStrategyShell { case .markAsDown(let members): self.markAsDown(members: members) - case .startTimer(let key, let member, let delay): - self.log.trace("Start timer \(key), member: \(member), delay: \(delay)") - self.timers.startSingle(key: key, delay: delay) { + case .startTimer(let member, let delay): + self.log.trace("Start timer for member: \(member), delay: \(delay)") + self.memberTimerTasks[member] = Task { + defer { self.memberTimerTasks.removeValue(forKey: member) } + + guard !Task.isCancelled else { + return + } + + try await Task.sleep(until: .now + delay, clock: .continuous) self.onTimeout(member: member) } - case .cancelTimer(let key): - self.log.trace("Cancel timer \(key)") - self.timers.cancel(for: key) + case .cancelTimer(let member): + self.log.trace("Cancel timer for member: \(member)") + if let timerTask = self.memberTimerTasks.removeValue(forKey: member) { + timerTask.cancel() + } case .none: () // nothing to be done diff --git a/Sources/DistributedActors/Cluster/Downing/TimeoutBasedDowningStrategy.swift b/Sources/DistributedActors/Cluster/Downing/TimeoutBasedDowningStrategy.swift index c0a247599..1dc786917 100644 --- a/Sources/DistributedActors/Cluster/Downing/TimeoutBasedDowningStrategy.swift +++ b/Sources/DistributedActors/Cluster/Downing/TimeoutBasedDowningStrategy.swift @@ -56,7 +56,7 @@ public final class TimeoutBasedDowningStrategy: DowningStrategy { // it was marked as down by someone, we don't need to track it anymore _ = self._markAsDown.remove(change.member) _ = self._unreachable.remove(change.member) - return .cancelTimer(key: self.timerKey(change.member)) + return .cancelTimer(member: change.member) } else if let replaced = change.replaced { _ = self._markAsDown.remove(replaced) _ = self._unreachable.remove(replaced) @@ -95,7 +95,7 @@ public final class TimeoutBasedDowningStrategy: DowningStrategy { self._unreachable.insert(member) - return .startTimer(key: self.timerKey(member), member: member, delay: self.settings.downUnreachableMembersAfter) + return .startTimer(member: member, delay: self.settings.downUnreachableMembersAfter) } func onMemberReachable(_ change: Cluster.ReachabilityChange) -> DowningStrategyDirective { @@ -104,16 +104,12 @@ public final class TimeoutBasedDowningStrategy: DowningStrategy { _ = self._markAsDown.remove(member) if self._unreachable.remove(member) != nil { - return .cancelTimer(key: self.timerKey(member)) + return .cancelTimer(member: member) } return .none } - func timerKey(_ member: Cluster.Member) -> TimerKey { - TimerKey(member.uniqueNode) - } - func onLeaderChange(to leader: Cluster.Member?) throws -> DowningStrategyDirective { _ = try self.membership.applyLeadershipChange(to: leader) @@ -129,7 +125,7 @@ public final class TimeoutBasedDowningStrategy: DowningStrategy { self._markAsDown.remove(member) if self._unreachable.remove(member) != nil { - return .cancelTimer(key: self.timerKey(member)) + return .cancelTimer(member: member) } return .none diff --git a/Sources/DistributedActors/Cluster/Reception/OperationLog.swift b/Sources/DistributedActors/Cluster/Reception/OperationLog.swift index 530b38031..75017c90c 100644 --- a/Sources/DistributedActors/Cluster/Reception/OperationLog.swift +++ b/Sources/DistributedActors/Cluster/Reception/OperationLog.swift @@ -2,7 +2,7 @@ // // This source file is part of the Swift Distributed Actors open source project // -// Copyright (c) 2020 Apple Inc. and the Swift Distributed Actors project authors +// Copyright (c) 2020-2022 Apple Inc. and the Swift Distributed Actors project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -34,6 +34,7 @@ internal class OpLog { self.maxSeqNr = 0 } + @discardableResult func add(_ op: Op) -> SequencedOp { self.maxSeqNr += 1 let sequencedOp = SequencedOp(sequenceRange: .single(self.maxSeqNr), op: op) diff --git a/Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift b/Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift index 9f4c3414e..1dc5f1d94 100644 --- a/Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift +++ b/Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// +import AsyncAlgorithms import Distributed import Logging @@ -207,12 +208,8 @@ public distributed actor OpLogDistributedReceptionist: DistributedReceptionist, // ==== ------------------------------------------------------------------------------------------------------------ // MARK: Timers - static let slowACKReplicationTick: TimerKey = "slow-ack-replication-tick" - static let fastACKReplicationTick: TimerKey = "fast-ack-replication-tick" - - static let localPublishLocalListingsTick: TimerKey = "publish-local-listings-tick" - - private lazy var timers = ActorTimers(self) + private var slowACKReplicationTimerTask: Task? + private var flushTimerTasks: [Int: Task] = [:] /// Important: This safeguards us from the following write amplification scenario: /// @@ -264,18 +261,18 @@ public distributed actor OpLogDistributedReceptionist: DistributedReceptionist, // === timers ------------------ // periodically gossip to other receptionists with the last seqNr we've seen, // and if it happens to be outdated by then this will cause a push from that node. - self.timers.startPeriodic( - key: Self.slowACKReplicationTick, - interval: actorSystem.settings.receptionist.ackPullReplicationIntervalSlow - ) { - await self.periodicAckTick() + self.slowACKReplicationTimerTask = Task { + for await _ in AsyncTimerSequence.repeating(every: self.actorSystem.settings.receptionist.ackPullReplicationIntervalSlow, clock: .continuous) { + self.periodicAckTick() + } } self.log.debug("Initialized receptionist") } deinit { - eventsListeningTask?.cancel() + self.eventsListeningTask?.cancel() + self.slowACKReplicationTimerTask?.cancel() } } @@ -301,7 +298,6 @@ extension OpLogDistributedReceptionist: LifecycleWatch { let key = key.asAnyKey let id = guest.id - let ref = actorSystem._resolveUntyped(id: guest.id) guard id._isLocal || (id.uniqueNode == actorSystem.cluster.uniqueNode) else { self.log.warning(""" @@ -410,24 +406,25 @@ extension OpLogDistributedReceptionist: LifecycleWatch { extension OpLogDistributedReceptionist { func ensureDelayedListingFlush(of key: AnyDistributedReceptionKey) { - let timerKey = self.flushTimerKey(key) - if self.storage.registrations(forKey: key)?.isEmpty ?? true { self.log.debug("notify now, no need to schedule delayed flush") self.notifySubscribers(of: key) return // no need to schedule, there are no registered actors at all, we eagerly emit this info } - guard !self.timers.exists(key: timerKey) else { + let timerTaskKey = key.hashValue + guard self.flushTimerTasks[timerTaskKey] == nil else { self.log.debug("timer exists") return // timer exists nothing to do } // TODO: also flush when a key has seen e.g. 100 changes? let flushDelay = actorSystem.settings.receptionist.listingFlushDelay - // timers.startSingle(key: timerKey, message: _ReceptionistDelayedListingFlushTick(key: key), delay: flushDelay) self.log.debug("schedule delayed flush") - self.timers.startSingle(key: timerKey, delay: flushDelay) { + self.flushTimerTasks[timerTaskKey] = Task { + defer { self.flushTimerTasks.removeValue(forKey: timerTaskKey) } + + try await Task.sleep(until: .now + flushDelay, clock: .continuous) self.onDelayedListingFlushTick(key: key) } } @@ -457,10 +454,6 @@ extension OpLogDistributedReceptionist { } } } - - private func flushTimerKey(_ key: AnyDistributedReceptionKey) -> TimerKey { - "flush-\(key.hashValue)" - } } // ==== ---------------------------------------------------------------------------------------------------------------- @@ -815,7 +808,7 @@ extension OpLogDistributedReceptionist { let wasRegisteredWithKeys = self.storage.removeFromKeyMappings(equalityHackRef.asAnyDistributedActor) for key in wasRegisteredWithKeys.registeredUnderKeys { - self.addOperation(.remove(key: key, identity: id)) + _ = self.addOperation(.remove(key: key, identity: id)) self.publishListings(forKey: key) } diff --git a/Sources/DistributedActors/Cluster/Reception/_OperationLogClusterReceptionistBehavior.swift b/Sources/DistributedActors/Cluster/Reception/_OperationLogClusterReceptionistBehavior.swift index 0caa1cc99..0c6a89e56 100644 --- a/Sources/DistributedActors/Cluster/Reception/_OperationLogClusterReceptionistBehavior.swift +++ b/Sources/DistributedActors/Cluster/Reception/_OperationLogClusterReceptionistBehavior.swift @@ -26,10 +26,7 @@ public final class _OperationLogClusterReceptionist { // ==== ------------------------------------------------------------------------------------------------------------ // MARK: Timer Keys - static let slowACKReplicationTick: TimerKey = "slow-ack-replication-tick" - static let fastACKReplicationTick: TimerKey = "fast-ack-replication-tick" - - static let localPublishLocalListingsTick: TimerKey = "publish-local-listings-tick" + static let slowACKReplicationTick: _TimerKey = "slow-ack-replication-tick" // ==== ------------------------------------------------------------------------------------------------------------ // MARK: State @@ -263,7 +260,7 @@ extension _OperationLogClusterReceptionist { } } - private func flushTimerKey(_ key: AnyReceptionKey) -> TimerKey { + private func flushTimerKey(_ key: AnyReceptionKey) -> _TimerKey { "flush-\(key.hashValue)" } } diff --git a/Sources/DistributedActors/Cluster/SWIM/SWIMActorShell.swift b/Sources/DistributedActors/Cluster/SWIM/SWIMActorShell.swift index 17f7f72a0..2fc1c7eac 100644 --- a/Sources/DistributedActors/Cluster/SWIM/SWIMActorShell.swift +++ b/Sources/DistributedActors/Cluster/SWIM/SWIMActorShell.swift @@ -552,7 +552,7 @@ extension SWIMActorShell { .metrics(group: "swim.shell", measure: [.serialization, .deserialization]) } - static let protocolPeriodTimerKey = TimerKey("\(SWIMActorShell.name)/periodic-ping") + static let protocolPeriodTimerKey = _TimerKey("\(SWIMActorShell.name)/periodic-ping") } extension ActorID { diff --git a/Sources/DistributedActors/Gossip/Gossiper+Shell.swift b/Sources/DistributedActors/Gossip/Gossiper+Shell.swift index d2c71a7f7..fcb1e1655 100644 --- a/Sources/DistributedActors/Gossip/Gossiper+Shell.swift +++ b/Sources/DistributedActors/Gossip/Gossiper+Shell.swift @@ -2,7 +2,7 @@ // // This source file is part of the Swift Distributed Actors open source project // -// Copyright (c) 2020 Apple Inc. and the Swift Distributed Actors project authors +// Copyright (c) 2020-2022 Apple Inc. and the Swift Distributed Actors project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -14,7 +14,7 @@ import Logging -private let gossipTickKey: TimerKey = "gossip-tick" +private let gossipTickKey: _TimerKey = "gossip-tick" /// :nodoc: /// diff --git a/Sources/DistributedActors/Supervision.swift b/Sources/DistributedActors/Supervision.swift index 8b75b9202..75e4c16d8 100644 --- a/Sources/DistributedActors/Supervision.swift +++ b/Sources/DistributedActors/Supervision.swift @@ -841,7 +841,7 @@ internal enum SupervisionRestartDelayedBehavior { @usableFromInline static func after(delay: Duration, with replacement: _Behavior) -> _Behavior { .setup { context in - context.timers._startResumeTimer(key: TimerKey("restartBackoff", isSystemTimer: true), delay: delay, resumeWith: WakeUp()) + context.timers._startResumeTimer(key: _TimerKey("restartBackoff", isSystemTimer: true), delay: delay, resumeWith: WakeUp()) return .suspend { (result: Result) in traceLog_Supervision("RESTART BACKOFF TRIGGER") diff --git a/Sources/DistributedActors/Timers+Distributed.swift b/Sources/DistributedActors/Timers+Distributed.swift deleted file mode 100644 index c024872b0..000000000 --- a/Sources/DistributedActors/Timers+Distributed.swift +++ /dev/null @@ -1,167 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift Distributed Actors open source project -// -// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Dispatch -import Distributed -import DistributedActorsConcurrencyHelpers -import struct NIO.TimeAmount - -struct DistributedActorTimer { - @usableFromInline - let key: TimerKey - @usableFromInline - let repeated: Bool - @usableFromInline - let handle: Cancelable -} - -struct DistributedActorTimerEvent { - let key: TimerKey - let owner: ActorID -} - -/// Creates and manages timers which may only be accessed from the actor that owns it. -/// -/// _BehaviorTimers are bound to this objects lifecycle, i.e. when the actor owning this object is deallocated, -/// and the `ActorTimers` are deallocated as well, all timers associated with it are cancelled. -@available(*, deprecated, message: "Actor timers cannot participate in structured cancellation, and will be replaced with patterns using swift-async-algorithms (see Timer)") -public final class ActorTimers where Act.ActorSystem == ClusterSystem { - internal let ownerID: ActorID - - internal let dispatchQueue = DispatchQueue.global() - - internal let installedTimersLock = Lock() - internal var installedTimers: [TimerKey: DistributedActorTimer] = [:] - - // TODO: this is a workaround, we're removing ActorTimers since they can't participate in structured cancellation - weak var actorSystem: Act.ActorSystem? - - /// Create a timers instance owned by the passed in actor. - /// - /// Does not retain the distributed actor. - /// - /// - Parameter myself: - public init(_ myself: Act) { - self.ownerID = myself.id - } - - deinit { - self._cancelAll(includeSystemTimers: true) - } - - /// Cancel all timers currently stored in this ``ActorTimers`` instance. - public func cancelAll() { - self._cancelAll(includeSystemTimers: false) - } - - internal func _cancelAll(includeSystemTimers: Bool) { - self.installedTimersLock.withLockVoid { - for key in self.installedTimers.keys where includeSystemTimers || !key.isSystemTimer { - // TODO: the reason the `_` keys are not cancelled is because we want to cancel timers in _restartPrepare but we need "our restart timer" to remain - self.cancel(for: key) - } - } - } - - /// Cancels timer associated with the given key. - /// - /// - Parameter key: key of the timer to cancel - public func cancel(for key: TimerKey) { - if let timer = self.installedTimersLock.withLock { self.installedTimers.removeValue(forKey: key) } { - timer.handle.cancel() - } - } - - /// Checks for the existence of a scheduler timer for given key (single or periodic). - /// - /// - Returns: true if timer exists, false otherwise - public func exists(key: TimerKey) -> Bool { - self.installedTimersLock.withLock { - self.installedTimers[key] != nil - } - } - - /// Starts a timer that will invoke the provided `call` closure on the actor's context after the specified delay. - /// - /// - Parameters: - /// - key: the key associated with the timer - /// - call: the call that will be made after the `delay` amount of time elapses - /// - delay: the delay after which the message will be sent - public func startSingle( - key: TimerKey, - delay: Duration, - @_inheritActorContext @_implicitSelfCapture call: @Sendable @escaping () async -> Void - ) { - self.start(key: key, call: call, interval: delay, repeated: false) - } - - /// Starts a timer that will periodically invoke the passed in `call` closure on the actor's context. - /// - /// - Parameters: - /// - key: the key associated with the timer - /// - call: the call that will be executed after the `delay` amount of time elapses - /// - interval: the interval with which the message will be sent - public func startPeriodic( - key: TimerKey, - interval: Duration, - @_inheritActorContext @_implicitSelfCapture call: @Sendable @escaping () async -> Void - ) { - self.start(key: key, call: call, interval: interval, repeated: true) - } - - internal func start( - key: TimerKey, - @_inheritActorContext @_implicitSelfCapture call: @Sendable @escaping () async -> Void, - interval: Duration, - repeated: Bool - ) { - let handle: Cancelable - if repeated { - handle = self.dispatchQueue.scheduleAsync(initialDelay: interval, interval: interval) { - // We take the lock to prevent the system from shutting down - // while we're in the middle of potentially issuing remote calls - // Which may cause: Cannot schedule tasks on an EventLoop that has already shut down. - // The actual solution is outstanding work tracking potentially. - let system = self.actorSystem - system?.shutdownSemaphore.wait() - defer { - system?.shutdownSemaphore.signal() - } - - await call() - } - } else { - handle = self.dispatchQueue.scheduleOnceAsync(delay: interval) { - // We take the lock to prevent the system from shutting down - // while we're in the middle of potentially issuing remote calls - // Which may cause: Cannot schedule tasks on an EventLoop that has already shut down. - // The actual solution is outstanding work tracking potentially. - let system = self.actorSystem - system?.shutdownSemaphore.wait() - defer { - system?.shutdownSemaphore.signal() - } - - await call() - - // The single timer is done. Remove it so it can be installed again if needed. - self.cancel(for: key) - } - } - - self.installedTimersLock.withLockVoid { - self.installedTimers[key] = DistributedActorTimer(key: key, repeated: repeated, handle: handle) - } - } -} diff --git a/Sources/DistributedActors/WeakActorDictionary.swift b/Sources/DistributedActors/WeakActorDictionary.swift index c9ce8f8d3..dab2bc40d 100644 --- a/Sources/DistributedActors/WeakActorDictionary.swift +++ b/Sources/DistributedActors/WeakActorDictionary.swift @@ -45,7 +45,7 @@ struct WeakActorDictionary { guard let knownActor = container.actor else { // the actor was released -- let's remove the container while we're here - self.removeActor(identifiedBy: id) + _ = self.removeActor(identifiedBy: id) return nil } diff --git a/Sources/DistributedActors/_BehaviorTimers.swift b/Sources/DistributedActors/_BehaviorTimers.swift index d9883bb98..e9d25c003 100644 --- a/Sources/DistributedActors/_BehaviorTimers.swift +++ b/Sources/DistributedActors/_BehaviorTimers.swift @@ -19,8 +19,7 @@ import struct NIO.TimeAmount @usableFromInline struct Timer { // FIXME(distributed): deprecate and remove in favor of DistributedActorTimers - @usableFromInline - let key: TimerKey + let key: _TimerKey @usableFromInline let message: Message? @usableFromInline @@ -33,22 +32,22 @@ struct Timer { // FIXME(distributed): deprecate and remove in favor of @usableFromInline struct TimerEvent { - let key: TimerKey + let key: _TimerKey let generation: Int let owner: ActorID } -/// A `TimerKey` is used to identify a timer. It can be stored and re-used. +/// A `_TimerKey` is used to identify a timer. It can be stored and re-used. /// /// Example: /// -/// let timerKey = TimerKey("my-key") +/// let timerKey = _TimerKey("my-key") /// timers.startPeriodicTimer(key: timerKey, message: "ping", interval: .seconds(1)) /// // ... /// timers.cancelTimer(forKey: timerKey) /// -@available(*, deprecated, message: "Actor timers cannot participate in structured cancellation, and will be replaced with patterns using swift-async-algorithms (see Timer)") -public struct TimerKey: Hashable { +// TODO: replace timers with AsyncTimerSequence from swift-async-algorithms +internal struct _TimerKey: Hashable { private let identifier: AnyHashable @usableFromInline @@ -64,17 +63,17 @@ public struct TimerKey: Hashable { } } -extension TimerKey: CustomStringConvertible { +extension _TimerKey: CustomStringConvertible { public var description: String { if self.isSystemTimer { - return "TimerKey(\(self.identifier), isSystemTimer: \(self.isSystemTimer))" + return "_TimerKey(\(self.identifier), isSystemTimer: \(self.isSystemTimer))" } else { - return "TimerKey(\(self.identifier.base))" + return "_TimerKey(\(self.identifier.base))" } } } -extension TimerKey: ExpressibleByStringLiteral, ExpressibleByStringInterpolation { +extension _TimerKey: ExpressibleByStringLiteral, ExpressibleByStringInterpolation { public init(stringLiteral value: StringLiteralType) { self.init(value) } @@ -87,8 +86,7 @@ public final class _BehaviorTimers { // TODO: eventually replace with our own scheduler implementation @usableFromInline internal let dispatchQueue = DispatchQueue.global() - @usableFromInline - internal var installedTimers: [TimerKey: Timer] = [:] + internal var installedTimers: [_TimerKey: Timer] = [:] @usableFromInline internal unowned var context: _ActorContext @@ -114,7 +112,7 @@ public final class _BehaviorTimers { /// Cancels timer associated with the given key. /// /// - Parameter key: key of the timer to cancel - public func cancel(for key: TimerKey) { + internal func cancel(for key: _TimerKey) { if let timer = self.installedTimers.removeValue(forKey: key) { if context.system.settings.logging.verboseTimers { self.context.log.trace("Cancel timer [\(key)] with generation [\(timer.generation)]", metadata: self.metadata) @@ -126,8 +124,7 @@ public final class _BehaviorTimers { /// Checks for the existence of a scheduler timer for given key (single or periodic). /// /// - Returns: true if timer exists, false otherwise - @inlinable - public func exists(key: TimerKey) -> Bool { + internal func exists(key: _TimerKey) -> Bool { self.installedTimers[key] != nil } @@ -137,8 +134,7 @@ public final class _BehaviorTimers { /// - key: the key associated with the timer /// - message: the message that will be sent to `myself` /// - delay: the delay after which the message will be sent - @inlinable - public func startSingle(key: TimerKey, message: Message, delay: Duration) { + internal func startSingle(key: _TimerKey, message: Message, delay: Duration) { self.start(key: key, message: message, interval: delay, repeated: false) } @@ -148,13 +144,11 @@ public final class _BehaviorTimers { /// - key: the key associated with the timer /// - message: the message that will be sent to `myself` /// - interval: the interval with which the message will be sent - @inlinable - public func startPeriodic(key: TimerKey, message: Message, interval: Duration) { + internal func startPeriodic(key: _TimerKey, message: Message, interval: Duration) { self.start(key: key, message: message, interval: interval, repeated: true) } - @usableFromInline - internal func start(key: TimerKey, message: Message, interval: Duration, repeated: Bool) { + internal func start(key: _TimerKey, message: Message, interval: Duration, repeated: Bool) { self.cancel(for: key) let generation = self.nextTimerGen() @@ -228,7 +222,7 @@ extension _BehaviorTimers { /// Dangerous version of `_startTimer` which allows scheduling a `.resume` system message (directly!) with a token `T`, after a time `delay`. /// This can be used e.g. to implement restarting an actor after a backoff delay. - internal func _startResumeTimer(key: TimerKey, delay: Duration, resumeWith token: T) { + internal func _startResumeTimer(key: _TimerKey, delay: Duration, resumeWith token: T) { assert(key.isSystemTimer, "_startResumeTimer MUST ONLY be used by system internal tasks, and keys MUST be `_` prefixed. Key was: \(key)") self.cancel(for: key) diff --git a/Tests/DistributedActorsDocumentationTests/DeathWatchDocExamples.swift b/Tests/DistributedActorsDocumentationTests/DeathWatchDocExamples.swift index 30b49ef3a..426994e9c 100644 --- a/Tests/DistributedActorsDocumentationTests/DeathWatchDocExamples.swift +++ b/Tests/DistributedActorsDocumentationTests/DeathWatchDocExamples.swift @@ -129,7 +129,7 @@ class DeathWatchDocExamples { false } // tag::handling_termination_deathwatch[] - let concedeTimer: TimerKey = "concede-timer" + let concedeTimer: _TimerKey = "concede-timer" _ = _Behavior.receive { context, command in switch command { diff --git a/Tests/DistributedActorsTests/Cluster/MembershipGossipClusteredTests.swift b/Tests/DistributedActorsTests/Cluster/MembershipGossipClusteredTests.swift index 23306f306..5a2cf474d 100644 --- a/Tests/DistributedActorsTests/Cluster/MembershipGossipClusteredTests.swift +++ b/Tests/DistributedActorsTests/Cluster/MembershipGossipClusteredTests.swift @@ -28,7 +28,7 @@ final class MembershipGossipClusteredTests: ClusteredActorSystemsXCTestCase { "/system/receptionist", ] settings.excludeGrep = [ - "TimerKey", + "_TimerKey", "schedule next gossip", "Gossip payload updated", ] diff --git a/Tests/DistributedActorsTests/Cluster/ShootTheOtherNodeClusteredTests.swift b/Tests/DistributedActorsTests/Cluster/ShootTheOtherNodeClusteredTests.swift index a3dd74225..35d5b3b1a 100644 --- a/Tests/DistributedActorsTests/Cluster/ShootTheOtherNodeClusteredTests.swift +++ b/Tests/DistributedActorsTests/Cluster/ShootTheOtherNodeClusteredTests.swift @@ -20,7 +20,7 @@ import XCTest final class ShootTheOtherNodeClusteredTests: ClusteredActorSystemsXCTestCase { override func configureLogCapture(settings: inout LogCapture.Settings) { settings.excludeGrep = [ - "TimerKey", + "_TimerKey", ] settings.excludeActorPaths = [ "/system/cluster/swim", diff --git a/Tests/DistributedActorsTests/TimersTests.swift b/Tests/DistributedActorsTests/TimersTests.swift index 55cee63f9..ed21a70a6 100644 --- a/Tests/DistributedActorsTests/TimersTests.swift +++ b/Tests/DistributedActorsTests/TimersTests.swift @@ -2,7 +2,7 @@ // // This source file is part of the Swift Distributed Actors open source project // -// Copyright (c) 2018-2019 Apple Inc. and the Swift Distributed Actors project authors +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -20,15 +20,15 @@ import XCTest final class TimersTests: ClusterSystemXCTestCase { func testTimerKey_shouldPrintNicely() { - TimerKey("Hello").description.shouldEqual("TimerKey(Hello)") - TimerKey("Hello", isSystemTimer: true).description.shouldEqual("TimerKey(Hello, isSystemTimer: true)") + _TimerKey("Hello").description.shouldEqual("_TimerKey(Hello)") + _TimerKey("Hello", isSystemTimer: true).description.shouldEqual("_TimerKey(Hello, isSystemTimer: true)") } func test_startSingleTimer_shouldSendSingleMessage() throws { let p: ActorTestProbe = self.testKit.makeTestProbe() let behavior: _Behavior = .setup { context in - context.timers.startSingle(key: TimerKey("message"), message: "fromTimer", delay: .microseconds(100)) + context.timers.startSingle(key: _TimerKey("message"), message: "fromTimer", delay: .microseconds(100)) return .receiveMessage { message in p.tell(message) return .same @@ -45,7 +45,7 @@ final class TimersTests: ClusterSystemXCTestCase { let behavior: _Behavior = .setup { context in var i = 0 - context.timers.startPeriodic(key: TimerKey("message"), message: "fromTimer", interval: .milliseconds(10)) + context.timers.startPeriodic(key: _TimerKey("message"), message: "fromTimer", interval: .milliseconds(10)) return .receiveMessage { message in i += 1 p.tell(message) @@ -70,13 +70,13 @@ final class TimersTests: ClusterSystemXCTestCase { let behavior: _Behavior = .setup { context in var i = 0 - context.timers.startPeriodic(key: TimerKey("message"), message: "fromTimer", interval: .milliseconds(10)) + context.timers.startPeriodic(key: _TimerKey("message"), message: "fromTimer", interval: .milliseconds(10)) return .receiveMessage { message in i += 1 p.tell(message) if i >= 5 { - context.timers.cancel(for: TimerKey("message")) + context.timers.cancel(for: _TimerKey("message")) } return .same } @@ -97,9 +97,9 @@ final class TimersTests: ClusterSystemXCTestCase { // amount of time, so the timer is triggered and sends the message. // Because we cancel the timer in the same run, the message should // not be processed and the probe should not receive a message. - context.timers.startSingle(key: TimerKey("message"), message: "fromTimer", delay: .nanoseconds(0)) + context.timers.startSingle(key: _TimerKey("message"), message: "fromTimer", delay: .nanoseconds(0)) DistributedActors._Thread.sleep(.milliseconds(10)) // FIXME(swift): replace with Task.sleep - context.timers.cancel(for: TimerKey("message")) + context.timers.cancel(for: _TimerKey("message")) return .receiveMessage { message in p.tell(message) return .same @@ -114,9 +114,9 @@ final class TimersTests: ClusterSystemXCTestCase { let p: ActorTestProbe = self.testKit.makeTestProbe() let behavior: _Behavior = .setup { context in - context.timers.startPeriodic(key: TimerKey("message"), message: "fromTimer", interval: .milliseconds(10)) - context.timers.startPeriodic(key: TimerKey("message2"), message: "fromTimer2", interval: .milliseconds(50)) - context.timers.startPeriodic(key: TimerKey("message3"), message: "fromTimer3", interval: .milliseconds(50)) + context.timers.startPeriodic(key: _TimerKey("message"), message: "fromTimer", interval: .milliseconds(10)) + context.timers.startPeriodic(key: _TimerKey("message2"), message: "fromTimer2", interval: .milliseconds(50)) + context.timers.startPeriodic(key: _TimerKey("message3"), message: "fromTimer3", interval: .milliseconds(50)) return .receiveMessage { message in p.tell(message) context.timers.cancelAll() @@ -133,7 +133,7 @@ final class TimersTests: ClusterSystemXCTestCase { let p: ActorTestProbe = self.testKit.makeTestProbe() let behavior: _Behavior = .setup { context in - context.timers.startPeriodic(key: TimerKey("message", isSystemTimer: true), message: "fromSystemTimer", interval: .milliseconds(10)) + context.timers.startPeriodic(key: _TimerKey("message", isSystemTimer: true), message: "fromSystemTimer", interval: .milliseconds(10)) return .receiveMessage { message in p.tell(message) context.timers.cancelAll()