Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.serializer.{JavaSerializer, Serializer}
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}

private[deploy] class Master(
override val rpcEnv: RpcEnv,
Expand Down Expand Up @@ -1037,6 +1037,8 @@ private[deploy] object Master extends Logging {
val ENDPOINT_NAME = "Master"

def main(argStrings: Array[String]) {
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
exitOnUncaughtException = false))
Utils.initDaemon(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}

private[deploy] class Worker(
override val rpcEnv: RpcEnv,
Expand Down Expand Up @@ -737,6 +737,8 @@ private[deploy] object Worker extends Logging {
val ENDPOINT_NAME = "Worker"

def main(argStrings: Array[String]) {
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
exitOnUncaughtException = false))
Utils.initDaemon(log)
val conf = new SparkConf
val args = new WorkerArguments(argStrings, conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[spark] class Executor(
env: SparkEnv,
userClassPath: Seq[URL] = Nil,
isLocal: Boolean = false,
uncaughtExceptionHandler: UncaughtExceptionHandler = SparkUncaughtExceptionHandler)
uncaughtExceptionHandler: UncaughtExceptionHandler = new SparkUncaughtExceptionHandler)
extends Logging {

logInfo(s"Starting executor ID $executorId on host $executorHostname")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ package org.apache.spark.util
import org.apache.spark.internal.Logging

/**
* The default uncaught exception handler for Executors terminates the whole process, to avoid
* getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better
* to fail fast when things go wrong.
* The default uncaught exception handler for Spark daemons. It terminates the whole process for
* any Errors, and also terminates the process for Exceptions when the exitOnException flag is true.
*
* @param exitOnUncaughtException Whether to exit the process on UncaughtException.
*/
private[spark] object SparkUncaughtExceptionHandler
private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException: Boolean = true)
extends Thread.UncaughtExceptionHandler with Logging {

override def uncaughtException(thread: Thread, exception: Throwable) {
Expand All @@ -40,7 +41,7 @@ private[spark] object SparkUncaughtExceptionHandler
if (!ShutdownHookManager.inShutdown()) {
if (exception.isInstanceOf[OutOfMemoryError]) {
System.exit(SparkExitCode.OOM)
} else {
} else if (exitOnUncaughtException) {
System.exit(SparkExitCode.UNCAUGHT_EXCEPTION)
}
}
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ private[spark] object CallSite {
private[spark] object Utils extends Logging {
val random = new Random()

private val sparkUncaughtExceptionHandler = new SparkUncaughtExceptionHandler

/**
* Define a default value for driver memory here since this value is referenced across the code
* base and nearly all files already use Utils.scala
Expand Down Expand Up @@ -1265,7 +1267,7 @@ private[spark] object Utils extends Logging {
block
} catch {
case e: ControlThrowable => throw e
case t: Throwable => SparkUncaughtExceptionHandler.uncaughtException(t)
case t: Throwable => sparkUncaughtExceptionHandler.uncaughtException(t)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private[mesos] object MesosClusterDispatcher
with CommandLineUtils {

override def main(args: Array[String]) {
Thread.setDefaultUncaughtExceptionHandler(SparkUncaughtExceptionHandler)
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler)
Utils.initDaemon(log)
val conf = new SparkConf
val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf)
Expand Down