Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.serializer.{JavaSerializer, Serializer}
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}

private[deploy] class Master(
override val rpcEnv: RpcEnv,
Expand Down Expand Up @@ -1037,6 +1037,7 @@ private[deploy] object Master extends Logging {
val ENDPOINT_NAME = "Master"

def main(argStrings: Array[String]) {
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(false))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: false => exitOnException = false

Utils.initDaemon(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}

private[deploy] class Worker(
override val rpcEnv: RpcEnv,
Expand Down Expand Up @@ -737,6 +737,7 @@ private[deploy] object Worker extends Logging {
val ENDPOINT_NAME = "Worker"

def main(argStrings: Array[String]) {
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(false))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: false => exitOnException = false

Utils.initDaemon(log)
val conf = new SparkConf
val args = new WorkerArguments(argStrings, conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[spark] class Executor(
env: SparkEnv,
userClassPath: Seq[URL] = Nil,
isLocal: Boolean = false,
uncaughtExceptionHandler: UncaughtExceptionHandler = SparkUncaughtExceptionHandler)
uncaughtExceptionHandler: UncaughtExceptionHandler = new SparkUncaughtExceptionHandler)
extends Logging {

logInfo(s"Starting executor ID $executorId on host $executorHostname")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,29 @@ package org.apache.spark.util
import org.apache.spark.internal.Logging

/**
* The default uncaught exception handler for Executors terminates the whole process, to avoid
* getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better
* to fail fast when things go wrong.
* The default uncaught exception handler for Spark daemons. It terminates the whole process for
* any Errors, and also terminates the process for Exceptions when the exitOnException flag is true.
*/
private[spark] object SparkUncaughtExceptionHandler
private[spark] class SparkUncaughtExceptionHandler(val exitOnException: Boolean = true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please also document exitOnException using

@param exitOnException ...

extends Thread.UncaughtExceptionHandler with Logging {

override def uncaughtException(thread: Thread, exception: Throwable) {
try {
// Make it explicit that uncaught exceptions are thrown when container is shutting down.
// Make it explicit that uncaught exceptions are thrown when process is shutting down.
// It will help users when they analyze the executor logs
val inShutdownMsg = if (ShutdownHookManager.inShutdown()) "[Container in shutdown] " else ""
val errMsg = "Uncaught exception in thread "
logError(inShutdownMsg + errMsg + thread, exception)

// We may have been called from a shutdown hook. If so, we must not call System.exit().
// (If we do, we will deadlock.)
if (!ShutdownHookManager.inShutdown()) {
val errMsg = "Uncaught exception in thread " + thread
if (ShutdownHookManager.inShutdown()) {
logError("[Process in shutdown] " + errMsg, exception)
} else if (exception.isInstanceOf[Error] ||
(!exception.isInstanceOf[Error] && exitOnException)) {
logError(errMsg + ". Shutting down now..", exception)
if (exception.isInstanceOf[OutOfMemoryError]) {
System.exit(SparkExitCode.OOM)
} else {
System.exit(SparkExitCode.UNCAUGHT_EXCEPTION)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current changes are too much. Could you rename exitOnException to exitOnUncaughtException and just change this line to

if (exitOnUncaughtException) {
  System.exit(SparkExitCode.UNCAUGHT_EXCEPTION)
}

}
} else {
logError(errMsg, exception)
}
} catch {
case oom: OutOfMemoryError => Runtime.getRuntime.halt(SparkExitCode.OOM)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you keep these codes? It unlikely happens but since the codes are there, it's better to not change it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing, this code is still there but I moved the try&catch to the block where we invoke System.exit. Do you mean moving the whole code in uncaughtException() to try block and having the catch block?

+      } catch {
+        case oom: OutOfMemoryError => Runtime.getRuntime.halt(SparkExitCode.OOM)
+        case t: Throwable => Runtime.getRuntime.halt(SparkExitCode.UNCAUGHT_EXCEPTION_TWICE)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean moving the whole code in uncaughtException() to try block and having the catch block?

Yes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing Thanks for the clarification.

Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ private[spark] object CallSite {
private[spark] object Utils extends Logging {
val random = new Random()

private val sparkUncaughtExceptionHandler = new SparkUncaughtExceptionHandler

/**
* Define a default value for driver memory here since this value is referenced across the code
* base and nearly all files already use Utils.scala
Expand Down Expand Up @@ -1265,7 +1267,7 @@ private[spark] object Utils extends Logging {
block
} catch {
case e: ControlThrowable => throw e
case t: Throwable => SparkUncaughtExceptionHandler.uncaughtException(t)
case t: Throwable => sparkUncaughtExceptionHandler.uncaughtException(t)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private[mesos] object MesosClusterDispatcher
with CommandLineUtils {

override def main(args: Array[String]) {
Thread.setDefaultUncaughtExceptionHandler(SparkUncaughtExceptionHandler)
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler)
Utils.initDaemon(log)
val conf = new SparkConf
val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf)
Expand Down