Skip to content

Commit 2f981fd

Browse files
authored
Use AsyncTimerSequence from swift-async-algorithms (#986)
* Use AsyncTimerSequence from swift-async-algorithms Resolves #958 * Update Samples * Rename TimerKey to _TimerKey; remove deprecation warning * Make _TimerKey internal * Update samples per feedback * Clean up DowningStrategy per feedback * Update receptionist per feedback * More warning fixes
1 parent 8f025cc commit 2f981fd

19 files changed

+114
-294
lines changed

Package.swift

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ var targets: [PackageDescription.Target] = [
3030
.product(name: "Metrics", package: "swift-metrics"),
3131
.product(name: "ServiceDiscovery", package: "swift-service-discovery"),
3232
.product(name: "Backtrace", package: "swift-backtrace"),
33+
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
3334
]
3435
),
3536

@@ -179,26 +180,27 @@ var targets: [PackageDescription.Target] = [
179180
var dependencies: [Package.Dependency] = [
180181
.package(url: "https://github.com/apple/swift-atomics", from: "1.0.2"),
181182

182-
.package(url: "https://github.com/apple/swift-cluster-membership.git", from: "0.3.0"),
183+
.package(url: "https://github.com/apple/swift-cluster-membership", from: "0.3.0"),
183184

184-
.package(url: "https://github.com/apple/swift-nio.git", from: "2.40.0"),
185-
.package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.2.0"),
186-
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.16.1"),
185+
.package(url: "https://github.com/apple/swift-nio", from: "2.40.0"),
186+
.package(url: "https://github.com/apple/swift-nio-extras", from: "1.2.0"),
187+
.package(url: "https://github.com/apple/swift-nio-ssl", from: "2.16.1"),
187188

188-
.package(url: "https://github.com/apple/swift-protobuf.git", from: "1.7.0"),
189+
.package(url: "https://github.com/apple/swift-protobuf", from: "1.7.0"),
189190

190191
// ~~~ backtraces ~~~
191192
// TODO: optimally, library should not pull swift-backtrace
192-
.package(url: "https://github.com/swift-server/swift-backtrace.git", from: "1.1.1"),
193+
.package(url: "https://github.com/swift-server/swift-backtrace", from: "1.1.1"),
193194

194-
// ~~~ Swift Collections ~~~
195-
.package(url: "https://github.com/apple/swift-collections.git", from: "1.0.1"),
195+
// ~~~ Swift libraries ~~~
196+
.package(url: "https://github.com/apple/swift-async-algorithms", from: "0.0.3"),
197+
.package(url: "https://github.com/apple/swift-collections", from: "1.0.1"),
196198

197199
// ~~~ Observability ~~~
198-
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
200+
.package(url: "https://github.com/apple/swift-log", from: "1.0.0"),
199201
// swift-metrics 1.x and 2.x are almost API compatible, so most clients should use
200-
.package(url: "https://github.com/apple/swift-metrics.git", "1.0.0" ..< "3.0.0"),
201-
.package(url: "https://github.com/apple/swift-service-discovery.git", from: "1.0.0"),
202+
.package(url: "https://github.com/apple/swift-metrics", "1.0.0" ..< "3.0.0"),
203+
.package(url: "https://github.com/apple/swift-service-discovery", from: "1.0.0"),
202204

203205
// ~~~ SwiftPM Plugins ~~~
204206
.package(url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0"),

Samples/Sources/SampleDiningPhilosophers/Philosopher.swift

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the Swift Distributed Actors open source project
44
//
5-
// Copyright (c) 2018-2021 Apple Inc. and the Swift Distributed Actors project authors
5+
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
66
// Licensed under Apache License v2.0
77
//
88
// See LICENSE.txt for license information
@@ -24,8 +24,6 @@ distributed actor Philosopher: CustomStringConvertible {
2424
private let rightFork: Fork
2525
private var state: State = .thinking
2626

27-
private lazy var timers = DistributedActors.ActorTimers<Philosopher>(self)
28-
2927
init(name: String, leftFork: Fork, rightFork: Fork, actorSystem: ActorSystem) {
3028
self.actorSystem = actorSystem
3129
self.name = name
@@ -58,15 +56,16 @@ distributed actor Philosopher: CustomStringConvertible {
5856
}
5957

6058
self.state = .thinking
61-
self.timers.startSingle(key: .becomeHungry, delay: .seconds(1)) {
59+
Task {
60+
try await Task.sleep(until: .now + .seconds(1), clock: .continuous)
6261
await self.attemptToTakeForks()
6362
}
64-
self.log.info("\(self.self.name) is thinking...")
63+
self.log.info("\(self.name) is thinking...")
6564
}
6665

6766
distributed func attemptToTakeForks() async {
6867
guard self.state == .thinking else {
69-
self.log.error("\(self.self.name) tried to take a fork but was not in the thinking state!")
68+
self.log.error("\(self.name) tried to take a fork but was not in the thinking state!")
7069
return
7170
}
7271

@@ -89,14 +88,14 @@ distributed actor Philosopher: CustomStringConvertible {
8988
}
9089
self.forkTaken(self.rightFork)
9190
} catch {
92-
self.log.info("\(self.self.name) wasn't able to take both forks!")
91+
self.log.info("\(self.name) wasn't able to take both forks!")
9392
self.think()
9493
}
9594
}
9695

9796
/// Message sent to oneself after a timer exceeds and we're done `eating` and can become `thinking` again.
9897
distributed func stopEating() {
99-
self.log.info("\(self.self.name) is done eating and replaced both forks!")
98+
self.log.info("\(self.name) is done eating and replaced both forks!")
10099
Task {
101100
do {
102101
try await self.leftFork.putBack()
@@ -128,10 +127,10 @@ distributed actor Philosopher: CustomStringConvertible {
128127

129128
switch fork {
130129
case self.leftFork:
131-
self.log.info("\(self.self.name) received their left fork!")
130+
self.log.info("\(self.name) received their left fork!")
132131
self.state = .takingForks(leftTaken: true, rightTaken: rightForkIsTaken)
133132
case self.rightFork:
134-
self.log.info("\(self.self.name) received their right fork!")
133+
self.log.info("\(self.name) received their right fork!")
135134
self.state = .takingForks(leftTaken: leftForkIsTaken, rightTaken: true)
136135
default:
137136
self.log.error("Received unknown fork! Got: \(fork). Known forks: \(self.leftFork), \(self.rightFork)")
@@ -144,18 +143,14 @@ distributed actor Philosopher: CustomStringConvertible {
144143

145144
private func becomeEating() {
146145
self.state = .eating
147-
self.log.notice("\(self.self.name) began eating!")
148-
self.timers.startSingle(key: .becomeHungry, delay: .seconds(3)) {
149-
await self.stopEating()
146+
self.log.notice("\(self.name) began eating!")
147+
Task {
148+
try await Task.sleep(until: .now + .seconds(3), clock: .continuous)
149+
self.stopEating()
150150
}
151151
}
152152
}
153153

154-
extension TimerKey {
155-
static let becomeHungry: Self = "become-hungry"
156-
static let finishEating: Self = "finish-eating"
157-
}
158-
159154
extension Philosopher {
160155
private enum State: Equatable {
161156
case thinking

Sources/DistributedActors/Cluster/ClusterControl.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ public struct ClusterControl {
218218
public func waitFor(_ node: UniqueNode, _ status: Cluster.MemberStatus, within: Duration) async throws -> Cluster.Member {
219219
try await self.waitForMembershipEventually(within: within) { membership in
220220
if status == .down || status == .removed {
221-
if let cluster = self.cluster, let tombstone = cluster.getExistingAssociationTombstone(with: node) {
221+
if let cluster = self.cluster, cluster.getExistingAssociationTombstone(with: node) != nil {
222222
return Cluster.Member(node: node, status: .removed).asUnreachable
223223
}
224224
}
@@ -251,7 +251,7 @@ public struct ClusterControl {
251251
public func waitFor(_ node: UniqueNode, atLeast atLeastStatus: Cluster.MemberStatus, within: Duration) async throws -> Cluster.Member {
252252
try await self.waitForMembershipEventually(within: within) { membership in
253253
if atLeastStatus == .down || atLeastStatus == .removed {
254-
if let cluster = self.cluster, let tombstone = cluster.getExistingAssociationTombstone(with: node) {
254+
if let cluster = self.cluster, cluster.getExistingAssociationTombstone(with: node) != nil {
255255
return Cluster.Member(node: node, status: .removed).asUnreachable
256256
}
257257
}

Sources/DistributedActors/Cluster/ClusterShell.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -943,7 +943,7 @@ extension ClusterShell {
943943
"handshake/retryDelay": "\(retryDelay)",
944944
])
945945
context.timers.startSingle(
946-
key: TimerKey("handshake-timer-\(remoteNode)"),
946+
key: _TimerKey("handshake-timer-\(remoteNode)"),
947947
message: .command(.retryHandshake(initiated)),
948948
delay: retryDelay
949949
)

Sources/DistributedActors/Cluster/Downing/DowningStrategy.swift

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ public struct DowningStrategyDirective {
3535
internal enum Repr {
3636
case none
3737
case markAsDown(Set<Cluster.Member>)
38-
case startTimer(key: TimerKey, member: Cluster.Member, delay: Duration)
39-
case cancelTimer(key: TimerKey)
38+
case startTimer(member: Cluster.Member, delay: Duration)
39+
case cancelTimer(member: Cluster.Member)
4040
}
4141

4242
internal init(_ underlying: Repr) {
@@ -47,12 +47,12 @@ public struct DowningStrategyDirective {
4747
.init(.none)
4848
}
4949

50-
public static func startTimer(key: TimerKey, member: Cluster.Member, delay: Duration) -> Self {
51-
.init(.startTimer(key: key, member: member, delay: delay))
50+
public static func startTimer(member: Cluster.Member, delay: Duration) -> Self {
51+
.init(.startTimer(member: member, delay: delay))
5252
}
5353

54-
public static func cancelTimer(key: TimerKey) -> Self {
55-
.init(.cancelTimer(key: key))
54+
public static func cancelTimer(member: Cluster.Member) -> Self {
55+
.init(.cancelTimer(member: member))
5656
}
5757

5858
public static func markAsDown(members: Set<Cluster.Member>) -> Self {
@@ -86,8 +86,8 @@ internal distributed actor DowningStrategyShell {
8686

8787
/// `Task` for subscribing to cluster events.
8888
private var eventsListeningTask: Task<Void, Error>?
89-
90-
private lazy var timers = ActorTimers<DowningStrategyShell>(self)
89+
/// Timer `Task`s
90+
private var memberTimerTasks: [Cluster.Member: Task<Void, Error>] = [:]
9191

9292
init(_ strategy: DowningStrategy, system: ActorSystem) async {
9393
self.strategy = strategy
@@ -113,14 +113,23 @@ internal distributed actor DowningStrategyShell {
113113
case .markAsDown(let members):
114114
self.markAsDown(members: members)
115115

116-
case .startTimer(let key, let member, let delay):
117-
self.log.trace("Start timer \(key), member: \(member), delay: \(delay)")
118-
self.timers.startSingle(key: key, delay: delay) {
116+
case .startTimer(let member, let delay):
117+
self.log.trace("Start timer for member: \(member), delay: \(delay)")
118+
self.memberTimerTasks[member] = Task {
119+
defer { self.memberTimerTasks.removeValue(forKey: member) }
120+
121+
guard !Task.isCancelled else {
122+
return
123+
}
124+
125+
try await Task.sleep(until: .now + delay, clock: .continuous)
119126
self.onTimeout(member: member)
120127
}
121-
case .cancelTimer(let key):
122-
self.log.trace("Cancel timer \(key)")
123-
self.timers.cancel(for: key)
128+
case .cancelTimer(let member):
129+
self.log.trace("Cancel timer for member: \(member)")
130+
if let timerTask = self.memberTimerTasks.removeValue(forKey: member) {
131+
timerTask.cancel()
132+
}
124133

125134
case .none:
126135
() // nothing to be done

Sources/DistributedActors/Cluster/Downing/TimeoutBasedDowningStrategy.swift

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public final class TimeoutBasedDowningStrategy: DowningStrategy {
5656
// it was marked as down by someone, we don't need to track it anymore
5757
_ = self._markAsDown.remove(change.member)
5858
_ = self._unreachable.remove(change.member)
59-
return .cancelTimer(key: self.timerKey(change.member))
59+
return .cancelTimer(member: change.member)
6060
} else if let replaced = change.replaced {
6161
_ = self._markAsDown.remove(replaced)
6262
_ = self._unreachable.remove(replaced)
@@ -95,7 +95,7 @@ public final class TimeoutBasedDowningStrategy: DowningStrategy {
9595

9696
self._unreachable.insert(member)
9797

98-
return .startTimer(key: self.timerKey(member), member: member, delay: self.settings.downUnreachableMembersAfter)
98+
return .startTimer(member: member, delay: self.settings.downUnreachableMembersAfter)
9999
}
100100

101101
func onMemberReachable(_ change: Cluster.ReachabilityChange) -> DowningStrategyDirective {
@@ -104,16 +104,12 @@ public final class TimeoutBasedDowningStrategy: DowningStrategy {
104104

105105
_ = self._markAsDown.remove(member)
106106
if self._unreachable.remove(member) != nil {
107-
return .cancelTimer(key: self.timerKey(member))
107+
return .cancelTimer(member: member)
108108
}
109109

110110
return .none
111111
}
112112

113-
func timerKey(_ member: Cluster.Member) -> TimerKey {
114-
TimerKey(member.uniqueNode)
115-
}
116-
117113
func onLeaderChange(to leader: Cluster.Member?) throws -> DowningStrategyDirective {
118114
_ = try self.membership.applyLeadershipChange(to: leader)
119115

@@ -129,7 +125,7 @@ public final class TimeoutBasedDowningStrategy: DowningStrategy {
129125
self._markAsDown.remove(member)
130126

131127
if self._unreachable.remove(member) != nil {
132-
return .cancelTimer(key: self.timerKey(member))
128+
return .cancelTimer(member: member)
133129
}
134130

135131
return .none

Sources/DistributedActors/Cluster/Reception/OperationLog.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the Swift Distributed Actors open source project
44
//
5-
// Copyright (c) 2020 Apple Inc. and the Swift Distributed Actors project authors
5+
// Copyright (c) 2020-2022 Apple Inc. and the Swift Distributed Actors project authors
66
// Licensed under Apache License v2.0
77
//
88
// See LICENSE.txt for license information
@@ -34,6 +34,7 @@ internal class OpLog<Op: OpLogStreamOp> {
3434
self.maxSeqNr = 0
3535
}
3636

37+
@discardableResult
3738
func add(_ op: Op) -> SequencedOp {
3839
self.maxSeqNr += 1
3940
let sequencedOp = SequencedOp(sequenceRange: .single(self.maxSeqNr), op: op)

0 commit comments

Comments
 (0)