Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Sources/DistributedActors/ActorSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,8 @@ extension ActorSystem: ActorRefFactory {
dispatcher = self.dispatcher
case .callingThread:
dispatcher = CallingThreadDispatcher()
case .dispatchQueue(let queue):
dispatcher = DispatchQueueDispatcher(queue: queue)
case .nio(let group):
dispatcher = NIOEventLoopGroupDispatcher(group)
default:
Expand Down
24 changes: 24 additions & 0 deletions Sources/DistributedActors/Dispatchers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import Dispatch
import NIO

// TODO: Consider renaming to "ActorScheduler" perhaps?
Copy link
Member

@yim-lee yim-lee Jun 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it do though? *Executor (comment right below) might be more fitting?

Copy link
Member Author

@ktoso ktoso Jun 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good point... The word "scheduler" is sadly very overloaded -- could be "at some point in time scheduler" or those "just thread pool manager" – in libs like Rx or Combine those are called schedulers and technically it's the right word...

Akka wording was always "dispatcher" though I guess it's not necessarily the best... There the other words on the JVM are executors indeed. WDYT @drexin ? I'm somehow not in love with "dispatcher" the more i looked at it; "It schedules the actors execution" is kind of the right phrase, but "it executes the actor" also sounds correct I guess (though a bit macabre 😆 though actor internals often sound like that 😉).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note... we also have Scheduler:

internal protocol Scheduler {
    func scheduleOnce(delay: TimeAmount, _ f: @escaping () -> Void) -> Cancelable

    func scheduleOnce<Message>(delay: TimeAmount, receiver: ActorRef<Message>, message: Message) -> Cancelable

    func schedule(initialDelay: TimeAmount, interval: TimeAmount, _ f: @escaping () -> Void) -> Cancelable

    func schedule<Message>(initialDelay: TimeAmount, interval: TimeAmount, receiver: ActorRef<Message>, message: Message) -> Cancelable
}

which is the time related one, unlike MessageDispatcher today which is the "run the actor" one:

public protocol MessageDispatcher {
    // TODO: we should make it dedicated to dispatch() rather than raw executing perhaps? This way it can take care of fairness things

    var name: String { get }

    /// - Returns: `true` iff the mailbox status indicated that the mailbox should be run (still contains pending messages)
    // func registerForExecution(_ mailbox: Mailbox, status: MailboxStatus, hasMessageHint: Bool, hasSystemMessageHint: Bool) -> Bool

    func execute(_ f: @escaping () -> Void)
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TODO there deciphered: // TODO: we should make it dedicated to dispatch() rather than raw executing perhaps? This way it can take care of fairness things

What I mean is scheduler.execute(actorShell) rather than scheduler.execute(mailbox.run), since then it knows which actor it's running and we can then in the future to some "locality" and "prefer the same thread as last time" etc tricks...


/// An `Executor` is a low building block that is able to take blocks and schedule them for running
public protocol MessageDispatcher {
// TODO: we should make it dedicated to dispatch() rather than raw executing perhaps? This way it can take care of fairness things
Expand Down Expand Up @@ -65,6 +67,7 @@ internal struct CallingThreadDispatcher: MessageDispatcher {
}
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: NIO Dispatcher only for internal use

internal struct NIOEventLoopGroupDispatcher: MessageDispatcher {
Expand All @@ -88,3 +91,24 @@ extension NIOEventLoopGroupDispatcher: InternalMessageDispatcher {
self.group.shutdownGracefully(queue: DispatchQueue.global()) { _ in () }
}
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: DispatchQueue Dispatcher

internal struct DispatchQueueDispatcher: MessageDispatcher {
let queue: DispatchQueue

init(queue: DispatchQueue) {
self.queue = queue
}

public var name: String {
"dispatchQueue:\(self.queue)"
}

func execute(_ f: @escaping () -> Void) {
self.queue.async {
f()
}
}
}
4 changes: 4 additions & 0 deletions Sources/DistributedActors/Props.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//
//===----------------------------------------------------------------------===//

import Dispatch
import NIO

// ==== ----------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -83,6 +84,8 @@ public enum DispatcherProps {
// I'd rather implement such style, as it actually is build "for" actors, and not accidentally running them well...
// case OurOwnFancyActorSpecificDispatcher

case dispatchQueue(DispatchQueue)

/// WARNING: Use with Caution!
///
/// This dispatcher will keep a real dedicated Thread for this actor. This is very rarely something you want,
Expand Down Expand Up @@ -110,6 +113,7 @@ public enum DispatcherProps {
case .default: return "default"
case .pinnedThread: return "pinnedThread"
case .nio: return "nioEventLoopGroup"
case .dispatchQueue: return "dispatchQueue"
case .callingThread: return "callingThread"
}
}
Expand Down
40 changes: 40 additions & 0 deletions Tests/DistributedActorsTests/DispatcherTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,44 @@ final class DispatcherTests: ActorSystemXCTestCase {
let dispatcher: String = try p.expectMessage()
dispatcher.dropFirst("Dispatcher: ".count).shouldStartWith(prefix: "nio:")
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Grand Central Dispatch

func test_runOn_dispatchQueue() throws {
let p = self.testKit.spawnTestProbe(expecting: String.self)
let behavior: Behavior<String> = .receive { context, message in
context.log.info("HELLO")
p.tell("\(message)")
p.tell("\((context as! ActorShell<String>)._dispatcher.name)")
return .same
}

let global: DispatchQueue = .global()
let w = try system.spawn(.anonymous, props: .dispatcher(.dispatchQueue(global)), behavior)
w.tell("Hello")
w.tell("World")

func expectWasOnDispatchQueue(p: ActorTestProbe<String>) throws {
#if os(Linux)
try p.expectMessage().shouldContain("Dispatch.DispatchQueue")
#else
try p.expectMessage().shouldContain("OS_dispatch_queue_global:")
#endif
}

try p.expectMessage("Hello")
try expectWasOnDispatchQueue(p: p)

try p.expectMessage("World")
try expectWasOnDispatchQueue(p: p)

for i in 1 ... 100 {
w.tell("\(i)")
}
for i in 1 ... 100 {
try p.expectMessage("\(i)")
try expectWasOnDispatchQueue(p: p)
}
}
}