Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@
<title>List: Messages (Received)</title>
<table-ref>messages-received</table-ref>

<column>timestamp</column>
<column>actor-sender</column>
<column>actor-recipient</column>
<column>actor-message</column>
Expand All @@ -397,7 +398,6 @@
<purpose>Points in time where actor messages are told (sent)</purpose>
<icon>Network</icon>


<create-table>
<id>messages-told</id>
<schema-ref>actor-message-told</schema-ref>
Expand All @@ -421,6 +421,7 @@
<title>List: Messages (Told)</title>
<table-ref>messages-told</table-ref>

<column>timestamp</column>
<column>actor-sender</column>
<column>actor-recipient</column>
<column>actor-message</column>
Expand Down Expand Up @@ -460,6 +461,7 @@
<title>List: Messages (Asked)</title>
<table-ref>messages-asked</table-ref>

<column>start</column>
<column>duration</column>
<column>actor-sender</column>
<column>actor-recipient</column>
Expand Down Expand Up @@ -528,6 +530,7 @@
<list>
<title>Lifetimes</title>
<table-ref>actor-lifecycle-intervals</table-ref>
<column>start</column>
<column>actor-address</column>
<column>duration</column>
</list>
Expand Down
10 changes: 10 additions & 0 deletions Sources/DistributedActors/ActorAddress.swift
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ public struct ActorAddress: Equatable, Hashable {
self.path = path
self.incarnation = incarnation
}

internal func fillNodeWhenEmpty(_ node: UniqueNode) -> ActorAddress {
guard self.node == nil else {
return self
}

var withAddress = self
withAddress.node = node
return withAddress
}
}

