Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions Sources/DistributedActors/ClusterSystemSettings.swift
Original file line number Diff line number Diff line change
Expand Up @@ -400,14 +400,14 @@ protocol ClusterSystemInstrumentationProvider {
/// in an attempt to form (or join) a cluster with those nodes. These typically should include a few initial contact points, but can also include
/// all the nodes of an existing cluster.
public struct ServiceDiscoverySettings {
internal let implementation: AnyServiceDiscovery
private let _subscribe: (@escaping (Result<[Node], Error>) -> Void, @escaping (CompletionReason) -> Void) -> CancellationToken
let implementation: ServiceDiscoveryImplementation
private let _subscribe: (@escaping (Result<[Node], Error>) -> Void, @escaping (CompletionReason) -> Void) -> CancellationToken?

public init<Discovery, S>(_ implementation: Discovery, service: S)
where Discovery: ServiceDiscovery, Discovery.Instance == Node,
S == Discovery.Service
{
self.implementation = AnyServiceDiscovery(implementation)
self.implementation = .dynamic(AnyServiceDiscovery(implementation))
self._subscribe = { onNext, onComplete in
implementation.subscribe(to: service, onNext: onNext, onComplete: onComplete)
}
Expand All @@ -418,15 +418,32 @@ public struct ServiceDiscoverySettings {
S == Discovery.Service
{
let mappedDiscovery: MapInstanceServiceDiscovery<Discovery, Node> = implementation.mapInstance(transformer)
self.implementation = AnyServiceDiscovery(mappedDiscovery)
self.implementation = .dynamic(AnyServiceDiscovery(mappedDiscovery))
self._subscribe = { onNext, onComplete in
mappedDiscovery.subscribe(to: service, onNext: onNext, onComplete: onComplete)
}
}

public init(static nodes: Set<Node>) {
self.implementation = .static(nodes)
self._subscribe = { onNext, _ in
// Call onNext once and never again since the list of nodes doesn't change
onNext(.success(Array(nodes)))
// Ignore onComplete because static service discovery never terminates

// No cancellation token
return nil
}
}

/// Similar to `ServiceDiscovery.subscribe` however it allows the handling of the listings to be generic and handled by the cluster system.
/// This function is only intended for internal use by the `DiscoveryShell`.
func subscribe(onNext nextResultHandler: @escaping (Result<[Node], Error>) -> Void, onComplete completionHandler: @escaping (CompletionReason) -> Void) -> CancellationToken {
func subscribe(onNext nextResultHandler: @escaping (Result<[Node], Error>) -> Void, onComplete completionHandler: @escaping (CompletionReason) -> Void) -> CancellationToken? {
self._subscribe(nextResultHandler, completionHandler)
}

enum ServiceDiscoveryImplementation {
case `static`(Set<Node>)
case dynamic(AnyServiceDiscovery)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice 👍

}
13 changes: 13 additions & 0 deletions Sources/DistributedActorsTestKit/ShouldMatchers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,19 @@ extension Equatable {
}
}

extension Hashable {
/// Asserts that the value is an element in the given set
public func shouldBeIn(_ set: Set<Self>, file: StaticString = #filePath, line: UInt = #line, column: UInt = #column) {
if set.contains(self) {
()
} else {
let callSite = CallSiteInfo(file: file, line: line, column: column, function: #function)
let error = callSite.error("Expected \(self) to be an item in \(set)")
XCTFail("\(error)", file: callSite.file, line: callSite.line)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love me some good new should... matchers...! :)

}

extension Bool {
public func shouldBe(_ expected: Bool, file: StaticString = #filePath, line: UInt = #line, column: UInt = #column) {
let csInfo = CallSiteInfo(file: file, line: line, column: column, function: #function)
Expand Down
17 changes: 15 additions & 2 deletions Tests/DistributedActorsTests/Cluster/ClusterDiscoveryTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2019 Apple Inc. and the Swift Distributed Actors project authors
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand All @@ -22,7 +22,6 @@ import XCTest
final class ClusterDiscoveryTests: ClusterSystemXCTestCase {
let A = Cluster.Member(node: UniqueNode(node: Node(systemName: "A", host: "1.1.1.1", port: 7337), nid: .random()), status: .up)
let B = Cluster.Member(node: UniqueNode(node: Node(systemName: "B", host: "2.2.2.2", port: 8228), nid: .random()), status: .up)
let C = Cluster.Member(node: UniqueNode(node: Node(systemName: "C", host: "2.2.2.2", port: 9119), nid: .random()), status: .up)

func test_discovery_shouldInitiateJoinsToNewlyDiscoveredNodes() throws {
let discovery = TestTriggeredServiceDiscovery<String, Node>()
Expand Down Expand Up @@ -64,6 +63,20 @@ final class ClusterDiscoveryTests: ClusterSystemXCTestCase {
node3.shouldEqual(self.B.uniqueNode.node)
}

func test_discovery_shouldInitiateJoinsToStaticNodes() throws {
let nodes = Set([self.A, self.B].map(\.uniqueNode.node))
let settings = ServiceDiscoverySettings(static: Set(nodes))
let clusterProbe = testKit.makeTestProbe(expecting: ClusterShell.Message.self)
_ = try system._spawn("discovery", DiscoveryShell(settings: settings, cluster: clusterProbe.ref).behavior)

try clusterProbe.expectMessages(count: 2).forEach { message in
guard case .command(.handshakeWith(let node)) = message else {
throw testKit.fail(line: #line - 1)
}
node.shouldBeIn(nodes)
}
}

func test_discovery_shouldHandleMappingsWhenDiscoveryHasItsOwnTypes() throws {
struct ExampleK8sService: Hashable {
let name: String
Expand Down