Skip to content

Commit d6cd9c5

Browse files
committed
Merge pull request #16152 from akka/wip-fjp-optimize-√
Elides the need to allocate an AkkaForkJoinTask for Mailbox-submission to...
2 parents 66aab4d + 4e0bd6d commit d6cd9c5

File tree

7 files changed

+178
-22
lines changed

7 files changed

+178
-22
lines changed

akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
398398
def compare(l: AnyRef, r: AnyRef) = (l, r) match { case (ll: ActorCell, rr: ActorCell) ll.self.path compareTo rr.self.path }
399399
} foreach {
400400
case cell: ActorCell
401-
System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " "
401+
System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.currentStatus + " "
402402
+ cell.mailbox.numberOfMessages + " " + cell.mailbox.systemDrain(SystemMessageList.LNil).size)
403403
}
404404

akka-actor/src/main/scala/akka/actor/ActorSystem.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -760,7 +760,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
760760
case _ Logging.simpleName(cell)
761761
}) +
762762
(cell match {
763-
case real: ActorCell " status=" + real.mailbox.status
763+
case real: ActorCell " status=" + real.mailbox.currentStatus
764764
case _ ""
765765
}) +
766766
" " + (cell.childrenRefs match {

akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
7272
clearActorFields(failedActor)
7373
}
7474
}
75-
assert(mailbox.isSuspended, "mailbox must be suspended during restart, status=" + mailbox.status)
75+
assert(mailbox.isSuspended, "mailbox must be suspended during restart, status=" + mailbox.currentStatus)
7676
if (!setChildrenTerminationReason(ChildrenContainer.Recreation(cause))) finishRecreate(cause, failedActor)
7777
} else {
7878
// need to keep that suspend counter balanced
@@ -118,7 +118,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
118118
* Do create the actor in response to a failure.
119119
*/
120120
protected def faultCreate(): Unit = {
121-
assert(mailbox.isSuspended, "mailbox must be suspended during failed creation, status=" + mailbox.status)
121+
assert(mailbox.isSuspended, "mailbox must be suspended during failed creation, status=" + mailbox.currentStatus)
122122
assert(perpetrator == self)
123123

124124
setReceiveTimeout(Duration.Undefined)

akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import akka.event.{ BusLogging, EventStream }
1212
import com.typesafe.config.{ ConfigFactory, Config }
1313
import akka.util.{ Unsafe, Index }
1414
import scala.annotation.tailrec
15-
import scala.concurrent.forkjoin.{ ForkJoinTask, ForkJoinPool }
15+
import scala.concurrent.forkjoin.{ ForkJoinTask, ForkJoinPool, ForkJoinWorkerThread }
1616
import scala.concurrent.duration.Duration
1717
import scala.concurrent.ExecutionContext
1818
import scala.concurrent.ExecutionContextExecutor
@@ -377,8 +377,16 @@ object ForkJoinExecutorConfigurator {
377377
threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
378378
unhandledExceptionHandler: Thread.UncaughtExceptionHandler)
379379
extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) with LoadMetrics {
380-
override def execute(r: Runnable): Unit =
381-
if (r eq null) throw new NullPointerException else super.execute(new AkkaForkJoinTask(r))
380+
override def execute(r: Runnable): Unit = {
381+
if (r eq null) throw new NullPointerException("The Runnable must not be null")
382+
val task =
383+
if (r.isInstanceOf[ForkJoinTask[_]]) r.asInstanceOf[ForkJoinTask[Any]]
384+
else new AkkaForkJoinTask(r)
385+
Thread.currentThread match {
386+
case worker: ForkJoinWorkerThread if worker.getPool eq this task.fork()
387+
case _ super.execute(task)
388+
}
389+
}
382390

383391
def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism()
384392
}
@@ -391,6 +399,9 @@ object ForkJoinExecutorConfigurator {
391399
override def getRawResult(): Unit = ()
392400
override def setRawResult(unit: Unit): Unit = ()
393401
final override def exec(): Boolean = try { runnable.run(); true } catch {
402+
case ie: InterruptedException
403+
Thread.currentThread.interrupt()
404+
false
394405
case anything: Throwable
395406
val t = Thread.currentThread
396407
t.getUncaughtExceptionHandler match {

akka-actor/src/main/scala/akka/dispatch/Mailbox.scala

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import akka.event.Logging.Error
1414
import scala.concurrent.duration.Duration
1515
import scala.concurrent.duration.FiniteDuration
1616
import scala.annotation.tailrec
17+
import scala.concurrent.forkjoin.ForkJoinTask
1718
import scala.util.control.NonFatal
1819
import com.typesafe.config.Config
1920
/**
@@ -50,7 +51,7 @@ private[akka] object Mailbox {
5051
* INTERNAL API
5152
*/
5253
private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
53-
extends SystemMessageQueue with Runnable {
54+
extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable {
5455

5556
import Mailbox._
5657

@@ -104,22 +105,22 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
104105
protected var _systemQueueDoNotCallMeDirectly: SystemMessage = _ //null by default
105106

106107
@inline
107-
final def status: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset)
108+
final def currentStatus: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset)
108109

109110
@inline
110-
final def shouldProcessMessage: Boolean = (status & shouldNotProcessMask) == 0
111+
final def shouldProcessMessage: Boolean = (currentStatus & shouldNotProcessMask) == 0
111112

112113
@inline
113-
final def suspendCount: Int = status / suspendUnit
114+
final def suspendCount: Int = currentStatus / suspendUnit
114115

115116
@inline
116-
final def isSuspended: Boolean = (status & suspendMask) != 0
117+
final def isSuspended: Boolean = (currentStatus & suspendMask) != 0
117118

118119
@inline
119-
final def isClosed: Boolean = status == Closed
120+
final def isClosed: Boolean = currentStatus == Closed
120121

121122
@inline
122-
final def isScheduled: Boolean = (status & Scheduled) != 0
123+
final def isScheduled: Boolean = (currentStatus & Scheduled) != 0
123124

124125
@inline
125126
protected final def updateStatus(oldStatus: Status, newStatus: Status): Boolean =
@@ -136,7 +137,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
136137
* @return true if the suspend count reached zero
137138
*/
138139
@tailrec
139-
final def resume(): Boolean = status match {
140+
final def resume(): Boolean = currentStatus match {
140141
case Closed
141142
setStatus(Closed); false
142143
case s
@@ -152,7 +153,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
152153
* @return true if the previous suspend count was zero
153154
*/
154155
@tailrec
155-
final def suspend(): Boolean = status match {
156+
final def suspend(): Boolean = currentStatus match {
156157
case Closed
157158
setStatus(Closed); false
158159
case s
@@ -165,7 +166,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
165166
* status was Scheduled or not.
166167
*/
167168
@tailrec
168-
final def becomeClosed(): Boolean = status match {
169+
final def becomeClosed(): Boolean = currentStatus match {
169170
case Closed
170171
setStatus(Closed); false
171172
case s updateStatus(s, Closed) || becomeClosed()
@@ -176,7 +177,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
176177
*/
177178
@tailrec
178179
final def setAsScheduled(): Boolean = {
179-
val s = status
180+
val s = currentStatus
180181
/*
181182
* Only try to add Scheduled bit if pure Open/Suspended, not Closed or with
182183
* Scheduled bit already set.
@@ -190,7 +191,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
190191
*/
191192
@tailrec
192193
final def setAsIdle(): Boolean = {
193-
val s = status
194+
val s = currentStatus
194195
updateStatus(s, s & ~Scheduled) || setAsIdle()
195196
}
196197
/*
@@ -207,13 +208,13 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
207208
// Without calling .head the parameters would be boxed in SystemMessageList wrapper.
208209
Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old.head, _new.head)
209210

210-
final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match {
211+
final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = currentStatus match {
211212
case Open | Scheduled hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages
212213
case Closed false
213214
case _ hasSystemMessageHint || hasSystemMessages
214215
}
215216

216-
final def run = {
217+
override final def run(): Unit = {
217218
try {
218219
if (!isClosed) { //Volatile read, needed here
219220
processAllSystemMessages() //First, deal with any system messages
@@ -225,6 +226,21 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
225226
}
226227
}
227228

229+
override final def getRawResult(): Unit = ()
230+
override final def setRawResult(unit: Unit): Unit = ()
231+
final override def exec(): Boolean = try { run(); false } catch {
232+
case ie: InterruptedException
233+
Thread.currentThread.interrupt()
234+
false
235+
case anything: Throwable
236+
val t = Thread.currentThread
237+
t.getUncaughtExceptionHandler match {
238+
case null
239+
case some some.uncaughtException(t, anything)
240+
}
241+
throw anything
242+
}
243+
228244
/**
229245
* Process the messages in the mailbox
230246
*/
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/**
2+
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
3+
*/
4+
package akka.actor
5+
6+
import akka.testkit.TestProbe
7+
import com.typesafe.config.ConfigFactory
8+
import org.openjdk.jmh.annotations._
9+
10+
import scala.concurrent.duration._
11+
12+
import java.util.concurrent.TimeUnit
13+
14+
@State(Scope.Benchmark)
15+
@BenchmarkMode(Array(Mode.Throughput))
16+
@Fork(1)
17+
@Threads(1)
18+
@Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS, batchSize = 1)
19+
@Measurement(iterations = 20)
20+
class ForkJoinActorBenchmark {
21+
import ForkJoinActorBenchmark._
22+
23+
@Param(Array("1", "5"))
24+
var tpt = 0
25+
26+
@Param(Array("1", "4"))
27+
var threads = ""
28+
29+
implicit var system: ActorSystem = _
30+
31+
@Setup(Level.Trial)
32+
def setup() {
33+
system = ActorSystem("ForkJoinActorBenchmark", ConfigFactory.parseString(
34+
s"""| akka {
35+
| log-dead-letters = off
36+
| actor {
37+
| default-dispatcher {
38+
| executor = "fork-join-executor"
39+
| fork-join-executor {
40+
| parallelism-min = 1
41+
| parallelism-factor = $threads
42+
| parallelism-max = 64
43+
| }
44+
| throughput = $tpt
45+
| }
46+
| }
47+
| }
48+
""".stripMargin))
49+
}
50+
51+
@TearDown(Level.Trial)
52+
def shutdown() {
53+
system.shutdown()
54+
system.awaitTermination()
55+
}
56+
57+
@Benchmark
58+
@Measurement(timeUnit = TimeUnit.MILLISECONDS)
59+
@OperationsPerInvocation(messages)
60+
def pingPong = {
61+
val ping = system.actorOf(Props[ForkJoinActorBenchmark.PingPong])
62+
val pong = system.actorOf(Props[ForkJoinActorBenchmark.PingPong])
63+
64+
ping.tell(message, pong)
65+
66+
val p = TestProbe()
67+
p.watch(ping)
68+
p.expectTerminated(ping, timeout)
69+
p.watch(pong)
70+
p.expectTerminated(pong, timeout)
71+
}
72+
73+
@Benchmark
74+
@Measurement(timeUnit = TimeUnit.MILLISECONDS)
75+
@OperationsPerInvocation(messages)
76+
def floodPipe = {
77+
78+
val end = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], None))
79+
val middle = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], Some(end)))
80+
val penultimate = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], Some(middle)))
81+
val beginning = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], Some(penultimate)))
82+
83+
val p = TestProbe()
84+
p.watch(end)
85+
86+
def send(left: Int): Unit =
87+
if (left > 0) {
88+
beginning ! message
89+
send(left - 1)
90+
}
91+
92+
send(messages / 4) // we have 4 actors in the pipeline
93+
94+
beginning ! stop
95+
96+
p.expectTerminated(end, timeout)
97+
}
98+
}
99+
100+
object ForkJoinActorBenchmark {
101+
final val stop = "stop"
102+
final val message = "message"
103+
final val timeout = 15.seconds
104+
final val messages = 400000
105+
class Pipe(next: Option[ActorRef]) extends Actor {
106+
def receive = {
107+
case m @ `message` =>
108+
if(next.isDefined) next.get forward m
109+
case s @ `stop` =>
110+
context stop self
111+
if(next.isDefined) next.get forward s
112+
}
113+
}
114+
class PingPong extends Actor {
115+
var left = messages / 2
116+
def receive = {
117+
case `message` =>
118+
119+
if (left <= 1)
120+
context stop self
121+
122+
sender() ! message
123+
left -= 1
124+
}
125+
}
126+
}

project/AkkaBuild.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1063,7 +1063,10 @@ object AkkaBuild extends Build {
10631063

10641064
// Change to internal API to fix #15991
10651065
ProblemFilters.exclude[MissingClassProblem]("akka.io.TcpConnection$UpdatePendingWrite$"),
1066-
ProblemFilters.exclude[MissingClassProblem]("akka.io.TcpConnection$UpdatePendingWrite")
1066+
ProblemFilters.exclude[MissingClassProblem]("akka.io.TcpConnection$UpdatePendingWrite"),
1067+
1068+
// Change to optimize use of ForkJoin with Akka's Mailbox
1069+
ProblemFilters.exclude[MissingMethodProblem]("akka.dispatch.Mailbox.status")
10671070
)
10681071
}
10691072

0 commit comments

Comments
 (0)