Skip to content

Commit 18b27fa

Browse files
committed
[Receptionist] Fix ordering bug and improve diagnostics
1 parent b1a84c4 commit 18b27fa

File tree

3 files changed

+57
-26
lines changed

3 files changed

+57
-26
lines changed

Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -344,28 +344,24 @@ extension OpLogDistributedReceptionist: LifecycleWatch {
344344
}
345345

346346
public nonisolated func listing<Guest>(
347-
of key: DistributedReception.Key<Guest>
347+
of key: DistributedReception.Key<Guest>,
348+
file: String = #fileID, line: UInt = #line
348349
) async -> DistributedReception.GuestListing<Guest>
349350
where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem
350351
{
351-
let res = await self.whenLocal { _ in
352-
DistributedReception.GuestListing<Guest>(receptionist: self, key: key)
353-
}
354-
355-
guard let r = res else {
356-
return .init(receptionist: self, key: key)
357-
}
358-
359-
return r
352+
return DistributedReception.GuestListing<Guest>(receptionist: self, key: key, file: file, line: line)
360353
}
361354

355+
// 'local' impl for 'listing'
362356
func _listing(
363-
subscription: AnyDistributedReceptionListingSubscription
357+
subscription: AnyDistributedReceptionListingSubscription,
358+
file: String = #fileID, line: UInt = #line
364359
) {
365360
if self.storage.addSubscription(key: subscription.key, subscription: subscription) {
366361
// self.instrumentation.actorSubscribed(key: anyKey, id: self.id._unwrapActorID) // FIXME: remove the address parameter, it does not make sense anymore
367-
self.log.trace("Subscribed async sequence to \(subscription.key) actors", metadata: [
362+
self.log.trace("Subscribed async sequence to \(subscription.key)", metadata: [
368363
"subscription/key": "\(subscription.key)",
364+
"subscription/callSite": "\(file):\(line)",
369365
])
370366
}
371367
}
@@ -705,7 +701,18 @@ extension OpLogDistributedReceptionist {
705701

706702
/// Receive an Ack and potentially continue streaming ops to peer if still pending operations available.
707703
distributed func ackOps(until: UInt64, by peer: ReceptionistRef) {
708-
guard var replayer = self.peerReceptionistReplayers[peer] else {
704+
var replayer = self.peerReceptionistReplayers[peer]
705+
706+
if replayer == nil, until == 0 {
707+
self.log.debug("Received message from \(peer), but no replayer available, create one ad-hoc now", metadata: [
708+
"peer": "\(peer.id.uniqueNode)",
709+
])
710+
// TODO: Generally we should trigger a `onNewClusterMember` but seems we got a message before that triggered
711+
// Seems ordering became less strict here with DA unfortunately...?
712+
replayer = self.ops.replay(from: .beginning)
713+
}
714+
715+
guard var replayer = replayer else {
709716
self.log.trace("Received a confirmation until \(until) from \(peer) but no replayer available for it, ignoring", metadata: [
710717
"receptionist/peer/confirmed": "\(until)",
711718
"receptionist/peer": "\(peer.id)",

Sources/DistributedActors/Receptionist/DistributedReceptionist.swift

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public protocol DistributedReceptionist: DistributedActor {
4343
///
4444
/// It emits both values for already existing, checked-in before the listing was created,
4545
/// actors; as well as new actors which are checked-in while the listing was already subscribed to.
46-
func listing<Guest>(of key: DistributedReception.Key<Guest>) async -> DistributedReception.GuestListing<Guest>
46+
func listing<Guest>(of key: DistributedReception.Key<Guest>, file: String, line: UInt) async -> DistributedReception.GuestListing<Guest>
4747
where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem
4848

4949
/// Perform a *single* lookup for a distributed actor identified by the passed in `key`.
@@ -54,26 +54,49 @@ public protocol DistributedReceptionist: DistributedActor {
5454
where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem
5555
}
5656

57+
extension DistributedReceptionist {
58+
/// Returns a "listing" asynchronous sequence which will emit actor references,
59+
/// for every distributed actor that the receptionist discovers for the specific key.
60+
///
61+
/// It emits both values for already existing, checked-in before the listing was created,
62+
/// actors; as well as new actors which are checked-in while the listing was already subscribed to.
63+
func listing<Guest>(of key: DistributedReception.Key<Guest>, file: String = #file, line: UInt = #line) async -> DistributedReception.GuestListing<Guest>
64+
where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem
65+
{
66+
await self.listing(of: key, file: file, line: line)
67+
}
68+
}
69+
5770
extension DistributedReception {
5871
public struct GuestListing<Guest: DistributedActor>: AsyncSequence, Sendable where Guest.ActorSystem == ClusterSystem {
5972
public typealias Element = Guest
6073

6174
let receptionist: OpLogDistributedReceptionist
6275
let key: DistributedReception.Key<Guest>
6376

64-
init(receptionist: OpLogDistributedReceptionist, key: DistributedReception.Key<Guest>) {
77+
// Location where the subscription was created
78+
let file: String
79+
let line: UInt
80+
81+
init(receptionist: OpLogDistributedReceptionist, key: DistributedReception.Key<Guest>,
82+
file: String, line: UInt)
83+
{
6584
self.receptionist = receptionist
6685
self.key = key
86+
self.file = file
87+
self.line = line
6788
}
6889

6990
public func makeAsyncIterator() -> AsyncIterator {
70-
AsyncIterator(receptionist: self.receptionist, key: self.key)
91+
AsyncIterator(receptionist: self.receptionist, key: self.key, file: self.file, line: self.line)
7192
}
7293

7394
public class AsyncIterator: AsyncIteratorProtocol {
7495
var underlying: AsyncStream<Element>.Iterator!
7596

76-
init(receptionist __secretlyKnownToBeLocal: OpLogDistributedReceptionist, key: DistributedReception.Key<Guest>) {
97+
init(receptionist __secretlyKnownToBeLocal: OpLogDistributedReceptionist, key: DistributedReception.Key<Guest>,
98+
file: String, line: UInt)
99+
{
77100
self.underlying = AsyncStream<Element> { continuation in
78101
let anySubscribe = AnyDistributedReceptionListingSubscription(
79102
subscriptionID: ObjectIdentifier(self),
@@ -89,7 +112,7 @@ extension DistributedReception {
89112
)
90113

91114
Task {
92-
await __secretlyKnownToBeLocal._listing(subscription: anySubscribe)
115+
await __secretlyKnownToBeLocal._listing(subscription: anySubscribe, file: file, line: line)
93116
}
94117

95118
continuation.onTermination = { @Sendable termination in

Tests/DistributedActorsTests/Cluster/Reception/OpLogDistributedReceptionistClusteredTests.swift

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,28 +85,29 @@ final class OpLogDistributedReceptionistClusteredTests: ClusteredActorSystemsXCT
8585
// MARK: Sync
8686

8787
func test_shouldReplicateRegistrations() async throws {
88-
let (local, remote) = await self.setUpPair()
89-
let testKit: ActorTestKit = self.testKit(local)
90-
try await self.joinNodes(node: local, with: remote)
88+
let (first, second) = await self.setUpPair()
89+
let testKit = self.testKit(first)
90+
try await self.joinNodes(node: first, with: second)
9191

9292
let probe = testKit.makeTestProbe(expecting: String.self)
9393

94-
// Create forwarder on 'local'
95-
let forwarder = StringForwarder(probe: probe, actorSystem: local)
94+
// Create forwarder on 'first'
95+
let forwarder = StringForwarder(probe: probe, actorSystem: first)
9696

9797
// subscribe on `remote`
9898
let subscriberProbe = testKit.makeTestProbe("subscriber", expecting: StringForwarder.self)
9999
let subscriptionTask = Task {
100-
for try await forwarder in await remote.receptionist.listing(of: .stringForwarders) {
100+
for try await forwarder in await second.receptionist.listing(of: .stringForwarders) {
101101
subscriberProbe.tell(forwarder)
102102
}
103103
}
104104
defer {
105105
subscriptionTask.cancel()
106106
}
107107

108-
// checkIn on `local`
109-
await local.receptionist.checkIn(forwarder, with: .stringForwarders)
108+
// checkIn on `first`
109+
await first.receptionist.checkIn(forwarder, with: .stringForwarders)
110+
first.log.notice("Checked in: \(forwarder)")
110111

111112
try await Task {
112113
let found = try subscriberProbe.expectMessage()

0 commit comments

Comments
 (0)