Skip to content

Commit 811d2e8

Browse files
ktosoyim-lee
andauthored
[docc] Soundness, docs additions and minor test cleanups for beta.2
* fix docc warnings * [docc] bring back @comment { fishy docs } enabling * [docc] validate we don't have any warnings in docc, e.g. bad links * fix validate_docc script * Showcase: Incorrect number of arguments passed to called function! musttail call swifttailcc void %4(%swift.context* swiftasync %1, %swift.error* %2), !dbg !1538 in function $s17DistributedActors10WorkerPoolCyxGAA14LifecycleWatchA2aEP10terminated5actoryAA13ClusterSystemC7ActorIDV_tYaFTW.0.61 <unknown>:0: error: fatal error encountered during compilation; please submit a bug report (https://swift.org/contributing/#reporting-bugs) and include the project <unknown>:0: note: Broken function found, compilation aborted! * fix conformance crasher * remove superflous prints and logs * =receptionist rdar://97586760 lookup() trying to cast stub actor * work on WeakActorDictionary * Update Sources/DistributedActors/DistributedActors.docc/Receptionist.md Co-authored-by: Yim Lee <[email protected]> * +test reenable testkit tests * !system cleanup shutdown() and terminated methods on actor system * formatting * rename Docs directory, add Security page * test fixes * +docs receptionist checkout docs * +docs some docs for leadership Co-authored-by: Yim Lee <[email protected]>
1 parent ec23c41 commit 811d2e8

File tree

52 files changed

+574
-383
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+574
-383
lines changed

