Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,14 @@ private[spark] class SparkSubmit extends Logging {
}
sparkConf.set(SUBMIT_PYTHON_FILES, formattedPyFiles.split(",").toSeq)

// set oom error handling in cluster mode
if (sparkConf.get(KILL_ON_OOM_ERROR) && deployMode == CLUSTER) {
val driverJavaOptions = sparkConf.getOption(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS)
.map( _ + " ")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. .map( _ + " ") -> .map(_ + " ")

.getOrElse("") + "-XX:OnOutOfMemoryError=\"kill -9 %p\""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a general PR for SparkSubmit, does this work on Windows?
ExitOnOutOfMemoryError will be a better choice, @skonto .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, I lost some context since this is the 3rd try for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think we should stick w/ OnOutOfMemoryError, because ExitOnOutOfMemoryError was not present till java 8u92 (discussed earlier here: #24796 (comment)). I don't think we specify a minimum version within java 8, so we might have to stick with this.

but yeah, we probably have to make sure it doesn't do anything too weird on windows (does spark actually run in anything other than local mode on windows?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Windows, I prefer to use JVM option. Personally, I don't think Apache Spark 3.0.0 will be used on JDK 8u91 or older. Apache Spark 3.0.0 starts a new age of JDK11. 😄

sparkConf.set(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, driverJavaOptions)
}

(childArgs, childClasspath, sparkConf, childMainClass)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ package object config {
.booleanConf
.createWithDefault(false)

private[spark] val KILL_ON_OOM_ERROR = ConfigBuilder("spark.driver.killOnOOMError")
.doc("Whether to kill the driver on an oom error in cluster mode.")
.booleanConf.createWithDefault(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please split this into two lines like the other conf.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this should be false by default to avoid the behavior change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I think we agreed to go ahead and change the default
#25229 (comment)

3.0 is a good chance to do this

and I agree about splitting to two lines to match the style for other confs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Got it. In that case, I'm +1 for the true by default.

BTW, @squito . Do you think we need to add a migration guide for this behavior change?

Also, cc @gatorsmile .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we agreed to change the default, we want to fail by default.


private[spark] val EVENT_LOG_ENABLED = ConfigBuilder("spark.eventLog.enabled")
.booleanConf
.createWithDefault(false)
Expand Down Expand Up @@ -1431,5 +1435,4 @@ package object config {
.doc("The amount of memory used per page in bytes")
.bytesConf(ByteUnit.BYTE)
.createOptional

Copy link
Member

@dongjoon-hyun dongjoon-hyun Oct 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not touch the irrelevant place.

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,24 @@ if ! [ -z ${HADOOP_CONF_DIR+x} ]; then
SPARK_CLASSPATH="$HADOOP_CONF_DIR:$SPARK_CLASSPATH";
fi

DRIVER_VERBOSE=${DRIVER_VERBOSE:-false}

function get_verbose_flag()
{
if [[ $DRIVER_VERBOSE == "true" ]]; then
echo "--verbose"
else
echo ""
fi
}

case "$1" in
driver)
shift 1
VERBOSE_FLAG=$(get_verbose_flag)
CMD=(
"$SPARK_HOME/bin/spark-submit"
$VERBOSE_FLAG
--conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS"
--deploy-mode client
"$@"
Copy link
Member

@dongjoon-hyun dongjoon-hyun Oct 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file contains orthogonal changes. Please refer this file. You can file a new JIRA issue for this.

Copy link
Contributor Author

@skonto skonto Oct 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun this is required by the tests, so it could be just DEBUG mode, disabled by default (as it is now). It is not meant to be another feature and does no harm. I want to debug the verbose output so tests in K8s can get the java option values set in the driver. Is there another way to trigger this (beyond writing a main that prints them and adding it to Spark examples package)?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.deploy.k8s.integrationtest

import io.fabric8.kubernetes.api.model.Pod

import org.apache.spark.internal.config.KILL_ON_OOM_ERROR
import org.apache.spark.launcher.SparkLauncher

private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite =>
Expand Down Expand Up @@ -103,6 +104,28 @@ private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite =>
.set("spark.files", REMOTE_PAGE_RANK_DATA_FILE)
runSparkRemoteCheckAndVerifyCompletion(appArgs = Array(REMOTE_PAGE_RANK_FILE_NAME))
}

test("Run SparkPi without the default exit on OOM error flag", k8sTestTag) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are suggesting this PR as a bug fix, you need to add SPARK-27900 prefix to the newly added test case names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I can do that.

sparkAppConf
.set("spark.driver.extraJavaOptions", "-Dspark.test.foo=spark.test.bar")
.set("spark.kubernetes.driverEnv.DRIVER_VERBOSE", "true")
Copy link
Member

@dongjoon-hyun dongjoon-hyun Oct 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you try SPARK_PRINT_LAUNCH_COMMAND instead of new DRIVER_VERBOSE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

.set(KILL_ON_OOM_ERROR.key, "false")
val output = Seq("Pi is roughly 3",
"(spark.driver.extraJavaOptions,-Dspark.test.foo=spark.test.bar)")

runSparkPiAndVerifyCompletion(expectedLogOnCompletion = output)
}

test("Run SparkPi with the default exit on OOM error flag set", k8sTestTag) {
sparkAppConf
.set("spark.driver.extraJavaOptions", "-Dspark.test.foo=spark.test.bar")
.set("spark.kubernetes.driverEnv.DRIVER_VERBOSE", "true")

val output = Seq("Pi is roughly 3",
"(spark.driver.extraJavaOptions,-Dspark.test.foo=spark.test.bar " +
"-XX:OnOutOfMemoryError=\"kill -9 %p\")")
runSparkPiAndVerifyCompletion(expectedLogOnCompletion = output)
}
}

private[spark] object BasicTestsSuite {
Expand All @@ -114,3 +137,4 @@ private[spark] object BasicTestsSuite {
"https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt"
val REMOTE_PAGE_RANK_FILE_NAME = "pagerank_data.txt"
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not add this new line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,14 @@ class KubernetesSuite extends SparkFunSuite
appResource: String = containerLocalSparkDistroExamplesJar,
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
expectedLogOnCompletion: Seq[String] = Seq("Pi is roughly 3"),
appArgs: Array[String] = Array.empty[String],
appLocator: String = appLocator,
isJVM: Boolean = true ): Unit = {
runSparkApplicationAndVerifyCompletion(
appResource,
SPARK_PI_MAIN_CLASS,
Seq("Pi is roughly 3"),
expectedLogOnCompletion,
appArgs,
driverPodChecker,
executorPodChecker,
Expand Down