Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,15 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) exte
}
}
} catch {
case ie: InterruptedException => // exit
case _: InterruptedException => // exit
case t: Throwable =>
try {
// Re-submit a MessageLoop so that Dispatcher will still work if
// UncaughtExceptionHandler decides to not kill JVM.
threadpool.execute(new MessageLoop)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should log before calling this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I usually don't log if it's going to be re-thrown since it will cause double logs.

} finally {
throw t
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this will always rethrow even when the MessageLoop is re-submitted? do you intend this to be not finally but catch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. This is going to throw the fatal errors (non fatal errors have been caught and should not reach here) to UncaughtExceptionHandler to let it decide what to do.

}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private[netty] class Inbox(
throw new SparkException(s"Unsupported message $message from ${_sender}")
})
} catch {
case NonFatal(e) =>
case e: Throwable =>
context.sendFailure(e)
// Throw the exception -- this exception will be caught by the safelyCall function.
// The endpoint's onError function will be called.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@

package org.apache.spark.rpc.netty

import java.util.concurrent.ExecutionException

import scala.concurrent.duration._

import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.mockito.MockitoSugar

import org.apache.spark._
import org.apache.spark.network.client.TransportClient
import org.apache.spark.rpc._

class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar {
class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar with TimeLimits {

private implicit val signaler: Signaler = ThreadSignaler

override def createRpcEnv(
conf: SparkConf,
Expand Down Expand Up @@ -84,4 +91,48 @@ class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar {
msg3,
RequestMessage(nettyEnv, client, msg3.serialize(nettyEnv)))
}

test("StackOverflowError should be sent back and Dispatcher should survive") {
val numUsableCores = 2
val conf = new SparkConf
val config = RpcEnvConfig(
conf,
"test",
"localhost",
"localhost",
0,
new SecurityManager(conf),
numUsableCores,
clientMode = false)
val anotherEnv = new NettyRpcEnvFactory().create(config)
anotherEnv.setupEndpoint("StackOverflowError", new RpcEndpoint {
override val rpcEnv = anotherEnv

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
// scalastyle:off throwerror
case msg: String => throw new StackOverflowError
// scalastyle:on throwerror
case num: Int => context.reply(num)
}
})

val rpcEndpointRef = env.setupEndpointRef(anotherEnv.address, "StackOverflowError")
try {
// Send `numUsableCores` messages to trigger `numUsableCores` `StackOverflowError`s
for (_ <- 0 until numUsableCores) {
val e = intercept[SparkException] {
rpcEndpointRef.askSync[String]("hello")
}
// The root cause `e.getCause.getCause` because it is boxed by Scala Promise.
assert(e.getCause.isInstanceOf[ExecutionException])
assert(e.getCause.getCause.isInstanceOf[StackOverflowError])
}
failAfter(10.seconds) {
assert(rpcEndpointRef.askSync[Int](100) === 100)
}
} finally {
anotherEnv.shutdown()
anotherEnv.awaitTermination()
}
}
}