Skip to content

Commit b3adef3

Browse files
authored
Support seed nodes (#1005)
Resolves #318
1 parent a8b9e64 commit b3adef3

File tree

3 files changed

+50
-7
lines changed

3 files changed

+50
-7
lines changed

Sources/DistributedActors/ClusterSystemSettings.swift

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -400,14 +400,14 @@ protocol ClusterSystemInstrumentationProvider {
400400
/// in an attempt to form (or join) a cluster with those nodes. These typically should include a few initial contact points, but can also include
401401
/// all the nodes of an existing cluster.
402402
public struct ServiceDiscoverySettings {
403-
internal let implementation: AnyServiceDiscovery
404-
private let _subscribe: (@escaping (Result<[Node], Error>) -> Void, @escaping (CompletionReason) -> Void) -> CancellationToken
403+
let implementation: ServiceDiscoveryImplementation
404+
private let _subscribe: (@escaping (Result<[Node], Error>) -> Void, @escaping (CompletionReason) -> Void) -> CancellationToken?
405405

406406
public init<Discovery, S>(_ implementation: Discovery, service: S)
407407
where Discovery: ServiceDiscovery, Discovery.Instance == Node,
408408
S == Discovery.Service
409409
{
410-
self.implementation = AnyServiceDiscovery(implementation)
410+
self.implementation = .dynamic(AnyServiceDiscovery(implementation))
411411
self._subscribe = { onNext, onComplete in
412412
implementation.subscribe(to: service, onNext: onNext, onComplete: onComplete)
413413
}
@@ -418,15 +418,32 @@ public struct ServiceDiscoverySettings {
418418
S == Discovery.Service
419419
{
420420
let mappedDiscovery: MapInstanceServiceDiscovery<Discovery, Node> = implementation.mapInstance(transformer)
421-
self.implementation = AnyServiceDiscovery(mappedDiscovery)
421+
self.implementation = .dynamic(AnyServiceDiscovery(mappedDiscovery))
422422
self._subscribe = { onNext, onComplete in
423423
mappedDiscovery.subscribe(to: service, onNext: onNext, onComplete: onComplete)
424424
}
425425
}
426426

427+
public init(static nodes: Set<Node>) {
428+
self.implementation = .static(nodes)
429+
self._subscribe = { onNext, _ in
430+
// Call onNext once and never again since the list of nodes doesn't change
431+
onNext(.success(Array(nodes)))
432+
// Ignore onComplete because static service discovery never terminates
433+
434+
// No cancellation token
435+
return nil
436+
}
437+
}
438+
427439
/// Similar to `ServiceDiscovery.subscribe` however it allows the handling of the listings to be generic and handled by the cluster system.
428440
/// This function is only intended for internal use by the `DiscoveryShell`.
429-
func subscribe(onNext nextResultHandler: @escaping (Result<[Node], Error>) -> Void, onComplete completionHandler: @escaping (CompletionReason) -> Void) -> CancellationToken {
441+
func subscribe(onNext nextResultHandler: @escaping (Result<[Node], Error>) -> Void, onComplete completionHandler: @escaping (CompletionReason) -> Void) -> CancellationToken? {
430442
self._subscribe(nextResultHandler, completionHandler)
431443
}
444+
445+
enum ServiceDiscoveryImplementation {
446+
case `static`(Set<Node>)
447+
case dynamic(AnyServiceDiscovery)
448+
}
432449
}

Sources/DistributedActorsTestKit/ShouldMatchers.swift

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,19 @@ extension Equatable {
324324
}
325325
}
326326

327+
extension Hashable {
328+
/// Asserts that the value is an element in the given set
329+
public func shouldBeIn(_ set: Set<Self>, file: StaticString = #filePath, line: UInt = #line, column: UInt = #column) {
330+
if set.contains(self) {
331+
()
332+
} else {
333+
let callSite = CallSiteInfo(file: file, line: line, column: column, function: #function)
334+
let error = callSite.error("Expected \(self) to be an item in \(set)")
335+
XCTFail("\(error)", file: callSite.file, line: callSite.line)
336+
}
337+
}
338+
}
339+
327340
extension Bool {
328341
public func shouldBe(_ expected: Bool, file: StaticString = #filePath, line: UInt = #line, column: UInt = #column) {
329342
let csInfo = CallSiteInfo(file: file, line: line, column: column, function: #function)

Tests/DistributedActorsTests/Cluster/ClusterDiscoveryTests.swift

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the Swift Distributed Actors open source project
44
//
5-
// Copyright (c) 2018-2019 Apple Inc. and the Swift Distributed Actors project authors
5+
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
66
// Licensed under Apache License v2.0
77
//
88
// See LICENSE.txt for license information
@@ -22,7 +22,6 @@ import XCTest
2222
final class ClusterDiscoveryTests: ClusterSystemXCTestCase {
2323
let A = Cluster.Member(node: UniqueNode(node: Node(systemName: "A", host: "1.1.1.1", port: 7337), nid: .random()), status: .up)
2424
let B = Cluster.Member(node: UniqueNode(node: Node(systemName: "B", host: "2.2.2.2", port: 8228), nid: .random()), status: .up)
25-
let C = Cluster.Member(node: UniqueNode(node: Node(systemName: "C", host: "2.2.2.2", port: 9119), nid: .random()), status: .up)
2625

2726
func test_discovery_shouldInitiateJoinsToNewlyDiscoveredNodes() throws {
2827
let discovery = TestTriggeredServiceDiscovery<String, Node>()
@@ -64,6 +63,20 @@ final class ClusterDiscoveryTests: ClusterSystemXCTestCase {
6463
node3.shouldEqual(self.B.uniqueNode.node)
6564
}
6665

66+
func test_discovery_shouldInitiateJoinsToStaticNodes() throws {
67+
let nodes = Set([self.A, self.B].map(\.uniqueNode.node))
68+
let settings = ServiceDiscoverySettings(static: Set(nodes))
69+
let clusterProbe = testKit.makeTestProbe(expecting: ClusterShell.Message.self)
70+
_ = try system._spawn("discovery", DiscoveryShell(settings: settings, cluster: clusterProbe.ref).behavior)
71+
72+
try clusterProbe.expectMessages(count: 2).forEach { message in
73+
guard case .command(.handshakeWith(let node)) = message else {
74+
throw testKit.fail(line: #line - 1)
75+
}
76+
node.shouldBeIn(nodes)
77+
}
78+
}
79+
6780
func test_discovery_shouldHandleMappingsWhenDiscoveryHasItsOwnTypes() throws {
6881
struct ExampleK8sService: Hashable {
6982
let name: String

0 commit comments

Comments
 (0)