From 4e0bd6d66178b2d676c7ec1a011a0559f904b636 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 29 Oct 2014 11:24:40 +0100 Subject: [PATCH] =act #16152 - Elides the need to allocate an AkkaForkJoinTask for Mailbox-submission to registerForExecution, by having Mailbox extend ForkJoinTask and using the fact that ForkJoinTask.exec when returning false does not set completion on the task, so it is free to be resubmitted to the ForkJoinPool without reinitialization. Also adds the ability to use fork() when the currentThread is a worker thread of the pool that we want to execute on. Adds a JMH benchmark for both the ping-pong performance and pipelined throughput. --- .../akka/actor/dispatch/ActorModelSpec.scala | 2 +- .../main/scala/akka/actor/ActorSystem.scala | 2 +- .../akka/actor/dungeon/FaultHandling.scala | 4 +- .../akka/dispatch/AbstractDispatcher.scala | 17 ++- .../main/scala/akka/dispatch/Mailbox.scala | 44 ++++-- .../akka/actor/ForkJoinActorBenchmark.scala | 126 ++++++++++++++++++ project/AkkaBuild.scala | 5 +- 7 files changed, 178 insertions(+), 22 deletions(-) create mode 100644 akka-bench-jmh/src/main/scala/akka/actor/ForkJoinActorBenchmark.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index fa48059e5c2..caefab1941f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -398,7 +398,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa def compare(l: AnyRef, r: AnyRef) = (l, r) match { case (ll: ActorCell, rr: ActorCell) ⇒ ll.self.path compareTo rr.self.path } } foreach { case cell: ActorCell ⇒ - System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " " + System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.currentStatus + " " + cell.mailbox.numberOfMessages + " " + cell.mailbox.systemDrain(SystemMessageList.LNil).size) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index d3bd19d88d2..1928b88db17 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -760,7 +760,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, case _ ⇒ Logging.simpleName(cell) }) + (cell match { - case real: ActorCell ⇒ " status=" + real.mailbox.status + case real: ActorCell ⇒ " status=" + real.mailbox.currentStatus case _ ⇒ "" }) + " " + (cell.childrenRefs match { diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala index 26b7c2e626a..e2f9e0f4d24 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala @@ -72,7 +72,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ clearActorFields(failedActor) } } - assert(mailbox.isSuspended, "mailbox must be suspended during restart, status=" + mailbox.status) + assert(mailbox.isSuspended, "mailbox must be suspended during restart, status=" + mailbox.currentStatus) if (!setChildrenTerminationReason(ChildrenContainer.Recreation(cause))) finishRecreate(cause, failedActor) } else { // need to keep that suspend counter balanced @@ -118,7 +118,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ * Do create the actor in response to a failure. */ protected def faultCreate(): Unit = { - assert(mailbox.isSuspended, "mailbox must be suspended during failed creation, status=" + mailbox.status) + assert(mailbox.isSuspended, "mailbox must be suspended during failed creation, status=" + mailbox.currentStatus) assert(perpetrator == self) setReceiveTimeout(Duration.Undefined) diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index ca6c73ec73a..214b183c8c7 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -12,7 +12,7 @@ import akka.event.{ BusLogging, EventStream } import com.typesafe.config.{ ConfigFactory, Config } import akka.util.{ Unsafe, Index } import scala.annotation.tailrec -import scala.concurrent.forkjoin.{ ForkJoinTask, ForkJoinPool } +import scala.concurrent.forkjoin.{ ForkJoinTask, ForkJoinPool, ForkJoinWorkerThread } import scala.concurrent.duration.Duration import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContextExecutor @@ -377,8 +377,16 @@ object ForkJoinExecutorConfigurator { threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, unhandledExceptionHandler: Thread.UncaughtExceptionHandler) extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) with LoadMetrics { - override def execute(r: Runnable): Unit = - if (r eq null) throw new NullPointerException else super.execute(new AkkaForkJoinTask(r)) + override def execute(r: Runnable): Unit = { + if (r eq null) throw new NullPointerException("The Runnable must not be null") + val task = + if (r.isInstanceOf[ForkJoinTask[_]]) r.asInstanceOf[ForkJoinTask[Any]] + else new AkkaForkJoinTask(r) + Thread.currentThread match { + case worker: ForkJoinWorkerThread if worker.getPool eq this ⇒ task.fork() + case _ ⇒ super.execute(task) + } + } def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism() } @@ -391,6 +399,9 @@ object ForkJoinExecutorConfigurator { override def getRawResult(): Unit = () override def setRawResult(unit: Unit): Unit = () final override def exec(): Boolean = try { runnable.run(); true } catch { + case ie: InterruptedException ⇒ + Thread.currentThread.interrupt() + false case anything: Throwable ⇒ val t = Thread.currentThread t.getUncaughtExceptionHandler match { diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 5832bb24fb0..322ffd2bac3 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -14,6 +14,7 @@ import akka.event.Logging.Error import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration import scala.annotation.tailrec +import scala.concurrent.forkjoin.ForkJoinTask import scala.util.control.NonFatal import com.typesafe.config.Config /** @@ -50,7 +51,7 @@ private[akka] object Mailbox { * INTERNAL API */ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) - extends SystemMessageQueue with Runnable { + extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable { import Mailbox._ @@ -104,22 +105,22 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) protected var _systemQueueDoNotCallMeDirectly: SystemMessage = _ //null by default @inline - final def status: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset) + final def currentStatus: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset) @inline - final def shouldProcessMessage: Boolean = (status & shouldNotProcessMask) == 0 + final def shouldProcessMessage: Boolean = (currentStatus & shouldNotProcessMask) == 0 @inline - final def suspendCount: Int = status / suspendUnit + final def suspendCount: Int = currentStatus / suspendUnit @inline - final def isSuspended: Boolean = (status & suspendMask) != 0 + final def isSuspended: Boolean = (currentStatus & suspendMask) != 0 @inline - final def isClosed: Boolean = status == Closed + final def isClosed: Boolean = currentStatus == Closed @inline - final def isScheduled: Boolean = (status & Scheduled) != 0 + final def isScheduled: Boolean = (currentStatus & Scheduled) != 0 @inline protected final def updateStatus(oldStatus: Status, newStatus: Status): Boolean = @@ -136,7 +137,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) * @return true if the suspend count reached zero */ @tailrec - final def resume(): Boolean = status match { + final def resume(): Boolean = currentStatus match { case Closed ⇒ setStatus(Closed); false case s ⇒ @@ -152,7 +153,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) * @return true if the previous suspend count was zero */ @tailrec - final def suspend(): Boolean = status match { + final def suspend(): Boolean = currentStatus match { case Closed ⇒ setStatus(Closed); false case s ⇒ @@ -165,7 +166,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) * status was Scheduled or not. */ @tailrec - final def becomeClosed(): Boolean = status match { + final def becomeClosed(): Boolean = currentStatus match { case Closed ⇒ setStatus(Closed); false case s ⇒ updateStatus(s, Closed) || becomeClosed() @@ -176,7 +177,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) */ @tailrec final def setAsScheduled(): Boolean = { - val s = status + val s = currentStatus /* * Only try to add Scheduled bit if pure Open/Suspended, not Closed or with * Scheduled bit already set. @@ -190,7 +191,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) */ @tailrec final def setAsIdle(): Boolean = { - val s = status + val s = currentStatus updateStatus(s, s & ~Scheduled) || setAsIdle() } /* @@ -207,13 +208,13 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) // Without calling .head the parameters would be boxed in SystemMessageList wrapper. Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old.head, _new.head) - final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match { + final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = currentStatus match { case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages case Closed ⇒ false case _ ⇒ hasSystemMessageHint || hasSystemMessages } - final def run = { + override final def run(): Unit = { try { if (!isClosed) { //Volatile read, needed here processAllSystemMessages() //First, deal with any system messages @@ -225,6 +226,21 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) } } + override final def getRawResult(): Unit = () + override final def setRawResult(unit: Unit): Unit = () + final override def exec(): Boolean = try { run(); false } catch { + case ie: InterruptedException ⇒ + Thread.currentThread.interrupt() + false + case anything: Throwable ⇒ + val t = Thread.currentThread + t.getUncaughtExceptionHandler match { + case null ⇒ + case some ⇒ some.uncaughtException(t, anything) + } + throw anything + } + /** * Process the messages in the mailbox */ diff --git a/akka-bench-jmh/src/main/scala/akka/actor/ForkJoinActorBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/ForkJoinActorBenchmark.scala new file mode 100644 index 00000000000..dcc62a2b9c5 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/actor/ForkJoinActorBenchmark.scala @@ -0,0 +1,126 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.actor + +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory +import org.openjdk.jmh.annotations._ + +import scala.concurrent.duration._ + +import java.util.concurrent.TimeUnit + +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.Throughput)) +@Fork(1) +@Threads(1) +@Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS, batchSize = 1) +@Measurement(iterations = 20) +class ForkJoinActorBenchmark { + import ForkJoinActorBenchmark._ + + @Param(Array("1", "5")) + var tpt = 0 + + @Param(Array("1", "4")) + var threads = "" + + implicit var system: ActorSystem = _ + + @Setup(Level.Trial) + def setup() { + system = ActorSystem("ForkJoinActorBenchmark", ConfigFactory.parseString( + s"""| akka { + | log-dead-letters = off + | actor { + | default-dispatcher { + | executor = "fork-join-executor" + | fork-join-executor { + | parallelism-min = 1 + | parallelism-factor = $threads + | parallelism-max = 64 + | } + | throughput = $tpt + | } + | } + | } + """.stripMargin)) + } + + @TearDown(Level.Trial) + def shutdown() { + system.shutdown() + system.awaitTermination() + } + + @Benchmark + @Measurement(timeUnit = TimeUnit.MILLISECONDS) + @OperationsPerInvocation(messages) + def pingPong = { + val ping = system.actorOf(Props[ForkJoinActorBenchmark.PingPong]) + val pong = system.actorOf(Props[ForkJoinActorBenchmark.PingPong]) + + ping.tell(message, pong) + + val p = TestProbe() + p.watch(ping) + p.expectTerminated(ping, timeout) + p.watch(pong) + p.expectTerminated(pong, timeout) + } + + @Benchmark + @Measurement(timeUnit = TimeUnit.MILLISECONDS) + @OperationsPerInvocation(messages) + def floodPipe = { + + val end = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], None)) + val middle = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], Some(end))) + val penultimate = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], Some(middle))) + val beginning = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], Some(penultimate))) + + val p = TestProbe() + p.watch(end) + + def send(left: Int): Unit = + if (left > 0) { + beginning ! message + send(left - 1) + } + + send(messages / 4) // we have 4 actors in the pipeline + + beginning ! stop + + p.expectTerminated(end, timeout) + } +} + +object ForkJoinActorBenchmark { + final val stop = "stop" + final val message = "message" + final val timeout = 15.seconds + final val messages = 400000 + class Pipe(next: Option[ActorRef]) extends Actor { + def receive = { + case m @ `message` => + if(next.isDefined) next.get forward m + case s @ `stop` => + context stop self + if(next.isDefined) next.get forward s + } + } + class PingPong extends Actor { + var left = messages / 2 + def receive = { + case `message` => + + if (left <= 1) + context stop self + + sender() ! message + left -= 1 + } + } +} \ No newline at end of file diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 37c54d38e7e..65d1c521695 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -1065,7 +1065,10 @@ object AkkaBuild extends Build { // Change to internal API to fix #15991 ProblemFilters.exclude[MissingClassProblem]("akka.io.TcpConnection$UpdatePendingWrite$"), - ProblemFilters.exclude[MissingClassProblem]("akka.io.TcpConnection$UpdatePendingWrite") + ProblemFilters.exclude[MissingClassProblem]("akka.io.TcpConnection$UpdatePendingWrite"), + + // Change to optimize use of ForkJoin with Akka's Mailbox + ProblemFilters.exclude[MissingMethodProblem]("akka.dispatch.Mailbox.status") ) }