From c6e71f87b0e8adbeadf35be76072c833bdcb3331 Mon Sep 17 00:00:00 2001 From: Devaraj K Date: Mon, 19 Jun 2017 17:39:16 -0700 Subject: [PATCH 1/4] [SPARK-21146] [CORE] Worker should handle and shutdown when any thread gets UncaughtException --- .../src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 1198e3cb05ea..71bd79c696dc 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -38,7 +38,7 @@ import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.internal.Logging import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rpc._ -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils} private[deploy] class Worker( override val rpcEnv: RpcEnv, @@ -737,6 +737,7 @@ private[deploy] object Worker extends Logging { val ENDPOINT_NAME = "Worker" def main(argStrings: Array[String]) { + Thread.setDefaultUncaughtExceptionHandler(SparkUncaughtExceptionHandler) Utils.initDaemon(log) val conf = new SparkConf val args = new WorkerArguments(argStrings, conf) From a2f30f07e7e72dfacb801032bfc71bb9fda1b650 Mon Sep 17 00:00:00 2001 From: Devaraj K Date: Wed, 28 Jun 2017 16:06:39 -0700 Subject: [PATCH 2/4] Updated SparkUncaughtExceptionHandler to take an argument whether to kill/not-to-kill the process for the unhandled exceptions with the default value as true, behaviour remains same for unhandled Error instances. --- .../apache/spark/deploy/worker/Worker.scala | 2 +- .../util/SparkUncaughtExceptionHandler.scala | 33 +++++++++++-------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 71bd79c696dc..785dd7714886 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -737,7 +737,7 @@ private[deploy] object Worker extends Logging { val ENDPOINT_NAME = "Worker" def main(argStrings: Array[String]) { - Thread.setDefaultUncaughtExceptionHandler(SparkUncaughtExceptionHandler) + Thread.setDefaultUncaughtExceptionHandler(SparkUncaughtExceptionHandler(false)) Utils.initDaemon(log) val conf = new SparkConf val args = new WorkerArguments(argStrings, conf) diff --git a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala index 95bf3f58bc77..569ed5770c0e 100644 --- a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala +++ b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala @@ -26,27 +26,34 @@ import org.apache.spark.internal.Logging */ private[spark] object SparkUncaughtExceptionHandler extends Thread.UncaughtExceptionHandler with Logging { + private[this] var exitOnException = true - override def uncaughtException(thread: Thread, exception: Throwable) { - try { - // Make it explicit that uncaught exceptions are thrown when container is shutting down. - // It will help users when they analyze the executor logs - val inShutdownMsg = if (ShutdownHookManager.inShutdown()) "[Container in shutdown] " else "" - val errMsg = "Uncaught exception in thread " - logError(inShutdownMsg + errMsg + thread, exception) + def apply(exitOnException: Boolean): Thread.UncaughtExceptionHandler = { + this.exitOnException = exitOnException + this + } - // We may have been called from a shutdown hook. If so, we must not call System.exit(). - // (If we do, we will deadlock.) - if (!ShutdownHookManager.inShutdown()) { + override def uncaughtException(thread: Thread, exception: Throwable) { + // Make it explicit that uncaught exceptions are thrown when process is shutting down. + // It will help users when they analyze the executor logs + val errMsg = "Uncaught exception in thread " + thread + if (ShutdownHookManager.inShutdown()) { + logError("[Process in shutdown] " + errMsg, exception) + } else if (exception.isInstanceOf[Error] || + (!exception.isInstanceOf[Error] && exitOnException)) { + try { + logError(errMsg + ". Shutting down now..", exception) if (exception.isInstanceOf[OutOfMemoryError]) { System.exit(SparkExitCode.OOM) } else { System.exit(SparkExitCode.UNCAUGHT_EXCEPTION) } + } catch { + case oom: OutOfMemoryError => Runtime.getRuntime.halt(SparkExitCode.OOM) + case t: Throwable => Runtime.getRuntime.halt(SparkExitCode.UNCAUGHT_EXCEPTION_TWICE) } - } catch { - case oom: OutOfMemoryError => Runtime.getRuntime.halt(SparkExitCode.OOM) - case t: Throwable => Runtime.getRuntime.halt(SparkExitCode.UNCAUGHT_EXCEPTION_TWICE) + } else { + logError(errMsg, exception) } } From 56c2e4d1ff167db531bec4a3e5ff15fb412fdfbc Mon Sep 17 00:00:00 2001 From: Devaraj K Date: Fri, 30 Jun 2017 11:48:41 -0700 Subject: [PATCH 3/4] Fixed the review comments --- .../apache/spark/deploy/master/Master.scala | 3 +- .../apache/spark/deploy/worker/Worker.scala | 2 +- .../org/apache/spark/executor/Executor.scala | 2 +- .../util/SparkUncaughtExceptionHandler.scala | 39 ++++++++----------- .../scala/org/apache/spark/util/Utils.scala | 4 +- .../deploy/mesos/MesosClusterDispatcher.scala | 2 +- 6 files changed, 24 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index f10a41286c52..afbaefcff61a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -36,7 +36,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rpc._ import org.apache.spark.serializer.{JavaSerializer, Serializer} -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils} private[deploy] class Master( override val rpcEnv: RpcEnv, @@ -1037,6 +1037,7 @@ private[deploy] object Master extends Logging { val ENDPOINT_NAME = "Master" def main(argStrings: Array[String]) { + Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(false)) Utils.initDaemon(log) val conf = new SparkConf val args = new MasterArguments(argStrings, conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 785dd7714886..1b4a8add329d 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -737,7 +737,7 @@ private[deploy] object Worker extends Logging { val ENDPOINT_NAME = "Worker" def main(argStrings: Array[String]) { - Thread.setDefaultUncaughtExceptionHandler(SparkUncaughtExceptionHandler(false)) + Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(false)) Utils.initDaemon(log) val conf = new SparkConf val args = new WorkerArguments(argStrings, conf) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 19e7eb086f41..21f0db103918 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -56,7 +56,7 @@ private[spark] class Executor( env: SparkEnv, userClassPath: Seq[URL] = Nil, isLocal: Boolean = false, - uncaughtExceptionHandler: UncaughtExceptionHandler = SparkUncaughtExceptionHandler) + uncaughtExceptionHandler: UncaughtExceptionHandler = new SparkUncaughtExceptionHandler) extends Logging { logInfo(s"Starting executor ID $executorId on host $executorHostname") diff --git a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala index 569ed5770c0e..2a137354fc9a 100644 --- a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala +++ b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala @@ -20,40 +20,33 @@ package org.apache.spark.util import org.apache.spark.internal.Logging /** - * The default uncaught exception handler for Executors terminates the whole process, to avoid - * getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better - * to fail fast when things go wrong. + * The default uncaught exception handler for Spark daemons. It terminates the whole process for + * any Errors, and also terminates the process for Exceptions when the exitOnException flag is true. */ -private[spark] object SparkUncaughtExceptionHandler +private[spark] class SparkUncaughtExceptionHandler(val exitOnException: Boolean = true) extends Thread.UncaughtExceptionHandler with Logging { - private[this] var exitOnException = true - - def apply(exitOnException: Boolean): Thread.UncaughtExceptionHandler = { - this.exitOnException = exitOnException - this - } override def uncaughtException(thread: Thread, exception: Throwable) { - // Make it explicit that uncaught exceptions are thrown when process is shutting down. - // It will help users when they analyze the executor logs - val errMsg = "Uncaught exception in thread " + thread - if (ShutdownHookManager.inShutdown()) { - logError("[Process in shutdown] " + errMsg, exception) - } else if (exception.isInstanceOf[Error] || - (!exception.isInstanceOf[Error] && exitOnException)) { - try { + try { + // Make it explicit that uncaught exceptions are thrown when process is shutting down. + // It will help users when they analyze the executor logs + val errMsg = "Uncaught exception in thread " + thread + if (ShutdownHookManager.inShutdown()) { + logError("[Process in shutdown] " + errMsg, exception) + } else if (exception.isInstanceOf[Error] || + (!exception.isInstanceOf[Error] && exitOnException)) { logError(errMsg + ". Shutting down now..", exception) if (exception.isInstanceOf[OutOfMemoryError]) { System.exit(SparkExitCode.OOM) } else { System.exit(SparkExitCode.UNCAUGHT_EXCEPTION) } - } catch { - case oom: OutOfMemoryError => Runtime.getRuntime.halt(SparkExitCode.OOM) - case t: Throwable => Runtime.getRuntime.halt(SparkExitCode.UNCAUGHT_EXCEPTION_TWICE) + } else { + logError(errMsg, exception) } - } else { - logError(errMsg, exception) + } catch { + case oom: OutOfMemoryError => Runtime.getRuntime.halt(SparkExitCode.OOM) + case t: Throwable => Runtime.getRuntime.halt(SparkExitCode.UNCAUGHT_EXCEPTION_TWICE) } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index bbb7999e2a14..c1a6433b7548 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -76,6 +76,8 @@ private[spark] object CallSite { private[spark] object Utils extends Logging { val random = new Random() + private val sparkUncaughtExceptionHandler = new SparkUncaughtExceptionHandler + /** * Define a default value for driver memory here since this value is referenced across the code * base and nearly all files already use Utils.scala @@ -1265,7 +1267,7 @@ private[spark] object Utils extends Logging { block } catch { case e: ControlThrowable => throw e - case t: Throwable => SparkUncaughtExceptionHandler.uncaughtException(t) + case t: Throwable => sparkUncaughtExceptionHandler.uncaughtException(t) } } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala index 38b082ac0119..aa378c9d340f 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala @@ -97,7 +97,7 @@ private[mesos] object MesosClusterDispatcher with CommandLineUtils { override def main(args: Array[String]) { - Thread.setDefaultUncaughtExceptionHandler(SparkUncaughtExceptionHandler) + Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler) Utils.initDaemon(log) val conf = new SparkConf val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf) From 3d5106b4b0ec67350e89b1b7579cd2c164bc1b4b Mon Sep 17 00:00:00 2001 From: Devaraj K Date: Thu, 6 Jul 2017 14:00:43 -0700 Subject: [PATCH 4/4] Fixed the review comments --- .../apache/spark/deploy/master/Master.scala | 3 ++- .../apache/spark/deploy/worker/Worker.scala | 3 ++- .../util/SparkUncaughtExceptionHandler.scala | 23 ++++++++++--------- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index afbaefcff61a..723737dce73d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -1037,7 +1037,8 @@ private[deploy] object Master extends Logging { val ENDPOINT_NAME = "Master" def main(argStrings: Array[String]) { - Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(false)) + Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler( + exitOnUncaughtException = false)) Utils.initDaemon(log) val conf = new SparkConf val args = new MasterArguments(argStrings, conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 1b4a8add329d..3c554ef727d0 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -737,7 +737,8 @@ private[deploy] object Worker extends Logging { val ENDPOINT_NAME = "Worker" def main(argStrings: Array[String]) { - Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(false)) + Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler( + exitOnUncaughtException = false)) Utils.initDaemon(log) val conf = new SparkConf val args = new WorkerArguments(argStrings, conf) diff --git a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala index 2a137354fc9a..e0f5af5250e7 100644 --- a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala +++ b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala @@ -22,27 +22,28 @@ import org.apache.spark.internal.Logging /** * The default uncaught exception handler for Spark daemons. It terminates the whole process for * any Errors, and also terminates the process for Exceptions when the exitOnException flag is true. + * + * @param exitOnUncaughtException Whether to exit the process on UncaughtException. */ -private[spark] class SparkUncaughtExceptionHandler(val exitOnException: Boolean = true) +private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException: Boolean = true) extends Thread.UncaughtExceptionHandler with Logging { override def uncaughtException(thread: Thread, exception: Throwable) { try { - // Make it explicit that uncaught exceptions are thrown when process is shutting down. + // Make it explicit that uncaught exceptions are thrown when container is shutting down. // It will help users when they analyze the executor logs - val errMsg = "Uncaught exception in thread " + thread - if (ShutdownHookManager.inShutdown()) { - logError("[Process in shutdown] " + errMsg, exception) - } else if (exception.isInstanceOf[Error] || - (!exception.isInstanceOf[Error] && exitOnException)) { - logError(errMsg + ". Shutting down now..", exception) + val inShutdownMsg = if (ShutdownHookManager.inShutdown()) "[Container in shutdown] " else "" + val errMsg = "Uncaught exception in thread " + logError(inShutdownMsg + errMsg + thread, exception) + + // We may have been called from a shutdown hook. If so, we must not call System.exit(). + // (If we do, we will deadlock.) + if (!ShutdownHookManager.inShutdown()) { if (exception.isInstanceOf[OutOfMemoryError]) { System.exit(SparkExitCode.OOM) - } else { + } else if (exitOnUncaughtException) { System.exit(SparkExitCode.UNCAUGHT_EXCEPTION) } - } else { - logError(errMsg, exception) } } catch { case oom: OutOfMemoryError => Runtime.getRuntime.halt(SparkExitCode.OOM)