Skip to content

Actor starvation when self-messages and different dispatchers are involved #17341

@clockfly

Description

@clockfly

Problem Statement:

We have a benchmark test which process 10 million message per second(project https://github.com/intel-hadoop/gearpump), during the test, I found sometimes some message cannot be delivered to target actor.

Test case description:

The test is like this:
I have 600 upstream actor, and 600 downstream actors distribued in 60 JVM. the upstream actors will crazily send data to downstream actors in a shuffle way. The upstream actors rely on self-sending message to trigger the repeation of sending.

upstream tasks and downstream tasks will share a custom dispatcher(not default dispatcher, or remote-default-dispatcher).

Upstream Actor logic:

  1. upstream actor A send itself a tick message
  2. in receive of A, sending one messge to downstream actors. After sending, send itself another tick message
  3. repeat step 2

Trouble-shooting and findings

I compiled akka source to add log trace, and confirmed that the lost message is already inserted into target Mailbox queue(SingleConsumerOnlyUnboundedMailbox), and the Malibox(Runnable) is already inserted into internal task queue of fork-join pool. The strange part is that the mbox was NOT get scheduled to a real thead and run.

Finding1: about akka Version

I find version akka 2.3.6 don't have this problem, while version 2.3.9 has.

Finding2: about self-sending message

For my test, the upstream actors need to send themselves repeatedly tick message so that they can trigger themselves to send more message downstream. If I change the self-sending message, to be an indirect self sending, then the problem is gone.
For example, instead of a sending directly to itself A->A, I create a intermediate actor C, A->C->A,

Finding 3: About changes in latest akka version

Since 2.3.6 doesn't have this problem, I browser through the akka history, and find a suspicious change. https://github.com/akka/akka/pull/16152/files#diff-2c9d0638ce3a1f007623ac9d1ead1725R379

The intent of this PR is to make MailBox a subclass of ForkjoinTask, so that we can optimize the performance by fully using fork-join ideas. I doubt the changes in this PR will also create fairness problem.

Here is some change in AkkaForkJoinPool

     extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) with LoadMetrics {
-    override def execute(r: Runnable): Unit =      +    override def execute(r: Runnable): Unit = {
-      if (r eq null) throw new NullPointerException else super.execute(new AkkaForkJoinTask(r))        +      if (r eq null) throw new NullPointerException("The Runnable must not be null")
+      val task =
+        if (r.isInstanceOf[ForkJoinTask[_]]) r.asInstanceOf[ForkJoinTask[Any]]
+        else new AkkaForkJoinTask(r)
+      Thread.currentThread match {
+        case worker: ForkJoinWorkerThread if worker.getPool eq this  task.fork()
+        case _  super.execute(task)
+      }

If actor A is sending itself message, then it will trigger AkkaForkJoinPool.execute, as the calling thread is already in the fork-join pool,

+      Thread.currentThread match {
+        case worker: ForkJoinWorkerThread if worker.getPool eq this  task.fork()
+        case _  super.execute(task)
+      }

So it will pick first branch which execute task.fork()

I doubt task.fork() will have higher priority than the other branch "super.execute(task)", so possibly the later will not get scheduled.

Thanks in advance.
Sean

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions