Skip to content

Commit 9c2c273

Browse files
authored
Codable improvements for ActorID tags (#915)
* [tags] introduce tags to actor identity * remove mailbox props, that won't be a thing anymore * prepare tags for paths * include actor/id in logger * cleanup * [Distributed] Allow carrying tags through serialization via config * reformat * less fragile (dependent on json impl) test * [warning] avoid sendable warning * fix path encoding, we cant remove it yet * formatting * fix latest compiler errors * formatting * disable one test * experimenting * Revert "experimenting" This reverts commit cdb99ea. * debugging * testing * cleanup
1 parent 849f621 commit 9c2c273

File tree

13 files changed

+274
-87
lines changed

13 files changed

+274
-87
lines changed

IntegrationTests/tests_01_cluster/it_Clustered_swim_suspension_reachability/main.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@ guard args.count >= 1 else {
2525
fatalError("no port given")
2626
}
2727

28+
let bindPort = Int(args[0])!
29+
2830
let system = await ClusterSystem("System") { settings in
2931
settings.logging.logLevel = .info
3032

31-
settings.bindPort = Int(args[0])!
33+
settings.bindPort = bindPort
3234

3335
settings.swim.probeInterval = .milliseconds(300)
3436
settings.swim.pingTimeout = .milliseconds(100)

Sources/DistributedActors/ActorAddress.swift

Lines changed: 95 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -124,36 +124,49 @@ extension ClusterSystem {
124124
public let incarnation: ActorIncarnation
125125

126126
/// :nodoc:
127-
public init(local node: UniqueNode, path: ActorPath, incarnation: ActorIncarnation) {
127+
public init(local node: UniqueNode, path: ActorPath?, incarnation: ActorIncarnation) {
128128
self._location = .local(node)
129129
self.tags = ActorTags()
130130
self.incarnation = incarnation
131-
self.tags[ActorTags.path] = path
131+
if let path {
132+
self.tags[ActorTags.path] = path
133+
}
132134
}
133135

134136
/// :nodoc:
135-
public init(remote node: UniqueNode, path: ActorPath, incarnation: ActorIncarnation) {
137+
public init(remote node: UniqueNode, path: ActorPath?, incarnation: ActorIncarnation) {
136138
self._location = .remote(node)
137139
self.incarnation = incarnation
138140
self.tags = ActorTags()
139-
self.tags[ActorTags.path] = path
141+
if let path {
142+
self.tags[ActorTags.path] = path
143+
}
140144
}
141145

142-
// /// :nodoc:
143-
// public init(local node: UniqueNode, path: ActorPath, incarnation: ActorIncarnation) {
144-
// self._location = .local(node)
145-
// self.tags = ActorTags()
146-
// self.incarnation = incarnation
147-
// self.tags[.path] = path
148-
// }
149-
//
150-
// /// :nodoc:
151-
// public init(remote node: UniqueNode, path: ActorPath, incarnation: ActorIncarnation) {
152-
// self._location = .remote(node)
153-
// self.incarnation = incarnation
154-
// self.tags = ActorTags()
155-
// self.tags[.path] = path
156-
// }
146+
/// :nodoc:
147+
public init<Act>(local node: UniqueNode, type: Act.Type, incarnation: ActorIncarnation)
148+
where Act: DistributedActor, Act.ActorSystem == ClusterSystem
149+
{
150+
self._location = .local(node)
151+
self.tags = ActorTags()
152+
self.incarnation = incarnation
153+
self.tags = ActorTags()
154+
// TODO: avoid mangling names on every spawn?
155+
if let mangledName = _mangledTypeName(type) {
156+
self.tags[ActorTags.type] = .init(mangledName: mangledName)
157+
}
158+
}
159+
160+
/// :nodoc:
161+
public init(remote node: UniqueNode, type: (some DistributedActor).Type, incarnation: ActorIncarnation) {
162+
self._location = .remote(node)
163+
self.incarnation = incarnation
164+
self.tags = ActorTags()
165+
// TODO: avoid mangling names on every spawn?
166+
if let mangledName = _mangledTypeName(type) {
167+
self.tags[ActorTags.type] = .init(mangledName: mangledName)
168+
}
169+
}
157170
}
158171
}
159172

@@ -793,6 +806,69 @@ extension UniqueNodeID {
793806
}
794807
}
795808

809+
// ==== ----------------------------------------------------------------------------------------------------------------
810+
// MARK: Codable ActorAddress
811+
812+
extension ActorID: Codable {
813+
public func encode(to encoder: Encoder) throws {
814+
let tagSettings = encoder.actorSerializationContext?.system.settings.tags
815+
let encodeCustomTags: (ActorID, inout KeyedEncodingContainer<ActorCoding.TagKeys>) throws -> Void =
816+
tagSettings?.encodeCustomTags ?? ({ _, _ in () })
817+
818+
var container = encoder.container(keyedBy: ActorCoding.CodingKeys.self)
819+
try container.encode(self.uniqueNode, forKey: ActorCoding.CodingKeys.node)
820+
try container.encode(self.path, forKey: ActorCoding.CodingKeys.path) // TODO: remove as we remove the tree
821+
try container.encode(self.incarnation, forKey: ActorCoding.CodingKeys.incarnation)
822+
823+
if !self.tags.isEmpty {
824+
var tagsContainer = container.nestedContainer(keyedBy: ActorCoding.TagKeys.self, forKey: ActorCoding.CodingKeys.tags)
825+
826+
if (tagSettings == nil || tagSettings!.propagateTags.contains(AnyActorTagKey(ActorTags.path))),
827+
let value = self.tags[ActorTags.path]
828+
{
829+
try tagsContainer.encode(value, forKey: ActorCoding.TagKeys.path)
830+
}
831+
if (tagSettings == nil || tagSettings!.propagateTags.contains(AnyActorTagKey(ActorTags.type))),
832+
let value = self.tags[ActorTags.type]
833+
{
834+
try tagsContainer.encode(value, forKey: ActorCoding.TagKeys.type)
835+
}
836+
837+
try encodeCustomTags(self, &tagsContainer)
838+
}
839+
}
840+
841+
public init(from decoder: Decoder) throws {
842+
let container = try decoder.container(keyedBy: ActorCoding.CodingKeys.self)
843+
let node = try container.decode(UniqueNode.self, forKey: ActorCoding.CodingKeys.node)
844+
let path = try container.decodeIfPresent(ActorPath.self, forKey: ActorCoding.CodingKeys.path)
845+
let incarnation = try container.decode(UInt32.self, forKey: ActorCoding.CodingKeys.incarnation)
846+
847+
self.init(remote: node, path: path, incarnation: ActorIncarnation(incarnation))
848+
849+
// Decode any tags:
850+
if let tagsContainer = try? container.nestedContainer(keyedBy: ActorCoding.TagKeys.self, forKey: ActorCoding.CodingKeys.tags) {
851+
// tags container found, try to decode all known tags:
852+
if let path = try tagsContainer.decodeIfPresent(ActorPath.self, forKey: .path) {
853+
self.tags[ActorTags.path] = path
854+
}
855+
856+
if let context = decoder.actorSerializationContext {
857+
let decodeCustomTags = context.system.settings.tags.decodeCustomTags
858+
859+
for tag in try decodeCustomTags(tagsContainer) {
860+
func store<K: ActorTagKey>(_: K.Type) {
861+
if let value = tag.value as? K.Value {
862+
self.tags[K.self] = value
863+
}
864+
}
865+
_openExistential(tag.keyType as any ActorTagKey.Type, do: store) // the `as` here is required, because: inferred result type 'any ActorTagKey.Type' requires explicit coercion due to loss of generic requirements
866+
}
867+
}
868+
}
869+
}
870+
}
871+
796872
// ==== ----------------------------------------------------------------------------------------------------------------
797873
// MARK: Path errors
798874

Sources/DistributedActors/ActorTagSettings.swift

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,19 @@ public struct ActorTagSettings {
3737
internal static let typeName = Self(underlying: .typeName)
3838
}
3939

40-
public var tagOnInit: [TagOnInit] = []
40+
// TODO: expose this eventually
41+
internal var tagOnInit: [TagOnInit] = []
42+
43+
/// What type of tags, known and defined by the cluster system itself, should be automatically propagated.
44+
/// Other types of tags, such as user-defined tags, must be propagated by declaring apropriate functions for `encodeCustomTags` and `decodeCustomTags`.
45+
internal var propagateTags: Set<AnyActorTagKey> = [
46+
.init(ActorTags.path),
47+
.init(ActorTags.type),
48+
]
49+
50+
// TODO: expose this eventually
51+
internal var encodeCustomTags: (ActorID, inout KeyedEncodingContainer<ActorCoding.TagKeys>) throws -> Void = { _, _ in () }
4152

4253
// TODO: expose this eventually
43-
internal var propagateTags: [(any ActorTagKey).Type] = []
54+
internal var decodeCustomTags: ((KeyedDecodingContainer<ActorCoding.TagKeys>) throws -> [any ActorTag]) = { _ in [] }
4455
}

Sources/DistributedActors/ActorTags.swift

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import Distributed
2020
/// Container of tags a concrete actor identity was tagged with.
2121
public struct ActorTags {
2222
// We still might re-think how we represent the storage.
23-
private var _storage: [String: Sendable & Codable] = [:] // FIXME: fix the key as AnyActorTagKey
23+
internal var _storage: [String: Sendable & Codable] = [:] // FIXME: fix the key as AnyActorTagKey
2424

2525
public init() {
2626
// empty tags
@@ -102,14 +102,36 @@ struct AnyActorTagKey: Hashable {
102102
// MARK: Known keys
103103

104104
extension ActorTags {
105-
static let path = ActorPathTagKey.self
106-
public struct ActorPathTagKey: ActorTagKey {
107-
public static let id: String = "path"
108-
public typealias Value = ActorPath
105+
static let path = ActorPathTag.Key.self
106+
struct ActorPathTag: ActorTag {
107+
struct Key: ActorTagKey {
108+
static let id: String = "path"
109+
typealias Value = ActorPath
110+
}
111+
112+
let value: Key.Value
109113
}
114+
}
110115

111-
public struct ActorPathTag: ActorTag {
112-
public typealias Key = ActorPathTagKey
113-
public let value: Key.Value
116+
// ==== ----------------------------------------------------------------------------------------------------------------
117+
// MARK: Known tag: type
118+
119+
extension ActorTags {
120+
static let type = ActorTypeTag.Key.self
121+
struct ActorTypeTag: ActorTag {
122+
struct Key: ActorTagKey {
123+
static let id: String = "$type"
124+
typealias Value = ActorTypeTagValue
125+
}
126+
127+
let value: Key.Value
128+
}
129+
130+
// FIXME: improve representation to be more efficient
131+
struct ActorTypeTagValue: Codable {
132+
let mangledName: String
133+
var simpleName: String {
134+
_typeByName(self.mangledName).map { "\($0)" } ?? self.mangledName
135+
}
114136
}
115137
}

Sources/DistributedActors/Cluster/ClusterControl.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public struct ClusterControl {
2929
///
3030
/// This sequence begins with a snapshot of the current cluster state and continues with events representing changes
3131
/// since the snapshot.
32-
public let events: EventStream<Cluster.Event>
32+
public let events: EventStream<Cluster.Event> // FIXME: make this an AsyncSequence<Cluster.Event>
3333

3434
/// Offers a snapshot of membership, which may be used to perform ad-hoc tests against the membership.
3535
/// Note that this view may be immediately outdated after checking if, if e.g. a membership change is just being processed.

Sources/DistributedActors/ClusterSystemSettings.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public struct ClusterSystemSettings {
3636

3737
public var actor: ActorSettings = .default
3838

39+
public var tags: ActorTagSettings = .default
40+
3941
public var plugins: _PluginsSettings = .default
4042

4143
public var receptionist: ReceptionistSettings = .default

Sources/DistributedActors/Serialization/ActorRef+Serialization.swift

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,44 @@ public enum ActorCoding {
2424
public enum CodingKeys: CodingKey {
2525
case node
2626
case path
27+
case type
28+
case tags
2729
case incarnation
2830
}
31+
32+
public enum TagKeys: CodingKey {
33+
case path
34+
case type
35+
case custom(String)
36+
37+
public init?(stringValue: String) {
38+
switch stringValue {
39+
case "path": self = .path
40+
case "type": self = .type
41+
default: self = .custom(stringValue)
42+
}
43+
}
44+
45+
public var intValue: Int? {
46+
switch self {
47+
case .path: return 0
48+
case .type: return 1
49+
case .custom: return 2
50+
}
51+
}
52+
53+
public init?(intValue: Int) {
54+
return nil
55+
}
56+
57+
public var stringValue: String {
58+
switch self {
59+
case .path: return "path"
60+
case .type: return "type"
61+
case .custom(let id): return id
62+
}
63+
}
64+
}
2965
}
3066

3167
extension _ActorRef {
@@ -108,27 +144,6 @@ internal enum ReceivesSystemMessagesDecoder {
108144
}
109145
}
110146

111-
// ==== ----------------------------------------------------------------------------------------------------------------
112-
// MARK: Codable ActorID
113-
114-
extension ActorID: Codable {
115-
public func encode(to encoder: Encoder) throws {
116-
var container = encoder.container(keyedBy: ActorCoding.CodingKeys.self)
117-
try container.encode(self.uniqueNode, forKey: ActorCoding.CodingKeys.node)
118-
try container.encode(self.path, forKey: ActorCoding.CodingKeys.path)
119-
try container.encode(self.incarnation, forKey: ActorCoding.CodingKeys.incarnation)
120-
}
121-
122-
public init(from decoder: Decoder) throws {
123-
let container = try decoder.container(keyedBy: ActorCoding.CodingKeys.self)
124-
let node = try container.decode(UniqueNode.self, forKey: ActorCoding.CodingKeys.node)
125-
let path = try container.decode(ActorPath.self, forKey: ActorCoding.CodingKeys.path)
126-
let incarnation = try container.decode(UInt32.self, forKey: ActorCoding.CodingKeys.incarnation)
127-
128-
self.init(remote: node, path: path, incarnation: ActorIncarnation(incarnation))
129-
}
130-
}
131-
132147
// ==== ----------------------------------------------------------------------------------------------------------------
133148
// MARK: Codable ActorPath
134149

Sources/DistributedActors/Serialization/Serialization.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import Foundation // for Codable
3939
public class Serialization {
4040
private let log: Logger
4141
internal let settings: Serialization.Settings
42+
internal let tagSettings: ActorTagSettings
4243

4344
/// Allocator used by the serialization infrastructure.
4445
/// Public only for access by other serialization work performed e.g. by other transports.
@@ -147,6 +148,7 @@ public class Serialization {
147148
settings.register(VersionVector.self, serializerID: ._ProtobufRepresentable)
148149

149150
self.settings = settings
151+
self.tagSettings = system.settings.tags
150152
self.metrics = system.metrics
151153

152154
self.allocator = self.settings.allocator

0 commit comments

Comments
 (0)