Skip to content

Commit 7e1ff7c

Browse files
authored
+system allow parking() when clustering, to be refactored when cluster (#459)
becomes a Transport()
1 parent 499771b commit 7e1ff7c

File tree

5 files changed

+27
-29
lines changed

5 files changed

+27
-29
lines changed

IntegrationTests/tests_03_xpc_actorable/it_XPCActorable_echo_service/main.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,5 @@ try! _file.append("service booted...\n")
3636
let service = try XPCActorableService(system, XPCEchoService.init)
3737

3838
service.park()
39-
//system.park()
39+
system.park() // TODO system park should invoke the service park, we only need to park once for XPC to kickoff dispatch_main
4040
// unreachable, park never exits

Samples/Sources/SampleCluster/main.swift

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,5 +93,4 @@ if system.cluster.node.port == 7337 { // <2>
9393
}
9494
// end::cluster-sample-actors-discover-and-chat[]
9595

96-
system.park()
97-
Thread.sleep(.seconds(60)) // TODO: make park halt execution here
96+
system.park(atMost: .seconds(60))

Samples/Sources/SampleDiningPhilosophers/DiningPhilosophers.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,6 @@ struct DiningPhilosophers {
3232
let _: Philosopher.Ref = try system.spawn("Cory", Philosopher(left: fork3, right: fork4).behavior)
3333
let _: Philosopher.Ref = try system.spawn("Norman", Philosopher(left: fork4, right: fork5).behavior)
3434

35-
Thread.sleep(time)
35+
system.park(atMost: time)
3636
}
3737
}

Samples/Sources/SampleDiningPhilosophers/DistributedDiningPhilosophers.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,6 @@ struct DistributedDiningPhilosophers {
7373
_ = try systemC.spawn("Cory", Philosopher(left: fork3, right: fork4).behavior)
7474
_ = try systemC.spawn("Norman", Philosopher(left: fork4, right: fork5).behavior)
7575

76-
Thread.sleep(time)
76+
systemA.park(atMost: time)
7777
}
7878
}

Sources/DistributedActors/ActorSystem.swift

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,8 @@ public final class ActorSystem {
103103

104104
// ==== ----------------------------------------------------------------------------------------------------------------
105105
// MARK: Shutdown
106-
private var shutdownReceptacle: BlockingReceptacle<Void>?
107-
private let shutdownLock: Lock = Lock()
106+
private var shutdownReceptacle = BlockingReceptacle<Void>()
107+
private let shutdownLock = Lock()
108108

109109
/// Greater than 0 shutdown has been initiated / is in progress.
110110
private let shutdownFlag = Atomic(value: 0)
@@ -277,14 +277,26 @@ public final class ActorSystem {
277277
self.init("ActorSystem")
278278
}
279279

280-
/// Some transports require being able to take over the applications thread.
281-
/// E.g. this may mean invoking `dispatchMain()` or TODO: `ProcessIsolated.park()`
282-
public func park() {
283-
self.log.info("Parking actor system...")
280+
/// Parks the current thread (usually "main thread") until the system is terminated,
281+
/// of the optional timeout is exceeded.
282+
///
283+
/// This call is also offered to underlying transports which may have to perform the blocking wait themselves
284+
/// (most notably, `ProcessIsolated` does so). Please refer to your configured transports documentation,
285+
/// to learn about exact semantics of parking a system while using them.
286+
public func park(atMost parkTimeout: TimeAmount? = nil) {
287+
let howLongParkingMsg = parkTimeout == nil ? "indefinitely" : "for \(parkTimeout!.prettyDescription)"
288+
self.log.info("Parking actor system \(howLongParkingMsg)...")
289+
284290
for transport in self.settings.transports {
285291
self.log.info("Offering transport [\(transport.protocolName)] chance to park the thread...")
286292
transport.onActorSystemPark()
287293
}
294+
295+
if let maxParkingTime = parkTimeout {
296+
self.shutdownReceptacle.wait(atMost: maxParkingTime)
297+
} else {
298+
self.shutdownReceptacle.wait()
299+
}
288300
}
289301

290302
#if SACT_TESTS_LEAKS
@@ -317,24 +329,11 @@ public final class ActorSystem {
317329
/// - Returns: A `Shutdown` value that can be waited upon until the system has completed the shutdown.
318330
@discardableResult
319331
public func shutdown() -> Shutdown {
320-
let (shutdownAlreadyRunning, receptacle) = self.shutdownLock.withLock { () -> (Bool, BlockingReceptacle<Void>) in
321-
if let receptacle = self.shutdownReceptacle {
322-
return (true, receptacle)
323-
} else {
324-
let receptacle = BlockingReceptacle<Void>()
325-
self.shutdownReceptacle = receptacle
326-
return (false, receptacle)
327-
}
332+
guard self.shutdownFlag.add(1) == 0 else {
333+
// shutdown already kicked off by someone else
334+
return Shutdown(receptacle: self.shutdownReceptacle)
328335
}
329336

330-
// if the shutdown has previously been initiated, we just return the
331-
// existing receptacle
332-
if shutdownAlreadyRunning {
333-
return Shutdown(receptacle: receptacle)
334-
}
335-
336-
_ = self.shutdownFlag.add(1)
337-
338337
self.serialization = nil
339338
self._cluster = nil
340339

@@ -352,10 +351,10 @@ public final class ActorSystem {
352351
self.dispatcher.shutdown()
353352
try! self._eventLoopGroup.syncShutdownGracefully()
354353
self._receptionist = self.deadLetters.adapted()
355-
receptacle.offerOnce(())
354+
self.shutdownReceptacle.offerOnce(())
356355
}
357356

358-
return Shutdown(receptacle: receptacle)
357+
return Shutdown(receptacle: self.shutdownReceptacle)
359358
}
360359

361360
public var cluster: ClusterControl {

0 commit comments

Comments
 (0)