InternalPlugins/FishyDocs/Plugins/FishyDocsPlugin/plugin.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import PackagePlugin
44
@main struct FishyDocsBuildPlugin: BuildToolPlugin {
55
func createBuildCommands(context: PluginContext, target: Target) async throws -> [Command] {
66
let genSourcesDir = context.pluginWorkDirectory
7-
let doccBasePath = "\(context.package.directory)/Sources/DistributedActors/DistributedActors.docc"
7+
let doccBasePath = "\(context.package.directory)/Sources/DistributedActors/Docs.docc"
88

99
let mdFiles = try FileManager.default
1010
.contentsOfDirectory(atPath: doccBasePath)

InternalPlugins/FishyDocs/Sources/FishyDocs/FishyDocs.swift

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@ func log(_ log: String, file: String = #fileID, line: UInt = #line) {
66
print("[fishy-docs] \(log)")
77
}
88

9+
struct Commands {
10+
static let Namespace = "fishy-docs"
11+
static let Enable = "\(Namespace):enable"
12+
static let Disable = "\(Namespace):disable"
13+
static let Skip = "\(Namespace):skip-next"
14+
}
15+
916
@main
1017
struct FishyDocs: ParsableCommand {
1118
@Option(help: "Folder containing the docc documentation to scan for fishy-docs")
@@ -41,16 +48,23 @@ struct FishyDocs: ParsableCommand {
4148
}
4249

4350
func usesFishyDocs(document: Document, url: URL) -> Bool {
44-
return (try? String(contentsOf: url).contains("fishy-docs:enable")) ?? false
45-
46-
// FIXME(docc): docc breaks when @Comment is used in symbol documentation: https://github.com/apple/swift-docc/issues/343
47-
// var detectFishyDocs = DetectFishyDocs()
48-
// detectFishyDocs.visit(document)
49-
// return detectFishyDocs.detected
51+
var detectFishyDocs = DetectFishyDocs()
52+
detectFishyDocs.visit(document)
53+
return detectFishyDocs.detected
5054
}
5155

5256
func makeDocsTestCode(document: Document, doccFileName: String) throws -> String? {
53-
var concatCodeBlocks = ConcatCodeBlocks()
57+
assert(!doccFileName.isEmpty)
58+
59+
guard var name = doccFileName.split(separator: "/").last else {
60+
return nil
61+
}
62+
guard let simpleName = name.split(separator: ".").first else {
63+
return nil
64+
}
65+
name = simpleName
66+
67+
var concatCodeBlocks = ConcatCodeBlocks(name: String(name))
5468
concatCodeBlocks.visit(document)
5569

5670
guard !concatCodeBlocks.code.isEmpty else {
@@ -79,6 +93,20 @@ struct DetectFishyDocs: MarkupWalker {
7993
struct ConcatCodeBlocks: MarkupWalker {
8094
var importBlocks: [CodeBlock] = []
8195
var codeBlocks: [CodeBlock] = []
96+
97+
/// True if capturing code blocks is enabled
98+
var includeCodeBlock: Bool = true
99+
100+
/// Allows for skipping a single code block, followed after the fishy-docs:skip-next
101+
var skipNextCodeBlock: Bool = false
102+
103+
/// Name of the file we're compile-testing
104+
let name: String
105+
106+
init(name: String) {
107+
self.name = name
108+
}
109+
82110
var code: String {
83111
var allBlocks = importBlocks
84112
allBlocks.append(contentsOf: codeBlocks)
@@ -88,7 +116,7 @@ struct ConcatCodeBlocks: MarkupWalker {
88116
codeBlockStrings.append(block.code)
89117
}
90118

91-
codeBlockStrings.append("func __test() async throws {")
119+
codeBlockStrings.append("func __compileTest_\(self.name)() async throws {")
92120
for block in codeBlocks {
93121
var s = "// ==== "
94122

@@ -115,7 +143,26 @@ struct ConcatCodeBlocks: MarkupWalker {
115143
return codeBlockStrings.joined(separator: "\n")
116144
}
117145

146+
mutating func visitParagraph(_ paragraph: Paragraph) {
147+
if paragraph.plainText.contains(Commands.Enable) {
148+
self.includeCodeBlock = true
149+
} else if paragraph.plainText.contains(Commands.Disable) {
150+
self.includeCodeBlock = false
151+
} else if paragraph.plainText.contains(Commands.Skip) {
152+
self.skipNextCodeBlock = true
153+
self.includeCodeBlock = false
154+
}
155+
}
156+
118157
mutating func visitCodeBlock(_ codeBlock: CodeBlock) {
158+
guard !self.skipNextCodeBlock else {
159+
self.skipNextCodeBlock = false
160+
return
161+
}
162+
guard self.includeCodeBlock else {
163+
return
164+
}
165+
119166
if codeBlock.code.contains("import ") {
120167
self.importBlocks.append(codeBlock)
121168
} else {

Package.swift

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,14 @@ var targets: [PackageDescription.Target] = [
5959
]
6060
),
6161

62-
// .testTarget(
63-
// name: "DistributedActorsTestKitTests",
64-
// dependencies: [
65-
// "DistributedActors",
66-
// "DistributedActorsTestKit"
67-
// ]
68-
// ),
69-
//
62+
.testTarget(
63+
name: "DistributedActorsTestKitTests",
64+
dependencies: [
65+
"DistributedActors",
66+
"DistributedActorsTestKit",
67+
]
68+
),
69+
7070
// .testTarget(
7171
// name: "CDistributedActorsMailboxTests",
7272
// dependencies: [

Sources/DistributedActors/Cluster/ClusterEventStream.swift

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,6 @@ public struct ClusterEventStream: AsyncSequence {
123123
}
124124
}
125125

126-
// FIXME(distributed): the only reason this actor is distributed is because of LifecycleWatch
127126
internal distributed actor ClusterEventStreamActor: LifecycleWatch {
128127
typealias ActorSystem = ClusterSystem
129128

@@ -210,7 +209,7 @@ internal distributed actor ClusterEventStreamActor: LifecycleWatch {
210209
}
211210
}
212211

213-
distributed func terminated(actor id: ActorID) {
212+
func terminated(actor id: ActorID) {
214213
if self.subscribers.removeValue(forKey: id) != nil {
215214
self.log.trace("Removed subscriber [\(id)], because it terminated")
216215
}

Sources/DistributedActors/Cluster/ClusterShell.swift

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -674,7 +674,7 @@ extension ClusterShell {
674674
}
675675
.receiveSpecificSignal(_Signals.Terminated.self) { context, signal in
676676
context.log.error("Cluster actor \(signal.id) terminated unexpectedly! Will initiate cluster shutdown.")
677-
context.system.shutdown()
677+
try context.system.shutdown()
678678
return .same // the system shutdown will cause downing which we may want to still handle, and then will stop us
679679
}
680680
}
@@ -1305,8 +1305,12 @@ extension ClusterShell {
13051305
let onDownAction = context.system.settings.onDownAction.make()
13061306
try onDownAction(context.system) // TODO: return a future and run with a timeout
13071307
} catch {
1308-
context.system.log.error("Failed to executed onDownAction! Shutting down system forcefully! Error: \(error)")
1309-
context.system.shutdown()
1308+
context.system.log.error("Failed to executed onDownAction! Shutting down system forcefully!", metadata: ["error": "\(error)"])
1309+
do {
1310+
try context.system.shutdown()
1311+
} catch {
1312+
context.system.log.error("Failed shutting down actor system!", metadata: ["error": "\(error)"])
1313+
}
13101314
}
13111315

13121316
state = self.interpretLeaderActions(context.system, state, state.collectLeaderActions())

Sources/DistributedActors/Cluster/Downing/DowningSettings.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public struct OnDownActionStrategySettings {
7474
guard .milliseconds(0) < shutdownDelay else {
7575
context.log.warning("This node was marked as [.down], delay is immediate. Shutting down the system immediately!")
7676
Task {
77-
system.shutdown()
77+
try system.shutdown()
7878
}
7979
return .stop
8080
}
@@ -85,7 +85,7 @@ public struct OnDownActionStrategySettings {
8585
return .receiveMessage { _ in
8686
system.log.warning("Shutting down...")
8787
Task {
88-
system.shutdown()
88+
try system.shutdown()
8989
}
9090
return .stop
9191
}

Sources/DistributedActors/Cluster/Leadership.swift

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,9 +223,9 @@ extension Leadership {
223223
/// fulfilling this role whenever the minimum number of nodes exist. This may be useful when operation would
224224
/// potentially be unsafe given less than `minimumNrOfMembers` nodes.
225225
///
226-
// TODO: In situations which need strong guarantees, this leadership election scheme does NOT provide strong enough
227-
/// guarantees, and you should consider using another scheme or consensus based modes.
228226
public struct LowestReachableMember: LeaderElection {
227+
// TODO: In situations which need strong guarantees, this leadership election scheme does NOT provide strong enough
228+
// guarantees, and you should consider using another scheme or consensus based modes.
229229
let minimumNumberOfMembersToDecide: Int
230230
let loseLeadershipIfBelowMinNrOfMembers: Bool
231231

@@ -318,6 +318,7 @@ extension Leadership {
318318
// MARK: Leadership settings
319319

320320
extension ClusterSystemSettings {
321+
/// Configure leadership election using which the cluster leader should be decided.
321322
public struct LeadershipSelectionSettings {
322323
private enum _LeadershipSelectionSettings {
323324
case none

Sources/DistributedActors/Cluster/NodeDeathWatcher.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import NIO
2626
/// and individually watched actors, the watcher handles subscribing for cluster events on behalf of actors which watch
2727
/// other actors on remote nodes, and messages them `SystemMessage.nodeTerminated(node)` upon node termination (down),
2828
/// which are in turn translated by the actors locally to `SystemMessage.terminated(ref:existenceConfirmed:idTerminated:true)`
29-
///
3029
/// to any actor which watched at least one actor on a node that has been downed.
3130
///
3231
/// Actor which is notified automatically when a remote actor is `context.watch()`-ed.

Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -812,7 +812,7 @@ extension OpLogDistributedReceptionist {
812812
// MARK: Termination handling
813813

814814
extension OpLogDistributedReceptionist {
815-
public distributed func terminated(actor id: ID) {
815+
public func terminated(actor id: ID) {
816816
if id == ActorID._receptionist(on: id.uniqueNode, for: .distributedActors) {
817817
self.log.debug("Watched receptionist terminated: \(id)")
818818
self.receptionistTerminated(identity: id)

Sources/DistributedActors/ClusterSystem.swift

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
6161

6262
// Access MUST be protected with `namingLock`.
6363
private var _managedRefs: [ActorID: _ReceivesSystemMessages] = [:]
64-
private var _managedDistributedActors: WeakActorDictionary = .init()
64+
private var _managedDistributedActors: WeakAnyDistributedActorDictionary = .init()
6565
private var _reservedNames: Set<ActorID> = []
6666

6767
typealias WellKnownName = String
@@ -427,51 +427,57 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
427427
#endif
428428
}
429429

430+
/// Object that can be awaited on until the system has completed shutting down.
430431
public struct Shutdown {
431432
private let receptacle: BlockingReceptacle<Error?>
432433

433434
init(receptacle: BlockingReceptacle<Error?>) {
434435
self.receptacle = receptacle
435436
}
436437

438+
@available(*, deprecated, message: "will be replaced by distributed actor / closure version")
437439
public func wait(atMost timeout: Duration) throws {
438440
if let error = self.receptacle.wait(atMost: timeout).flatMap({ $0 }) {
439441
throw error
440442
}
441443
}
442444

445+
@available(*, deprecated, message: "will be replaced by distributed actor / closure version")
443446
public func wait() throws {
444447
if let error = self.receptacle.wait() {
445448
throw error
446449
}
447450
}
448-
}
449451

450-
/// Suspends until the ``ClusterSystem`` is terminated by a call to ``shutdown``.
451-
var terminated: Void {
452-
get async throws {
452+
/// Suspend until the system has completed its shutdown and is terminated.
453+
public func wait() async throws {
454+
// TODO: implement without blocking the internal task;
453455
try await Task.detached {
454-
try Shutdown(receptacle: self.shutdownReceptacle).wait()
456+
if let error = self.receptacle.wait() {
457+
throw error
458+
}
455459
}.value
456460
}
457461
}
458462

463+
/// Suspends until the ``ClusterSystem`` is terminated by a call to ``shutdown()``.
464+
public var terminated: Void {
465+
get async throws {
466+
try await Shutdown(receptacle: self.shutdownReceptacle).wait()
467+
}
468+
}
469+
459470
/// Forcefully stops this actor system and all actors that live within it.
460471
/// This is an asynchronous operation and will be executed on a separate thread.
461472
///
462473
/// You can use `shutdown().wait()` to synchronously await on the system's termination,
463474
/// or provide a callback to be executed after the system has completed it's shutdown.
464475
///
465-
/// - Parameters:
466-
/// - queue: allows configuring on which dispatch queue the shutdown operation will be finalized.
467-
/// - afterShutdownCompleted: optional callback to be invoked when the system has completed shutting down.
468-
/// Will be invoked on the passed in `queue` (which defaults to `DispatchQueue.global()`).
469476
/// - Returns: A `Shutdown` value that can be waited upon until the system has completed the shutdown.
470477
@discardableResult
471-
public func shutdown(queue: DispatchQueue = DispatchQueue.global(), afterShutdownCompleted: @escaping (Error?) -> Void = { _ in () }) -> Shutdown {
478+
public func shutdown() throws -> Shutdown {
472479
guard self.shutdownFlag.loadThenWrappingIncrement(by: 1, ordering: .relaxed) == 0 else {
473480
// shutdown already kicked off by someone else
474-
afterShutdownCompleted(nil)
475481
return Shutdown(receptacle: self.shutdownReceptacle)
476482
}
477483

@@ -510,7 +516,7 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
510516
// self._receptionistRef = self.deadLetters.adapted()
511517
} catch {
512518
self.shutdownReceptacle.offerOnce(error)
513-
afterShutdownCompleted(error)
519+
throw error
514520
}
515521

516522
/// Only once we've shutdown all dispatchers and loops, we clear cycles between the serialization and system,
@@ -523,7 +529,6 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
523529
_ = self._clusterStore.storeIfNilThenLoad(Box(nil))
524530

525531
self.shutdownReceptacle.offerOnce(nil)
526-
afterShutdownCompleted(nil)
527532

528533
return Shutdown(receptacle: self.shutdownReceptacle)
529534
}
@@ -1085,16 +1090,13 @@ extension ClusterSystem {
10851090
Res: Codable
10861091
{
10871092
if let interceptor = actor.id.context.remoteCallInterceptor {
1088-
self.log.warning("INTERCEPTOR remote call \(actor.id)...")
1089-
print("[\(self.cluster.uniqueNode)] INTERCEPTOR remote call \(actor.id)...")
10901093
return try await interceptor.interceptRemoteCall(on: actor, target: target, invocation: &invocation, throwing: throwing, returning: returning)
10911094
}
10921095

10931096
guard actor.id.uniqueNode != self.cluster.uniqueNode else {
10941097
// It actually is a remote call, so redirect it to local call-path.
10951098
// Such calls can happen when we deal with interceptors and proxies;
10961099
// To make their lives easier, we centralize the noticing when a call is local and dispatch it from here.
1097-
self.log.warning("ACTUALLY LOCAL CALL: \(target) on \(actor.id)")
10981100
return try await self.localCall(on: actor, target: target, invocation: &invocation, throwing: throwing, returning: returning)
10991101
}
11001102

@@ -1116,13 +1118,10 @@ extension ClusterSystem {
11161118
arguments: arguments
11171119
)
11181120

1119-
print("[\(self.cluster.uniqueNode)] SEND INVOCATION: \(invocation) TO \(recipient.id.fullDescription)")
1120-
log.warning("[\(self.cluster.uniqueNode)] SEND INVOCATION: \(invocation) TO \(recipient.id.fullDescription)")
11211121
recipient.sendInvocation(invocation)
11221122
}
11231123

11241124
if let error = reply.thrownError {
1125-
print("[\(self.cluster.uniqueNode)] reply error: \(error)")
11261125
throw error
11271126
}
11281127
guard let value = reply.value else {
@@ -1258,13 +1257,10 @@ extension ClusterSystem {
12581257
Err: Error,
12591258
Res: Codable
12601259
{
1261-
print("[\(self.cluster.uniqueNode)] ACT: \(actor.id.fullDescription)")
12621260
precondition(
12631261
self.cluster.uniqueNode == actor.id.uniqueNode,
12641262
"Attempted to localCall an actor whose ID was a different node: [\(actor.id)], current node: \(self.cluster.uniqueNode)"
12651263
)
1266-
// precondition(!__isRemoteActor(actor),
1267-
// "Attempted to localCall a remote actor! \(actor.id)")
12681264
self.log.trace("Execute local call", metadata: [
12691265
"actor/id": "\(actor.id.fullDescription)",
12701266
"target": "\(target)",
@@ -1302,7 +1298,7 @@ extension ClusterSystem {
13021298
"invocation": "\(invocation)",
13031299
])
13041300

1305-
guard let shell = self._cluster else {
1301+
guard self._cluster != nil else {
13061302
self.log.error("Cluster has shut down already, yet received message. Message will be dropped: \(invocation)")
13071303
return
13081304
}

0 commit comments

Comments
 (0)