diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index b776ec8f81e06..6d8960e1dcd36 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -787,6 +787,14 @@ private[spark] class SparkSubmit extends Logging { } sparkConf.set(SUBMIT_PYTHON_FILES, formattedPyFiles.split(",").toSeq) + // set oom error handling in cluster mode + if (sparkConf.get(KILL_ON_OOM_ERROR) && deployMode == CLUSTER) { + val driverJavaOptions = sparkConf.getOption(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS) + .map( _ + " ") + .getOrElse("") + "-XX:OnOutOfMemoryError=\"kill -9 %p\"" + sparkConf.set(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, driverJavaOptions) + } + (childArgs, childClasspath, sparkConf, childMainClass) } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 79a6dc159e001..9595fc74089ab 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -111,6 +111,10 @@ package object config { .booleanConf .createWithDefault(false) + private[spark] val KILL_ON_OOM_ERROR = ConfigBuilder("spark.driver.killOnOOMError") + .doc("Whether to kill the driver on an oom error in cluster mode.") + .booleanConf.createWithDefault(true) + private[spark] val EVENT_LOG_ENABLED = ConfigBuilder("spark.eventLog.enabled") .booleanConf .createWithDefault(false) @@ -1431,5 +1435,4 @@ package object config { .doc("The amount of memory used per page in bytes") .bytesConf(ByteUnit.BYTE) .createOptional - } diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index 4fe8df61ef569..feb58c470dbc2 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh @@ -60,11 +60,24 @@ if ! [ -z ${HADOOP_CONF_DIR+x} ]; then SPARK_CLASSPATH="$HADOOP_CONF_DIR:$SPARK_CLASSPATH"; fi +DRIVER_VERBOSE=${DRIVER_VERBOSE:-false} + +function get_verbose_flag() +{ + if [[ $DRIVER_VERBOSE == "true" ]]; then + echo "--verbose" + else + echo "" + fi +} + case "$1" in driver) shift 1 + VERBOSE_FLAG=$(get_verbose_flag) CMD=( "$SPARK_HOME/bin/spark-submit" + $VERBOSE_FLAG --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" --deploy-mode client "$@" diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala index 6b340f2558cca..1d8081ff6af86 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.k8s.integrationtest import io.fabric8.kubernetes.api.model.Pod +import org.apache.spark.internal.config.KILL_ON_OOM_ERROR import org.apache.spark.launcher.SparkLauncher private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite => @@ -103,6 +104,28 @@ private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite => .set("spark.files", REMOTE_PAGE_RANK_DATA_FILE) runSparkRemoteCheckAndVerifyCompletion(appArgs = Array(REMOTE_PAGE_RANK_FILE_NAME)) } + + test("Run SparkPi without the default exit on OOM error flag", k8sTestTag) { + sparkAppConf + .set("spark.driver.extraJavaOptions", "-Dspark.test.foo=spark.test.bar") + .set("spark.kubernetes.driverEnv.DRIVER_VERBOSE", "true") + .set(KILL_ON_OOM_ERROR.key, "false") + val output = Seq("Pi is roughly 3", + "(spark.driver.extraJavaOptions,-Dspark.test.foo=spark.test.bar)") + + runSparkPiAndVerifyCompletion(expectedLogOnCompletion = output) + } + + test("Run SparkPi with the default exit on OOM error flag set", k8sTestTag) { + sparkAppConf + .set("spark.driver.extraJavaOptions", "-Dspark.test.foo=spark.test.bar") + .set("spark.kubernetes.driverEnv.DRIVER_VERBOSE", "true") + + val output = Seq("Pi is roughly 3", + "(spark.driver.extraJavaOptions,-Dspark.test.foo=spark.test.bar " + + "-XX:OnOutOfMemoryError=\"kill -9 %p\")") + runSparkPiAndVerifyCompletion(expectedLogOnCompletion = output) + } } private[spark] object BasicTestsSuite { @@ -114,3 +137,4 @@ private[spark] object BasicTestsSuite { "https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt" val REMOTE_PAGE_RANK_FILE_NAME = "pagerank_data.txt" } + diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 0d4fcccc35cf9..4108da8af336f 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -161,13 +161,14 @@ class KubernetesSuite extends SparkFunSuite appResource: String = containerLocalSparkDistroExamplesJar, driverPodChecker: Pod => Unit = doBasicDriverPodCheck, executorPodChecker: Pod => Unit = doBasicExecutorPodCheck, + expectedLogOnCompletion: Seq[String] = Seq("Pi is roughly 3"), appArgs: Array[String] = Array.empty[String], appLocator: String = appLocator, isJVM: Boolean = true ): Unit = { runSparkApplicationAndVerifyCompletion( appResource, SPARK_PI_MAIN_CLASS, - Seq("Pi is roughly 3"), + expectedLogOnCompletion, appArgs, driverPodChecker, executorPodChecker,