Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a325d27
+multinode Introduce better integration test infrastructure WIP
ktoso Aug 4, 2022
91a2d1e
=build verify dependencies in `main` CI
ktoso Aug 9, 2022
f7b636e
initial integration test infra
ktoso Aug 9, 2022
f804b14
enable integration in docker
ktoso Aug 9, 2022
a01900b
killall may not be available; just ignore it then (e.g. in container, so
ktoso Aug 10, 2022
9e12e49
[multi-node] working multinode tests though issue with serialization …
ktoso Aug 10, 2022
30ac915
serialization fix - dont use the specialized one for string - TRY
ktoso Aug 10, 2022
7e8421e
wip
ktoso Aug 12, 2022
c79dd52
[InspectKit] Actor leak detection, since we always leak actors now af…
ktoso Aug 12, 2022
ebb5074
disable automatic leak reporting until we're good here
ktoso Aug 12, 2022
2b4f56d
fix encodings
ktoso Aug 12, 2022
5403b95
formatting
ktoso Aug 12, 2022
dc9944c
okey progress in checkpoint fixes; local calls handled with timeout too
ktoso Aug 15, 2022
b7342ad
minor cleanup
ktoso Aug 15, 2022
0036477
regex workaround
ktoso Aug 15, 2022
7c25280
-leak stop leaking well known actors; release them during shutdown (i…
ktoso Aug 15, 2022
34db819
=cluster simplify membership holder to avoid racing with the join() i…
ktoso Aug 15, 2022
83f7701
fix leaks, some cleanups;
ktoso Aug 16, 2022
0d96f53
=test silence logging in ActorTestProbeTests
ktoso Aug 16, 2022
c1e764b
-regex can't use regex on 5.7 on linux because of rdar://98705227
ktoso Aug 16, 2022
717fdab
~rename base test class to SingleClusterSystemXCTestCase to make it m…
ktoso Aug 16, 2022
d1dad32
simplify test_shutdown_shouldCompleteReturnedHandleWhenDone
ktoso Aug 16, 2022
ed1d648
formatting
ktoso Aug 16, 2022
737a5d1
=multinode implemnent test filtering
ktoso Aug 16, 2022
55fab7c
=singleton harden multinode test for singleton
ktoso Aug 16, 2022
ab01b7e
=docker cant keep privileged in docker files
ktoso Aug 16, 2022
e47c16c
=receptionist use OrderedSet to avoid ordering issues ordering bugs/h…
ktoso Aug 16, 2022
3ab44e7
=multinode terminate process if it hangs
ktoso Aug 17, 2022
c0e0ea7
=multinode better handling of stuck processes
ktoso Aug 17, 2022
2d72e32
minor improvement to OrderedSet filter
ktoso Aug 17, 2022
d872d20
=regex workaround for crashes on String.starts(with:) caused by rdar:…
ktoso Aug 17, 2022
fef95e8
=test unlock dumping tests after a lockd up test detected
ktoso Aug 17, 2022
c920b58
remove println
ktoso Aug 17, 2022
4645dfe
=soundness needs workaround for experimental regex now
ktoso Aug 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,13 @@ extension OpLogDistributedReceptionist: LifecycleWatch {
_ guest: Guest,
with key: DistributedReception.Key<Guest>
) async where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem {
self.log.warning("distributed receptionist: checkIn(\(guest), with: \(key)")
let id = guest.id
let key = key.asAnyKey

let id = guest.id
self.log.debug("Receptionist checkIn [\(guest.id)] with key [\(key)]", metadata: [
"receptionist/key": "\(key)",
"receptionist/guest": "\(guest.id)",
])

guard id._isLocal || (id.uniqueNode == actorSystem.cluster.uniqueNode) else {
self.log.warning("""
Expand All @@ -327,7 +330,9 @@ extension OpLogDistributedReceptionist: LifecycleWatch {
"receptionist/key": "\(key)",
"receptionist/guest": "\(id)",
"receptionist/opLog/maxSeqNr": "\(self.ops.maxSeqNr)",
"receptionist/opLog": "\(self.ops.ops)",
"receptionist/opLog": Logger.MetadataValue.array(
self.ops.ops.map { Logger.MetadataValue.string("\($0)") }
),
]
)

Expand Down Expand Up @@ -368,7 +373,11 @@ extension OpLogDistributedReceptionist: LifecycleWatch {
// We immediately flush all already-known registrations;
// as new ones come in, they will be reported to this subscription later on
for alreadyRegisteredAtSubscriptionTime in self.storage.registrations(forKey: subscription.key) ?? [] {
subscription.tryOffer(registration: alreadyRegisteredAtSubscriptionTime)
if subscription.tryOffer(registration: alreadyRegisteredAtSubscriptionTime) {
self.log.notice("OFFERED \(alreadyRegisteredAtSubscriptionTime.actorID) TO \(subscription)")
} else {
self.log.notice("DROPPED \(alreadyRegisteredAtSubscriptionTime.actorID) TO \(subscription)")
}
}
}
}
Expand Down Expand Up @@ -438,7 +447,7 @@ extension OpLogDistributedReceptionist {

let timerTaskKey = key.hashValue
guard self.flushTimerTasks[timerTaskKey] == nil else {
self.log.debug("timer exists")
self.log.trace("Delayed listing flush timer task already exists (key: \(key))")
return // timer exists nothing to do
}

Expand All @@ -460,7 +469,6 @@ extension OpLogDistributedReceptionist {

func onDelayedListingFlushTick(key: AnyDistributedReceptionKey) {
self.log.trace("Run delayed listing flush, key: \(key)")

self.notifySubscribers(of: key)
}

Expand All @@ -474,12 +482,28 @@ extension OpLogDistributedReceptionist {
// Sort registrations by version from oldest to newest so that they are processed in order.
// Otherwise, if we process a newer version (i.e., with bigger sequence number) first, older
// versions will be dropped because they are considered "seen".
let registrations = (self.storage.registrations(forKey: key) ?? []).sorted { l, r in l.version < r.version }
let registrations = (self.storage.registrations(forKey: key) ?? []).sorted { l, r in l.version < r.version } // FIXME: use ordered set or Deque now that we have them
self.log.notice(
"Registrations to flush: \(registrations.count)",
metadata: [
"registrations": Logger.MetadataValue.array(
registrations.map { Logger.MetadataValue.string("\($0)") }
),
]
)

// self.instrumentation.listingPublished(key: key, subscribers: subscriptions.count, registrations: registrations.count) // TODO(distributed): make the instrumentation calls compatible with distributed actor based types
for subscription in subscriptions {
self.log.notice("Offering registrations to subscription: \(subscription))", metadata: [
"registrations": "\(subscription.seenActorRegistrations)",
])

for registration in registrations {
subscription.tryOffer(registration: registration)
if subscription.tryOffer(registration: registration) {
self.log.notice("OFFERED \(registration.actorID) TO \(subscription)")
} else {
self.log.notice("DROPPED \(registration.actorID) TO \(subscription)")
}
}
}
}
Expand Down Expand Up @@ -643,15 +667,8 @@ extension OpLogDistributedReceptionist {
// we effectively initiate the "first pull"
let latestAppliedSeqNrFromPeer = self.appliedSequenceNrs[.actorID(receptionistID)]

let ack = AckOps(
appliedUntil: latestAppliedSeqNrFromPeer,
observedSeqNrs: self.observedSequenceNrs,
peer: self
)

Task { [weak self] in
do {
// assert(self.id.path.description.contains("/system/receptionist"), "Receptionist path did not include /system/receptionist, was: \(self.id.fullDescription)")
guard let __secretlyKnownToBeLocal = self else { return } // FIXME: we need `local`
try await peerReceptionistRef.ackOps(until: latestAppliedSeqNrFromPeer, by: __secretlyKnownToBeLocal)
} catch {
Expand Down Expand Up @@ -702,7 +719,11 @@ extension OpLogDistributedReceptionist {

for subscription in subscriptions {
for registration in registrations {
subscription.tryOffer(registration: registration)
if subscription.tryOffer(registration: registration) {
self.log.notice("OFFERED \(registration.actorID) TO \(subscription)")
} else {
self.log.notice("DROPPED \(registration.actorID) TO \(subscription)")
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import Distributed
import Logging
import OrderedCollections

/// A receptionist is a system actor that allows users to register actors under
/// a key to make them available to other parts of the system, without having to
Expand Down Expand Up @@ -180,7 +181,7 @@ internal struct VersionedRegistration: Hashable {
internal final class DistributedReceptionistStorage {
typealias ReceptionistOp = OpLogDistributedReceptionist.ReceptionistOp

internal var _registrations: [AnyDistributedReceptionKey: Set<VersionedRegistration>] = [:]
internal var _registrations: [AnyDistributedReceptionKey: OrderedSet<VersionedRegistration>] = [:]
internal var _subscriptions: [AnyDistributedReceptionKey: Set<AnyDistributedReceptionListingSubscription>] = [:]

/// Per (receptionist) node mapping of which keys are presently known to this receptionist on the given node.
Expand Down Expand Up @@ -218,7 +219,7 @@ internal final class DistributedReceptionistStorage {
return self.addTo(dict: &self._registrations, key: key, value: versionedRegistration)
}

func removeRegistration<Guest>(key: AnyDistributedReceptionKey, guest: Guest) -> Set<VersionedRegistration>?
func removeRegistration<Guest>(key: AnyDistributedReceptionKey, guest: Guest) -> OrderedSet<VersionedRegistration>?
where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem
{
let address = guest.id
Expand All @@ -232,7 +233,7 @@ internal final class DistributedReceptionistStorage {
return self.removeFrom(dict: &self._registrations, key: key, value: versionedRegistration)
}

func registrations(forKey key: AnyDistributedReceptionKey) -> Set<VersionedRegistration>? {
func registrations(forKey key: AnyDistributedReceptionKey) -> OrderedSet<VersionedRegistration>? {
self._registrations[key]
}

Expand Down Expand Up @@ -304,7 +305,7 @@ internal final class DistributedReceptionistStorage {
for key in keys {
// 1) we remove any registrations that it hosted
let registrations = self._registrations.removeValue(forKey: key) ?? []
let remainingRegistrations = registrations.filter { registration in
let remainingRegistrations = registrations._filter { registration in // FIXME(collections): missing type preserving filter on OrderedSet https://github.com/apple/swift-collections/pull/159
registration.actorID.uniqueNode != node
}
if !remainingRegistrations.isEmpty {
Expand All @@ -327,6 +328,16 @@ internal final class DistributedReceptionistStorage {
}
}

/// - returns: `true` if the value was a newly inserted value, `false` otherwise
private func addTo<Value: Hashable>(dict: inout [AnyDistributedReceptionKey: OrderedSet<Value>], key: AnyDistributedReceptionKey, value: Value) -> Bool {
guard !(dict[key]?.contains(value) ?? false) else {
return false
}

dict[key, default: []].append(value)
return true
}

/// - returns: `true` if the value was a newly inserted value, `false` otherwise
private func addTo<Value: Hashable>(dict: inout [AnyDistributedReceptionKey: Set<Value>], key: AnyDistributedReceptionKey, value: Value) -> Bool {
guard !(dict[key]?.contains(value) ?? false) else {
Expand All @@ -337,6 +348,14 @@ internal final class DistributedReceptionistStorage {
return true
}

private func removeFrom<Value: Hashable>(dict: inout [AnyDistributedReceptionKey: OrderedSet<Value>], key: AnyDistributedReceptionKey, value: Value) -> OrderedSet<Value>? {
if dict[key]?.remove(value) != nil, dict[key]?.isEmpty ?? false {
dict.removeValue(forKey: key)
}

return dict[key]
}

private func removeFrom<Value: Hashable>(dict: inout [AnyDistributedReceptionKey: Set<Value>], key: AnyDistributedReceptionKey, value: Value) -> Set<Value>? {
if dict[key]?.remove(value) != nil, dict[key]?.isEmpty ?? false {
dict.removeValue(forKey: key)
Expand All @@ -354,7 +373,7 @@ internal final class DistributedReceptionistStorage {
// ==== ----------------------------------------------------------------------------------------------------------------

/// Represents a local subscription (for `receptionist.subscribe`) for a specific key.
internal final class AnyDistributedReceptionListingSubscription: Hashable, @unchecked Sendable {
internal final class AnyDistributedReceptionListingSubscription: Hashable, @unchecked Sendable, CustomStringConvertible {
let subscriptionID: ObjectIdentifier
let key: AnyDistributedReceptionKey

Expand All @@ -368,7 +387,7 @@ internal final class AnyDistributedReceptionListingSubscription: Hashable, @unch

/// We very carefully only modify this from the owning actor (receptionist).
// TODO: It would be lovely to be able to express this in the type system as "actor owned" or "actor local" to some actor instance.
private var seenActorRegistrations: VersionVector
var seenActorRegistrations: VersionVector

init(
subscriptionID: ObjectIdentifier,
Expand Down Expand Up @@ -399,19 +418,22 @@ internal final class AnyDistributedReceptionListingSubscription: Hashable, @unch
/// version vector, to see if it "advanced it" - if so, it must be a new registration and we have
/// to emit the value. If it didn't advance the local "seen" version vector, it means we've already
/// seen this actor in this specific stream, and don't need to emit it again.
func tryOffer(registration: VersionedRegistration) {
///
/// - Returns: true if the value was successfully offered
func tryOffer(registration: VersionedRegistration) -> Bool {
let oldSeenRegistrations = self.seenActorRegistrations
self.seenActorRegistrations.merge(other: registration.version)

switch self.seenActorRegistrations.compareTo(oldSeenRegistrations) {
case .same:
// the seen vector was unaffected by the merge, which means that the
// incoming registration version was already seen, and thus we don't need to emit it again
return
return false
case .happenedAfter:
// the incoming registration has not yet been seen before,
// which means that we should emit the actor to the stream.
self.onNext(registration.actorID)
return true
case .concurrent:
fatalError("""
It should not be possible for a version vector to be concurrent with a PAST version of itself before the merge
Expand All @@ -435,4 +457,13 @@ internal final class AnyDistributedReceptionListingSubscription: Hashable, @unch
hasher.combine(self.subscriptionID)
hasher.combine(self.key)
}

var description: String {
var string = "AnyDistributedReceptionListingSubscription(subscriptionID: \(subscriptionID), key: \(key), , seenActorRegistrations: \(seenActorRegistrations)"
#if DEBUG
string += ", at: \(self.file):\(self.line)"
#endif
string += ")"
return string
}
}
29 changes: 29 additions & 0 deletions Sources/DistributedActors/_OrderedSet+Extensions.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import OrderedCollections

extension OrderedSet {
// FIXME(collections): implemented for real in https://github.com/apple/swift-collections/pull/159
@inlinable
internal func _filter(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing operation in OrderedSet; upstreaming better impl: apple/swift-collections#159

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the uncheckedUniqueElements initializer would generally give better performance here:

let members = self.filter(isIncluded)
return OrderedSet(uncheckedUniqueElements: members)

This prevents multiple rehashings of the result while it is being constructed. Copying the contents twice would still be slower than what the package can do, but reducing hash operations would still be a considerable boost!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thanks for the hint!

_ isIncluded: (Element) throws -> Bool
) rethrows -> Self {
var result: OrderedSet = Self()
for element in self where try isIncluded(element) {
result.append(element)
}
return result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,22 @@
//===----------------------------------------------------------------------===//

@testable import DistributedActors
import DistributedActorsConcurrencyHelpers
import Foundation
import XCTest

/// Convenience class for building multi-node (yet same-process) tests with many actor systems involved.
///
/// Systems started using `setUpNode` are automatically terminated upon test completion, and logs are automatically
/// captured and only printed when a test failure occurs.
open class ClusteredActorSystemsXCTestCase: XCTestCase {
internal let lock = DistributedActorsConcurrencyHelpers.Lock()
public private(set) var _nodes: [ClusterSystem] = []
public private(set) var _testKits: [ActorTestKit] = []
public private(set) var _logCaptures: [LogCapture] = []

private var stuckTestDumpLogsTask: Task<Void, Error>?

// ==== ------------------------------------------------------------------------------------------------------------
// MARK: Actor leak detection

Expand Down Expand Up @@ -54,6 +59,12 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase {
true
}

/// Unconditionally dump logs if the test has been running longer than this duration.
/// This is to help diagnose stuck tests.
open var dumpLogsAfter: Duration {
.seconds(60)
}

/// Enables logging all captured logs, even if the test passed successfully.
/// - Default: `false`
open var alwaysPrintCaptureLogs: Bool {
Expand Down Expand Up @@ -84,6 +95,19 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase {
if self.inspectDetectActorLeaks {
self.actorStatsBefore = try InspectKit.actorStats()
}

self.stuckTestDumpLogsTask = Task.detached {
try await Task.sleep(until: .now + self.dumpLogsAfter, clock: .continuous)
guard !Task.isCancelled else {
return
}

print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("!!!!!!!!!! TEST SEEMS STUCK - DUMPING LOGS !!!!!!!!!!")
print("!!!!!!!!!! PID: \(ProcessInfo.processInfo.processIdentifier) !!!!!!!!!!")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
self.printAllCapturedLogs()
}
try await super.setUp()
}

Expand All @@ -101,7 +125,9 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase {
settings.logging.baseLogger = capture.logger(label: name)
settings.swim.logger = settings.logging.baseLogger

self._logCaptures.append(capture)
self.lock.withLockVoid {
self._logCaptures.append(capture)
}
}

settings.autoLeaderElection = .lowestReachable(minNumberOfMembers: 2)
Expand Down Expand Up @@ -129,6 +155,9 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase {
}

override open func tearDown() async throws {
self.stuckTestDumpLogsTask?.cancel()
self.stuckTestDumpLogsTask = nil

try await super.tearDown()

let testsFailed = self.testRun?.totalFailureCount ?? 0 > 0
Expand All @@ -141,9 +170,11 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase {
try! await node.shutdown().wait()
}

self._nodes = []
self._testKits = []
self._logCaptures = []
self.lock.withLockVoid {
self._nodes = []
self._testKits = []
self._logCaptures = []
}

if self.inspectDetectActorLeaks {
try await Task.sleep(until: .now + .seconds(2), clock: .continuous)
Expand Down Expand Up @@ -268,7 +299,9 @@ extension ClusteredActorSystemsXCTestCase {
fatalError("No such node: [\(node)] in [\(self._nodes)]!")
}

return self._logCaptures[index]
return self.lock.withLock {
self._logCaptures[index]
}
}

public func printCapturedLogs(of node: ClusterSystem) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,15 @@ final class DistributedReceptionistTests: SingleClusterSystemXCTestCase {
let forwarderA = Forwarder(probe: probe, name: "A", actorSystem: system)
let forwarderB = Forwarder(probe: probe, name: "B", actorSystem: system)

system.log.notice("Checking in: \(forwarderA.id)")
await receptionist.checkIn(forwarderA, with: .forwarders)
system.log.notice("Checking in: \(forwarderB.id)")
await receptionist.checkIn(forwarderB, with: .forwarders)

var i = 0
system.log.notice("here")
for await forwarder in await receptionist.listing(of: .forwarders) {
system.log.notice("here more \(i): \(forwarder.id)")
i += 1
try await forwarder.forward(message: "test")

Expand Down