Skip to content

Commit c6c137f

Browse files
committed
=cluster,membership compressed gossip messages by node ID in seen tables
1 parent 4a2c1aa commit c6c137f

28 files changed

+498
-99
lines changed

Protos/Clocks/VersionVector.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ message VersionReplicaID {
2727
oneof value {
2828
ActorAddress actorAddress = 1;
2929
UniqueNode uniqueNode = 2;
30+
uint32 uniqueNodeID = 3;
3031
}
3132
}
3233

Protos/Cluster/Membership.proto

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ option optimize_for = SPEED;
1919
option swift_prefix = "Proto";
2020

2121
import "ActorAddress.proto";
22+
import "Clocks/VersionVector.proto";
2223

2324
message ClusterMembership {
2425
repeated ClusterMember members = 1;
@@ -47,8 +48,24 @@ enum ClusterMemberStatus {
4748
CLUSTER_MEMBER_STATUS_REMOVED = 5;
4849
}
4950

51+
// ==== Membership Gossip ----------------------------------------------------------------------------------------------
52+
5053
message ClusterMembershipGossip {
51-
// origin of the gossip
52-
UniqueNode from = 1;
53-
repeated ClusterMember members = 2; // TODO: Something else, "membership diff"?
54+
// Membership contains full UniqueNode renderings, and the owner and seen table refer to them by UniqueNode.ID
55+
// this saves us space (by avoiding to render the unique node explicitly many times for each member/seen-entry).
56+
ClusterMembership membership = 1;
57+
58+
// The following fields will use compressed UniqueNode encoding and ONLY serialize them as their uniqueNodeID.
59+
// During deserialization the fields can be resolved against the membership to obtain full UniqueNode values if necessary.
60+
uint32 ownerUniqueNodeID = 2;
61+
ClusterMembershipSeenTable seenTable = 3;
5462
}
63+
64+
message ClusterMembershipSeenTable {
65+
repeated ClusterMembershipSeenTableRow rows = 1;
66+
}
67+
68+
message ClusterMembershipSeenTableRow {
69+
uint32 uniqueNodeID = 1;
70+
VersionVector version = 2;
71+
}

Sources/DistributedActors/ActorAddress.swift

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -612,20 +612,22 @@ extension Node: Comparable {
612612
/// Once the remote node accepts our handshake, it offers the other node its unique address.
613613
/// Only once this address has been obtained can a node communicate with actors located on the remote node.
614614
public struct UniqueNode: Hashable {
615+
public typealias ID = UniqueNodeID
616+
615617
public var node: Node
616-
public let nid: NodeID
618+
public let nid: UniqueNodeID
617619

618-
public init(node: Node, nid: NodeID) {
620+
public init(node: Node, nid: UniqueNodeID) {
619621
precondition(node.port > 0, "port MUST be > 0")
620622
self.node = node
621623
self.nid = nid
622624
}
623625

624-
public init(protocol: String, systemName: String, host: String, port: Int, nid: NodeID) {
626+
public init(protocol: String, systemName: String, host: String, port: Int, nid: UniqueNodeID) {
625627
self.init(node: Node(protocol: `protocol`, systemName: systemName, host: host, port: port), nid: nid)
626628
}
627629

628-
public init(systemName: String, host: String, port: Int, nid: NodeID) {
630+
public init(systemName: String, host: String, port: Int, nid: UniqueNodeID) {
629631
self.init(protocol: "sact", systemName: systemName, host: host, port: port, nid: nid)
630632
}
631633

@@ -676,29 +678,29 @@ extension UniqueNode: Comparable {
676678
}
677679
}
678680

679-
public struct NodeID: Hashable {
681+
public struct UniqueNodeID: Hashable {
680682
let value: UInt32 // TODO: redesign / reconsider exact size
681683

682684
public init(_ value: UInt32) {
683685
self.value = value
684686
}
685687
}
686688

687-
extension NodeID: Comparable {
688-
public static func < (lhs: NodeID, rhs: NodeID) -> Bool {
689+
extension UniqueNodeID: Comparable {
690+
public static func < (lhs: UniqueNodeID, rhs: UniqueNodeID) -> Bool {
689691
lhs.value < rhs.value
690692
}
691693
}
692694

693-
extension NodeID: CustomStringConvertible {
695+
extension UniqueNodeID: CustomStringConvertible {
694696
public var description: String {
695697
"\(self.value)"
696698
}
697699
}
698700

699-
public extension NodeID {
700-
static func random() -> NodeID {
701-
NodeID(UInt32.random(in: 1 ... .max))
701+
public extension UniqueNodeID {
702+
static func random() -> UniqueNodeID {
703+
UniqueNodeID(UInt32.random(in: 1 ... .max))
702704
}
703705
}
704706

Sources/DistributedActors/Clocks/Protobuf/VersionVector+Serialization.swift

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ extension ReplicaID: ProtobufRepresentable {
2525
proto.actorAddress = try actorAddress.toProto(context: context)
2626
case .uniqueNode(let node):
2727
proto.uniqueNode = try node.toProto(context: context)
28+
case .uniqueNodeID(let nid):
29+
proto.uniqueNodeID = nid.value
2830
}
2931
return proto
3032
}
@@ -41,6 +43,8 @@ extension ReplicaID: ProtobufRepresentable {
4143
case .uniqueNode(let protoNode):
4244
let node = try UniqueNode(fromProto: protoNode, context: context)
4345
self = .uniqueNode(node)
46+
case .uniqueNodeID(let nid):
47+
self = .uniqueNodeID(nid)
4448
}
4549
}
4650
}
@@ -65,6 +69,29 @@ extension VersionVector: ProtobufRepresentable {
6569
return proto
6670
}
6771

72+
/// Serialize using uniqueNodeID specifically (or crash);
73+
/// Used in situations where an enclosing message already has the unique nodes serialized and we can save space by avoiding to serialize them again.
74+
public func toCompactReplicaNodeIDProto(context: Serialization.Context) throws -> ProtoVersionVector {
75+
var proto = ProtoVersionVector()
76+
77+
let replicaVersions: [ProtoReplicaVersion] = try self.state.map { replicaID, version in
78+
var replicaVersion = ProtoReplicaVersion()
79+
switch replicaID.storage {
80+
case .uniqueNode(let node):
81+
replicaVersion.replicaID.uniqueNodeID = node.nid.value
82+
case .uniqueNodeID(let nid):
83+
replicaVersion.replicaID.uniqueNodeID = nid.value
84+
case .actorAddress:
85+
throw SerializationError.unableToSerialize(hint: "Can't serialize using actor address as replica id! Was: \(replicaID)")
86+
}
87+
replicaVersion.version = UInt64(version)
88+
return replicaVersion
89+
}
90+
proto.state = replicaVersions
91+
92+
return proto
93+
}
94+
6895
public init(fromProto proto: ProtoVersionVector, context: Serialization.Context) throws {
6996
// `state` defaults to [:]
7097
self.state.reserveCapacity(proto.state.count)

Sources/DistributedActors/Clocks/Protobuf/VersionVector.pb.swift

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,17 +59,27 @@ public struct ProtoVersionReplicaID {
5959
set {_uniqueStorage()._value = .uniqueNode(newValue)}
6060
}
6161

62+
public var uniqueNodeID: UInt32 {
63+
get {
64+
if case .uniqueNodeID(let v)? = _storage._value {return v}
65+
return 0
66+
}
67+
set {_uniqueStorage()._value = .uniqueNodeID(newValue)}
68+
}
69+
6270
public var unknownFields = SwiftProtobuf.UnknownStorage()
6371

6472
public enum OneOf_Value: Equatable {
6573
case actorAddress(ProtoActorAddress)
6674
case uniqueNode(ProtoUniqueNode)
75+
case uniqueNodeID(UInt32)
6776

6877
#if !swift(>=4.1)
6978
public static func ==(lhs: ProtoVersionReplicaID.OneOf_Value, rhs: ProtoVersionReplicaID.OneOf_Value) -> Bool {
7079
switch (lhs, rhs) {
7180
case (.actorAddress(let l), .actorAddress(let r)): return l == r
7281
case (.uniqueNode(let l), .uniqueNode(let r)): return l == r
82+
case (.uniqueNodeID(let l), .uniqueNodeID(let r)): return l == r
7383
default: return false
7484
}
7585
}
@@ -190,6 +200,7 @@ extension ProtoVersionReplicaID: SwiftProtobuf.Message, SwiftProtobuf._MessageIm
190200
public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [
191201
1: .same(proto: "actorAddress"),
192202
2: .same(proto: "uniqueNode"),
203+
3: .same(proto: "uniqueNodeID"),
193204
]
194205

195206
fileprivate class _StorageClass {
@@ -232,6 +243,11 @@ extension ProtoVersionReplicaID: SwiftProtobuf.Message, SwiftProtobuf._MessageIm
232243
}
233244
try decoder.decodeSingularMessageField(value: &v)
234245
if let v = v {_storage._value = .uniqueNode(v)}
246+
case 3:
247+
if _storage._value != nil {try decoder.handleConflictingOneOf()}
248+
var v: UInt32?
249+
try decoder.decodeSingularUInt32Field(value: &v)
250+
if let v = v {_storage._value = .uniqueNodeID(v)}
235251
default: break
236252
}
237253
}
@@ -245,6 +261,8 @@ extension ProtoVersionReplicaID: SwiftProtobuf.Message, SwiftProtobuf._MessageIm
245261
try visitor.visitSingularMessageField(value: v, fieldNumber: 1)
246262
case .uniqueNode(let v)?:
247263
try visitor.visitSingularMessageField(value: v, fieldNumber: 2)
264+
case .uniqueNodeID(let v)?:
265+
try visitor.visitSingularUInt32Field(value: v, fieldNumber: 3)
248266
case nil: break
249267
}
250268
}

Sources/DistributedActors/Clocks/VersionVector.swift

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ public struct ReplicaID: Hashable {
241241
internal enum Storage: Hashable {
242242
case actorAddress(ActorAddress)
243243
case uniqueNode(UniqueNode)
244+
case uniqueNodeID(UniqueNode.ID)
244245
}
245246

246247
internal let storage: Storage
@@ -262,12 +263,24 @@ public struct ReplicaID: Hashable {
262263
.init(.uniqueNode(uniqueNode))
263264
}
264265

266+
public static func uniqueNodeID(_ uniqueNode: UniqueNode) -> ReplicaID {
267+
.init(.uniqueNodeID(uniqueNode.nid))
268+
}
269+
270+
internal static func uniqueNodeID(_ uniqueNodeID: UInt32) -> ReplicaID {
271+
.init(.uniqueNodeID(.init(uniqueNodeID)))
272+
}
273+
265274
func ensuringNode(_ node: UniqueNode) -> ReplicaID {
266275
switch self.storage {
267-
case .uniqueNode:
268-
return self
269276
case .actorAddress(let address):
270277
return .actorAddress(address.ensuringNode(node))
278+
case .uniqueNode(let existingNode):
279+
assert(existingNode.nid == node.nid, "Attempted to ensureNode with non-matching node identifier, was: \(existingNode)], attempted: \(node)")
280+
return self
281+
case .uniqueNodeID(let nid): // drops the nid
282+
assert(nid == node.nid, "Attempted to ensureNode with non-matching node identifier, was: \(nid)], attempted: \(node)")
283+
return .uniqueNode(node)
271284
}
272285
}
273286
}
@@ -279,6 +292,8 @@ extension ReplicaID: CustomStringConvertible {
279292
return "actor:\(address)"
280293
case .uniqueNode(let node):
281294
return "uniqueNode:\(node)"
295+
case .uniqueNodeID(let nid):
296+
return "uniqueNodeID:\(nid)"
282297
}
283298
}
284299
}
@@ -290,43 +305,56 @@ extension ReplicaID: Comparable {
290305
return l < r
291306
case (.uniqueNode(let l), .uniqueNode(let r)):
292307
return l < r
293-
case (.uniqueNode, _), (.actorAddress, _):
294-
return false // TODO: should we even disallow comparing them?
308+
case (.uniqueNodeID(let l), .uniqueNodeID(let r)):
309+
return l < r
310+
case (.uniqueNode, _), (.uniqueNodeID, _), (.actorAddress, _):
311+
return false
295312
}
296313
}
297314

298315
public static func == (lhs: ReplicaID, rhs: ReplicaID) -> Bool {
299316
switch (lhs.storage, rhs.storage) {
300317
case (.actorAddress(let l), .actorAddress(let r)):
301318
return l == r
319+
302320
case (.uniqueNode(let l), .uniqueNode(let r)):
303321
return l == r
304-
case (.uniqueNode, _), (.actorAddress, _):
305-
return false // TODO: should we even disallow comparing them?
322+
323+
case (.uniqueNodeID(let l), .uniqueNodeID(let r)):
324+
return l == r
325+
case (.uniqueNode(let l), .uniqueNodeID(let r)):
326+
return l.nid == r
327+
case (.uniqueNodeID(let l), .uniqueNode(let r)):
328+
return l == r.nid
329+
330+
case (.uniqueNode, _), (.uniqueNodeID, _), (.actorAddress, _):
331+
return false
306332
}
307333
}
308334
}
309335

310336
extension ReplicaID: Codable {
311337
public enum DiscriminatorKeys: String, Codable {
312-
case actorAddress
313-
case uniqueNode
338+
case actorAddress = "a"
339+
case uniqueNode = "N"
340+
case uniqueNodeID = "n"
314341
}
315342

316343
public enum CodingKeys: CodingKey {
317344
case _case
318345

319-
case actorAddress_value
320-
case uniqueNode_value
346+
case value
321347
}
322348

323349
public init(from decoder: Decoder) throws {
324350
let container = try decoder.container(keyedBy: CodingKeys.self)
325351
switch try container.decode(DiscriminatorKeys.self, forKey: ._case) {
326352
case .actorAddress:
327-
self = try .actorAddress(container.decode(ActorAddress.self, forKey: .actorAddress_value))
353+
self = try .actorAddress(container.decode(ActorAddress.self, forKey: .value))
328354
case .uniqueNode:
329-
self = try .uniqueNode(container.decode(UniqueNode.self, forKey: .uniqueNode_value))
355+
self = try .uniqueNode(container.decode(UniqueNode.self, forKey: .value))
356+
case .uniqueNodeID:
357+
self = try .uniqueNodeID(container.decode(UInt32.self, forKey: .value))
330358
}
331359
}
332360

@@ -335,10 +363,13 @@ extension ReplicaID: Codable {
335363
switch self.storage {
336364
case .actorAddress(let address):
337365
try container.encode(DiscriminatorKeys.actorAddress, forKey: ._case)
338-
try container.encode(address, forKey: .actorAddress_value)
366+
try container.encode(address, forKey: .value)
339367
case .uniqueNode(let node):
340368
try container.encode(DiscriminatorKeys.uniqueNode, forKey: ._case)
341-
try container.encode(node, forKey: .uniqueNode_value)
369+
try container.encode(node, forKey: .value)
370+
case .uniqueNodeID(let nid):
371+
try container.encode(DiscriminatorKeys.uniqueNodeID, forKey: ._case)
372+
try container.encode(nid.value, forKey: .value)
342373
}
343374
}
344375
}

0 commit comments

Comments
 (0)