Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 62 additions & 66 deletions Sources/DistributedActors/Cluster/SWIM/SWIMActor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -169,86 +169,82 @@ internal distributed actor SWIMActor: SWIMPeer, SWIMAddressablePeer, CustomStrin
}

internal func sendPingRequests(_ directive: SWIMInstance.SendPingRequestDirective) async {
// We are only interested in successful pings, as a single success tells us the node is
// still alive. Therefore we propagate only the first success, but no failures.
// The failure case is handled through the timeout of the whole operation.
let eventLoop = self.actorSystem._eventLoopGroup.next()
let firstSuccessful = eventLoop.makePromise(of: SWIM.PingResponse<SWIMActor, SWIMActor>.self)
let pingTimeout = directive.timeout
let peerToPing = directive.target

let startedSendingPingRequestsSentAt: DispatchTime = .now()
let pingRequestResponseTimeFirstTimer = self.swim.metrics.shell.pingRequestResponseTimeFirst
firstSuccessful.futureResult.whenComplete { result in
switch result {
case .success: pingRequestResponseTimeFirstTimer.recordInterval(since: startedSendingPingRequestsSentAt)
case .failure: ()
}
}

for pingRequest in directive.requestDetails {
let peerToPingRequestThrough = pingRequest.peerToPingRequestThrough
let payload = pingRequest.payload
let sequenceNumber = pingRequest.sequenceNumber

self.log.trace("Sending ping request for [\(peerToPing)] to [\(peerToPingRequestThrough)] with payload: \(payload)")

let pingRequestSentAt: DispatchTime = .now()
self.metrics.shell.messageOutboundCount.increment()

do {
let response = try await peerToPingRequestThrough.pingRequest(
target: peerToPing,
payload: payload,
from: self,
timeout: pingTimeout,
sequenceNumber: sequenceNumber
)
let firstSuccessful = await withTaskGroup(
of: SWIM.PingResponse<SWIMActor, SWIMActor>.self,
returning: SWIM.PingResponse<SWIMActor, SWIMActor>?.self
) { [log, metrics, swim] group in
for pingRequest in directive.requestDetails {
group.addTask {
let peerToPingRequestThrough = pingRequest.peerToPingRequestThrough
let payload = pingRequest.payload
let sequenceNumber = pingRequest.sequenceNumber

log.trace("Sending ping request for [\(peerToPing)] to [\(peerToPingRequestThrough)] with payload: \(payload)")

let pingRequestSentAt: DispatchTime = .now()
metrics.shell.messageOutboundCount.increment()

do {
let response = try await peerToPingRequestThrough.pingRequest(
target: peerToPing,
payload: payload,
from: self,
timeout: pingTimeout,
sequenceNumber: sequenceNumber
)
metrics.shell.pingRequestResponseTimeAll.recordInterval(since: pingRequestSentAt)
return response
} catch {
log.debug(".pingRequest resulted in error", metadata: swim!.metadata([ // !-safe, initialized in init()
"swim/pingRequest/target": "\(peerToPing)",
"swim/pingRequest/peerToPingRequestThrough": "\(peerToPingRequestThrough)",
"swim/pingRequest/sequenceNumber": "\(sequenceNumber)",
"error": "\(error)",
]))

// these are generally harmless thus we do not want to log them on higher levels
log.trace("Failed pingRequest", metadata: [
"swim/target": "\(peerToPing)",
"swim/payload": "\(payload)",
"swim/pingTimeout": "\(pingTimeout)",
"error": "\(error)",
])

let response = SWIM.PingResponse<SWIMActor, SWIMActor>.timeout(
target: peerToPing,
pingRequestOrigin: self,
timeout: pingTimeout,
sequenceNumber: sequenceNumber
)
return response
}
}
}

self.metrics.shell.pingRequestResponseTimeAll.recordInterval(since: pingRequestSentAt)
var firstSuccessful: SWIM.PingResponse<SWIMActor, SWIMActor>?
for await response in group {
self.handleEveryPingRequestResponse(response: response, pinged: peerToPing)

if case .ack = response {
// We only cascade successful ping responses (i.e. `ack`s);
//
// While this has a slight timing implication on time timeout of the pings -- the node that is last
// in the list that we ping, has slightly less time to fulfil the "total ping timeout"; as we set a total timeout on the entire `firstSuccess`.
// In practice those timeouts will be relatively large (seconds) and the few millis here should not have a large impact on correctness.
firstSuccessful.succeed(response)
// We are only interested in successful ping responses (i.e. `ack`s), as a single success tells us the node is
// still alive. Therefore we propagate only the first success, but no failures.
// The failure case is handled through the timeout of the whole operation.
if case .ack = response, firstSuccessful == nil {
pingRequestResponseTimeFirstTimer.recordInterval(since: startedSendingPingRequestsSentAt)
firstSuccessful = response
}
} catch {
self.log.debug(".pingRequest resulted in error", metadata: self.swim.metadata([
"swim/pingRequest/target": "\(peerToPing)",
"swim/pingRequest/peerToPingRequestThrough": "\(peerToPingRequestThrough)",
"swim/pingRequest/sequenceNumber": "\(sequenceNumber)",
"error": "\(error)",
]))
self.handleEveryPingRequestResponse(
response: .timeout(
target: peerToPing,
pingRequestOrigin: self,
timeout: pingTimeout,
sequenceNumber: sequenceNumber
),
pinged: peerToPing
)
// these are generally harmless thus we do not want to log them on higher levels
self.log.trace("Failed pingRequest", metadata: [
"swim/target": "\(peerToPing)",
"swim/payload": "\(payload)",
"swim/pingTimeout": "\(pingTimeout)",
"error": "\(error)",
])
}
return firstSuccessful
}

do {
let pingRequestResponse = try await firstSuccessful.futureResult.get()
if let pingRequestResponse = firstSuccessful {
self.handlePingRequestResponse(response: pingRequestResponse, pinged: peerToPing)
} catch {
self.log.debug("Failed to sendPingRequests", metadata: [
"error": "\(error)",
])
} else {
self.handlePingRequestResponse(
response: .timeout(target: peerToPing, pingRequestOrigin: self, timeout: pingTimeout, sequenceNumber: 0),
pinged: peerToPing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import Foundation
@testable import SWIM
import XCTest

final class SWIMShellClusteredTests: ClusteredActorSystemsXCTestCase {
final class SWIMActorClusteredTests: ClusteredActorSystemsXCTestCase {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch!

var metrics: TestMetrics! = TestMetrics()

override func setUp() {
Expand Down