Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var targets: [PackageDescription.Target] = [
.product(name: "Metrics", package: "swift-metrics"),
.product(name: "ServiceDiscovery", package: "swift-service-discovery"),
.product(name: "Backtrace", package: "swift-backtrace"),
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
]
),

Expand Down Expand Up @@ -179,26 +180,27 @@ var targets: [PackageDescription.Target] = [
var dependencies: [Package.Dependency] = [
.package(url: "https://github.com/apple/swift-atomics", from: "1.0.2"),

.package(url: "https://github.com/apple/swift-cluster-membership.git", from: "0.3.0"),
.package(url: "https://github.com/apple/swift-cluster-membership", from: "0.3.0"),

.package(url: "https://github.com/apple/swift-nio.git", from: "2.40.0"),
.package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.2.0"),
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.16.1"),
.package(url: "https://github.com/apple/swift-nio", from: "2.40.0"),
.package(url: "https://github.com/apple/swift-nio-extras", from: "1.2.0"),
.package(url: "https://github.com/apple/swift-nio-ssl", from: "2.16.1"),

.package(url: "https://github.com/apple/swift-protobuf.git", from: "1.7.0"),
.package(url: "https://github.com/apple/swift-protobuf", from: "1.7.0"),

// ~~~ backtraces ~~~
// TODO: optimally, library should not pull swift-backtrace
.package(url: "https://github.com/swift-server/swift-backtrace.git", from: "1.1.1"),
.package(url: "https://github.com/swift-server/swift-backtrace", from: "1.1.1"),

// ~~~ Swift Collections ~~~
.package(url: "https://github.com/apple/swift-collections.git", from: "1.0.1"),
// ~~~ Swift libraries ~~~
.package(url: "https://github.com/apple/swift-async-algorithms", from: "0.0.3"),
.package(url: "https://github.com/apple/swift-collections", from: "1.0.1"),

// ~~~ Observability ~~~
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-log", from: "1.0.0"),
// swift-metrics 1.x and 2.x are almost API compatible, so most clients should use
.package(url: "https://github.com/apple/swift-metrics.git", "1.0.0" ..< "3.0.0"),
.package(url: "https://github.com/apple/swift-service-discovery.git", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-metrics", "1.0.0" ..< "3.0.0"),
.package(url: "https://github.com/apple/swift-service-discovery", from: "1.0.0"),

// ~~~ SwiftPM Plugins ~~~
.package(url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0"),
Expand Down
31 changes: 13 additions & 18 deletions Samples/Sources/SampleDiningPhilosophers/Philosopher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2021 Apple Inc. and the Swift Distributed Actors project authors
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand All @@ -24,8 +24,6 @@ distributed actor Philosopher: CustomStringConvertible {
private let rightFork: Fork
private var state: State = .thinking

private lazy var timers = DistributedActors.ActorTimers<Philosopher>(self)

init(name: String, leftFork: Fork, rightFork: Fork, actorSystem: ActorSystem) {
self.actorSystem = actorSystem
self.name = name
Expand Down Expand Up @@ -58,15 +56,16 @@ distributed actor Philosopher: CustomStringConvertible {
}

self.state = .thinking
self.timers.startSingle(key: .becomeHungry, delay: .seconds(1)) {
Task {
try await Task.sleep(until: .now + .seconds(1), clock: .continuous)
await self.attemptToTakeForks()
}
self.log.info("\(self.self.name) is thinking...")
self.log.info("\(self.name) is thinking...")
}

distributed func attemptToTakeForks() async {
guard self.state == .thinking else {
self.log.error("\(self.self.name) tried to take a fork but was not in the thinking state!")
self.log.error("\(self.name) tried to take a fork but was not in the thinking state!")
return
}

Expand All @@ -89,14 +88,14 @@ distributed actor Philosopher: CustomStringConvertible {
}
self.forkTaken(self.rightFork)
} catch {
self.log.info("\(self.self.name) wasn't able to take both forks!")
self.log.info("\(self.name) wasn't able to take both forks!")
self.think()
}
}

/// Message sent to oneself after a timer exceeds and we're done `eating` and can become `thinking` again.
distributed func stopEating() {
self.log.info("\(self.self.name) is done eating and replaced both forks!")
self.log.info("\(self.name) is done eating and replaced both forks!")
Task {
do {
try await self.leftFork.putBack()
Expand Down Expand Up @@ -128,10 +127,10 @@ distributed actor Philosopher: CustomStringConvertible {

switch fork {
case self.leftFork:
self.log.info("\(self.self.name) received their left fork!")
self.log.info("\(self.name) received their left fork!")
self.state = .takingForks(leftTaken: true, rightTaken: rightForkIsTaken)
case self.rightFork:
self.log.info("\(self.self.name) received their right fork!")
self.log.info("\(self.name) received their right fork!")
self.state = .takingForks(leftTaken: leftForkIsTaken, rightTaken: true)
default:
self.log.error("Received unknown fork! Got: \(fork). Known forks: \(self.leftFork), \(self.rightFork)")
Expand All @@ -144,18 +143,14 @@ distributed actor Philosopher: CustomStringConvertible {

private func becomeEating() {
self.state = .eating
self.log.notice("\(self.self.name) began eating!")
self.timers.startSingle(key: .becomeHungry, delay: .seconds(3)) {
await self.stopEating()
self.log.notice("\(self.name) began eating!")
Task {
try await Task.sleep(until: .now + .seconds(3), clock: .continuous)
self.stopEating()
}
}
}

extension TimerKey {
static let becomeHungry: Self = "become-hungry"
static let finishEating: Self = "finish-eating"
}

extension Philosopher {
private enum State: Equatable {
case thinking
Expand Down
4 changes: 2 additions & 2 deletions Sources/DistributedActors/Cluster/ClusterControl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public struct ClusterControl {
public func waitFor(_ node: UniqueNode, _ status: Cluster.MemberStatus, within: Duration) async throws -> Cluster.Member {
try await self.waitForMembershipEventually(within: within) { membership in
if status == .down || status == .removed {
if let cluster = self.cluster, let tombstone = cluster.getExistingAssociationTombstone(with: node) {
if let cluster = self.cluster, cluster.getExistingAssociationTombstone(with: node) != nil {
return Cluster.Member(node: node, status: .removed).asUnreachable
}
}
Expand Down Expand Up @@ -251,7 +251,7 @@ public struct ClusterControl {
public func waitFor(_ node: UniqueNode, atLeast atLeastStatus: Cluster.MemberStatus, within: Duration) async throws -> Cluster.Member {
try await self.waitForMembershipEventually(within: within) { membership in
if atLeastStatus == .down || atLeastStatus == .removed {
if let cluster = self.cluster, let tombstone = cluster.getExistingAssociationTombstone(with: node) {
if let cluster = self.cluster, cluster.getExistingAssociationTombstone(with: node) != nil {
return Cluster.Member(node: node, status: .removed).asUnreachable
}
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/DistributedActors/Cluster/ClusterShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ extension ClusterShell {
"handshake/retryDelay": "\(retryDelay)",
])
context.timers.startSingle(
key: TimerKey("handshake-timer-\(remoteNode)"),
key: _TimerKey("handshake-timer-\(remoteNode)"),
message: .command(.retryHandshake(initiated)),
delay: retryDelay
)
Expand Down
37 changes: 23 additions & 14 deletions Sources/DistributedActors/Cluster/Downing/DowningStrategy.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public struct DowningStrategyDirective {
internal enum Repr {
case none
case markAsDown(Set<Cluster.Member>)
case startTimer(key: TimerKey, member: Cluster.Member, delay: Duration)
case cancelTimer(key: TimerKey)
case startTimer(member: Cluster.Member, delay: Duration)
case cancelTimer(member: Cluster.Member)
}

internal init(_ underlying: Repr) {
Expand All @@ -47,12 +47,12 @@ public struct DowningStrategyDirective {
.init(.none)
}

public static func startTimer(key: TimerKey, member: Cluster.Member, delay: Duration) -> Self {
.init(.startTimer(key: key, member: member, delay: delay))
public static func startTimer(member: Cluster.Member, delay: Duration) -> Self {
.init(.startTimer(member: member, delay: delay))
}

public static func cancelTimer(key: TimerKey) -> Self {
.init(.cancelTimer(key: key))
public static func cancelTimer(member: Cluster.Member) -> Self {
.init(.cancelTimer(member: member))
}

public static func markAsDown(members: Set<Cluster.Member>) -> Self {
Expand Down Expand Up @@ -86,8 +86,8 @@ internal distributed actor DowningStrategyShell {

/// `Task` for subscribing to cluster events.
private var eventsListeningTask: Task<Void, Error>?

private lazy var timers = ActorTimers<DowningStrategyShell>(self)
/// Timer `Task`s
private var memberTimerTasks: [Cluster.Member: Task<Void, Error>] = [:]

init(_ strategy: DowningStrategy, system: ActorSystem) async {
self.strategy = strategy
Expand All @@ -113,14 +113,23 @@ internal distributed actor DowningStrategyShell {
case .markAsDown(let members):
self.markAsDown(members: members)

case .startTimer(let key, let member, let delay):
self.log.trace("Start timer \(key), member: \(member), delay: \(delay)")
self.timers.startSingle(key: key, delay: delay) {
case .startTimer(let member, let delay):
self.log.trace("Start timer for member: \(member), delay: \(delay)")
self.memberTimerTasks[member] = Task {
defer { self.memberTimerTasks.removeValue(forKey: member) }

guard !Task.isCancelled else {
return
}

try await Task.sleep(until: .now + delay, clock: .continuous)
self.onTimeout(member: member)
}
case .cancelTimer(let key):
self.log.trace("Cancel timer \(key)")
self.timers.cancel(for: key)
case .cancelTimer(let member):
self.log.trace("Cancel timer for member: \(member)")
if let timerTask = self.memberTimerTasks.removeValue(forKey: member) {
timerTask.cancel()
}

case .none:
() // nothing to be done
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public final class TimeoutBasedDowningStrategy: DowningStrategy {
// it was marked as down by someone, we don't need to track it anymore
_ = self._markAsDown.remove(change.member)
_ = self._unreachable.remove(change.member)
return .cancelTimer(key: self.timerKey(change.member))
return .cancelTimer(member: change.member)
} else if let replaced = change.replaced {
_ = self._markAsDown.remove(replaced)
_ = self._unreachable.remove(replaced)
Expand Down Expand Up @@ -95,7 +95,7 @@ public final class TimeoutBasedDowningStrategy: DowningStrategy {

self._unreachable.insert(member)

return .startTimer(key: self.timerKey(member), member: member, delay: self.settings.downUnreachableMembersAfter)
return .startTimer(member: member, delay: self.settings.downUnreachableMembersAfter)
}

func onMemberReachable(_ change: Cluster.ReachabilityChange) -> DowningStrategyDirective {
Expand All @@ -104,16 +104,12 @@ public final class TimeoutBasedDowningStrategy: DowningStrategy {

_ = self._markAsDown.remove(member)
if self._unreachable.remove(member) != nil {
return .cancelTimer(key: self.timerKey(member))
return .cancelTimer(member: member)
}

return .none
}

func timerKey(_ member: Cluster.Member) -> TimerKey {
TimerKey(member.uniqueNode)
}

func onLeaderChange(to leader: Cluster.Member?) throws -> DowningStrategyDirective {
_ = try self.membership.applyLeadershipChange(to: leader)

Expand All @@ -129,7 +125,7 @@ public final class TimeoutBasedDowningStrategy: DowningStrategy {
self._markAsDown.remove(member)

if self._unreachable.remove(member) != nil {
return .cancelTimer(key: self.timerKey(member))
return .cancelTimer(member: member)
}

return .none
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2020 Apple Inc. and the Swift Distributed Actors project authors
// Copyright (c) 2020-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand Down Expand Up @@ -34,6 +34,7 @@ internal class OpLog<Op: OpLogStreamOp> {
self.maxSeqNr = 0
}

@discardableResult
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the warning cleanups while here 👍

func add(_ op: Op) -> SequencedOp {
self.maxSeqNr += 1
let sequencedOp = SequencedOp(sequenceRange: .single(self.maxSeqNr), op: op)
Expand Down
Loading