Skip to content

Commit ead1235

Browse files
authored
Rewrite SWIMActor.sendPingRequests to use Swift concurrency (#1052)
Resolves #1051
1 parent 28fa9b8 commit ead1235

File tree

2 files changed

+63
-67
lines changed

2 files changed

+63
-67
lines changed

Sources/DistributedActors/Cluster/SWIM/SWIMActor.swift

Lines changed: 62 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -169,86 +169,82 @@ internal distributed actor SWIMActor: SWIMPeer, SWIMAddressablePeer, CustomStrin
169169
}
170170

171171
internal func sendPingRequests(_ directive: SWIMInstance.SendPingRequestDirective) async {
172-
// We are only interested in successful pings, as a single success tells us the node is
173-
// still alive. Therefore we propagate only the first success, but no failures.
174-
// The failure case is handled through the timeout of the whole operation.
175-
let eventLoop = self.actorSystem._eventLoopGroup.next()
176-
let firstSuccessful = eventLoop.makePromise(of: SWIM.PingResponse<SWIMActor, SWIMActor>.self)
177172
let pingTimeout = directive.timeout
178173
let peerToPing = directive.target
179174

180175
let startedSendingPingRequestsSentAt: DispatchTime = .now()
181176
let pingRequestResponseTimeFirstTimer = self.swim.metrics.shell.pingRequestResponseTimeFirst
182-
firstSuccessful.futureResult.whenComplete { result in
183-
switch result {
184-
case .success: pingRequestResponseTimeFirstTimer.recordInterval(since: startedSendingPingRequestsSentAt)
185-
case .failure: ()
186-
}
187-
}
188-
189-
for pingRequest in directive.requestDetails {
190-
let peerToPingRequestThrough = pingRequest.peerToPingRequestThrough
191-
let payload = pingRequest.payload
192-
let sequenceNumber = pingRequest.sequenceNumber
193-
194-
self.log.trace("Sending ping request for [\(peerToPing)] to [\(peerToPingRequestThrough)] with payload: \(payload)")
195177

196-
let pingRequestSentAt: DispatchTime = .now()
197-
self.metrics.shell.messageOutboundCount.increment()
198-
199-
do {
200-
let response = try await peerToPingRequestThrough.pingRequest(
201-
target: peerToPing,
202-
payload: payload,
203-
from: self,
204-
timeout: pingTimeout,
205-
sequenceNumber: sequenceNumber
206-
)
178+
let firstSuccessful = await withTaskGroup(
179+
of: SWIM.PingResponse<SWIMActor, SWIMActor>.self,
180+
returning: SWIM.PingResponse<SWIMActor, SWIMActor>?.self
181+
) { [log, metrics, swim] group in
182+
for pingRequest in directive.requestDetails {
183+
group.addTask {
184+
let peerToPingRequestThrough = pingRequest.peerToPingRequestThrough
185+
let payload = pingRequest.payload
186+
let sequenceNumber = pingRequest.sequenceNumber
187+
188+
log.trace("Sending ping request for [\(peerToPing)] to [\(peerToPingRequestThrough)] with payload: \(payload)")
189+
190+
let pingRequestSentAt: DispatchTime = .now()
191+
metrics.shell.messageOutboundCount.increment()
192+
193+
do {
194+
let response = try await peerToPingRequestThrough.pingRequest(
195+
target: peerToPing,
196+
payload: payload,
197+
from: self,
198+
timeout: pingTimeout,
199+
sequenceNumber: sequenceNumber
200+
)
201+
metrics.shell.pingRequestResponseTimeAll.recordInterval(since: pingRequestSentAt)
202+
return response
203+
} catch {
204+
log.debug(".pingRequest resulted in error", metadata: swim!.metadata([ // !-safe, initialized in init()
205+
"swim/pingRequest/target": "\(peerToPing)",
206+
"swim/pingRequest/peerToPingRequestThrough": "\(peerToPingRequestThrough)",
207+
"swim/pingRequest/sequenceNumber": "\(sequenceNumber)",
208+
"error": "\(error)",
209+
]))
210+
211+
// these are generally harmless thus we do not want to log them on higher levels
212+
log.trace("Failed pingRequest", metadata: [
213+
"swim/target": "\(peerToPing)",
214+
"swim/payload": "\(payload)",
215+
"swim/pingTimeout": "\(pingTimeout)",
216+
"error": "\(error)",
217+
])
218+
219+
let response = SWIM.PingResponse<SWIMActor, SWIMActor>.timeout(
220+
target: peerToPing,
221+
pingRequestOrigin: self,
222+
timeout: pingTimeout,
223+
sequenceNumber: sequenceNumber
224+
)
225+
return response
226+
}
227+
}
228+
}
207229

208-
self.metrics.shell.pingRequestResponseTimeAll.recordInterval(since: pingRequestSentAt)
230+
var firstSuccessful: SWIM.PingResponse<SWIMActor, SWIMActor>?
231+
for await response in group {
209232
self.handleEveryPingRequestResponse(response: response, pinged: peerToPing)
210233

211-
if case .ack = response {
212-
// We only cascade successful ping responses (i.e. `ack`s);
213-
//
214-
// While this has a slight timing implication on time timeout of the pings -- the node that is last
215-
// in the list that we ping, has slightly less time to fulfil the "total ping timeout"; as we set a total timeout on the entire `firstSuccess`.
216-
// In practice those timeouts will be relatively large (seconds) and the few millis here should not have a large impact on correctness.
217-
firstSuccessful.succeed(response)
234+
// We are only interested in successful ping responses (i.e. `ack`s), as a single success tells us the node is
235+
// still alive. Therefore we propagate only the first success, but no failures.
236+
// The failure case is handled through the timeout of the whole operation.
237+
if case .ack = response, firstSuccessful == nil {
238+
pingRequestResponseTimeFirstTimer.recordInterval(since: startedSendingPingRequestsSentAt)
239+
firstSuccessful = response
218240
}
219-
} catch {
220-
self.log.debug(".pingRequest resulted in error", metadata: self.swim.metadata([
221-
"swim/pingRequest/target": "\(peerToPing)",
222-
"swim/pingRequest/peerToPingRequestThrough": "\(peerToPingRequestThrough)",
223-
"swim/pingRequest/sequenceNumber": "\(sequenceNumber)",
224-
"error": "\(error)",
225-
]))
226-
self.handleEveryPingRequestResponse(
227-
response: .timeout(
228-
target: peerToPing,
229-
pingRequestOrigin: self,
230-
timeout: pingTimeout,
231-
sequenceNumber: sequenceNumber
232-
),
233-
pinged: peerToPing
234-
)
235-
// these are generally harmless thus we do not want to log them on higher levels
236-
self.log.trace("Failed pingRequest", metadata: [
237-
"swim/target": "\(peerToPing)",
238-
"swim/payload": "\(payload)",
239-
"swim/pingTimeout": "\(pingTimeout)",
240-
"error": "\(error)",
241-
])
242241
}
242+
return firstSuccessful
243243
}
244244

245-
do {
246-
let pingRequestResponse = try await firstSuccessful.futureResult.get()
245+
if let pingRequestResponse = firstSuccessful {
247246
self.handlePingRequestResponse(response: pingRequestResponse, pinged: peerToPing)
248-
} catch {
249-
self.log.debug("Failed to sendPingRequests", metadata: [
250-
"error": "\(error)",
251-
])
247+
} else {
252248
self.handlePingRequestResponse(
253249
response: .timeout(target: peerToPing, pingRequestOrigin: self, timeout: pingTimeout, sequenceNumber: 0),
254250
pinged: peerToPing

Tests/DistributedActorsTests/Cluster/SWIM/SWIMActorShellClusteredTests.swift renamed to Tests/DistributedActorsTests/Cluster/SWIM/SWIMActorClusteredTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import Foundation
2323
@testable import SWIM
2424
import XCTest
2525

26-
final class SWIMShellClusteredTests: ClusteredActorSystemsXCTestCase {
26+
final class SWIMActorClusteredTests: ClusteredActorSystemsXCTestCase {
2727
var metrics: TestMetrics! = TestMetrics()
2828

2929
override func setUp() {

0 commit comments

Comments
 (0)