Skip to content

Commit d15fc04

Browse files
committed
Convert APIs to async
Change `ping` and `pingRequest` to async and remove `onResponse` callback parameter. This change requires Swift 5.5.
1 parent 91efaf0 commit d15fc04

File tree

9 files changed

+465
-236
lines changed

9 files changed

+465
-236
lines changed

Package.swift

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// swift-tools-version:5.2
1+
// swift-tools-version:5.5
22
// The swift-tools-version declares the minimum version of Swift required to build this package.
33

44
import class Foundation.ProcessInfo
@@ -102,7 +102,7 @@ var targets: [PackageDescription.Target] = [
102102
// ==== ------------------------------------------------------------------------------------------------------------
103103
// MARK: Integration Tests - `it_` prefixed
104104

105-
.target(
105+
.executableTarget(
106106
name: "it_Clustered_swim_suspension_reachability",
107107
dependencies: [
108108
"SWIM",
@@ -116,8 +116,8 @@ var targets: [PackageDescription.Target] = [
116116
]
117117

118118
var dependencies: [Package.Dependency] = [
119-
.package(url: "https://github.com/apple/swift-nio.git", from: "2.19.0"),
120-
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.8.0"),
119+
.package(url: "https://github.com/apple/swift-nio.git", from: "2.19.0"),
120+
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.8.0"),
121121
.package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.5.1"),
122122

123123
// ~~~ SSWG APIs ~~~
@@ -144,6 +144,12 @@ let products: [PackageDescription.Product] = [
144144

145145
var package = Package(
146146
name: "swift-cluster-membership",
147+
platforms: [
148+
.macOS(.v10_15),
149+
.iOS(.v13),
150+
.tvOS(.v13),
151+
.watchOS(.v6),
152+
],
147153
products: products,
148154

149155
dependencies: dependencies,

Sources/SWIM/Peer.swift

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the Swift Cluster Membership open source project
44
//
5-
// Copyright (c) 2020 Apple Inc. and the Swift Cluster Membership project authors
5+
// Copyright (c) 2020-2022 Apple Inc. and the Swift Cluster Membership project authors
66
// Licensed under Apache License v2.0
77
//
88
// See LICENSE.txt for license information
@@ -19,7 +19,7 @@ import enum Dispatch.DispatchTimeInterval
1919
/// Any peer in the cluster, can be used used to identify a peer using its unique node that it represents.
2020
public protocol SWIMAddressablePeer {
2121
/// Node that this peer is representing.
22-
var node: ClusterMembership.Node { get }
22+
nonisolated var node: ClusterMembership.Node { get }
2323
}
2424

2525
/// SWIM A peer which originated a `ping`, should be replied to with an `ack`.
@@ -38,7 +38,7 @@ public protocol SWIMPingOriginPeer: SWIMAddressablePeer {
3838
target: SWIMPeer,
3939
incarnation: SWIM.Incarnation,
4040
payload: SWIM.GossipPayload
41-
)
41+
) async throws
4242
}
4343

4444
/// A SWIM peer which originated a `pingRequest` and thus can receive either an `ack` or `nack` from the intermediary.
@@ -56,31 +56,31 @@ public protocol SWIMPingRequestOriginPeer: SWIMPingOriginPeer {
5656
func nack(
5757
acknowledging sequenceNumber: SWIM.SequenceNumber,
5858
target: SWIMPeer
59-
)
59+
) async throws
6060
}
6161

6262
/// SWIM peer which can be initiated contact with, by sending ping or ping request messages.
6363
public protocol SWIMPeer: SWIMAddressablePeer {
6464
/// Perform a probe of this peer by sending a `ping` message.
6565
///
66-
/// We expect the reply to be an `ack`, upon which the `onResponse`
66+
/// We expect the reply to be an `ack`.
6767
///
6868
/// - parameters:
6969
/// - payload: additional gossip information to be processed by the recipient
7070
/// - origin: the origin peer that has initiated this ping message (i.e. "myself" of the sender)
7171
/// replies (`ack`s) from to this ping should be send to this peer
7272
/// - timeout: timeout during which we expect the other peer to have replied to us with a `PingResponse` about the pinged node.
7373
/// If we get no response about that peer in that time, this `ping` is considered failed, and the onResponse MUST be invoked with a `.timeout`.
74-
/// - onResponse: must be invoked when the a corresponding reply (`ack`) or `timeout` event for this ping occurs.
75-
/// No guarantees about concurrency or threading are made with regards to where/how this invocation will take place,
76-
/// so implementation shells may want to hop to the right executor or protect their state using some other way when before handling the response.
74+
///
75+
/// - Returns the corresponding reply (`ack`) or `timeout` event for this ping request occurs.
76+
///
77+
/// - Throws if the ping fails or if the reply is `nack`.
7778
func ping(
7879
payload: SWIM.GossipPayload,
7980
from origin: SWIMPingOriginPeer,
8081
timeout: DispatchTimeInterval,
81-
sequenceNumber: SWIM.SequenceNumber,
82-
onResponse: @escaping (Result<SWIM.PingResponse, Error>) -> Void
83-
)
82+
sequenceNumber: SWIM.SequenceNumber
83+
) async throws -> SWIM.PingResponse
8484

8585
/// Send a ping request to this peer, asking it to perform an "indirect ping" of the target on our behalf.
8686
///
@@ -95,15 +95,14 @@ public protocol SWIMPeer: SWIMAddressablePeer {
9595
/// replies (`ack`s) from this indirect ping should be forwarded to it.
9696
/// - timeout: timeout during which we expect the other peer to have replied to us with a `PingResponse` about the pinged node.
9797
/// If we get no response about that peer in that time, this `pingRequest` is considered failed, and the onResponse MUST be invoked with a `.timeout`.
98-
/// - onResponse: must be invoked when the a corresponding reply (ack, nack) or timeout event for this ping request occurs.
99-
/// No guarantees about concurrency or threading are made with regards to where/how this invocation will take place,
100-
/// so implementation shells may want to hop to the right executor or protect their state using some other way when before handling the response.
98+
///
99+
/// - Returns the corresponding reply (`ack`, `nack`) or `timeout` event for this ping request occurs.
100+
/// - Throws if the ping request fails
101101
func pingRequest(
102102
target: SWIMPeer,
103103
payload: SWIM.GossipPayload,
104104
from origin: SWIMPingRequestOriginPeer,
105105
timeout: DispatchTimeInterval,
106-
sequenceNumber: SWIM.SequenceNumber,
107-
onResponse: @escaping (Result<SWIM.PingResponse, Error>) -> Void
108-
)
106+
sequenceNumber: SWIM.SequenceNumber
107+
) async throws -> SWIM.PingResponse
109108
}

Sources/SWIMNIOExample/Coding.swift

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the Swift Cluster Membership open source project
44
//
5-
// Copyright (c) 2020 Apple Inc. and the Swift Cluster Membership project authors
5+
// Copyright (c) 2020-2022 Apple Inc. and the Swift Cluster Membership project authors
66
// Licensed under Apache License v2.0
77
//
88
// See LICENSE.txt for license information
@@ -106,20 +106,20 @@ extension SWIM.Message: Codable {
106106
}
107107

108108
extension CodingUserInfoKey {
109-
static let channelUserInfoKey: CodingUserInfoKey = CodingUserInfoKey(rawValue: "nio_peer_channel")!
109+
static let channelUserInfoKey = CodingUserInfoKey(rawValue: "nio_peer_channel")!
110110
}
111111

112112
extension SWIM.NIOPeer: Codable {
113-
public init(from decoder: Decoder) throws {
113+
public convenience init(from decoder: Decoder) throws {
114114
let container = try decoder.singleValueContainer()
115-
self.node = try container.decode(Node.self)
115+
let node = try container.decode(Node.self)
116116
guard let channel = decoder.userInfo[.channelUserInfoKey] as? Channel else {
117117
fatalError("Expected channelUserInfoKey to be present in userInfo, unable to decode SWIM.NIOPeer!")
118118
}
119-
self.channel = channel
119+
self.init(node: node, channel: channel)
120120
}
121121

122-
public func encode(to encoder: Encoder) throws {
122+
public nonisolated func encode(to encoder: Encoder) throws {
123123
var container = encoder.singleValueContainer()
124124
try container.encode(self.node)
125125
}
@@ -176,8 +176,7 @@ extension ClusterMembership.Node: Codable {
176176
atIndex = repr.index(after: atIndex)
177177

178178
let name: String?
179-
if let nameEndIndex = repr[atIndex...].firstIndex(of: "@"),
180-
nameEndIndex < repr.endIndex {
179+
if let nameEndIndex = repr[atIndex...].firstIndex(of: "@"), nameEndIndex < repr.endIndex {
181180
name = String(repr[atIndex ..< nameEndIndex])
182181
atIndex = repr.index(after: nameEndIndex)
183182
} else {

Sources/SWIMNIOExample/NIOPeer.swift

Lines changed: 55 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the Swift Cluster Membership open source project
44
//
5-
// Copyright (c) 2020 Apple Inc. and the Swift Cluster Membership project authors
5+
// Copyright (c) 2020-2022 Apple Inc. and the Swift Cluster Membership project authors
66
// Licensed under Apache License v2.0
77
//
88
// See LICENSE.txt for license information
@@ -19,12 +19,12 @@ import NIO
1919
import NIOConcurrencyHelpers
2020
import SWIM
2121

22-
extension SWIM {
22+
public extension SWIM {
2323
/// SWIMPeer designed to deliver messages over UDP in collaboration with the SWIMNIOHandler.
24-
public struct NIOPeer: SWIMPeer, SWIMPingOriginPeer, SWIMPingRequestOriginPeer, CustomStringConvertible {
25-
public var node: Node
24+
actor NIOPeer: SWIMPeer, SWIMPingOriginPeer, SWIMPingRequestOriginPeer, CustomStringConvertible {
25+
public let node: Node
2626

27-
internal var channel: Channel
27+
internal let channel: Channel
2828

2929
public init(node: Node, channel: Channel) {
3030
self.node = node
@@ -35,60 +35,64 @@ extension SWIM {
3535
payload: GossipPayload,
3636
from origin: SWIMPingOriginPeer,
3737
timeout: DispatchTimeInterval,
38-
sequenceNumber: SWIM.SequenceNumber,
39-
onResponse: @escaping (Result<PingResponse, Error>) -> Void
40-
) {
38+
sequenceNumber: SWIM.SequenceNumber
39+
) async throws -> PingResponse {
4140
guard let originPeer = origin as? SWIM.NIOPeer else {
4241
fatalError("Peers MUST be of type SWIM.NIOPeer, yet was: \(origin)")
4342
}
44-
let message = SWIM.Message.ping(replyTo: originPeer, payload: payload, sequenceNumber: sequenceNumber)
4543

46-
let command = SWIMNIOWriteCommand(message: message, to: self.node, replyTimeout: timeout.toNIO, replyCallback: { reply in
47-
switch reply {
48-
case .success(.response(let pingResponse)):
49-
assert(sequenceNumber == pingResponse.sequenceNumber, "callback invoked with not matching sequence number! Submitted with \(sequenceNumber) but invoked with \(pingResponse.sequenceNumber)!")
50-
onResponse(.success(pingResponse))
51-
case .failure(let error):
52-
onResponse(.failure(error))
44+
return try await withCheckedThrowingContinuation { continuation in
45+
let message = SWIM.Message.ping(replyTo: originPeer, payload: payload, sequenceNumber: sequenceNumber)
46+
let command = SWIMNIOWriteCommand(message: message, to: self.node, replyTimeout: timeout.toNIO, replyCallback: { reply in
47+
switch reply {
48+
case .success(.response(let pingResponse)):
49+
assert(sequenceNumber == pingResponse.sequenceNumber, "callback invoked with not matching sequence number! Submitted with \(sequenceNumber) but invoked with \(pingResponse.sequenceNumber)!")
50+
continuation.resume(returning: pingResponse)
5351

54-
case .success(let other):
55-
fatalError("Unexpected message, got: [\(other)]:\(reflecting: type(of: other)) while expected \(PingResponse.self)")
56-
}
57-
})
52+
case .failure(let error):
53+
continuation.resume(throwing: error)
5854

59-
self.channel.writeAndFlush(command, promise: nil)
55+
case .success(let other):
56+
continuation.resume(throwing: SWIMNIOIllegalMessageTypeError("Unexpected message, got: [\(other)]:\(reflecting: type(of: other)) while expected \(PingResponse.self)"))
57+
}
58+
})
59+
60+
self.channel.writeAndFlush(command, promise: nil)
61+
}
6062
}
6163

6264
public func pingRequest(
6365
target: SWIMPeer,
6466
payload: GossipPayload,
6567
from origin: SWIMPingRequestOriginPeer,
6668
timeout: DispatchTimeInterval,
67-
sequenceNumber: SWIM.SequenceNumber,
68-
onResponse: @escaping (Result<PingResponse, Error>) -> Void
69-
) {
69+
sequenceNumber: SWIM.SequenceNumber
70+
) async throws -> PingResponse {
7071
guard let targetPeer = target as? SWIM.NIOPeer else {
7172
fatalError("Peers MUST be of type SWIM.NIOPeer, yet was: \(target)")
7273
}
7374
guard let originPeer = origin as? SWIM.NIOPeer else {
7475
fatalError("Peers MUST be of type SWIM.NIOPeer, yet was: \(origin)")
7576
}
76-
let message = SWIM.Message.pingRequest(target: targetPeer, replyTo: originPeer, payload: payload, sequenceNumber: sequenceNumber)
7777

78-
let command = SWIMNIOWriteCommand(message: message, to: self.node, replyTimeout: timeout.toNIO, replyCallback: { reply in
79-
switch reply {
80-
case .success(.response(let pingResponse)):
81-
assert(sequenceNumber == pingResponse.sequenceNumber, "callback invoked with not matching sequence number! Submitted with \(sequenceNumber) but invoked with \(pingResponse.sequenceNumber)!")
82-
onResponse(.success(pingResponse))
83-
case .failure(let error):
84-
onResponse(.failure(error))
78+
return try await withCheckedThrowingContinuation { continuation in
79+
let message = SWIM.Message.pingRequest(target: targetPeer, replyTo: originPeer, payload: payload, sequenceNumber: sequenceNumber)
80+
let command = SWIMNIOWriteCommand(message: message, to: self.node, replyTimeout: timeout.toNIO, replyCallback: { reply in
81+
switch reply {
82+
case .success(.response(let pingResponse)):
83+
assert(sequenceNumber == pingResponse.sequenceNumber, "callback invoked with not matching sequence number! Submitted with \(sequenceNumber) but invoked with \(pingResponse.sequenceNumber)!")
84+
continuation.resume(returning: pingResponse)
8585

86-
case .success(let other):
87-
fatalError("Unexpected message, got: \(other) while expected \(PingResponse.self)")
88-
}
89-
})
86+
case .failure(let error):
87+
continuation.resume(throwing: error)
9088

91-
self.channel.writeAndFlush(command, promise: nil)
89+
case .success(let other):
90+
continuation.resume(throwing: SWIMNIOIllegalMessageTypeError("Unexpected message, got: \(other) while expected \(PingResponse.self)"))
91+
}
92+
})
93+
94+
self.channel.writeAndFlush(command, promise: nil)
95+
}
9296
}
9397

9498
public func ack(
@@ -113,14 +117,14 @@ extension SWIM {
113117
self.channel.writeAndFlush(command, promise: nil)
114118
}
115119

116-
public var description: String {
120+
public nonisolated var description: String {
117121
"NIOPeer(\(self.node))"
118122
}
119123
}
120124
}
121125

122126
extension SWIM.NIOPeer: Hashable {
123-
public func hash(into hasher: inout Hasher) {
127+
public nonisolated func hash(into hasher: inout Hasher) {
124128
self.node.hash(into: &hasher)
125129
}
126130

@@ -147,3 +151,15 @@ public struct SWIMNIOTimeoutError: Error, CustomStringConvertible {
147151
"SWIMNIOTimeoutError(timeout: \(self.timeout.prettyDescription), \(self.message))"
148152
}
149153
}
154+
155+
public struct SWIMNIOIllegalMessageTypeError: Error, CustomStringConvertible {
156+
let message: String
157+
158+
init(_ message: String) {
159+
self.message = message
160+
}
161+
162+
public var description: String {
163+
"SWIMNIOIllegalMessageTypeError(\(self.message))"
164+
}
165+
}

0 commit comments

Comments
 (0)