Skip to content

Commit ee8093f

Browse files
committed
API-evolution proof the DowningStrategyDirective
1 parent 9676283 commit ee8093f

33 files changed

+115
-93
lines changed

Sources/ActorSingletonPlugin/ActorSingletonManager.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ internal class ActorSingletonManager<Message: Codable> {
7171
try context.stop(child: singleton)
7272
}
7373

74-
internal enum Directive: NotActuallyCodableMessage {
74+
internal enum Directive: _NotActuallyCodableMessage {
7575
case takeOver(from: UniqueNode?, replyTo: _ActorRef<_ActorRef<Message>?>)
7676
case handOver(to: UniqueNode?)
7777
case stop

Sources/DistributedActors/ActorMessages.swift

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import struct Foundation.Data
1616
import NIO
1717

1818
/// A `Never` can never be sent as message, even more so over the wire.
19-
extension Never: NotActuallyCodableMessage {}
19+
extension Never: _NotActuallyCodableMessage {}
2020

2121
// ==== ----------------------------------------------------------------------------------------------------------------
2222
// MARK: Common utility messages
@@ -128,7 +128,7 @@ public struct BestEffortStringError: Error, Codable, Equatable, CustomStringConv
128128
}
129129

130130
/// Useful error wrapper which performs an best effort Error serialization as configured by the actor system.
131-
public struct NonTransportableAnyError: Error, NotActuallyCodableMessage {
131+
public struct NonTransportableAnyError: Error, _NotActuallyCodableMessage {
132132
public let failure: Error
133133

134134
public init<Failure: Error>(_ failure: Failure) {
@@ -150,22 +150,22 @@ public struct NonTransportableAnyError: Error, NotActuallyCodableMessage {
150150
/// No serializer is expected to be registered for such types.
151151
///
152152
/// - Warning: Attempting to send such message over the network will fail at runtime (and log an error or warning).
153-
public protocol NotActuallyCodableMessage: Codable {}
153+
public protocol _NotActuallyCodableMessage: Codable {}
154154

155-
extension NotActuallyCodableMessage {
155+
extension _NotActuallyCodableMessage {
156156
public init(from decoder: Swift.Decoder) throws {
157-
fatalError("Attempted to decode NotActuallyCodableMessage message: \(Self.self)! This should never happen.")
157+
fatalError("Attempted to decode _NotActuallyCodableMessage message: \(Self.self)! This should never happen.")
158158
}
159159

160160
public func encode(to encoder: Swift.Encoder) throws {
161-
fatalError("Attempted to encode NotActuallyCodableMessage message: \(Self.self)! This should never happen.")
161+
fatalError("Attempted to encode _NotActuallyCodableMessage message: \(Self.self)! This should never happen.")
162162
}
163163

164164
public init(context: Serialization.Context, from buffer: inout ByteBuffer, using manifest: Serialization.Manifest) throws {
165-
fatalError("Attempted to deserialize NotActuallyCodableMessage message: \(Self.self)! This should never happen.")
165+
fatalError("Attempted to deserialize _NotActuallyCodableMessage message: \(Self.self)! This should never happen.")
166166
}
167167

168168
public func serialize(context: Serialization.Context, to bytes: inout ByteBuffer) throws {
169-
fatalError("Attempted to serialize NotActuallyCodableMessage message: \(Self.self)! This should never happen.")
169+
fatalError("Attempted to serialize _NotActuallyCodableMessage message: \(Self.self)! This should never happen.")
170170
}
171171
}

Sources/DistributedActors/ActorRef+Ask.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ extension AskResponse {
238238
///
239239
// TODO: replace with a special minimal `_ActorRef` that does not require spawning or scheduling.
240240
internal enum AskActor {
241-
enum Event: NotActuallyCodableMessage {
241+
enum Event: _NotActuallyCodableMessage {
242242
case timeout
243243
}
244244

Sources/DistributedActors/Cluster/ClusterShell.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ internal class ClusterShell {
324324
}
325325

326326
// this is basically our API internally for this system
327-
enum CommandMessage: NotActuallyCodableMessage, SilentDeadLetter {
327+
enum CommandMessage: _NotActuallyCodableMessage, SilentDeadLetter {
328328
/// Connect and handshake with remote `Node`, obtaining an `UniqueNode` in the process.
329329
/// Once the handshake is completed, reply to `replyTo` with the handshake result, and also mark the unique node as `.joining`.
330330
///
@@ -343,7 +343,7 @@ internal class ClusterShell {
343343
case cleanUpAssociationTombstones
344344
}
345345

346-
enum QueryMessage: NotActuallyCodableMessage {
346+
enum QueryMessage: _NotActuallyCodableMessage {
347347
case associatedNodes(_ActorRef<Set<UniqueNode>>) // TODO: better type here
348348
case currentMembership(_ActorRef<Cluster.Membership>)
349349
}
@@ -362,7 +362,7 @@ internal class ClusterShell {
362362
}
363363

364364
// TODO: reformulate as Wire.accept / reject?
365-
internal enum HandshakeResult: Equatable, NotActuallyCodableMessage {
365+
internal enum HandshakeResult: Equatable, _NotActuallyCodableMessage {
366366
case success(UniqueNode)
367367
case failure(HandshakeStateMachine.HandshakeConnectionError)
368368
}

Sources/DistributedActors/Cluster/DiscoveryShell.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import Logging
1616
import ServiceDiscovery
1717

1818
final class DiscoveryShell {
19-
enum Message: NotActuallyCodableMessage {
19+
enum Message: _NotActuallyCodableMessage {
2020
case listing(Set<Node>)
2121
case stop(CompletionReason?)
2222
}

Sources/DistributedActors/Cluster/Downing/DowningStrategy.swift

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,44 @@ import Logging
2121
public protocol DowningStrategy {
2222
/// Invoked whenever the cluster emits an event.
2323
///
24-
/// - Parameter event: cluster event that just ocurred
24+
/// - Parameter event: cluster event that just occurred
2525
/// - Returns: directive, instructing the cluster to take some specific action.
2626
/// - Throws: If unable to handle the event for some reason; the failure will be logged and ignored.
2727
func onClusterEvent(event: Cluster.Event) throws -> DowningStrategyDirective
2828

2929
func onTimeout(_ member: Cluster.Member) -> DowningStrategyDirective
3030
}
3131

32-
public enum DowningStrategyDirective {
33-
case none
34-
case markAsDown(Set<Cluster.Member>)
35-
case startTimer(key: TimerKey, member: Cluster.Member, delay: TimeAmount)
36-
case cancelTimer(key: TimerKey)
32+
/// Return to instruct the downing shell how to react.
33+
public struct DowningStrategyDirective {
34+
internal let underlying: Repr
35+
internal enum Repr {
36+
case none
37+
case markAsDown(Set<Cluster.Member>)
38+
case startTimer(key: TimerKey, member: Cluster.Member, delay: TimeAmount)
39+
case cancelTimer(key: TimerKey)
40+
}
41+
42+
internal init(_ underlying: Repr) {
43+
self.underlying = underlying
44+
}
45+
46+
public static var none: Self {
47+
.init(.none)
48+
}
3749

38-
static func markAsDown(_ member: Cluster.Member) -> Self {
39-
Self.markAsDown([member])
50+
public static func startTimer(key: TimerKey, member: Cluster.Member, delay: TimeAmount) -> Self {
51+
.init(.startTimer(key: key, member: member, delay: delay))
52+
}
53+
public static func cancelTimer(key: TimerKey) -> Self {
54+
.init(.cancelTimer(key: key))
55+
}
56+
57+
public static func markAsDown(members: Set<Cluster.Member>) -> Self {
58+
.init(.markAsDown(members))
59+
}
60+
public static func markAsDown(_ member: Cluster.Member) -> Self {
61+
.init(.markAsDown([member]))
4062
}
4163
}
4264

@@ -85,7 +107,7 @@ internal distributed actor DowningStrategyShell {
85107
}
86108

87109
func interpret(directive: DowningStrategyDirective) {
88-
switch directive {
110+
switch directive.underlying {
89111
case .markAsDown(let members):
90112
self.markAsDown(members: members)
91113

Sources/DistributedActors/Cluster/Downing/TimeoutBasedDowningStrategy.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public final class TimeoutBasedDowningStrategy: DowningStrategy {
6060
} else if let replaced = change.replaced {
6161
_ = self._markAsDown.remove(replaced)
6262
_ = self._unreachable.remove(replaced)
63-
return .markAsDown([replaced])
63+
return .markAsDown(members: [replaced])
6464
}
6565

6666
return .none
@@ -82,7 +82,7 @@ public final class TimeoutBasedDowningStrategy: DowningStrategy {
8282
return .none // perhaps we removed it already for other reasons (e.g. node replacement)
8383
}
8484
if self.isLeader {
85-
return .markAsDown([nodeToDown])
85+
return .markAsDown(members: [nodeToDown])
8686
} else {
8787
self._markAsDown.insert(nodeToDown)
8888
return .none
@@ -119,7 +119,7 @@ public final class TimeoutBasedDowningStrategy: DowningStrategy {
119119

120120
if self.isLeader, !self._markAsDown.isEmpty {
121121
defer { self._markAsDown = [] }
122-
return .markAsDown(self._markAsDown)
122+
return .markAsDown(members: self._markAsDown)
123123
} else {
124124
return .none
125125
}

Sources/DistributedActors/Cluster/NodeDeathWatcher.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ enum NodeDeathWatcherShell {
171171
/// Message protocol for interacting with the failure detector.
172172
/// By default, the `FailureDetectorShell` handles these messages by interpreting them with an underlying `FailureDetector`,
173173
/// it would be possible however to allow implementing the raw protocol by user actors if we ever see the need for it.
174-
internal enum Message: NotActuallyCodableMessage {
174+
internal enum Message: _NotActuallyCodableMessage {
175175
case remoteActorWatched(watcher: _AddressableActorRef, remoteNode: UniqueNode)
176176
case remoteDistributedActorWatched(remoteNode: UniqueNode, watcherID: ClusterSystem.ActorID, nodeTerminated: @Sendable (UniqueNode) async -> Void)
177177
case removeWatcher(watcherID: ClusterSystem.ActorID)

Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ import Logging
8585
/// number observations.
8686
///
8787
/// #### Optimization: "Blip" Registration Replication Avoidance
88-
// TODO: This is done "automatically" once we do log compaction
8988
/// We define a "blip registration" as a registration of an actor, which immediately (or very quickly) after registering
9089
/// terminates. It can be argued it is NOT useful to replicate the very existence of such short lived actor to other peers,
9190
/// as even if they'd act on the `register`, it'd be immediately followed by `remove` and/or a termination signal.
@@ -139,15 +138,16 @@ import Logging
139138
/// (Note that we simply always `ack(latest)` and if in the meantime the pusher got more updates, it'll push those to us as well.
140139
///
141140
/// - SeeAlso: [Wikipedia: Atomic broadcast](https://en.wikipedia.org/wiki/Atomic_broadcast)
142-
// TODO: compact the log whenever we know all members of the cluster have seen
143-
// TODO: Optimization: gap collapsing: [+a,+b,+c,-c] -> [+a,+b,gap(until:4)]
144-
// TODO: Optimization: head collapsing: [+a,+b,+c,-b,-a] -> [gap(until:2),+c,-b]
145-
//
146-
// TODO: slow/fast ticks: When we know there's nothing new to share with others, we use the slow tick (which should be increased to 5 seconds or less)
147-
// when we received a register() or observed an "ahead" receptionist, we should schedule a "fast tick" in order to more quickly spread this information
148-
// This should still be done on a delay, e.g. if we are receiving many registrations, we want to get the benefit of batching them up before sending after all
149-
// The fast tick could be 1s or 0.5s for example as a default.
150141
public distributed actor OpLogDistributedReceptionist: DistributedReceptionist, CustomStringConvertible {
142+
// TODO: compact the log whenever we know all members of the cluster have seen
143+
// TODO: Optimization: gap collapsing: [+a,+b,+c,-c] -> [+a,+b,gap(until:4)]
144+
// TODO: Optimization: head collapsing: [+a,+b,+c,-b,-a] -> [gap(until:2),+c,-b]
145+
//
146+
// TODO: slow/fast ticks: When we know there's nothing new to share with others, we use the slow tick (which should be increased to 5 seconds or less)
147+
// when we received a register() or observed an "ahead" receptionist, we should schedule a "fast tick" in order to more quickly spread this information
148+
// This should still be done on a delay, e.g. if we are receiving many registrations, we want to get the benefit of batching them up before sending after all
149+
// The fast tick could be 1s or 0.5s for example as a default.
150+
151151
public typealias ActorSystem = ClusterSystem
152152

153153
// TODO: remove this
@@ -1028,7 +1028,7 @@ extension OpLogDistributedReceptionist {
10281028
}
10291029
}
10301030

1031-
final class PublishLocalListingsTrigger: Receptionist.Message, NotActuallyCodableMessage, CustomStringConvertible {
1031+
final class PublishLocalListingsTrigger: Receptionist.Message, _NotActuallyCodableMessage, CustomStringConvertible {
10321032
override init() {
10331033
super.init()
10341034
}

Sources/DistributedActors/Cluster/Reception/_OperationLogClusterReceptionistBehavior.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -774,7 +774,7 @@ extension _OperationLogClusterReceptionist {
774774
}
775775
}
776776

777-
class PeriodicAckTick: Receptionist.Message, NotActuallyCodableMessage, CustomStringConvertible {
777+
class PeriodicAckTick: Receptionist.Message, _NotActuallyCodableMessage, CustomStringConvertible {
778778
override init() {
779779
super.init()
780780
}
@@ -788,7 +788,7 @@ extension _OperationLogClusterReceptionist {
788788
}
789789
}
790790

791-
class PublishLocalListingsTrigger: Receptionist.Message, NotActuallyCodableMessage, CustomStringConvertible {
791+
class PublishLocalListingsTrigger: Receptionist.Message, _NotActuallyCodableMessage, CustomStringConvertible {
792792
override init() {
793793
super.init()
794794
}

0 commit comments

Comments
 (0)