Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .swiftformat
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
--disable redundantGet
--disable redundantReturn

# enum namespaces does not understand some instances of code where an instance
# IS necessary because of how dynamic member lookup is used; or we even create
# instances manually, since we must have *member methods* for dynamic lookup
# to work.
--disable enumNamespaces

# we want to have fine grained control over extensions by marking each function
# explicitly, rather than it being forced onto the extension entirely.
--extensionacl on-declarations
4 changes: 2 additions & 2 deletions Sources/ActorSingletonPlugin/ActorSingletonProxy.swift
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ internal class ActorSingletonProxy<Message: Codable> {
}

private func updateRef(_ context: _ActorContext<Message>, _ newRef: _ActorRef<Message>?) {
context.log.debug("Updating ref from [\(optional: self.ref)] to [\(optional: newRef)], flushing \(self.buffer.count) messages")
context.log.debug("Updating ref from [\(String(describing: self.ref))] to [\(String(describing: newRef))], flushing \(self.buffer.count) messages")
self.ref = newRef

// Unstash messages if we have the singleton
Expand Down Expand Up @@ -222,7 +222,7 @@ extension ActorSingletonProxy {
"singleton/buffer": "\(self.buffer.count)/\(self.settings.bufferCapacity)",
]

metadata["targetNode"] = "\(optional: self.targetNode?.debugDescription)"
metadata["targetNode"] = "\(String(describing: self.targetNode?.debugDescription))"
if let ref = self.ref {
metadata["ref"] = "\(ref.id)"
}
Expand Down
165 changes: 113 additions & 52 deletions Sources/DistributedActors/ActorID.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,61 @@ import Distributed
/// Convenience alias for ``ClusterSystem/ActorID``.
public typealias ActorID = ClusterSystem.ActorID

extension DistributedActor where ActorSystem == ClusterSystem {
public nonisolated var metadata: ActorMetadata {
self.id.metadata
}
}

extension ClusterSystem.ActorID {
@propertyWrapper
public struct Metadata<Value: Sendable & Codable> {
let keyType: Any.Type
let id: String
// public init(_: Key.Type) {
// // no initial value; it must be set during initialization
// }

public init(_ keyPath: KeyPath<ActorMetadataKeys, ActorMetadataKey<Value>>) {
let key = ActorMetadataKeys()[keyPath: keyPath]
self.id = key.id
self.keyType = type(of: key)
}

public var wrappedValue: Value {
get { fatalError("called wrappedValue getter") }
set { fatalError("called wrappedValue setter") }
}

public var projectedValue: Value {
get { fatalError("called projectedValue getter") }
set { fatalError("called projectedValue setter") }
}

public static subscript<EnclosingSelf: DistributedActor, FinalValue>(
_enclosingInstance myself: EnclosingSelf,
wrapped wrappedKeyPath: ReferenceWritableKeyPath<EnclosingSelf, FinalValue>,
storage storageKeyPath: ReferenceWritableKeyPath<EnclosingSelf, Self>
) -> Value where EnclosingSelf.ActorSystem == ClusterSystem, EnclosingSelf.ID == ClusterSystem.ActorID {
get {
let key = myself[keyPath: storageKeyPath]
guard let value = myself.id.metadata[key.id] else {
fatalError("ActorID Metadata for key \(key.id):\(key.keyType) was not assigned initial value, assign one in the distributed actor's initializer.")
}
return value as! Value
}
set {
let metadata = myself.id.metadata
let key = myself[keyPath: storageKeyPath]
if let value = metadata[key.id] {
fatalError("Attempted to override ActorID Metadata for key \(key.id):\(key.keyType) which already had value: \(value); with new value: \(String(describing: newValue))")
}
metadata[key.id] = newValue
}
}
}
}

extension ClusterSystem {
/// Uniquely identifies a DistributedActor within the cluster.
///
Expand All @@ -40,7 +95,7 @@ extension ClusterSystem {
/// The location of the distributed actor (i.e. it being "remote" or "local") is not taken into account
/// during comparison of IDs, however does matter that it is on the same actual unique node.
///
/// Additional information can be attached to them using ``ActorTags`` however those do not impact
/// Additional information can be attached to them using ``ActorMetadata`` however those do not impact
/// the identity or equality of the ID, or distributed actors identified by those IDs.
///
/// ## Lifecycle
Expand All @@ -64,7 +119,7 @@ extension ClusterSystem {
/// Some tags may be carried to remote peers, while others are intended only for local use,
/// e.g. to inform the actor system to resolve the actor identity using some special treatment etc.
///
/// Please refer to ``ActorTags`` for an in depth discussion about tagging.
/// Please refer to ``ActorMetadata`` for an in depth discussion about tagging.
///
/// ## Serialization
///
Expand Down Expand Up @@ -96,8 +151,10 @@ extension ClusterSystem {
/// Tags MAY be transferred to other peers as the identity is replicated, however they are not necessary to uniquely identify the actor.
/// Tags can carry additional information such as the type of the actor identified by this identity, or any other user defined "roles" or similar tags.
///
/// - SeeAlso: `ActorTags` for a detailed discussion of some frequently used tags.
public var tags: ActorTags
/// - SeeAlso: ``ActorMetadata`` for a detailed discussion of some frequently used tags.
public var metadata: ActorMetadata {
self.context.metadata
}

/// Internal "actor context" which is used as storage for additional cluster actor features, such as watching.
internal var context: DistributedActorContext
Expand All @@ -106,13 +163,13 @@ extension ClusterSystem {
// FIXME(distributed): make optional
public var path: ActorPath {
get {
guard let path = tags[ActorTags.path] else {
fatalError("FIXME: ActorTags.path was not set on \(self)! NOTE THAT PATHS ARE TO BECOME OPTIONAL!!!") // FIXME(distributed): must be removed
guard let path = metadata.path else {
fatalError("FIXME: ActorTags.path was not set on \(self.incarnation)! NOTE THAT PATHS ARE TO BECOME OPTIONAL!!!") // FIXME(distributed): must be removed
}
return path
}
set {
self.tags[ActorTags.path] = newValue
self.metadata[ActorMetadataKeys().path.id] = newValue
}
}

Expand All @@ -135,10 +192,9 @@ extension ClusterSystem {
init(local node: UniqueNode, path: ActorPath?, incarnation: ActorIncarnation) {
self.context = .init(lifecycle: nil)
self._location = .local(node)
self.tags = ActorTags()
self.incarnation = incarnation
if let path {
self.tags[ActorTags.path] = path
self.context.metadata[ActorMetadataKeys().path.id] = path
}
traceLog_DeathWatch("Made ID: \(self)")
}
Expand All @@ -153,9 +209,8 @@ extension ClusterSystem {
self.context = .init(lifecycle: nil)
self._location = .remote(node)
self.incarnation = incarnation
self.tags = ActorTags()
if let path {
self.tags[ActorTags.path] = path
self.context.metadata[ActorMetadataKeys().path.id] = path
}
traceLog_DeathWatch("Made ID: \(self)")
}
Expand All @@ -166,10 +221,8 @@ extension ClusterSystem {
self.context = .init(lifecycle: nil)
self._location = .remote(node)
self.incarnation = incarnation
self.tags = ActorTags()
// TODO: avoid mangling names on every spawn?
if let mangledName = _mangledTypeName(type) {
self.tags[ActorTags.type] = .init(mangledName: mangledName)
if let mangledName = _mangledTypeName(type) { // TODO: avoid mangling names on every spawn?
self.context.metadata.type = .init(mangledName: mangledName)
}
traceLog_DeathWatch("Made ID: \(self)")
}
Expand All @@ -180,12 +233,9 @@ extension ClusterSystem {
{
self.context = context
self._location = .local(node)
self.tags = ActorTags()
self.incarnation = incarnation
self.tags = ActorTags()
// TODO: avoid mangling names on every spawn?
if let mangledName = _mangledTypeName(type) {
self.tags[ActorTags.type] = .init(mangledName: mangledName)
if let mangledName = _mangledTypeName(type) { // TODO: avoid mangling names on every spawn?
self.context.metadata.type = .init(mangledName: mangledName)
}
traceLog_DeathWatch("Made ID: \(self)")
}
Expand All @@ -197,17 +247,27 @@ extension ClusterSystem {
self.context = context
self._location = .remote(node)
self.incarnation = incarnation
self.tags = ActorTags()
// TODO: avoid mangling names on every spawn?
if let mangledName = _mangledTypeName(type) {
self.tags[ActorTags.type] = .init(mangledName: mangledName)
if let mangledName = _mangledTypeName(type) { // TODO: avoid mangling names on every spawn?
self.context.metadata.type = .init(mangledName: mangledName)
}
traceLog_DeathWatch("Made ID: \(self)")
}

internal var withoutContext: Self {
internal var withoutLifecycle: Self {
var copy = self
copy.context = .init(lifecycle: nil)
copy.context = .init(
lifecycle: nil,
metadata: self.metadata
)
return copy
}

public var withoutMetadata: Self {
var copy = self
copy.context = .init(
lifecycle: self.context.lifecycle,
metadata: nil
)
return copy
}
}
Expand Down Expand Up @@ -857,30 +917,31 @@ extension UniqueNodeID {

extension ActorID: Codable {
public func encode(to encoder: Encoder) throws {
let tagSettings = encoder.actorSerializationContext?.system.settings.tags
let encodeCustomTags: (ActorID, inout KeyedEncodingContainer<ActorCoding.TagKeys>) throws -> Void =
tagSettings?.encodeCustomTags ?? ({ _, _ in () })
let metadataSettings = encoder.actorSerializationContext?.system.settings.actorMetadata
let encodeCustomMetadata =
metadataSettings?.encodeCustomMetadata ?? ({ _, _ in () })

var container = encoder.container(keyedBy: ActorCoding.CodingKeys.self)
try container.encode(self.uniqueNode, forKey: ActorCoding.CodingKeys.node)
try container.encode(self.path, forKey: ActorCoding.CodingKeys.path) // TODO: remove as we remove the tree
try container.encode(self.incarnation, forKey: ActorCoding.CodingKeys.incarnation)

if !self.tags.isEmpty {
var tagsContainer = container.nestedContainer(keyedBy: ActorCoding.TagKeys.self, forKey: ActorCoding.CodingKeys.tags)
if !self.metadata.isEmpty {
var metadataContainer = container.nestedContainer(keyedBy: ActorCoding.MetadataKeys.self, forKey: ActorCoding.CodingKeys.metadata)

if (tagSettings == nil || tagSettings!.propagateTags.contains(AnyActorTagKey(ActorTags.path))),
let value = self.tags[ActorTags.path]
let keys = ActorMetadataKeys()
if (metadataSettings == nil || metadataSettings!.propagateMetadata.contains(keys.path.id)),
let value = self.metadata.path
{
try tagsContainer.encode(value, forKey: ActorCoding.TagKeys.path)
try metadataContainer.encode(value, forKey: ActorCoding.MetadataKeys.path)
}
if (tagSettings == nil || tagSettings!.propagateTags.contains(AnyActorTagKey(ActorTags.type))),
let value = self.tags[ActorTags.type]
if (metadataSettings == nil || metadataSettings!.propagateMetadata.contains(keys.type.id)),
let value = self.metadata.type
{
try tagsContainer.encode(value, forKey: ActorCoding.TagKeys.type)
try metadataContainer.encode(value, forKey: ActorCoding.MetadataKeys.type)
}

try encodeCustomTags(self, &tagsContainer)
try encodeCustomMetadata(self.metadata, &metadataContainer)
}
}

Expand All @@ -893,23 +954,23 @@ extension ActorID: Codable {
self.init(remote: node, path: path, incarnation: ActorIncarnation(incarnation))

// Decode any tags:
if let tagsContainer = try? container.nestedContainer(keyedBy: ActorCoding.TagKeys.self, forKey: ActorCoding.CodingKeys.tags) {
if let metadataContainer = try? container.nestedContainer(keyedBy: ActorCoding.MetadataKeys.self, forKey: ActorCoding.CodingKeys.metadata) {
// tags container found, try to decode all known tags:
if let path = try tagsContainer.decodeIfPresent(ActorPath.self, forKey: .path) {
self.tags[ActorTags.path] = path
}

// FIXME: implement decoding tags/metadata in general

if let context = decoder.actorSerializationContext {
let decodeCustomTags = context.system.settings.tags.decodeCustomTags

for tag in try decodeCustomTags(tagsContainer) {
func store<K: ActorTagKey>(_: K.Type) {
if let value = tag.value as? K.Value {
self.tags[K.self] = value
}
}
_openExistential(tag.keyType as any ActorTagKey.Type, do: store) // the `as` here is required, because: inferred result type 'any ActorTagKey.Type' requires explicit coercion due to loss of generic requirements
}
let decodeCustomMetadata = context.system.settings.actorMetadata.decodeCustomMetadata
try decodeCustomMetadata(metadataContainer, self.metadata)

// for (key, value) in try decodeCustomMetadata(metadataContainer) {
// func store(_: K.Type) {
// if let value = tag.value as? K.Value {
// self.metadata[K.self] = value
// }
// }
// _openExistential(key, do: store) // the `as` here is required, because: inferred result type 'any ActorTagKey.Type' requires explicit coercion due to loss of generic requirements
// }
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,39 @@ import NIOSSL
import ServiceDiscovery
import SWIM

/// Configures default actor tagging behavior, as well as handling of tags on actors.
public struct ActorTagSettings {
public static var `default`: ActorTagSettings {
/// Configures default actor id metadta behavior, like which metadata should be propagated cross process and which not.
internal struct ActorIDMetadataSettings {
public static var `default`: ActorIDMetadataSettings {
return .init()
}

public struct TagOnInit {
internal enum _TagOnInit {
/// Configures metadata which should be
public struct AutoIDMetadata {
internal enum _AutoIDMetadata: Hashable {
case typeName
}

internal var underlying: _TagOnInit
internal var underlying: _AutoIDMetadata

/// Tag every actor with an additional human-readable type name
// TODO: expose this eventually
internal static let typeName = Self(underlying: .typeName)
}

// TODO: expose this eventually
internal var tagOnInit: [TagOnInit] = []
/// List of metadata which the system should automatically include in an `ActorID` for types it manages.
internal var autoIncludedMetadata: [AutoIDMetadata] = []

/// What type of tags, known and defined by the cluster system itself, should be automatically propagated.
/// Other types of tags, such as user-defined tags, must be propagated by declaring apropriate functions for `encodeCustomTags` and `decodeCustomTags`.
internal var propagateTags: Set<AnyActorTagKey> = [
.init(ActorTags.path),
.init(ActorTags.type),
/// Other types of tags, such as user-defined tags, must be propagated by declaring apropriate functions for ``encodeCustomMetadata`` and ``decodeCustomMetadata``.
internal var propagateMetadata: Set<String> = [
ActorMetadataKeys().path.id,
ActorMetadataKeys().type.id,
]

// TODO: expose this eventually
internal var encodeCustomTags: (ActorID, inout KeyedEncodingContainer<ActorCoding.TagKeys>) throws -> Void = { _, _ in () }
internal var encodeCustomMetadata: (ActorMetadata, inout KeyedEncodingContainer<ActorCoding.MetadataKeys>) throws -> Void =
{ _, _ in () }

// TODO: expose this eventually
internal var decodeCustomTags: ((KeyedDecodingContainer<ActorCoding.TagKeys>) throws -> [any ActorTag]) = { _ in [] }
internal var decodeCustomMetadata: ((KeyedDecodingContainer<ActorCoding.MetadataKeys>, ActorMetadata) throws -> Void) =
{ _, _ in () }
}
Loading