Skip to content

Commit e47c16c

Browse files
committed
=receptionist use OrderedSet to avoid ordering issues ordering bugs/hangs
1 parent ab01b7e commit e47c16c

File tree

6 files changed

+147
-29
lines changed

6 files changed

+147
-29
lines changed

Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -300,10 +300,13 @@ extension OpLogDistributedReceptionist: LifecycleWatch {
300300
_ guest: Guest,
301301
with key: DistributedReception.Key<Guest>
302302
) async where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem {
303-
self.log.warning("distributed receptionist: checkIn(\(guest), with: \(key)")
303+
let id = guest.id
304304
let key = key.asAnyKey
305305

306-
let id = guest.id
306+
self.log.debug("Receptionist checkIn [\(guest.id)] with key [\(key)]", metadata: [
307+
"receptionist/key": "\(key)",
308+
"receptionist/guest": "\(guest.id)",
309+
])
307310

308311
guard id._isLocal || (id.uniqueNode == actorSystem.cluster.uniqueNode) else {
309312
self.log.warning("""
@@ -327,7 +330,9 @@ extension OpLogDistributedReceptionist: LifecycleWatch {
327330
"receptionist/key": "\(key)",
328331
"receptionist/guest": "\(id)",
329332
"receptionist/opLog/maxSeqNr": "\(self.ops.maxSeqNr)",
330-
"receptionist/opLog": "\(self.ops.ops)",
333+
"receptionist/opLog": Logger.MetadataValue.array(
334+
self.ops.ops.map { Logger.MetadataValue.string("\($0)") }
335+
),
331336
]
332337
)
333338

@@ -368,7 +373,11 @@ extension OpLogDistributedReceptionist: LifecycleWatch {
368373
// We immediately flush all already-known registrations;
369374
// as new ones come in, they will be reported to this subscription later on
370375
for alreadyRegisteredAtSubscriptionTime in self.storage.registrations(forKey: subscription.key) ?? [] {
371-
subscription.tryOffer(registration: alreadyRegisteredAtSubscriptionTime)
376+
if subscription.tryOffer(registration: alreadyRegisteredAtSubscriptionTime) {
377+
self.log.notice("OFFERED \(alreadyRegisteredAtSubscriptionTime.actorID) TO \(subscription)")
378+
} else {
379+
self.log.notice("DROPPED \(alreadyRegisteredAtSubscriptionTime.actorID) TO \(subscription)")
380+
}
372381
}
373382
}
374383
}
@@ -438,7 +447,7 @@ extension OpLogDistributedReceptionist {
438447

439448
let timerTaskKey = key.hashValue
440449
guard self.flushTimerTasks[timerTaskKey] == nil else {
441-
self.log.debug("timer exists")
450+
self.log.trace("Delayed listing flush timer task already exists (key: \(key))")
442451
return // timer exists nothing to do
443452
}
444453

@@ -460,7 +469,6 @@ extension OpLogDistributedReceptionist {
460469

461470
func onDelayedListingFlushTick(key: AnyDistributedReceptionKey) {
462471
self.log.trace("Run delayed listing flush, key: \(key)")
463-
464472
self.notifySubscribers(of: key)
465473
}
466474

@@ -474,12 +482,28 @@ extension OpLogDistributedReceptionist {
474482
// Sort registrations by version from oldest to newest so that they are processed in order.
475483
// Otherwise, if we process a newer version (i.e., with bigger sequence number) first, older
476484
// versions will be dropped because they are considered "seen".
477-
let registrations = (self.storage.registrations(forKey: key) ?? []).sorted { l, r in l.version < r.version }
485+
let registrations = (self.storage.registrations(forKey: key) ?? []).sorted { l, r in l.version < r.version } // FIXME: use ordered set or Deque now that we have them
486+
self.log.notice(
487+
"Registrations to flush: \(registrations.count)",
488+
metadata: [
489+
"registrations": Logger.MetadataValue.array(
490+
registrations.map { Logger.MetadataValue.string("\($0)") }
491+
),
492+
]
493+
)
478494

479495
// self.instrumentation.listingPublished(key: key, subscribers: subscriptions.count, registrations: registrations.count) // TODO(distributed): make the instrumentation calls compatible with distributed actor based types
480496
for subscription in subscriptions {
497+
self.log.notice("Offering registrations to subscription: \(subscription))", metadata: [
498+
"registrations": "\(subscription.seenActorRegistrations)",
499+
])
500+
481501
for registration in registrations {
482-
subscription.tryOffer(registration: registration)
502+
if subscription.tryOffer(registration: registration) {
503+
self.log.notice("OFFERED \(registration.actorID) TO \(subscription)")
504+
} else {
505+
self.log.notice("DROPPED \(registration.actorID) TO \(subscription)")
506+
}
483507
}
484508
}
485509
}
@@ -643,15 +667,8 @@ extension OpLogDistributedReceptionist {
643667
// we effectively initiate the "first pull"
644668
let latestAppliedSeqNrFromPeer = self.appliedSequenceNrs[.actorID(receptionistID)]
645669

646-
let ack = AckOps(
647-
appliedUntil: latestAppliedSeqNrFromPeer,
648-
observedSeqNrs: self.observedSequenceNrs,
649-
peer: self
650-
)
651-
652670
Task { [weak self] in
653671
do {
654-
// assert(self.id.path.description.contains("/system/receptionist"), "Receptionist path did not include /system/receptionist, was: \(self.id.fullDescription)")
655672
guard let __secretlyKnownToBeLocal = self else { return } // FIXME: we need `local`
656673
try await peerReceptionistRef.ackOps(until: latestAppliedSeqNrFromPeer, by: __secretlyKnownToBeLocal)
657674
} catch {
@@ -702,7 +719,11 @@ extension OpLogDistributedReceptionist {
702719

703720
for subscription in subscriptions {
704721
for registration in registrations {
705-
subscription.tryOffer(registration: registration)
722+
if subscription.tryOffer(registration: registration) {
723+
self.log.notice("OFFERED \(registration.actorID) TO \(subscription)")
724+
} else {
725+
self.log.notice("DROPPED \(registration.actorID) TO \(subscription)")
726+
}
706727
}
707728
}
708729
}

Sources/DistributedActors/Receptionist/DistributedReceptionist.swift

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import Distributed
1616
import Logging
17+
import OrderedCollections
1718

1819
/// A receptionist is a system actor that allows users to register actors under
1920
/// a key to make them available to other parts of the system, without having to
@@ -180,7 +181,7 @@ internal struct VersionedRegistration: Hashable {
180181
internal final class DistributedReceptionistStorage {
181182
typealias ReceptionistOp = OpLogDistributedReceptionist.ReceptionistOp
182183

183-
internal var _registrations: [AnyDistributedReceptionKey: Set<VersionedRegistration>] = [:]
184+
internal var _registrations: [AnyDistributedReceptionKey: OrderedSet<VersionedRegistration>] = [:]
184185
internal var _subscriptions: [AnyDistributedReceptionKey: Set<AnyDistributedReceptionListingSubscription>] = [:]
185186

186187
/// Per (receptionist) node mapping of which keys are presently known to this receptionist on the given node.
@@ -218,7 +219,7 @@ internal final class DistributedReceptionistStorage {
218219
return self.addTo(dict: &self._registrations, key: key, value: versionedRegistration)
219220
}
220221

221-
func removeRegistration<Guest>(key: AnyDistributedReceptionKey, guest: Guest) -> Set<VersionedRegistration>?
222+
func removeRegistration<Guest>(key: AnyDistributedReceptionKey, guest: Guest) -> OrderedSet<VersionedRegistration>?
222223
where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem
223224
{
224225
let address = guest.id
@@ -232,7 +233,7 @@ internal final class DistributedReceptionistStorage {
232233
return self.removeFrom(dict: &self._registrations, key: key, value: versionedRegistration)
233234
}
234235

235-
func registrations(forKey key: AnyDistributedReceptionKey) -> Set<VersionedRegistration>? {
236+
func registrations(forKey key: AnyDistributedReceptionKey) -> OrderedSet<VersionedRegistration>? {
236237
self._registrations[key]
237238
}
238239

@@ -304,7 +305,7 @@ internal final class DistributedReceptionistStorage {
304305
for key in keys {
305306
// 1) we remove any registrations that it hosted
306307
let registrations = self._registrations.removeValue(forKey: key) ?? []
307-
let remainingRegistrations = registrations.filter { registration in
308+
let remainingRegistrations = registrations._filter { registration in // FIXME(collections): missing type preserving filter on OrderedSet https://github.com/apple/swift-collections/pull/159
308309
registration.actorID.uniqueNode != node
309310
}
310311
if !remainingRegistrations.isEmpty {
@@ -327,6 +328,16 @@ internal final class DistributedReceptionistStorage {
327328
}
328329
}
329330

331+
/// - returns: `true` if the value was a newly inserted value, `false` otherwise
332+
private func addTo<Value: Hashable>(dict: inout [AnyDistributedReceptionKey: OrderedSet<Value>], key: AnyDistributedReceptionKey, value: Value) -> Bool {
333+
guard !(dict[key]?.contains(value) ?? false) else {
334+
return false
335+
}
336+
337+
dict[key, default: []].append(value)
338+
return true
339+
}
340+
330341
/// - returns: `true` if the value was a newly inserted value, `false` otherwise
331342
private func addTo<Value: Hashable>(dict: inout [AnyDistributedReceptionKey: Set<Value>], key: AnyDistributedReceptionKey, value: Value) -> Bool {
332343
guard !(dict[key]?.contains(value) ?? false) else {
@@ -337,6 +348,14 @@ internal final class DistributedReceptionistStorage {
337348
return true
338349
}
339350

351+
private func removeFrom<Value: Hashable>(dict: inout [AnyDistributedReceptionKey: OrderedSet<Value>], key: AnyDistributedReceptionKey, value: Value) -> OrderedSet<Value>? {
352+
if dict[key]?.remove(value) != nil, dict[key]?.isEmpty ?? false {
353+
dict.removeValue(forKey: key)
354+
}
355+
356+
return dict[key]
357+
}
358+
340359
private func removeFrom<Value: Hashable>(dict: inout [AnyDistributedReceptionKey: Set<Value>], key: AnyDistributedReceptionKey, value: Value) -> Set<Value>? {
341360
if dict[key]?.remove(value) != nil, dict[key]?.isEmpty ?? false {
342361
dict.removeValue(forKey: key)
@@ -354,7 +373,7 @@ internal final class DistributedReceptionistStorage {
354373
// ==== ----------------------------------------------------------------------------------------------------------------
355374

356375
/// Represents a local subscription (for `receptionist.subscribe`) for a specific key.
357-
internal final class AnyDistributedReceptionListingSubscription: Hashable, @unchecked Sendable {
376+
internal final class AnyDistributedReceptionListingSubscription: Hashable, @unchecked Sendable, CustomStringConvertible {
358377
let subscriptionID: ObjectIdentifier
359378
let key: AnyDistributedReceptionKey
360379

@@ -368,7 +387,7 @@ internal final class AnyDistributedReceptionListingSubscription: Hashable, @unch
368387

369388
/// We very carefully only modify this from the owning actor (receptionist).
370389
// TODO: It would be lovely to be able to express this in the type system as "actor owned" or "actor local" to some actor instance.
371-
private var seenActorRegistrations: VersionVector
390+
var seenActorRegistrations: VersionVector
372391

373392
init(
374393
subscriptionID: ObjectIdentifier,
@@ -399,19 +418,22 @@ internal final class AnyDistributedReceptionListingSubscription: Hashable, @unch
399418
/// version vector, to see if it "advanced it" - if so, it must be a new registration and we have
400419
/// to emit the value. If it didn't advance the local "seen" version vector, it means we've already
401420
/// seen this actor in this specific stream, and don't need to emit it again.
402-
func tryOffer(registration: VersionedRegistration) {
421+
///
422+
/// - Returns: true if the value was successfully offered
423+
func tryOffer(registration: VersionedRegistration) -> Bool {
403424
let oldSeenRegistrations = self.seenActorRegistrations
404425
self.seenActorRegistrations.merge(other: registration.version)
405426

406427
switch self.seenActorRegistrations.compareTo(oldSeenRegistrations) {
407428
case .same:
408429
// the seen vector was unaffected by the merge, which means that the
409430
// incoming registration version was already seen, and thus we don't need to emit it again
410-
return
431+
return false
411432
case .happenedAfter:
412433
// the incoming registration has not yet been seen before,
413434
// which means that we should emit the actor to the stream.
414435
self.onNext(registration.actorID)
436+
return true
415437
case .concurrent:
416438
fatalError("""
417439
It should not be possible for a version vector to be concurrent with a PAST version of itself before the merge
@@ -435,4 +457,13 @@ internal final class AnyDistributedReceptionListingSubscription: Hashable, @unch
435457
hasher.combine(self.subscriptionID)
436458
hasher.combine(self.key)
437459
}
460+
461+
var description: String {
462+
var string = "AnyDistributedReceptionListingSubscription(subscriptionID: \(subscriptionID), key: \(key), , seenActorRegistrations: \(seenActorRegistrations)"
463+
#if DEBUG
464+
string += ", at: \(self.file):\(self.line)"
465+
#endif
466+
string += ")"
467+
return string
468+
}
438469
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Distributed Actors open source project
4+
//
5+
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import OrderedCollections
16+
17+
extension OrderedSet {
18+
// FIXME(collections): implemented for real in https://github.com/apple/swift-collections/pull/159
19+
@inlinable
20+
internal func _filter(
21+
_ isIncluded: (Element) throws -> Bool
22+
) rethrows -> Self {
23+
var result: OrderedSet = Self()
24+
for element in self where try isIncluded(element) {
25+
result.append(element)
26+
}
27+
return result
28+
}
29+
}

Sources/DistributedActorsTestKit/Cluster/ClusteredActorSystemsXCTestCase.swift

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,22 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
@testable import DistributedActors
16+
import DistributedActorsConcurrencyHelpers
17+
import Foundation
1618
import XCTest
1719

1820
/// Convenience class for building multi-node (yet same-process) tests with many actor systems involved.
1921
///
2022
/// Systems started using `setUpNode` are automatically terminated upon test completion, and logs are automatically
2123
/// captured and only printed when a test failure occurs.
2224
open class ClusteredActorSystemsXCTestCase: XCTestCase {
25+
internal let lock = DistributedActorsConcurrencyHelpers.Lock()
2326
public private(set) var _nodes: [ClusterSystem] = []
2427
public private(set) var _testKits: [ActorTestKit] = []
2528
public private(set) var _logCaptures: [LogCapture] = []
2629

30+
private var stuckTestDumpLogsTask: Task<Void, Error>?
31+
2732
// ==== ------------------------------------------------------------------------------------------------------------
2833
// MARK: Actor leak detection
2934

@@ -54,6 +59,12 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase {
5459
true
5560
}
5661

62+
/// Unconditionally dump logs if the test has been running longer than this duration.
63+
/// This is to help diagnose stuck tests.
64+
open var dumpLogsAfter: Duration {
65+
.seconds(60)
66+
}
67+
5768
/// Enables logging all captured logs, even if the test passed successfully.
5869
/// - Default: `false`
5970
open var alwaysPrintCaptureLogs: Bool {
@@ -84,6 +95,19 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase {
8495
if self.inspectDetectActorLeaks {
8596
self.actorStatsBefore = try InspectKit.actorStats()
8697
}
98+
99+
self.stuckTestDumpLogsTask = Task.detached {
100+
try await Task.sleep(until: .now + self.dumpLogsAfter, clock: .continuous)
101+
guard !Task.isCancelled else {
102+
return
103+
}
104+
105+
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
106+
print("!!!!!!!!!! TEST SEEMS STUCK - DUMPING LOGS !!!!!!!!!!")
107+
print("!!!!!!!!!! PID: \(ProcessInfo.processInfo.processIdentifier) !!!!!!!!!!")
108+
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
109+
self.printAllCapturedLogs()
110+
}
87111
try await super.setUp()
88112
}
89113

@@ -101,7 +125,9 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase {
101125
settings.logging.baseLogger = capture.logger(label: name)
102126
settings.swim.logger = settings.logging.baseLogger
103127

104-
self._logCaptures.append(capture)
128+
self.lock.withLockVoid {
129+
self._logCaptures.append(capture)
130+
}
105131
}
106132

107133
settings.autoLeaderElection = .lowestReachable(minNumberOfMembers: 2)
@@ -129,6 +155,9 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase {
129155
}
130156

131157
override open func tearDown() async throws {
158+
self.stuckTestDumpLogsTask?.cancel()
159+
self.stuckTestDumpLogsTask = nil
160+
132161
try await super.tearDown()
133162

134163
let testsFailed = self.testRun?.totalFailureCount ?? 0 > 0
@@ -141,9 +170,11 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase {
141170
try! await node.shutdown().wait()
142171
}
143172

144-
self._nodes = []
145-
self._testKits = []
146-
self._logCaptures = []
173+
self.lock.withLockVoid {
174+
self._nodes = []
175+
self._testKits = []
176+
self._logCaptures = []
177+
}
147178

148179
if self.inspectDetectActorLeaks {
149180
try await Task.sleep(until: .now + .seconds(2), clock: .continuous)
@@ -268,7 +299,9 @@ extension ClusteredActorSystemsXCTestCase {
268299
fatalError("No such node: [\(node)] in [\(self._nodes)]!")
269300
}
270301

271-
return self._logCaptures[index]
302+
return self.lock.withLock {
303+
self._logCaptures[index]
304+
}
272305
}
273306

274307
public func printCapturedLogs(of node: ClusterSystem) {

Tests/DistributedActorsTests/DistributedReceptionistTests.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,15 @@ final class DistributedReceptionistTests: SingleClusterSystemXCTestCase {
6060
let forwarderA = Forwarder(probe: probe, name: "A", actorSystem: system)
6161
let forwarderB = Forwarder(probe: probe, name: "B", actorSystem: system)
6262

63+
system.log.notice("Checking in: \(forwarderA.id)")
6364
await receptionist.checkIn(forwarderA, with: .forwarders)
65+
system.log.notice("Checking in: \(forwarderB.id)")
6466
await receptionist.checkIn(forwarderB, with: .forwarders)
6567

6668
var i = 0
69+
system.log.notice("here")
6770
for await forwarder in await receptionist.listing(of: .forwarders) {
71+
system.log.notice("here more \(i): \(forwarder.id)")
6872
i += 1
6973
try await forwarder.forward(message: "test")
7074

0 commit comments

Comments
 (0)