From 2fb8d2abe0cf3a9ea9931d771327aa85673fc2df Mon Sep 17 00:00:00 2001 From: Yim Lee Date: Tue, 5 Jul 2022 23:03:25 -0700 Subject: [PATCH 1/8] Use AsyncTimerSequence from swift-async-algorithms Resolves https://github.com/apple/swift-distributed-actors/issues/958 --- Package.swift | 24 +++++----- .../Cluster/Downing/DowningStrategy.swift | 45 ++++++++++++------ .../Downing/TimeoutBasedDowningStrategy.swift | 12 ++--- .../OperationLogDistributedReceptionist.swift | 47 ++++++++++--------- ...rationLogClusterReceptionistBehavior.swift | 3 -- 5 files changed, 71 insertions(+), 60 deletions(-) diff --git a/Package.swift b/Package.swift index 390d087aa..483c2fa45 100644 --- a/Package.swift +++ b/Package.swift @@ -30,6 +30,7 @@ var targets: [PackageDescription.Target] = [ .product(name: "Metrics", package: "swift-metrics"), .product(name: "ServiceDiscovery", package: "swift-service-discovery"), .product(name: "Backtrace", package: "swift-backtrace"), + .product(name: "AsyncAlgorithms", package: "swift-async-algorithms"), ] ), @@ -179,26 +180,27 @@ var targets: [PackageDescription.Target] = [ var dependencies: [Package.Dependency] = [ .package(url: "https://github.com/apple/swift-atomics", from: "1.0.2"), - .package(url: "https://github.com/apple/swift-cluster-membership.git", from: "0.3.0"), + .package(url: "https://github.com/apple/swift-cluster-membership", from: "0.3.0"), - .package(url: "https://github.com/apple/swift-nio.git", from: "2.40.0"), - .package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.2.0"), - .package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.16.1"), + .package(url: "https://github.com/apple/swift-nio", from: "2.40.0"), + .package(url: "https://github.com/apple/swift-nio-extras", from: "1.2.0"), + .package(url: "https://github.com/apple/swift-nio-ssl", from: "2.16.1"), - .package(url: "https://github.com/apple/swift-protobuf.git", from: "1.7.0"), + .package(url: "https://github.com/apple/swift-protobuf", from: "1.7.0"), // ~~~ backtraces ~~~ // TODO: optimally, library should not pull swift-backtrace - .package(url: "https://github.com/swift-server/swift-backtrace.git", from: "1.1.1"), + .package(url: "https://github.com/swift-server/swift-backtrace", from: "1.1.1"), - // ~~~ Swift Collections ~~~ - .package(url: "https://github.com/apple/swift-collections.git", from: "1.0.1"), + // ~~~ Swift libraries ~~~ + .package(url: "https://github.com/apple/swift-async-algorithms", from: "0.0.3"), + .package(url: "https://github.com/apple/swift-collections", from: "1.0.1"), // ~~~ Observability ~~~ - .package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"), + .package(url: "https://github.com/apple/swift-log", from: "1.0.0"), // swift-metrics 1.x and 2.x are almost API compatible, so most clients should use - .package(url: "https://github.com/apple/swift-metrics.git", "1.0.0" ..< "3.0.0"), - .package(url: "https://github.com/apple/swift-service-discovery.git", from: "1.0.0"), + .package(url: "https://github.com/apple/swift-metrics", "1.0.0" ..< "3.0.0"), + .package(url: "https://github.com/apple/swift-service-discovery", from: "1.0.0"), // ~~~ SwiftPM Plugins ~~~ .package(url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0"), diff --git a/Sources/DistributedActors/Cluster/Downing/DowningStrategy.swift b/Sources/DistributedActors/Cluster/Downing/DowningStrategy.swift index c36bf9ca2..b1e2b1971 100644 --- a/Sources/DistributedActors/Cluster/Downing/DowningStrategy.swift +++ b/Sources/DistributedActors/Cluster/Downing/DowningStrategy.swift @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// +import AsyncAlgorithms import Distributed import Logging @@ -35,8 +36,8 @@ public struct DowningStrategyDirective { internal enum Repr { case none case markAsDown(Set) - case startTimer(key: TimerKey, member: Cluster.Member, delay: Duration) - case cancelTimer(key: TimerKey) + case startTimer(member: Cluster.Member, delay: Duration) + case cancelTimer(member: Cluster.Member) } internal init(_ underlying: Repr) { @@ -47,12 +48,12 @@ public struct DowningStrategyDirective { .init(.none) } - public static func startTimer(key: TimerKey, member: Cluster.Member, delay: Duration) -> Self { - .init(.startTimer(key: key, member: member, delay: delay)) + public static func startTimer(member: Cluster.Member, delay: Duration) -> Self { + .init(.startTimer(member: member, delay: delay)) } - public static func cancelTimer(key: TimerKey) -> Self { - .init(.cancelTimer(key: key)) + public static func cancelTimer(member: Cluster.Member) -> Self { + .init(.cancelTimer(member: member)) } public static func markAsDown(members: Set) -> Self { @@ -86,8 +87,8 @@ internal distributed actor DowningStrategyShell { /// `Task` for subscribing to cluster events. private var eventsListeningTask: Task? - - private lazy var timers = ActorTimers(self) + /// `Task` for timers + private var memberTimerTasks: [Cluster.Member: Task] = [:] init(_ strategy: DowningStrategy, system: ActorSystem) async { self.strategy = strategy @@ -101,6 +102,9 @@ internal distributed actor DowningStrategyShell { deinit { self.eventsListeningTask?.cancel() + self.memberTimerTasks.values.forEach { timerTask in + timerTask.cancel() + } } func receiveClusterEvent(_ event: Cluster.Event) throws { @@ -113,20 +117,31 @@ internal distributed actor DowningStrategyShell { case .markAsDown(let members): self.markAsDown(members: members) - case .startTimer(let key, let member, let delay): - self.log.trace("Start timer \(key), member: \(member), delay: \(delay)") - self.timers.startSingle(key: key, delay: delay) { - self.onTimeout(member: member) + case .startTimer(let member, let delay): + self.log.trace("Start timer for member: \(member), delay: \(delay)") + self.memberTimerTasks[member] = Task { + for await _ in AsyncTimerSequence(interval: delay, clock: ContinuousClock()) { + self.onTimeout(member: member) + // Single-shot; cancel task immediately after it has been fired + self.cancelTimer(for: member) + break + } } - case .cancelTimer(let key): - self.log.trace("Cancel timer \(key)") - self.timers.cancel(for: key) + case .cancelTimer(let member): + self.log.trace("Cancel timer for member: \(member)") + self.cancelTimer(for: member) case .none: () // nothing to be done } } + private func cancelTimer(for member: Cluster.Member) { + if let timerTask = self.memberTimerTasks.removeValue(forKey: member) { + timerTask.cancel() + } + } + func markAsDown(members: Set) { for member in members { self.log.info( diff --git a/Sources/DistributedActors/Cluster/Downing/TimeoutBasedDowningStrategy.swift b/Sources/DistributedActors/Cluster/Downing/TimeoutBasedDowningStrategy.swift index c0a247599..1dc786917 100644 --- a/Sources/DistributedActors/Cluster/Downing/TimeoutBasedDowningStrategy.swift +++ b/Sources/DistributedActors/Cluster/Downing/TimeoutBasedDowningStrategy.swift @@ -56,7 +56,7 @@ public final class TimeoutBasedDowningStrategy: DowningStrategy { // it was marked as down by someone, we don't need to track it anymore _ = self._markAsDown.remove(change.member) _ = self._unreachable.remove(change.member) - return .cancelTimer(key: self.timerKey(change.member)) + return .cancelTimer(member: change.member) } else if let replaced = change.replaced { _ = self._markAsDown.remove(replaced) _ = self._unreachable.remove(replaced) @@ -95,7 +95,7 @@ public final class TimeoutBasedDowningStrategy: DowningStrategy { self._unreachable.insert(member) - return .startTimer(key: self.timerKey(member), member: member, delay: self.settings.downUnreachableMembersAfter) + return .startTimer(member: member, delay: self.settings.downUnreachableMembersAfter) } func onMemberReachable(_ change: Cluster.ReachabilityChange) -> DowningStrategyDirective { @@ -104,16 +104,12 @@ public final class TimeoutBasedDowningStrategy: DowningStrategy { _ = self._markAsDown.remove(member) if self._unreachable.remove(member) != nil { - return .cancelTimer(key: self.timerKey(member)) + return .cancelTimer(member: member) } return .none } - func timerKey(_ member: Cluster.Member) -> TimerKey { - TimerKey(member.uniqueNode) - } - func onLeaderChange(to leader: Cluster.Member?) throws -> DowningStrategyDirective { _ = try self.membership.applyLeadershipChange(to: leader) @@ -129,7 +125,7 @@ public final class TimeoutBasedDowningStrategy: DowningStrategy { self._markAsDown.remove(member) if self._unreachable.remove(member) != nil { - return .cancelTimer(key: self.timerKey(member)) + return .cancelTimer(member: member) } return .none diff --git a/Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift b/Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift index 9f4c3414e..035fe3f87 100644 --- a/Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift +++ b/Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// +import AsyncAlgorithms import Distributed import Logging @@ -207,12 +208,8 @@ public distributed actor OpLogDistributedReceptionist: DistributedReceptionist, // ==== ------------------------------------------------------------------------------------------------------------ // MARK: Timers - static let slowACKReplicationTick: TimerKey = "slow-ack-replication-tick" - static let fastACKReplicationTick: TimerKey = "fast-ack-replication-tick" - - static let localPublishLocalListingsTick: TimerKey = "publish-local-listings-tick" - - private lazy var timers = ActorTimers(self) + private var slowACKReplicationTimerTask: Task? + private var flushTimerTasks: [Int: Task] = [:] /// Important: This safeguards us from the following write amplification scenario: /// @@ -264,18 +261,22 @@ public distributed actor OpLogDistributedReceptionist: DistributedReceptionist, // === timers ------------------ // periodically gossip to other receptionists with the last seqNr we've seen, // and if it happens to be outdated by then this will cause a push from that node. - self.timers.startPeriodic( - key: Self.slowACKReplicationTick, - interval: actorSystem.settings.receptionist.ackPullReplicationIntervalSlow - ) { - await self.periodicAckTick() + self.slowACKReplicationTimerTask = Task { + for await _ in AsyncTimerSequence.repeating(every: self.actorSystem.settings.receptionist.ackPullReplicationIntervalSlow, clock: ContinuousClock()) { + self.periodicAckTick() + } } self.log.debug("Initialized receptionist") } deinit { - eventsListeningTask?.cancel() + self.eventsListeningTask?.cancel() + self.slowACKReplicationTimerTask?.cancel() + // FIXME: this crashes tests (ClusterAssociationTests.test_association_sameAddressNodeJoin_shouldOverrideExistingNode) +// self.flushTimerTasks.values.forEach { timerTask in +// timerTask.cancel() +// } } } @@ -301,7 +302,6 @@ extension OpLogDistributedReceptionist: LifecycleWatch { let key = key.asAnyKey let id = guest.id - let ref = actorSystem._resolveUntyped(id: guest.id) guard id._isLocal || (id.uniqueNode == actorSystem.cluster.uniqueNode) else { self.log.warning(""" @@ -410,25 +410,30 @@ extension OpLogDistributedReceptionist: LifecycleWatch { extension OpLogDistributedReceptionist { func ensureDelayedListingFlush(of key: AnyDistributedReceptionKey) { - let timerKey = self.flushTimerKey(key) - if self.storage.registrations(forKey: key)?.isEmpty ?? true { self.log.debug("notify now, no need to schedule delayed flush") self.notifySubscribers(of: key) return // no need to schedule, there are no registered actors at all, we eagerly emit this info } - guard !self.timers.exists(key: timerKey) else { + let timerTaskKey = key.hashValue + guard self.flushTimerTasks[timerTaskKey] == nil else { self.log.debug("timer exists") return // timer exists nothing to do } // TODO: also flush when a key has seen e.g. 100 changes? let flushDelay = actorSystem.settings.receptionist.listingFlushDelay - // timers.startSingle(key: timerKey, message: _ReceptionistDelayedListingFlushTick(key: key), delay: flushDelay) self.log.debug("schedule delayed flush") - self.timers.startSingle(key: timerKey, delay: flushDelay) { - self.onDelayedListingFlushTick(key: key) + self.flushTimerTasks[timerTaskKey] = Task { + for await _ in AsyncTimerSequence(interval: flushDelay, clock: ContinuousClock()) { + self.onDelayedListingFlushTick(key: key) + // Single-shot; cancel task immediately after it has been fired + if let timerTask = self.flushTimerTasks.removeValue(forKey: timerTaskKey) { + timerTask.cancel() + } + break + } } } @@ -457,10 +462,6 @@ extension OpLogDistributedReceptionist { } } } - - private func flushTimerKey(_ key: AnyDistributedReceptionKey) -> TimerKey { - "flush-\(key.hashValue)" - } } // ==== ---------------------------------------------------------------------------------------------------------------- diff --git a/Sources/DistributedActors/Cluster/Reception/_OperationLogClusterReceptionistBehavior.swift b/Sources/DistributedActors/Cluster/Reception/_OperationLogClusterReceptionistBehavior.swift index 0caa1cc99..aedeaa52b 100644 --- a/Sources/DistributedActors/Cluster/Reception/_OperationLogClusterReceptionistBehavior.swift +++ b/Sources/DistributedActors/Cluster/Reception/_OperationLogClusterReceptionistBehavior.swift @@ -27,9 +27,6 @@ public final class _OperationLogClusterReceptionist { // MARK: Timer Keys static let slowACKReplicationTick: TimerKey = "slow-ack-replication-tick" - static let fastACKReplicationTick: TimerKey = "fast-ack-replication-tick" - - static let localPublishLocalListingsTick: TimerKey = "publish-local-listings-tick" // ==== ------------------------------------------------------------------------------------------------------------ // MARK: State From 7c2b908152eccdec262267ec58ef727605f9fc75 Mon Sep 17 00:00:00 2001 From: Yim Lee Date: Wed, 6 Jul 2022 01:11:47 -0700 Subject: [PATCH 2/8] Update Samples --- .../Philosopher.swift | 45 +++-- .../Timers+Distributed.swift | 167 ------------------ 2 files changed, 28 insertions(+), 184 deletions(-) delete mode 100644 Sources/DistributedActors/Timers+Distributed.swift diff --git a/Samples/Sources/SampleDiningPhilosophers/Philosopher.swift b/Samples/Sources/SampleDiningPhilosophers/Philosopher.swift index f72836215..9e5d61bf7 100644 --- a/Samples/Sources/SampleDiningPhilosophers/Philosopher.swift +++ b/Samples/Sources/SampleDiningPhilosophers/Philosopher.swift @@ -2,7 +2,7 @@ // // This source file is part of the Swift Distributed Actors open source project // -// Copyright (c) 2018-2021 Apple Inc. and the Swift Distributed Actors project authors +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// +import AsyncAlgorithms import Distributed import DistributedActors import Logging @@ -24,7 +25,8 @@ distributed actor Philosopher: CustomStringConvertible { private let rightFork: Fork private var state: State = .thinking - private lazy var timers = DistributedActors.ActorTimers(self) + private var becomeHungryTimerTask: Task? + private var finishEatingTimerTask: Task? init(name: String, leftFork: Fork, rightFork: Fork, actorSystem: ActorSystem) { self.actorSystem = actorSystem @@ -58,15 +60,19 @@ distributed actor Philosopher: CustomStringConvertible { } self.state = .thinking - self.timers.startSingle(key: .becomeHungry, delay: .seconds(1)) { - await self.attemptToTakeForks() + self.becomeHungryTimerTask = Task { + for await _ in AsyncTimerSequence(interval: .seconds(1), clock: ContinuousClock()) { + await self.attemptToTakeForks() + self.becomeHungryTimerTask?.cancel() + break + } } - self.log.info("\(self.self.name) is thinking...") + self.log.info("\(self.name) is thinking...") } distributed func attemptToTakeForks() async { guard self.state == .thinking else { - self.log.error("\(self.self.name) tried to take a fork but was not in the thinking state!") + self.log.error("\(self.name) tried to take a fork but was not in the thinking state!") return } @@ -89,14 +95,14 @@ distributed actor Philosopher: CustomStringConvertible { } self.forkTaken(self.rightFork) } catch { - self.log.info("\(self.self.name) wasn't able to take both forks!") + self.log.info("\(self.name) wasn't able to take both forks!") self.think() } } /// Message sent to oneself after a timer exceeds and we're done `eating` and can become `thinking` again. distributed func stopEating() { - self.log.info("\(self.self.name) is done eating and replaced both forks!") + self.log.info("\(self.name) is done eating and replaced both forks!") Task { do { try await self.leftFork.putBack() @@ -128,10 +134,10 @@ distributed actor Philosopher: CustomStringConvertible { switch fork { case self.leftFork: - self.log.info("\(self.self.name) received their left fork!") + self.log.info("\(self.name) received their left fork!") self.state = .takingForks(leftTaken: true, rightTaken: rightForkIsTaken) case self.rightFork: - self.log.info("\(self.self.name) received their right fork!") + self.log.info("\(self.name) received their right fork!") self.state = .takingForks(leftTaken: leftForkIsTaken, rightTaken: true) default: self.log.error("Received unknown fork! Got: \(fork). Known forks: \(self.leftFork), \(self.rightFork)") @@ -144,16 +150,21 @@ distributed actor Philosopher: CustomStringConvertible { private func becomeEating() { self.state = .eating - self.log.notice("\(self.self.name) began eating!") - self.timers.startSingle(key: .becomeHungry, delay: .seconds(3)) { - await self.stopEating() + self.log.notice("\(self.name) began eating!") + self.finishEatingTimerTask = Task { + for await _ in AsyncTimerSequence(interval: .seconds(3), clock: ContinuousClock()) { + self.stopEating() + self.finishEatingTimerTask?.cancel() + break + } } } -} -extension TimerKey { - static let becomeHungry: Self = "become-hungry" - static let finishEating: Self = "finish-eating" + deinit { + // FIXME: these are async +// self.becomeHungryTimerTask?.cancel() +// self.finishEatingTimerTask?.cancel() + } } extension Philosopher { diff --git a/Sources/DistributedActors/Timers+Distributed.swift b/Sources/DistributedActors/Timers+Distributed.swift deleted file mode 100644 index c024872b0..000000000 --- a/Sources/DistributedActors/Timers+Distributed.swift +++ /dev/null @@ -1,167 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift Distributed Actors open source project -// -// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Dispatch -import Distributed -import DistributedActorsConcurrencyHelpers -import struct NIO.TimeAmount - -struct DistributedActorTimer { - @usableFromInline - let key: TimerKey - @usableFromInline - let repeated: Bool - @usableFromInline - let handle: Cancelable -} - -struct DistributedActorTimerEvent { - let key: TimerKey - let owner: ActorID -} - -/// Creates and manages timers which may only be accessed from the actor that owns it. -/// -/// _BehaviorTimers are bound to this objects lifecycle, i.e. when the actor owning this object is deallocated, -/// and the `ActorTimers` are deallocated as well, all timers associated with it are cancelled. -@available(*, deprecated, message: "Actor timers cannot participate in structured cancellation, and will be replaced with patterns using swift-async-algorithms (see Timer)") -public final class ActorTimers where Act.ActorSystem == ClusterSystem { - internal let ownerID: ActorID - - internal let dispatchQueue = DispatchQueue.global() - - internal let installedTimersLock = Lock() - internal var installedTimers: [TimerKey: DistributedActorTimer] = [:] - - // TODO: this is a workaround, we're removing ActorTimers since they can't participate in structured cancellation - weak var actorSystem: Act.ActorSystem? - - /// Create a timers instance owned by the passed in actor. - /// - /// Does not retain the distributed actor. - /// - /// - Parameter myself: - public init(_ myself: Act) { - self.ownerID = myself.id - } - - deinit { - self._cancelAll(includeSystemTimers: true) - } - - /// Cancel all timers currently stored in this ``ActorTimers`` instance. - public func cancelAll() { - self._cancelAll(includeSystemTimers: false) - } - - internal func _cancelAll(includeSystemTimers: Bool) { - self.installedTimersLock.withLockVoid { - for key in self.installedTimers.keys where includeSystemTimers || !key.isSystemTimer { - // TODO: the reason the `_` keys are not cancelled is because we want to cancel timers in _restartPrepare but we need "our restart timer" to remain - self.cancel(for: key) - } - } - } - - /// Cancels timer associated with the given key. - /// - /// - Parameter key: key of the timer to cancel - public func cancel(for key: TimerKey) { - if let timer = self.installedTimersLock.withLock { self.installedTimers.removeValue(forKey: key) } { - timer.handle.cancel() - } - } - - /// Checks for the existence of a scheduler timer for given key (single or periodic). - /// - /// - Returns: true if timer exists, false otherwise - public func exists(key: TimerKey) -> Bool { - self.installedTimersLock.withLock { - self.installedTimers[key] != nil - } - } - - /// Starts a timer that will invoke the provided `call` closure on the actor's context after the specified delay. - /// - /// - Parameters: - /// - key: the key associated with the timer - /// - call: the call that will be made after the `delay` amount of time elapses - /// - delay: the delay after which the message will be sent - public func startSingle( - key: TimerKey, - delay: Duration, - @_inheritActorContext @_implicitSelfCapture call: @Sendable @escaping () async -> Void - ) { - self.start(key: key, call: call, interval: delay, repeated: false) - } - - /// Starts a timer that will periodically invoke the passed in `call` closure on the actor's context. - /// - /// - Parameters: - /// - key: the key associated with the timer - /// - call: the call that will be executed after the `delay` amount of time elapses - /// - interval: the interval with which the message will be sent - public func startPeriodic( - key: TimerKey, - interval: Duration, - @_inheritActorContext @_implicitSelfCapture call: @Sendable @escaping () async -> Void - ) { - self.start(key: key, call: call, interval: interval, repeated: true) - } - - internal func start( - key: TimerKey, - @_inheritActorContext @_implicitSelfCapture call: @Sendable @escaping () async -> Void, - interval: Duration, - repeated: Bool - ) { - let handle: Cancelable - if repeated { - handle = self.dispatchQueue.scheduleAsync(initialDelay: interval, interval: interval) { - // We take the lock to prevent the system from shutting down - // while we're in the middle of potentially issuing remote calls - // Which may cause: Cannot schedule tasks on an EventLoop that has already shut down. - // The actual solution is outstanding work tracking potentially. - let system = self.actorSystem - system?.shutdownSemaphore.wait() - defer { - system?.shutdownSemaphore.signal() - } - - await call() - } - } else { - handle = self.dispatchQueue.scheduleOnceAsync(delay: interval) { - // We take the lock to prevent the system from shutting down - // while we're in the middle of potentially issuing remote calls - // Which may cause: Cannot schedule tasks on an EventLoop that has already shut down. - // The actual solution is outstanding work tracking potentially. - let system = self.actorSystem - system?.shutdownSemaphore.wait() - defer { - system?.shutdownSemaphore.signal() - } - - await call() - - // The single timer is done. Remove it so it can be installed again if needed. - self.cancel(for: key) - } - } - - self.installedTimersLock.withLockVoid { - self.installedTimers[key] = DistributedActorTimer(key: key, repeated: repeated, handle: handle) - } - } -} From 5477873d65338160023a2332fad33cacba07e9c1 Mon Sep 17 00:00:00 2001 From: Yim Lee Date: Wed, 6 Jul 2022 01:16:33 -0700 Subject: [PATCH 3/8] Rename TimerKey to _TimerKey; remove deprecation warning --- .../Cluster/ClusterShell.swift | 2 +- ...rationLogClusterReceptionistBehavior.swift | 4 +-- .../Cluster/SWIM/SWIMActorShell.swift | 2 +- .../Gossip/Gossiper+Shell.swift | 4 +-- Sources/DistributedActors/Supervision.swift | 2 +- .../DistributedActors/_BehaviorTimers.swift | 34 +++++++++---------- .../DeathWatchDocExamples.swift | 2 +- .../MembershipGossipClusteredTests.swift | 2 +- .../ShootTheOtherNodeClusteredTests.swift | 2 +- .../DistributedActorsTests/TimersTests.swift | 26 +++++++------- 10 files changed, 40 insertions(+), 40 deletions(-) diff --git a/Sources/DistributedActors/Cluster/ClusterShell.swift b/Sources/DistributedActors/Cluster/ClusterShell.swift index 64e9d9c0b..2adf72aac 100644 --- a/Sources/DistributedActors/Cluster/ClusterShell.swift +++ b/Sources/DistributedActors/Cluster/ClusterShell.swift @@ -943,7 +943,7 @@ extension ClusterShell { "handshake/retryDelay": "\(retryDelay)", ]) context.timers.startSingle( - key: TimerKey("handshake-timer-\(remoteNode)"), + key: _TimerKey("handshake-timer-\(remoteNode)"), message: .command(.retryHandshake(initiated)), delay: retryDelay ) diff --git a/Sources/DistributedActors/Cluster/Reception/_OperationLogClusterReceptionistBehavior.swift b/Sources/DistributedActors/Cluster/Reception/_OperationLogClusterReceptionistBehavior.swift index aedeaa52b..0c6a89e56 100644 --- a/Sources/DistributedActors/Cluster/Reception/_OperationLogClusterReceptionistBehavior.swift +++ b/Sources/DistributedActors/Cluster/Reception/_OperationLogClusterReceptionistBehavior.swift @@ -26,7 +26,7 @@ public final class _OperationLogClusterReceptionist { // ==== ------------------------------------------------------------------------------------------------------------ // MARK: Timer Keys - static let slowACKReplicationTick: TimerKey = "slow-ack-replication-tick" + static let slowACKReplicationTick: _TimerKey = "slow-ack-replication-tick" // ==== ------------------------------------------------------------------------------------------------------------ // MARK: State @@ -260,7 +260,7 @@ extension _OperationLogClusterReceptionist { } } - private func flushTimerKey(_ key: AnyReceptionKey) -> TimerKey { + private func flushTimerKey(_ key: AnyReceptionKey) -> _TimerKey { "flush-\(key.hashValue)" } } diff --git a/Sources/DistributedActors/Cluster/SWIM/SWIMActorShell.swift b/Sources/DistributedActors/Cluster/SWIM/SWIMActorShell.swift index 17f7f72a0..2fc1c7eac 100644 --- a/Sources/DistributedActors/Cluster/SWIM/SWIMActorShell.swift +++ b/Sources/DistributedActors/Cluster/SWIM/SWIMActorShell.swift @@ -552,7 +552,7 @@ extension SWIMActorShell { .metrics(group: "swim.shell", measure: [.serialization, .deserialization]) } - static let protocolPeriodTimerKey = TimerKey("\(SWIMActorShell.name)/periodic-ping") + static let protocolPeriodTimerKey = _TimerKey("\(SWIMActorShell.name)/periodic-ping") } extension ActorID { diff --git a/Sources/DistributedActors/Gossip/Gossiper+Shell.swift b/Sources/DistributedActors/Gossip/Gossiper+Shell.swift index d2c71a7f7..fcb1e1655 100644 --- a/Sources/DistributedActors/Gossip/Gossiper+Shell.swift +++ b/Sources/DistributedActors/Gossip/Gossiper+Shell.swift @@ -2,7 +2,7 @@ // // This source file is part of the Swift Distributed Actors open source project // -// Copyright (c) 2020 Apple Inc. and the Swift Distributed Actors project authors +// Copyright (c) 2020-2022 Apple Inc. and the Swift Distributed Actors project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -14,7 +14,7 @@ import Logging -private let gossipTickKey: TimerKey = "gossip-tick" +private let gossipTickKey: _TimerKey = "gossip-tick" /// :nodoc: /// diff --git a/Sources/DistributedActors/Supervision.swift b/Sources/DistributedActors/Supervision.swift index 8b75b9202..75e4c16d8 100644 --- a/Sources/DistributedActors/Supervision.swift +++ b/Sources/DistributedActors/Supervision.swift @@ -841,7 +841,7 @@ internal enum SupervisionRestartDelayedBehavior { @usableFromInline static func after(delay: Duration, with replacement: _Behavior) -> _Behavior { .setup { context in - context.timers._startResumeTimer(key: TimerKey("restartBackoff", isSystemTimer: true), delay: delay, resumeWith: WakeUp()) + context.timers._startResumeTimer(key: _TimerKey("restartBackoff", isSystemTimer: true), delay: delay, resumeWith: WakeUp()) return .suspend { (result: Result) in traceLog_Supervision("RESTART BACKOFF TRIGGER") diff --git a/Sources/DistributedActors/_BehaviorTimers.swift b/Sources/DistributedActors/_BehaviorTimers.swift index d9883bb98..7e674e5af 100644 --- a/Sources/DistributedActors/_BehaviorTimers.swift +++ b/Sources/DistributedActors/_BehaviorTimers.swift @@ -20,7 +20,7 @@ import struct NIO.TimeAmount @usableFromInline struct Timer { // FIXME(distributed): deprecate and remove in favor of DistributedActorTimers @usableFromInline - let key: TimerKey + let key: _TimerKey @usableFromInline let message: Message? @usableFromInline @@ -33,22 +33,22 @@ struct Timer { // FIXME(distributed): deprecate and remove in favor of @usableFromInline struct TimerEvent { - let key: TimerKey + let key: _TimerKey let generation: Int let owner: ActorID } -/// A `TimerKey` is used to identify a timer. It can be stored and re-used. +/// A `_TimerKey` is used to identify a timer. It can be stored and re-used. /// /// Example: /// -/// let timerKey = TimerKey("my-key") +/// let timerKey = _TimerKey("my-key") /// timers.startPeriodicTimer(key: timerKey, message: "ping", interval: .seconds(1)) /// // ... /// timers.cancelTimer(forKey: timerKey) /// -@available(*, deprecated, message: "Actor timers cannot participate in structured cancellation, and will be replaced with patterns using swift-async-algorithms (see Timer)") -public struct TimerKey: Hashable { +// TODO: replace timers with AsyncTimerSequence from swift-async-algorithms +public struct _TimerKey: Hashable { private let identifier: AnyHashable @usableFromInline @@ -64,17 +64,17 @@ public struct TimerKey: Hashable { } } -extension TimerKey: CustomStringConvertible { +extension _TimerKey: CustomStringConvertible { public var description: String { if self.isSystemTimer { - return "TimerKey(\(self.identifier), isSystemTimer: \(self.isSystemTimer))" + return "_TimerKey(\(self.identifier), isSystemTimer: \(self.isSystemTimer))" } else { - return "TimerKey(\(self.identifier.base))" + return "_TimerKey(\(self.identifier.base))" } } } -extension TimerKey: ExpressibleByStringLiteral, ExpressibleByStringInterpolation { +extension _TimerKey: ExpressibleByStringLiteral, ExpressibleByStringInterpolation { public init(stringLiteral value: StringLiteralType) { self.init(value) } @@ -88,7 +88,7 @@ public final class _BehaviorTimers { @usableFromInline internal let dispatchQueue = DispatchQueue.global() @usableFromInline - internal var installedTimers: [TimerKey: Timer] = [:] + internal var installedTimers: [_TimerKey: Timer] = [:] @usableFromInline internal unowned var context: _ActorContext @@ -114,7 +114,7 @@ public final class _BehaviorTimers { /// Cancels timer associated with the given key. /// /// - Parameter key: key of the timer to cancel - public func cancel(for key: TimerKey) { + public func cancel(for key: _TimerKey) { if let timer = self.installedTimers.removeValue(forKey: key) { if context.system.settings.logging.verboseTimers { self.context.log.trace("Cancel timer [\(key)] with generation [\(timer.generation)]", metadata: self.metadata) @@ -127,7 +127,7 @@ public final class _BehaviorTimers { /// /// - Returns: true if timer exists, false otherwise @inlinable - public func exists(key: TimerKey) -> Bool { + public func exists(key: _TimerKey) -> Bool { self.installedTimers[key] != nil } @@ -138,7 +138,7 @@ public final class _BehaviorTimers { /// - message: the message that will be sent to `myself` /// - delay: the delay after which the message will be sent @inlinable - public func startSingle(key: TimerKey, message: Message, delay: Duration) { + public func startSingle(key: _TimerKey, message: Message, delay: Duration) { self.start(key: key, message: message, interval: delay, repeated: false) } @@ -149,12 +149,12 @@ public final class _BehaviorTimers { /// - message: the message that will be sent to `myself` /// - interval: the interval with which the message will be sent @inlinable - public func startPeriodic(key: TimerKey, message: Message, interval: Duration) { + public func startPeriodic(key: _TimerKey, message: Message, interval: Duration) { self.start(key: key, message: message, interval: interval, repeated: true) } @usableFromInline - internal func start(key: TimerKey, message: Message, interval: Duration, repeated: Bool) { + internal func start(key: _TimerKey, message: Message, interval: Duration, repeated: Bool) { self.cancel(for: key) let generation = self.nextTimerGen() @@ -228,7 +228,7 @@ extension _BehaviorTimers { /// Dangerous version of `_startTimer` which allows scheduling a `.resume` system message (directly!) with a token `T`, after a time `delay`. /// This can be used e.g. to implement restarting an actor after a backoff delay. - internal func _startResumeTimer(key: TimerKey, delay: Duration, resumeWith token: T) { + internal func _startResumeTimer(key: _TimerKey, delay: Duration, resumeWith token: T) { assert(key.isSystemTimer, "_startResumeTimer MUST ONLY be used by system internal tasks, and keys MUST be `_` prefixed. Key was: \(key)") self.cancel(for: key) diff --git a/Tests/DistributedActorsDocumentationTests/DeathWatchDocExamples.swift b/Tests/DistributedActorsDocumentationTests/DeathWatchDocExamples.swift index 30b49ef3a..426994e9c 100644 --- a/Tests/DistributedActorsDocumentationTests/DeathWatchDocExamples.swift +++ b/Tests/DistributedActorsDocumentationTests/DeathWatchDocExamples.swift @@ -129,7 +129,7 @@ class DeathWatchDocExamples { false } // tag::handling_termination_deathwatch[] - let concedeTimer: TimerKey = "concede-timer" + let concedeTimer: _TimerKey = "concede-timer" _ = _Behavior.receive { context, command in switch command { diff --git a/Tests/DistributedActorsTests/Cluster/MembershipGossipClusteredTests.swift b/Tests/DistributedActorsTests/Cluster/MembershipGossipClusteredTests.swift index 23306f306..5a2cf474d 100644 --- a/Tests/DistributedActorsTests/Cluster/MembershipGossipClusteredTests.swift +++ b/Tests/DistributedActorsTests/Cluster/MembershipGossipClusteredTests.swift @@ -28,7 +28,7 @@ final class MembershipGossipClusteredTests: ClusteredActorSystemsXCTestCase { "/system/receptionist", ] settings.excludeGrep = [ - "TimerKey", + "_TimerKey", "schedule next gossip", "Gossip payload updated", ] diff --git a/Tests/DistributedActorsTests/Cluster/ShootTheOtherNodeClusteredTests.swift b/Tests/DistributedActorsTests/Cluster/ShootTheOtherNodeClusteredTests.swift index a3dd74225..35d5b3b1a 100644 --- a/Tests/DistributedActorsTests/Cluster/ShootTheOtherNodeClusteredTests.swift +++ b/Tests/DistributedActorsTests/Cluster/ShootTheOtherNodeClusteredTests.swift @@ -20,7 +20,7 @@ import XCTest final class ShootTheOtherNodeClusteredTests: ClusteredActorSystemsXCTestCase { override func configureLogCapture(settings: inout LogCapture.Settings) { settings.excludeGrep = [ - "TimerKey", + "_TimerKey", ] settings.excludeActorPaths = [ "/system/cluster/swim", diff --git a/Tests/DistributedActorsTests/TimersTests.swift b/Tests/DistributedActorsTests/TimersTests.swift index 55cee63f9..ed21a70a6 100644 --- a/Tests/DistributedActorsTests/TimersTests.swift +++ b/Tests/DistributedActorsTests/TimersTests.swift @@ -2,7 +2,7 @@ // // This source file is part of the Swift Distributed Actors open source project // -// Copyright (c) 2018-2019 Apple Inc. and the Swift Distributed Actors project authors +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -20,15 +20,15 @@ import XCTest final class TimersTests: ClusterSystemXCTestCase { func testTimerKey_shouldPrintNicely() { - TimerKey("Hello").description.shouldEqual("TimerKey(Hello)") - TimerKey("Hello", isSystemTimer: true).description.shouldEqual("TimerKey(Hello, isSystemTimer: true)") + _TimerKey("Hello").description.shouldEqual("_TimerKey(Hello)") + _TimerKey("Hello", isSystemTimer: true).description.shouldEqual("_TimerKey(Hello, isSystemTimer: true)") } func test_startSingleTimer_shouldSendSingleMessage() throws { let p: ActorTestProbe = self.testKit.makeTestProbe() let behavior: _Behavior = .setup { context in - context.timers.startSingle(key: TimerKey("message"), message: "fromTimer", delay: .microseconds(100)) + context.timers.startSingle(key: _TimerKey("message"), message: "fromTimer", delay: .microseconds(100)) return .receiveMessage { message in p.tell(message) return .same @@ -45,7 +45,7 @@ final class TimersTests: ClusterSystemXCTestCase { let behavior: _Behavior = .setup { context in var i = 0 - context.timers.startPeriodic(key: TimerKey("message"), message: "fromTimer", interval: .milliseconds(10)) + context.timers.startPeriodic(key: _TimerKey("message"), message: "fromTimer", interval: .milliseconds(10)) return .receiveMessage { message in i += 1 p.tell(message) @@ -70,13 +70,13 @@ final class TimersTests: ClusterSystemXCTestCase { let behavior: _Behavior = .setup { context in var i = 0 - context.timers.startPeriodic(key: TimerKey("message"), message: "fromTimer", interval: .milliseconds(10)) + context.timers.startPeriodic(key: _TimerKey("message"), message: "fromTimer", interval: .milliseconds(10)) return .receiveMessage { message in i += 1 p.tell(message) if i >= 5 { - context.timers.cancel(for: TimerKey("message")) + context.timers.cancel(for: _TimerKey("message")) } return .same } @@ -97,9 +97,9 @@ final class TimersTests: ClusterSystemXCTestCase { // amount of time, so the timer is triggered and sends the message. // Because we cancel the timer in the same run, the message should // not be processed and the probe should not receive a message. - context.timers.startSingle(key: TimerKey("message"), message: "fromTimer", delay: .nanoseconds(0)) + context.timers.startSingle(key: _TimerKey("message"), message: "fromTimer", delay: .nanoseconds(0)) DistributedActors._Thread.sleep(.milliseconds(10)) // FIXME(swift): replace with Task.sleep - context.timers.cancel(for: TimerKey("message")) + context.timers.cancel(for: _TimerKey("message")) return .receiveMessage { message in p.tell(message) return .same @@ -114,9 +114,9 @@ final class TimersTests: ClusterSystemXCTestCase { let p: ActorTestProbe = self.testKit.makeTestProbe() let behavior: _Behavior = .setup { context in - context.timers.startPeriodic(key: TimerKey("message"), message: "fromTimer", interval: .milliseconds(10)) - context.timers.startPeriodic(key: TimerKey("message2"), message: "fromTimer2", interval: .milliseconds(50)) - context.timers.startPeriodic(key: TimerKey("message3"), message: "fromTimer3", interval: .milliseconds(50)) + context.timers.startPeriodic(key: _TimerKey("message"), message: "fromTimer", interval: .milliseconds(10)) + context.timers.startPeriodic(key: _TimerKey("message2"), message: "fromTimer2", interval: .milliseconds(50)) + context.timers.startPeriodic(key: _TimerKey("message3"), message: "fromTimer3", interval: .milliseconds(50)) return .receiveMessage { message in p.tell(message) context.timers.cancelAll() @@ -133,7 +133,7 @@ final class TimersTests: ClusterSystemXCTestCase { let p: ActorTestProbe = self.testKit.makeTestProbe() let behavior: _Behavior = .setup { context in - context.timers.startPeriodic(key: TimerKey("message", isSystemTimer: true), message: "fromSystemTimer", interval: .milliseconds(10)) + context.timers.startPeriodic(key: _TimerKey("message", isSystemTimer: true), message: "fromSystemTimer", interval: .milliseconds(10)) return .receiveMessage { message in p.tell(message) context.timers.cancelAll() From f0d11b201a79d27337694d56279fbd2499f31557 Mon Sep 17 00:00:00 2001 From: Yim Lee Date: Wed, 6 Jul 2022 01:20:16 -0700 Subject: [PATCH 4/8] Make _TimerKey internal --- Sources/DistributedActors/_BehaviorTimers.swift | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/Sources/DistributedActors/_BehaviorTimers.swift b/Sources/DistributedActors/_BehaviorTimers.swift index 7e674e5af..e9d25c003 100644 --- a/Sources/DistributedActors/_BehaviorTimers.swift +++ b/Sources/DistributedActors/_BehaviorTimers.swift @@ -19,7 +19,6 @@ import struct NIO.TimeAmount @usableFromInline struct Timer { // FIXME(distributed): deprecate and remove in favor of DistributedActorTimers - @usableFromInline let key: _TimerKey @usableFromInline let message: Message? @@ -48,7 +47,7 @@ struct TimerEvent { /// timers.cancelTimer(forKey: timerKey) /// // TODO: replace timers with AsyncTimerSequence from swift-async-algorithms -public struct _TimerKey: Hashable { +internal struct _TimerKey: Hashable { private let identifier: AnyHashable @usableFromInline @@ -87,7 +86,6 @@ public final class _BehaviorTimers { // TODO: eventually replace with our own scheduler implementation @usableFromInline internal let dispatchQueue = DispatchQueue.global() - @usableFromInline internal var installedTimers: [_TimerKey: Timer] = [:] @usableFromInline internal unowned var context: _ActorContext @@ -114,7 +112,7 @@ public final class _BehaviorTimers { /// Cancels timer associated with the given key. /// /// - Parameter key: key of the timer to cancel - public func cancel(for key: _TimerKey) { + internal func cancel(for key: _TimerKey) { if let timer = self.installedTimers.removeValue(forKey: key) { if context.system.settings.logging.verboseTimers { self.context.log.trace("Cancel timer [\(key)] with generation [\(timer.generation)]", metadata: self.metadata) @@ -126,8 +124,7 @@ public final class _BehaviorTimers { /// Checks for the existence of a scheduler timer for given key (single or periodic). /// /// - Returns: true if timer exists, false otherwise - @inlinable - public func exists(key: _TimerKey) -> Bool { + internal func exists(key: _TimerKey) -> Bool { self.installedTimers[key] != nil } @@ -137,8 +134,7 @@ public final class _BehaviorTimers { /// - key: the key associated with the timer /// - message: the message that will be sent to `myself` /// - delay: the delay after which the message will be sent - @inlinable - public func startSingle(key: _TimerKey, message: Message, delay: Duration) { + internal func startSingle(key: _TimerKey, message: Message, delay: Duration) { self.start(key: key, message: message, interval: delay, repeated: false) } @@ -148,12 +144,10 @@ public final class _BehaviorTimers { /// - key: the key associated with the timer /// - message: the message that will be sent to `myself` /// - interval: the interval with which the message will be sent - @inlinable - public func startPeriodic(key: _TimerKey, message: Message, interval: Duration) { + internal func startPeriodic(key: _TimerKey, message: Message, interval: Duration) { self.start(key: key, message: message, interval: interval, repeated: true) } - @usableFromInline internal func start(key: _TimerKey, message: Message, interval: Duration, repeated: Bool) { self.cancel(for: key) From ef06ce6360ecf61e62f8f2d3ff049195cc5b78d4 Mon Sep 17 00:00:00 2001 From: Yim Lee Date: Wed, 6 Jul 2022 11:57:26 -0700 Subject: [PATCH 5/8] Update samples per feedback --- .../Philosopher.swift | 28 ++++--------------- 1 file changed, 6 insertions(+), 22 deletions(-) diff --git a/Samples/Sources/SampleDiningPhilosophers/Philosopher.swift b/Samples/Sources/SampleDiningPhilosophers/Philosopher.swift index 9e5d61bf7..c59413059 100644 --- a/Samples/Sources/SampleDiningPhilosophers/Philosopher.swift +++ b/Samples/Sources/SampleDiningPhilosophers/Philosopher.swift @@ -12,7 +12,6 @@ // //===----------------------------------------------------------------------===// -import AsyncAlgorithms import Distributed import DistributedActors import Logging @@ -25,9 +24,6 @@ distributed actor Philosopher: CustomStringConvertible { private let rightFork: Fork private var state: State = .thinking - private var becomeHungryTimerTask: Task? - private var finishEatingTimerTask: Task? - init(name: String, leftFork: Fork, rightFork: Fork, actorSystem: ActorSystem) { self.actorSystem = actorSystem self.name = name @@ -60,12 +56,9 @@ distributed actor Philosopher: CustomStringConvertible { } self.state = .thinking - self.becomeHungryTimerTask = Task { - for await _ in AsyncTimerSequence(interval: .seconds(1), clock: ContinuousClock()) { - await self.attemptToTakeForks() - self.becomeHungryTimerTask?.cancel() - break - } + Task { + try await Task.sleep(until: .now + .seconds(1), clock: .continuous) + await self.attemptToTakeForks() } self.log.info("\(self.name) is thinking...") } @@ -151,20 +144,11 @@ distributed actor Philosopher: CustomStringConvertible { private func becomeEating() { self.state = .eating self.log.notice("\(self.name) began eating!") - self.finishEatingTimerTask = Task { - for await _ in AsyncTimerSequence(interval: .seconds(3), clock: ContinuousClock()) { - self.stopEating() - self.finishEatingTimerTask?.cancel() - break - } + Task { + try await Task.sleep(until: .now + .seconds(3), clock: .continuous) + self.stopEating() } } - - deinit { - // FIXME: these are async -// self.becomeHungryTimerTask?.cancel() -// self.finishEatingTimerTask?.cancel() - } } extension Philosopher { From 78accc3f6a679e3d8e64e8d4e4aede759d815613 Mon Sep 17 00:00:00 2001 From: Yim Lee Date: Wed, 6 Jul 2022 12:13:42 -0700 Subject: [PATCH 6/8] Clean up DowningStrategy per feedback --- .../Cluster/Downing/DowningStrategy.swift | 28 ++++++++----------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/Sources/DistributedActors/Cluster/Downing/DowningStrategy.swift b/Sources/DistributedActors/Cluster/Downing/DowningStrategy.swift index b1e2b1971..c3ead2ef1 100644 --- a/Sources/DistributedActors/Cluster/Downing/DowningStrategy.swift +++ b/Sources/DistributedActors/Cluster/Downing/DowningStrategy.swift @@ -12,7 +12,6 @@ // //===----------------------------------------------------------------------===// -import AsyncAlgorithms import Distributed import Logging @@ -87,7 +86,7 @@ internal distributed actor DowningStrategyShell { /// `Task` for subscribing to cluster events. private var eventsListeningTask: Task? - /// `Task` for timers + /// Timer `Task`s private var memberTimerTasks: [Cluster.Member: Task] = [:] init(_ strategy: DowningStrategy, system: ActorSystem) async { @@ -102,9 +101,6 @@ internal distributed actor DowningStrategyShell { deinit { self.eventsListeningTask?.cancel() - self.memberTimerTasks.values.forEach { timerTask in - timerTask.cancel() - } } func receiveClusterEvent(_ event: Cluster.Event) throws { @@ -120,28 +116,26 @@ internal distributed actor DowningStrategyShell { case .startTimer(let member, let delay): self.log.trace("Start timer for member: \(member), delay: \(delay)") self.memberTimerTasks[member] = Task { - for await _ in AsyncTimerSequence(interval: delay, clock: ContinuousClock()) { - self.onTimeout(member: member) - // Single-shot; cancel task immediately after it has been fired - self.cancelTimer(for: member) - break + defer { self.memberTimerTasks.removeValue(forKey: member) } + + guard !Task.isCancelled else { + return } + + try await Task.sleep(until: .now + delay, clock: .continuous) + self.onTimeout(member: member) } case .cancelTimer(let member): self.log.trace("Cancel timer for member: \(member)") - self.cancelTimer(for: member) + if let timerTask = self.memberTimerTasks.removeValue(forKey: member) { + timerTask.cancel() + } case .none: () // nothing to be done } } - private func cancelTimer(for member: Cluster.Member) { - if let timerTask = self.memberTimerTasks.removeValue(forKey: member) { - timerTask.cancel() - } - } - func markAsDown(members: Set) { for member in members { self.log.info( From 27ce9ec740497f7e9ab4dbb4ada7f196218bfeb4 Mon Sep 17 00:00:00 2001 From: Yim Lee Date: Wed, 6 Jul 2022 12:34:12 -0700 Subject: [PATCH 7/8] Update receptionist per feedback --- .../OperationLogDistributedReceptionist.swift | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift b/Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift index 035fe3f87..48e7d01d4 100644 --- a/Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift +++ b/Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift @@ -262,7 +262,7 @@ public distributed actor OpLogDistributedReceptionist: DistributedReceptionist, // periodically gossip to other receptionists with the last seqNr we've seen, // and if it happens to be outdated by then this will cause a push from that node. self.slowACKReplicationTimerTask = Task { - for await _ in AsyncTimerSequence.repeating(every: self.actorSystem.settings.receptionist.ackPullReplicationIntervalSlow, clock: ContinuousClock()) { + for await _ in AsyncTimerSequence.repeating(every: self.actorSystem.settings.receptionist.ackPullReplicationIntervalSlow, clock: .continuous) { self.periodicAckTick() } } @@ -273,10 +273,6 @@ public distributed actor OpLogDistributedReceptionist: DistributedReceptionist, deinit { self.eventsListeningTask?.cancel() self.slowACKReplicationTimerTask?.cancel() - // FIXME: this crashes tests (ClusterAssociationTests.test_association_sameAddressNodeJoin_shouldOverrideExistingNode) -// self.flushTimerTasks.values.forEach { timerTask in -// timerTask.cancel() -// } } } @@ -426,14 +422,10 @@ extension OpLogDistributedReceptionist { let flushDelay = actorSystem.settings.receptionist.listingFlushDelay self.log.debug("schedule delayed flush") self.flushTimerTasks[timerTaskKey] = Task { - for await _ in AsyncTimerSequence(interval: flushDelay, clock: ContinuousClock()) { - self.onDelayedListingFlushTick(key: key) - // Single-shot; cancel task immediately after it has been fired - if let timerTask = self.flushTimerTasks.removeValue(forKey: timerTaskKey) { - timerTask.cancel() - } - break - } + defer { self.flushTimerTasks.removeValue(forKey: timerTaskKey) } + + try await Task.sleep(until: .now + flushDelay, clock: .continuous) + self.onDelayedListingFlushTick(key: key) } } From 7faef59ea1736a433e14c1aa9e756cf925972f04 Mon Sep 17 00:00:00 2001 From: Yim Lee Date: Wed, 6 Jul 2022 13:51:18 -0700 Subject: [PATCH 8/8] More warning fixes --- Sources/DistributedActors/Cluster/ClusterControl.swift | 4 ++-- .../DistributedActors/Cluster/Reception/OperationLog.swift | 3 ++- .../Reception/OperationLogDistributedReceptionist.swift | 2 +- Sources/DistributedActors/WeakActorDictionary.swift | 2 +- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/Sources/DistributedActors/Cluster/ClusterControl.swift b/Sources/DistributedActors/Cluster/ClusterControl.swift index be9076827..c93ec2eb0 100644 --- a/Sources/DistributedActors/Cluster/ClusterControl.swift +++ b/Sources/DistributedActors/Cluster/ClusterControl.swift @@ -218,7 +218,7 @@ public struct ClusterControl { public func waitFor(_ node: UniqueNode, _ status: Cluster.MemberStatus, within: Duration) async throws -> Cluster.Member { try await self.waitForMembershipEventually(within: within) { membership in if status == .down || status == .removed { - if let cluster = self.cluster, let tombstone = cluster.getExistingAssociationTombstone(with: node) { + if let cluster = self.cluster, cluster.getExistingAssociationTombstone(with: node) != nil { return Cluster.Member(node: node, status: .removed).asUnreachable } } @@ -251,7 +251,7 @@ public struct ClusterControl { public func waitFor(_ node: UniqueNode, atLeast atLeastStatus: Cluster.MemberStatus, within: Duration) async throws -> Cluster.Member { try await self.waitForMembershipEventually(within: within) { membership in if atLeastStatus == .down || atLeastStatus == .removed { - if let cluster = self.cluster, let tombstone = cluster.getExistingAssociationTombstone(with: node) { + if let cluster = self.cluster, cluster.getExistingAssociationTombstone(with: node) != nil { return Cluster.Member(node: node, status: .removed).asUnreachable } } diff --git a/Sources/DistributedActors/Cluster/Reception/OperationLog.swift b/Sources/DistributedActors/Cluster/Reception/OperationLog.swift index 530b38031..75017c90c 100644 --- a/Sources/DistributedActors/Cluster/Reception/OperationLog.swift +++ b/Sources/DistributedActors/Cluster/Reception/OperationLog.swift @@ -2,7 +2,7 @@ // // This source file is part of the Swift Distributed Actors open source project // -// Copyright (c) 2020 Apple Inc. and the Swift Distributed Actors project authors +// Copyright (c) 2020-2022 Apple Inc. and the Swift Distributed Actors project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -34,6 +34,7 @@ internal class OpLog { self.maxSeqNr = 0 } + @discardableResult func add(_ op: Op) -> SequencedOp { self.maxSeqNr += 1 let sequencedOp = SequencedOp(sequenceRange: .single(self.maxSeqNr), op: op) diff --git a/Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift b/Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift index 48e7d01d4..1dc5f1d94 100644 --- a/Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift +++ b/Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift @@ -808,7 +808,7 @@ extension OpLogDistributedReceptionist { let wasRegisteredWithKeys = self.storage.removeFromKeyMappings(equalityHackRef.asAnyDistributedActor) for key in wasRegisteredWithKeys.registeredUnderKeys { - self.addOperation(.remove(key: key, identity: id)) + _ = self.addOperation(.remove(key: key, identity: id)) self.publishListings(forKey: key) } diff --git a/Sources/DistributedActors/WeakActorDictionary.swift b/Sources/DistributedActors/WeakActorDictionary.swift index c9ce8f8d3..dab2bc40d 100644 --- a/Sources/DistributedActors/WeakActorDictionary.swift +++ b/Sources/DistributedActors/WeakActorDictionary.swift @@ -45,7 +45,7 @@ struct WeakActorDictionary { guard let knownActor = container.actor else { // the actor was released -- let's remove the container while we're here - self.removeActor(identifiedBy: id) + _ = self.removeActor(identifiedBy: id) return nil }