extension ActorAddress: CustomStringConvertible, CustomDebugStringConvertible {
Expand Down
4 changes: 4 additions & 0 deletions Sources/DistributedActors/ActorContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ public class ActorContext<Message>: ActorRefFactory {
return undefined()
}

internal func unwatch(_ watchee: AddressableActorRef, file: String = #file, line: UInt = #line) {
return undefined()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are these methods actually implemented?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ActorShell which extends ActorContext, it's our way of hiding some of the API.
So the way we pass a context to anyone is basically just pass self from the shell, and expose it only as the Context

Perhaps there's a better way but so far we have this way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're implemented on ActorShell which is "the actual actor impl" (in a way).

So ActorShell extends ActorContext and when we pass context around we actually pass self and it happens to have all the right lifecycle properties -- context is valid only as long the actual actor is etc.

}

// ==== ------------------------------------------------------------------------------------------------------------
// MARK: Child actor management

Expand Down
13 changes: 9 additions & 4 deletions Sources/DistributedActors/ActorShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ public final class ActorShell<Message>: ActorContext<Message>, AbstractActor {

super.init()

self.instrumentation = system.settings.instrumentation.makeActorInstrumentation(self, address)
let addr = address.fillNodeWhenEmpty(system.settings.cluster.uniqueBindNode)
self.instrumentation = system.settings.instrumentation.makeActorInstrumentation(self, addr)
self.instrumentation.actorSpawned()
system.metrics.recordActorStart(self)
}
Expand Down Expand Up @@ -666,19 +667,23 @@ public final class ActorShell<Message>: ActorContext<Message>, AbstractActor {
// MARK: Death Watch API

public override func watch<M>(_ watchee: ActorRef<M>, with terminationMessage: Message? = nil, file: String = #file, line: UInt = #line) -> ActorRef<M> {
self.deathWatch.watch(watchee: watchee.asAddressable(), with: terminationMessage, myself: self.myself, parent: self._parent, file: file, line: line)
self.watch(watchee.asAddressable(), with: terminationMessage, file: file, line: line)
return watchee
}

internal override func watch(_ watchee: AddressableActorRef, with terminationMessage: Message? = nil, file: String = #file, line: UInt = #line) {
self.deathWatch.watch(watchee: watchee, with: nil, myself: self.myself, parent: self._parent, file: file, line: line)
self.deathWatch.watch(watchee: watchee, with: terminationMessage, myself: self.myself, parent: self._parent, file: file, line: line)
}

public override func unwatch<M>(_ watchee: ActorRef<M>, file: String = #file, line: UInt = #line) -> ActorRef<M> {
self.deathWatch.unwatch(watchee: watchee.asAddressable(), myself: self.myself, file: file, line: line)
self.unwatch(watchee.asAddressable(), file: file, line: line)
return watchee
}

internal override func unwatch(_ watchee: AddressableActorRef, file: String = #file, line: UInt = #line) {
self.deathWatch.unwatch(watchee: watchee, myself: self.myself, file: file, line: line)
}

// ==== ------------------------------------------------------------------------------------------------------------
// MARK: Sub Receive

Expand Down
6 changes: 5 additions & 1 deletion Sources/DistributedActors/ActorSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,10 @@ public final class ActorSystem {
// serialization
self.serialization = Serialization(settings: settings, system: self)

let receptionistBehavior = self.settings.cluster.enabled ? ClusterReceptionist.behavior(syncInterval: settings.cluster.receptionistSyncInterval) : LocalReceptionist.behavior
// receptionist
let receptionistBehavior = self.settings.cluster.enabled ?
self.settings.cluster.receptionist.implementation.behavior(settings: self.settings.cluster.receptionist) :
LocalReceptionist.behavior
let lazyReceptionist = try! self._prepareSystemActor(Receptionist.naming, receptionistBehavior, props: ._wellKnown)
self._receptionist = lazyReceptionist.ref

Expand Down Expand Up @@ -270,6 +273,7 @@ public final class ActorSystem {
if settings.cluster.enabled {
self.log.info("Actor System Settings in effect: Cluster.autoLeaderElection: \(self.settings.cluster.autoLeaderElection)")
self.log.info("Actor System Settings in effect: Cluster.downingStrategy: \(self.settings.cluster.downingStrategy)")
self.log.info("Actor System Settings in effect: Cluster.onDownAction: \(self.settings.cluster.onDownAction)")
}
}

Expand Down
11 changes: 6 additions & 5 deletions Sources/DistributedActors/CRDT/CRDT+Replication.swift
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@ extension CRDT.Replicator.RemoteCommand.WriteError: Equatable {

extension CRDT.Replicator {
public struct Settings {
public static var `default`: Settings {
.init()
}

public var gossipInterval: TimeAmount = .seconds(2)

/// When enabled traces _all_ replicator messages.
/// All logs will be prefixed using `[tracelog:replicator]`, for easier grepping and inspecting only logs related to the replicator.
// TODO: how to make this nicely dynamically changeable during runtime
Expand All @@ -238,10 +244,5 @@ extension CRDT.Replicator {
#else
var traceLogLevel: Logger.Level?
#endif

// TODO: gossip settings
public static var `default`: Settings {
.init()
}
}
}
9 changes: 3 additions & 6 deletions Sources/DistributedActors/CRDT/CRDTReplicatorShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,9 @@ extension CRDT.Replicator {
var behavior: Behavior<Message> {
.setup { context in

if context.system.settings.cluster.enabled {
// Not getting replicators listing through receptionist to prevent potential circular dependency
context.system.cluster.events.subscribe(context.subReceive(Cluster.Event.self) { event in
self.receiveClusterEvent(context, event: event)
})
}
context.system.cluster.events.subscribe(context.subReceive(Cluster.Event.self) { event in
self.receiveClusterEvent(context, event: event)
})

return .receive { context, message in
switch message {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ extension VersionVector: ProtobufRepresentable {
throw SerializationError.missingField("replicaID", type: String(describing: ReplicaVersion.self))
}
let replicaId = try ReplicaId(fromProto: replicaVersion.replicaID, context: context)
state[replicaId] = Int(replicaVersion.version) // TODO: safety?
state[replicaId] = replicaVersion.version
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As side effect of this work I needed / wanted UInt here so this also addressed the TODOs we had laying around

}
}
}
Expand All @@ -97,6 +97,6 @@ extension VersionDot: ProtobufRepresentable {
throw SerializationError.missingField("replicaID", type: String(describing: VersionDot.self))
}
self.replicaId = try ReplicaId(fromProto: proto.replicaID, context: context)
self.version = Int(proto.version) // TODO: safety?
self.version = proto.version
}
}
27 changes: 19 additions & 8 deletions Sources/DistributedActors/Clocks/VersionVector.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@
public struct VersionVector {
// TODO: should we disallow mixing ReplicaId types somehow?

public typealias ReplicaVersion = (replicaId: ReplicaId, version: Int) // TODO: struct? // TODO: UInt?
public typealias Version = UInt64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be UInt instead of UInt64?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm kind of thinking we should be explicit with those... UInt is platform specific right? So what if we'd send between 64 and 32 nodes (e.g. some IoT thingy); On the wire they all are 64bit already in protobuf so thought here they should to.

What's the right thing to do here you think @drexin ?

public typealias ReplicaVersion = (replicaId: ReplicaId, version: Version) // TODO: struct?

// Internal state is a dictionary of replicas and their corresponding version
internal var state: [ReplicaId: Int] = [:]
internal var state: [ReplicaId: Version] = [:]

public static let empty: VersionVector = .init()

Expand All @@ -55,6 +56,10 @@ public struct VersionVector {
self.init([replicaVersion])
}

public init(_ version: Version, at replicaId: ReplicaId) {
self.init([(replicaId, version)])
}

public init(_ replicaVersions: [ReplicaVersion]) {
for rv in replicaVersions {
precondition(rv.version > 0, "Version must be greater than 0")
Expand All @@ -75,7 +80,7 @@ public struct VersionVector {
/// - Parameter replicaId: The replica whose version is to be incremented.
/// - Returns: The replica's version after the increment.
@discardableResult
public mutating func increment(at replicaId: ReplicaId) -> Int {
public mutating func increment(at replicaId: ReplicaId) -> Version {
if let current = self.state[replicaId] {
let nextVersion = current + 1
self.state[replicaId] = nextVersion
Expand All @@ -102,16 +107,21 @@ public struct VersionVector {
///
/// - Parameter replicaId: The replica whose version is being queried.
/// - Returns: The replica's version or 0 if replica is unknown.
public subscript(replicaId: ReplicaId) -> Int {
return self.state[replicaId] ?? 0
public subscript(replicaId: ReplicaId) -> Version {
self.state[replicaId] ?? 0
}

/// Lists all replica ids that this version vector contains.
public var replicaIds: Dictionary<ReplicaId, Version>.Keys {
self.state.keys
}

/// Determine if this `VersionVector` contains a specific version at the given replica.
///
/// - Parameter replicaId: The replica of interest
/// - Parameter version: The version of interest
/// - Returns: True if the replica's version in the `VersionVector` is greater than or equal to `version`. False otherwise.
public func contains(_ replicaId: ReplicaId, _ version: Int) -> Bool {
public func contains(_ replicaId: ReplicaId, _ version: Version) -> Bool {
self[replicaId] >= version
}

Expand Down Expand Up @@ -192,9 +202,10 @@ extension VersionVector: Codable {
/// to be `Hashable` we have to define a type.
public struct VersionDot {
public let replicaId: ReplicaId
public let version: Int // TODO: UInt?
public let version: Version
public typealias Version = UInt64

init(_ replicaId: ReplicaId, _ version: Int) {
init(_ replicaId: ReplicaId, _ version: Version) {
self.replicaId = replicaId
self.version = version
}
Expand Down
4 changes: 4 additions & 0 deletions Sources/DistributedActors/Cluster/Cluster+Membership.swift
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ extension Cluster {
public func isLeader(_ member: Cluster.Member) -> Bool {
self.isLeader(member.node)
}

func contains(_ uniqueNode: UniqueNode) -> Bool {
self._members[uniqueNode] != nil
}
}
}

Expand Down
Loading