diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index 2097fb8865de9..ab12cfbe5cf33 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh @@ -60,14 +60,44 @@ if ! [ -z ${HADOOP_CONF_DIR+x} ]; then SPARK_CLASSPATH="$HADOOP_CONF_DIR:$SPARK_CLASSPATH"; fi +IGNORE_DEFAULT_DRIVER_JVM_OPTIONS=${IGNORE_DEFAULT_DRIVER_JVM_OPTIONS:-false} +DRIVER_VERBOSE=${DRIVER_VERBOSE:-false} + +function get_verbose_flag() +{ + if [[ $DRIVER_VERBOSE == "true" ]]; then + echo "--verbose" + else + echo "" + fi +} + +function get_args_with_defaults() +{ + if [[ $IGNORE_DEFAULT_DRIVER_JVM_OPTIONS == "true" ]]; then + echo "$@" + else + if grep -q "spark.driver.extraJavaOptions" "/opt/spark/conf/spark.properties"; then + sed 's/spark.driver.extraJavaOptions=/&-XX:OnOutOfMemoryError="kill -9 %p" /g' /opt/spark/conf/spark.properties > /tmp/spark.properties + else + cp /opt/spark/conf/spark.properties /tmp/spark.properties + echo 'spark.driver.extraJavaOptions=-XX:OnOutOfMemoryError="kill -9 %p"' >> /tmp/spark.properties + fi + echo "$@" | sed 's|/opt/spark/conf/spark.properties |/tmp/spark.properties |g' + fi +} + case "$1" in driver) shift 1 + DRIVER_ARGS=$(get_args_with_defaults "$@") + VERBOSE_FLAG=$(get_verbose_flag) CMD=( "$SPARK_HOME/bin/spark-submit" + $VERBOSE_FLAG --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" --deploy-mode client - "$@" + $DRIVER_ARGS ) ;; executor) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala index 6b340f2558cca..ba6b67f0cfa21 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala @@ -98,6 +98,28 @@ private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite => expectedJVMValue = Seq("(spark.test.foo,spark.test.bar)")) } + test("Run SparkPi without the default exit on OOM error flag", k8sTestTag) { + sparkAppConf + .set("spark.driver.extraJavaOptions", "-Dspark.test.foo=spark.test.bar") + .set("spark.kubernetes.driverEnv.DRIVER_VERBOSE", "true") + .set("spark.kubernetes.driverEnv.IGNORE_DEFAULT_DRIVER_JVM_OPTIONS", "true") + + val output = Seq("Pi is roughly 3", + "(spark.driver.extraJavaOptions,-Dspark.test.foo=spark.test.bar)") + + runSparkPiAndVerifyCompletion(expectedLogOnCompletion = output) + } + + test("Run SparkPi with the default exit on OOM error flag set", k8sTestTag) { + sparkAppConf + .set("spark.driver.extraJavaOptions", "-Dspark.test.foo=spark.test.bar") + .set("spark.kubernetes.driverEnv.DRIVER_VERBOSE", "true") + val output = Seq("Pi is roughly 3", + "(spark.driver.extraJavaOptions,-XX:OnOutOfMemoryError=\"kill -9 %p\" " + + "-Dspark.test.foo=spark.test.bar)") + runSparkPiAndVerifyCompletion(expectedLogOnCompletion = output) + } + test("Run SparkRemoteFileTest using a remote data file", k8sTestTag) { sparkAppConf .set("spark.files", REMOTE_PAGE_RANK_DATA_FILE) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 80d5f239a09cc..2d65298cef96b 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -161,13 +161,14 @@ class KubernetesSuite extends SparkFunSuite appResource: String = containerLocalSparkDistroExamplesJar, driverPodChecker: Pod => Unit = doBasicDriverPodCheck, executorPodChecker: Pod => Unit = doBasicExecutorPodCheck, + expectedLogOnCompletion: Seq[String] = Seq("Pi is roughly 3"), appArgs: Array[String] = Array.empty[String], appLocator: String = appLocator, isJVM: Boolean = true ): Unit = { runSparkApplicationAndVerifyCompletion( appResource, SPARK_PI_MAIN_CLASS, - Seq("Pi is roughly 3"), + expectedLogOnCompletion, appArgs, driverPodChecker, executorPodChecker,