Skip to content

Commit a2f30f0

Browse files
author
Devaraj K
committed
Updated SparkUncaughtExceptionHandler to take an argument whether to
kill/not-to-kill the process for the unhandled exceptions with the default value as true, behaviour remains same for unhandled Error instances.
1 parent c6e71f8 commit a2f30f0

File tree

2 files changed

+21
-14
lines changed

2 files changed

+21
-14
lines changed

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -737,7 +737,7 @@ private[deploy] object Worker extends Logging {
737737
val ENDPOINT_NAME = "Worker"
738738

739739
def main(argStrings: Array[String]) {
740-
Thread.setDefaultUncaughtExceptionHandler(SparkUncaughtExceptionHandler)
740+
Thread.setDefaultUncaughtExceptionHandler(SparkUncaughtExceptionHandler(false))
741741
Utils.initDaemon(log)
742742
val conf = new SparkConf
743743
val args = new WorkerArguments(argStrings, conf)

core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,27 +26,34 @@ import org.apache.spark.internal.Logging
2626
*/
2727
private[spark] object SparkUncaughtExceptionHandler
2828
extends Thread.UncaughtExceptionHandler with Logging {
29+
private[this] var exitOnException = true
2930

30-
override def uncaughtException(thread: Thread, exception: Throwable) {
31-
try {
32-
// Make it explicit that uncaught exceptions are thrown when container is shutting down.
33-
// It will help users when they analyze the executor logs
34-
val inShutdownMsg = if (ShutdownHookManager.inShutdown()) "[Container in shutdown] " else ""
35-
val errMsg = "Uncaught exception in thread "
36-
logError(inShutdownMsg + errMsg + thread, exception)
31+
def apply(exitOnException: Boolean): Thread.UncaughtExceptionHandler = {
32+
this.exitOnException = exitOnException
33+
this
34+
}
3735

38-
// We may have been called from a shutdown hook. If so, we must not call System.exit().
39-
// (If we do, we will deadlock.)
40-
if (!ShutdownHookManager.inShutdown()) {
36+
override def uncaughtException(thread: Thread, exception: Throwable) {
37+
// Make it explicit that uncaught exceptions are thrown when process is shutting down.
38+
// It will help users when they analyze the executor logs
39+
val errMsg = "Uncaught exception in thread " + thread
40+
if (ShutdownHookManager.inShutdown()) {
41+
logError("[Process in shutdown] " + errMsg, exception)
42+
} else if (exception.isInstanceOf[Error] ||
43+
(!exception.isInstanceOf[Error] && exitOnException)) {
44+
try {
45+
logError(errMsg + ". Shutting down now..", exception)
4146
if (exception.isInstanceOf[OutOfMemoryError]) {
4247
System.exit(SparkExitCode.OOM)
4348
} else {
4449
System.exit(SparkExitCode.UNCAUGHT_EXCEPTION)
4550
}
51+
} catch {
52+
case oom: OutOfMemoryError => Runtime.getRuntime.halt(SparkExitCode.OOM)
53+
case t: Throwable => Runtime.getRuntime.halt(SparkExitCode.UNCAUGHT_EXCEPTION_TWICE)
4654
}
47-
} catch {
48-
case oom: OutOfMemoryError => Runtime.getRuntime.halt(SparkExitCode.OOM)
49-
case t: Throwable => Runtime.getRuntime.halt(SparkExitCode.UNCAUGHT_EXCEPTION_TWICE)
55+
} else {
56+
logError(errMsg, exception)
5057
}
5158
}
5259

0 commit comments

Comments
 (0)