Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
def compare(l: AnyRef, r: AnyRef) = (l, r) match { case (ll: ActorCell, rr: ActorCell) ⇒ ll.self.path compareTo rr.self.path }
} foreach {
case cell: ActorCell ⇒
System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " "
System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.currentStatus + " "
+ cell.mailbox.numberOfMessages + " " + cell.mailbox.systemDrain(SystemMessageList.LNil).size)
}

Expand Down
2 changes: 1 addition & 1 deletion akka-actor/src/main/scala/akka/actor/ActorSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
case _ ⇒ Logging.simpleName(cell)
}) +
(cell match {
case real: ActorCell ⇒ " status=" + real.mailbox.status
case real: ActorCell ⇒ " status=" + real.mailbox.currentStatus
case _ ⇒ ""
}) +
" " + (cell.childrenRefs match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
clearActorFields(failedActor)
}
}
assert(mailbox.isSuspended, "mailbox must be suspended during restart, status=" + mailbox.status)
assert(mailbox.isSuspended, "mailbox must be suspended during restart, status=" + mailbox.currentStatus)
if (!setChildrenTerminationReason(ChildrenContainer.Recreation(cause))) finishRecreate(cause, failedActor)
} else {
// need to keep that suspend counter balanced
Expand Down Expand Up @@ -118,7 +118,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
* Do create the actor in response to a failure.
*/
protected def faultCreate(): Unit = {
assert(mailbox.isSuspended, "mailbox must be suspended during failed creation, status=" + mailbox.status)
assert(mailbox.isSuspended, "mailbox must be suspended during failed creation, status=" + mailbox.currentStatus)
assert(perpetrator == self)

setReceiveTimeout(Duration.Undefined)
Expand Down
17 changes: 14 additions & 3 deletions akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import akka.event.{ BusLogging, EventStream }
import com.typesafe.config.{ ConfigFactory, Config }
import akka.util.{ Unsafe, Index }
import scala.annotation.tailrec
import scala.concurrent.forkjoin.{ ForkJoinTask, ForkJoinPool }
import scala.concurrent.forkjoin.{ ForkJoinTask, ForkJoinPool, ForkJoinWorkerThread }
import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContextExecutor
Expand Down Expand Up @@ -377,8 +377,16 @@ object ForkJoinExecutorConfigurator {
threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
unhandledExceptionHandler: Thread.UncaughtExceptionHandler)
extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) with LoadMetrics {
override def execute(r: Runnable): Unit =
if (r eq null) throw new NullPointerException else super.execute(new AkkaForkJoinTask(r))
override def execute(r: Runnable): Unit = {
if (r eq null) throw new NullPointerException("The Runnable must not be null")
val task =
if (r.isInstanceOf[ForkJoinTask[_]]) r.asInstanceOf[ForkJoinTask[Any]]
else new AkkaForkJoinTask(r)
Thread.currentThread match {
case worker: ForkJoinWorkerThread if worker.getPool eq this ⇒ task.fork()
case _ ⇒ super.execute(task)
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More efficient lines for Java FJP:

if (ForkJoinTask.getPool eq p) t.fork()
else p.execute(t)

and for Scala's FJP:

package scala.concurrent.forkjoin

Thread.currentThread() match {
  case wt: ForkJoinWorkerThread if wt.getPool eq p => wt.workQueue.push(t)
  case _ => p.externalPush(t)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't want to go in and rely on package-protected details, but:

if (ForkJoinTask.getPool eq this) task.fork()
else super.execute(task)

Looks slightly cleaner.

}

def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism()
}
Expand All @@ -391,6 +399,9 @@ object ForkJoinExecutorConfigurator {
override def getRawResult(): Unit = ()
override def setRawResult(unit: Unit): Unit = ()
final override def exec(): Boolean = try { runnable.run(); true } catch {
case ie: InterruptedException ⇒
Thread.currentThread.interrupt()
false
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an open question whether this should be true or false, I tend to think false since it was Interrupted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. This should be false since interruption isn't normal completion.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't return false always? It will save from redundant signalling of normal completion of the internal task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a great point, Andriy.
Nobody will be referring to the FJT anyway.
On Nov 1, 2014 5:26 PM, "Andriy Plokhotnyuk" [email protected]
wrote:

In akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala:

@@ -391,6 +392,9 @@ object ForkJoinExecutorConfigurator {
override def getRawResult(): Unit = ()
override def setRawResult(unit: Unit): Unit = ()
final override def exec(): Boolean = try { runnable.run(); true } catch {

  •  case ie: InterruptedException ⇒
    
  •    Thread.currentThread.interrupt()
    
  •    false
    

Why don't return false always? It will save from redundant signalling of
normal completion of the internal task.


Reply to this email directly or view it on GitHub
https://github.com/akka/akka/pull/16152/files#r19705216.

case anything: Throwable ⇒
val t = Thread.currentThread
t.getUncaughtExceptionHandler match {
Expand Down
44 changes: 30 additions & 14 deletions akka-actor/src/main/scala/akka/dispatch/Mailbox.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import akka.event.Logging.Error
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import scala.annotation.tailrec
import scala.concurrent.forkjoin.ForkJoinTask
import scala.util.control.NonFatal
import com.typesafe.config.Config
/**
Expand Down Expand Up @@ -50,7 +51,7 @@ private[akka] object Mailbox {
* INTERNAL API
*/
private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
extends SystemMessageQueue with Runnable {
extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable {

import Mailbox._

Expand Down Expand Up @@ -104,22 +105,22 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
protected var _systemQueueDoNotCallMeDirectly: SystemMessage = _ //null by default

@inline
final def status: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset)
final def currentStatus: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset)

@inline
final def shouldProcessMessage: Boolean = (status & shouldNotProcessMask) == 0
final def shouldProcessMessage: Boolean = (currentStatus & shouldNotProcessMask) == 0

@inline
final def suspendCount: Int = status / suspendUnit
final def suspendCount: Int = currentStatus / suspendUnit

@inline
final def isSuspended: Boolean = (status & suspendMask) != 0
final def isSuspended: Boolean = (currentStatus & suspendMask) != 0

@inline
final def isClosed: Boolean = status == Closed
final def isClosed: Boolean = currentStatus == Closed

@inline
final def isScheduled: Boolean = (status & Scheduled) != 0
final def isScheduled: Boolean = (currentStatus & Scheduled) != 0

@inline
protected final def updateStatus(oldStatus: Status, newStatus: Status): Boolean =
Expand All @@ -136,7 +137,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
* @return true if the suspend count reached zero
*/
@tailrec
final def resume(): Boolean = status match {
final def resume(): Boolean = currentStatus match {
case Closed ⇒
setStatus(Closed); false
case s ⇒
Expand All @@ -152,7 +153,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
* @return true if the previous suspend count was zero
*/
@tailrec
final def suspend(): Boolean = status match {
final def suspend(): Boolean = currentStatus match {
case Closed ⇒
setStatus(Closed); false
case s ⇒
Expand All @@ -165,7 +166,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
* status was Scheduled or not.
*/
@tailrec
final def becomeClosed(): Boolean = status match {
final def becomeClosed(): Boolean = currentStatus match {
case Closed ⇒
setStatus(Closed); false
case s ⇒ updateStatus(s, Closed) || becomeClosed()
Expand All @@ -176,7 +177,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
*/
@tailrec
final def setAsScheduled(): Boolean = {
val s = status
val s = currentStatus
/*
* Only try to add Scheduled bit if pure Open/Suspended, not Closed or with
* Scheduled bit already set.
Expand All @@ -190,7 +191,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
*/
@tailrec
final def setAsIdle(): Boolean = {
val s = status
val s = currentStatus
updateStatus(s, s & ~Scheduled) || setAsIdle()
}
/*
Expand All @@ -207,13 +208,13 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
// Without calling .head the parameters would be boxed in SystemMessageList wrapper.
Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old.head, _new.head)

final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match {
final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = currentStatus match {
case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages
case Closed ⇒ false
case _ ⇒ hasSystemMessageHint || hasSystemMessages
}

final def run = {
override final def run(): Unit = {
try {
if (!isClosed) { //Volatile read, needed here
processAllSystemMessages() //First, deal with any system messages
Expand All @@ -225,6 +226,21 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
}
}

override final def getRawResult(): Unit = ()
override final def setRawResult(unit: Unit): Unit = ()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

final override def exec(): Boolean = try { run(); false } catch {
case ie: InterruptedException ⇒
Thread.currentThread.interrupt()
false
case anything: Throwable ⇒
val t = Thread.currentThread
t.getUncaughtExceptionHandler match {
case null ⇒
case some ⇒ some.uncaughtException(t, anything)
}
throw anything
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this imply that FJP does not catch things and also does not tell the uncaughtException handler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly so

}

/**
* Process the messages in the mailbox
*/
Expand Down
126 changes: 126 additions & 0 deletions akka-bench-jmh/src/main/scala/akka/actor/ForkJoinActorBenchmark.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations._

import scala.concurrent.duration._

import java.util.concurrent.TimeUnit

@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
@Fork(1)
@Threads(1)
@Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS, batchSize = 1)
@Measurement(iterations = 20)
class ForkJoinActorBenchmark {
import ForkJoinActorBenchmark._

@Param(Array("1", "5"))
var tpt = 0

@Param(Array("1", "4"))
var threads = ""

implicit var system: ActorSystem = _

@Setup(Level.Trial)
def setup() {
system = ActorSystem("ForkJoinActorBenchmark", ConfigFactory.parseString(
s"""| akka {
| log-dead-letters = off
| actor {
| default-dispatcher {
| executor = "fork-join-executor"
| fork-join-executor {
| parallelism-min = 1
| parallelism-factor = $threads
| parallelism-max = 64
| }
| throughput = $tpt
| }
| }
| }
""".stripMargin))
}

@TearDown(Level.Trial)
def shutdown() {
system.shutdown()
system.awaitTermination()
}

@Benchmark
@Measurement(timeUnit = TimeUnit.MILLISECONDS)
@OperationsPerInvocation(messages)
def pingPong = {
val ping = system.actorOf(Props[ForkJoinActorBenchmark.PingPong])
val pong = system.actorOf(Props[ForkJoinActorBenchmark.PingPong])

ping.tell(message, pong)

val p = TestProbe()
p.watch(ping)
p.expectTerminated(ping, timeout)
p.watch(pong)
p.expectTerminated(pong, timeout)
}

@Benchmark
@Measurement(timeUnit = TimeUnit.MILLISECONDS)
@OperationsPerInvocation(messages)
def floodPipe = {

val end = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], None))
val middle = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], Some(end)))
val penultimate = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], Some(middle)))
val beginning = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], Some(penultimate)))

val p = TestProbe()
p.watch(end)

def send(left: Int): Unit =
if (left > 0) {
beginning ! message
send(left - 1)
}

send(messages / 4) // we have 4 actors in the pipeline

beginning ! stop

p.expectTerminated(end, timeout)
}
}

object ForkJoinActorBenchmark {
final val stop = "stop"
final val message = "message"
final val timeout = 15.seconds
final val messages = 400000
class Pipe(next: Option[ActorRef]) extends Actor {
def receive = {
case m @ `message` =>
if(next.isDefined) next.get forward m
case s @ `stop` =>
context stop self
if(next.isDefined) next.get forward s
}
}
class PingPong extends Actor {
var left = messages / 2
def receive = {
case `message` =>

if (left <= 1)
context stop self

sender() ! message
left -= 1
}
}
}
5 changes: 4 additions & 1 deletion project/AkkaBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,10 @@ object AkkaBuild extends Build {

// Change to internal API to fix #15991
ProblemFilters.exclude[MissingClassProblem]("akka.io.TcpConnection$UpdatePendingWrite$"),
ProblemFilters.exclude[MissingClassProblem]("akka.io.TcpConnection$UpdatePendingWrite")
ProblemFilters.exclude[MissingClassProblem]("akka.io.TcpConnection$UpdatePendingWrite"),

// Change to optimize use of ForkJoin with Akka's Mailbox
ProblemFilters.exclude[MissingMethodProblem]("akka.dispatch.Mailbox.status")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this trips someone over then they get what they deserve ;-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dungeon strikes back!

)
}

Expand Down