@@ -296,96 +296,73 @@ final class SWIMShellClusteredTests: ClusteredActorSystemsXCTestCase {
296296 try await self . awaitStatus ( . suspect( incarnation: 0 , suspectedBy: [ first. node] ) , for: targetPeer, on: first, within: . seconds( 1 ) )
297297 }
298298
299- /*
300- func test_swim_shouldNotMarkUnreachable_whenSuspectedByNotEnoughNodes_whenMinTimeoutReached() async throws {
301- let first = await self.setUpFirst()
302- let firstNode = first.cluster.uniqueNode
303- let second = await self.setUpSecond()
304-
305- first.cluster.join(node: second.cluster.uniqueNode.node)
306- try assertAssociated(first, withExactly: second.cluster.uniqueNode)
307- try assertAssociated(second, withExactly: first.cluster.uniqueNode)
308-
309- let probeOnSecond = self.testKit(second).makeTestProbe(expecting: SWIM.Message.self)
310- let remoteMemberRef = first._resolveKnownRemote(probeOnSecond.ref, onRemoteSystem: second)
311- let maxIndependentSuspicions = 10
312- let suspicionTimeoutPeriodsMax = 1000
313- let suspicionTimeoutPeriodsMin = 1
314- let timeSource = TestTimeSource()
315-
316- let ref = try first._spawn(
317- "SWIM",
318- SWIMActorShell.swimTestBehavior(members: [remoteMemberRef], clusterRef: self.firstClusterProbe.ref) { settings in
319- settings.timeSourceNow = timeSource.now
320- settings.lifeguard.suspicionTimeoutMin = .nanoseconds(suspicionTimeoutPeriodsMin)
321- settings.lifeguard.suspicionTimeoutMax = .nanoseconds(suspicionTimeoutPeriodsMax)
322- settings.lifeguard.maxIndependentSuspicions = maxIndependentSuspicions
323- }
324- )
325- ref.tell(.local(.protocolPeriodTick))
326- try self.expectPing(on: probeOnSecond, reply: false)
327- timeSource.tick()
328- let ackProbe = self.testKit(first).makeTestProbe(expecting: SWIM.Message.self)
329- let suspectStatus: SWIM.Status = .suspect(incarnation: 0, suspectedBy: [firstNode.asSWIMNode])
330- ref.tell(.remote(.ping(pingOrigin: ackProbe.ref, payload: .membership([SWIM.Member(peer: remoteMemberRef, status: suspectStatus, protocolPeriod: 0)]), sequenceNumber: 1)))
331-
332- try self.awaitStatus(suspectStatus, for: remoteMemberRef, on: ref, within: .seconds(1))
333- timeSource.tick()
334-
335- for _ in 0 ..< suspicionTimeoutPeriodsMin {
336- ref.tell(.local(.protocolPeriodTick))
337- try self.expectPing(on: probeOnSecond, reply: false)
338- timeSource.tick()
339- }
340-
341- // We need to trigger an additional ping to advance the protocol period
342- // and have the SWIM actor mark the remote node as dead
343- ref.tell(.local(.protocolPeriodTick))
344- try self.firstClusterProbe.expectNoMessage(for: .seconds(1))
345- }
346-
347- /// Passed in `eventStreamProbe` is expected to have been subscribed to the event stream as early as possible,
348- /// as we want to expect the specific reachability event to be sent
349- private func expectReachabilityEvent(
350- _ testKit: ActorTestKit, _ eventStreamProbe: ActorTestProbe<Cluster.Event>,
351- node uniqueNode: UniqueNode, expect expected: Cluster.MemberReachability
352- ) throws {
353- let messages = try eventStreamProbe.fishFor(Cluster.ReachabilityChange.self, within: .seconds(10)) { event in
354- switch event {
355- case .reachabilityChange(let change):
356- return .catchComplete(change)
357- default:
358- return .ignore
359- }
360- }
361- messages.count.shouldEqual(1)
362- guard let change: Cluster.ReachabilityChange = messages.first else {
363- throw testKit.fail("Expected a reachability change, but did not get one on \(testKit.system.cluster.uniqueNode)")
364- }
365- change.member.uniqueNode.shouldEqual(uniqueNode)
366- change.member.reachability.shouldEqual(expected)
367- }
368-
369- private func expectReachabilityInSnapshot(_ testKit: ActorTestKit, node: UniqueNode, expect expected: Cluster.MemberReachability) throws {
370- try testKit.eventually(within: .seconds(3)) {
371- let p11 = testKit.spawnEventStreamTestProbe(subscribedTo: testKit.system.cluster.events)
372- guard case .some(Cluster.Event.snapshot(let snapshot)) = try p11.maybeExpectMessage() else {
373- throw testKit.error("Expected snapshot, was: \(String(reflecting: p11.lastMessage))")
374- }
375-
376- if let secondMember = snapshot.uniqueMember(node) {
377- if secondMember.reachability == expected {
378- return
379- } else {
380- throw testKit.error("Expected \(node) on \(testKit.system.cluster.uniqueNode) to be [\(expected)] but was: \(secondMember)")
381- }
382- } else {
383- pinfo("Unable to assert reachability of \(node) on \(testKit.system.cluster.uniqueNode) since membership did not contain it. Was: \(snapshot)")
384- () // it may have technically been removed already, so this is "fine"
385- }
386- }
387- }
388- */
299+ func test_swim_shouldNotMarkUnreachable_whenSuspectedByNotEnoughNodes_whenMinTimeoutReached( ) async throws {
300+ let maxIndependentSuspicions = 10
301+ let suspicionTimeoutPeriodsMax = 1000
302+ let suspicionTimeoutPeriodsMin = 1
303+ let timeSource = TestTimeSource ( )
304+
305+ let firstNode = await self . setUpFirst ( ) { settings in
306+ settings. swim. timeSourceNow = timeSource. now
307+ settings. swim. lifeguard. suspicionTimeoutMin = . nanoseconds( suspicionTimeoutPeriodsMin)
308+ settings. swim. lifeguard. suspicionTimeoutMax = . nanoseconds( suspicionTimeoutPeriodsMax)
309+ settings. swim. lifeguard. maxIndependentSuspicions = maxIndependentSuspicions
310+ }
311+ let secondNode = await self . setUpSecond ( )
312+ let thirdNode = await self . setUpThird ( )
313+
314+ firstNode. cluster. join ( node: secondNode. cluster. uniqueNode. node)
315+ thirdNode. cluster. join ( node: secondNode. cluster. uniqueNode. node)
316+ try assertAssociated ( firstNode, withExactly: [ secondNode. cluster. uniqueNode, thirdNode. cluster. uniqueNode] )
317+ try assertAssociated ( secondNode, withExactly: [ firstNode. cluster. uniqueNode, thirdNode. cluster. uniqueNode] )
318+
319+ guard let first = firstNode. _cluster? . _swimShell else {
320+ throw testKit ( firstNode) . fail ( " SWIM shell of [ \( firstNode) ] should not be nil " )
321+ }
322+ guard let second = secondNode. _cluster? . _swimShell else {
323+ throw testKit ( secondNode) . fail ( " SWIM shell of [ \( secondNode) ] should not be nil " )
324+ }
325+ guard let third = thirdNode. _cluster? . _swimShell else {
326+ throw testKit ( thirdNode) . fail ( " SWIM shell of [ \( thirdNode) ] should not be nil " )
327+ }
328+
329+ try await self . configureSWIM ( for: first, members: [ second, third] )
330+
331+ let originPeer = try SWIMActorShell . resolve ( id: third. id. _asRemote, using: secondNode)
332+ let targetPeer = try SWIMActorShell . resolve ( id: second. id. _asRemote, using: firstNode)
333+
334+ _ = await first. whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): rename once https://github.com/apple/swift/pull/42098 is implemented
335+ __secretlyKnownToBeLocal. handlePeriodicProtocolPeriodTick ( )
336+ }
337+ // FIXME: use a non-responsive test probe
338+ // try self.expectPing(on: probeOnSecond, reply: false)
339+ timeSource. tick ( )
340+
341+ let suspectStatus : SWIM . Status = . suspect( incarnation: 0 , suspectedBy: [ first. node] )
342+
343+ _ = try await first. ping ( origin: originPeer, payload: . membership( [ SWIM . Member ( peer: targetPeer, status: suspectStatus, protocolPeriod: 0 ) ] ) , sequenceNumber: 1 )
344+
345+ try await self . awaitStatus ( suspectStatus, for: targetPeer, on: first, within: . seconds( 1 ) )
346+ timeSource. tick ( )
347+
348+ // for _ in 0 ..< suspicionTimeoutPeriodsMin {
349+ // _ = await first.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): rename once https://github.com/apple/swift/pull/42098 is implemented
350+ // __secretlyKnownToBeLocal.handlePeriodicProtocolPeriodTick()
351+ // }
352+ // // FIXME: use a non-responsive test probe
353+ // try self.expectPing(on: probeOnSecond, reply: false)
354+ // timeSource.tick()
355+ // }
356+ //
357+ // // We need to trigger an additional ping to advance the protocol period
358+ // // and have the SWIM actor mark the remote node as dead
359+ // _ = await first.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): rename once https://github.com/apple/swift/pull/42098 is implemented
360+ // __secretlyKnownToBeLocal.handlePeriodicProtocolPeriodTick()
361+ // }
362+ //
363+ // // FIXME: would second end up with .dead status?
364+ // try await self.awaitStatus(.dead, for: targetPeer, on: first, within: .seconds(1))
365+ }
389366
390367 // ==== ----------------------------------------------------------------------------------------------------------------
391368 // MARK: Gossiping
@@ -439,78 +416,6 @@ final class SWIMShellClusteredTests: ClusteredActorSystemsXCTestCase {
439416 // ==== ------------------------------------------------------------------------------------------------------------
440417 // MARK: utility functions
441418
442- /*
443- struct ForwardedSWIMMessage: Codable {
444- let message: SWIM.Message
445- let recipient: _ActorRef<SWIM.Message>
446- }
447-
448- func forwardingSWIMBehavior(forwardTo ref: _ActorRef<ForwardedSWIMMessage>) -> _Behavior<SWIM.Message> {
449- .receive { context, message in
450- ref.tell(.init(message: message, recipient: context.myself))
451- return .same
452- }
453- }
454-
455- func expectPing(
456- on probe: ActorTestProbe<SWIM.Message>, reply: Bool, incarnation: SWIM.Incarnation = 0,
457- file: StaticString = #file, line: UInt = #line, column: UInt = #column,
458- assertPayload: (SWIM.GossipPayload) throws -> Void = { _ in
459- }
460- ) throws {
461- switch try probe.expectMessage(file: file, line: line, column: column) {
462- case .remote(.ping(let replyTo, let payload, let sequenceNumber)):
463- try assertPayload(payload)
464- if reply {
465- replyTo.tell(.remote(.pingResponse(.ack(target: probe.ref, incarnation: incarnation, payload: .none, sequenceNumber: sequenceNumber))))
466- }
467- case let message:
468- throw probe.error("Expected to receive `.ping`, received \(message) instead")
469- }
470- }
471-
472- func awaitStatus(
473- _ status: SWIM.Status, for peer: _ActorRef<SWIM.Message>,
474- on swimShell: _ActorRef<SWIM.Message>, within timeout: Duration,
475- file: StaticString = #file, line: UInt = #line, column: UInt = #column
476- ) throws {
477- let testKit = self._testKits.first!
478- let stateProbe = testKit.makeTestProbe(expecting: [SWIM.Member].self)
479-
480- try testKit.eventually(within: timeout, file: file, line: line, column: column) {
481- swimShell.tell(._testing(._getMembershipState(replyTo: stateProbe.ref)))
482- let membership = try stateProbe.expectMessage()
483-
484- let otherStatus = membership
485- .first(where: { $0.peer as! SWIM.Ref == peer })
486- .map(\.status)
487- guard otherStatus == status else {
488- throw testKit.error("Expected status [\(status)] for [\(peer)], but found \(otherStatus.debugDescription); Membership: \(membership)", file: file, line: line)
489- }
490- }
491- }
492-
493- func holdStatus(
494- _ status: SWIM.Status, for peer: _ActorRef<SWIM.Message>,
495- on swimShell: _ActorRef<SWIM.Message>, within timeout: Duration,
496- file: StaticString = #file, line: UInt = #line, column: UInt = #column
497- ) throws {
498- let testKit = self._testKits.first!
499- let stateProbe = testKit.makeTestProbe(expecting: [SWIM.Member].self)
500-
501- try testKit.assertHolds(for: timeout, file: file, line: line, column: column) {
502- swimShell.tell(._testing(._getMembershipState(replyTo: stateProbe.ref)))
503- let membership = try stateProbe.expectMessage()
504- let otherStatus = membership
505- .first(where: { $0.peer as! SWIM.Ref == peer })
506- .map(\.status)
507- guard otherStatus == status else {
508- throw testKit.error("Expected status [\(status)] for [\(peer)], but found \(otherStatus.debugDescription)")
509- }
510- }
511- }
512- */
513-
514419 private func configureSWIM( for swimShell: SWIMActorShell , members: [ SWIMActorShell ] ) async throws {
515420 var memberStatus : [ SWIMActorShell : SWIM . Status ] = [ : ]
516421 for member in members {
@@ -563,6 +468,7 @@ final class SWIMShellClusteredTests: ClusteredActorSystemsXCTestCase {
563468 let otherStatus = membership
564469 . first ( where: { $0. peer as! SWIMActorShell == peer } )
565470 . map ( \. status)
471+
566472 guard otherStatus == status else {
567473 throw testKit. error ( " Expected status [ \( status) ] for [ \( peer) ], but found \( otherStatus. debugDescription) ; Membership: \( membership) " , file: file, line: line)
568474 }
0 commit comments