From 98cc0448a4fd9e85b749c54510ae860ad8148e49 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Sun, 6 May 2018 23:41:29 -0400 Subject: [PATCH] restructured step based pattern to resolve comments --- .../spark/deploy/k8s/KubernetesConf.scala | 6 +- .../k8s/features/BasicDriverFeatureStep.scala | 16 +---- .../bindings/JavaDriverFeatureStep.scala | 42 ++++++++++++ .../bindings/PythonDriverFeatureStep.scala | 65 +++++++++++-------- .../submit/KubernetesClientApplication.scala | 2 +- .../k8s/submit/KubernetesDriverBuilder.scala | 14 ++-- .../bindings/JavaDriverFeatureStepSuite.scala | 54 +++++++++++++++ .../PythonDriverFeatureStepSuite.scala | 10 ++- .../submit/KubernetesDriverBuilderSuite.scala | 17 +++-- .../src/main/dockerfiles/spark/entrypoint.sh | 8 ++- 10 files changed, 173 insertions(+), 61 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index d97aaacefa0f..d95f1a1709cb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -67,15 +67,14 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( .map(str => str.split(",").toSeq) .getOrElse(Seq.empty[String]) + def getRoleConf: T = roleSpecificConf + def pyFiles(): Option[String] = sparkConf .get(KUBERNETES_PYSPARK_PY_FILES) def pySparkMainResource(): Option[String] = sparkConf .get(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE) - def pySparkAppArgs(): Option[String] = sparkConf - .get(KUBERNETES_PYSPARK_APP_ARGS) - def pySparkPythonVersion(): String = sparkConf .get(PYSPARK_PYTHON_VERSION) @@ -132,7 +131,6 @@ private[spark] object KubernetesConf { maybePyFiles.foreach{maybePyFiles => additionalFiles.appendAll(maybePyFiles.split(","))} sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res) - sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_APP_ARGS, appArgs.mkString(" ")) } sparkConfWithMainAppJar.setIfMissing(MEMORY_OVERHEAD_FACTOR, 0.4) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 258d4947aae3..01693f2b5ec3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -27,7 +27,6 @@ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.internal.config._ -import org.apache.spark.launcher.SparkLauncher private[spark] class BasicDriverFeatureStep( conf: KubernetesConf[KubernetesDriverSpecificConf]) @@ -77,7 +76,7 @@ private[spark] class BasicDriverFeatureStep( ("cpu", new QuantityBuilder(false).withAmount(limitCores).build()) } - val withoutArgsDriverContainer: ContainerBuilder = new ContainerBuilder(pod.container) + val driverContainer = new ContainerBuilder(pod.container) .withName(DRIVER_CONTAINER_NAME) .withImage(driverContainerImage) .withImagePullPolicy(conf.imagePullPolicy()) @@ -97,19 +96,8 @@ private[spark] class BasicDriverFeatureStep( .addToArgs(driverDockerContainer) .addToArgs("--properties-file", SPARK_CONF_PATH) .addToArgs("--class", conf.roleSpecificConf.mainClass) + .build() - val driverContainer = - if (driverDockerContainer == "driver-py") { - withoutArgsDriverContainer - .build() - } else { - // The user application jar is merged into the spark.jars list and managed through that - // property, so there is no need to reference it explicitly here. - withoutArgsDriverContainer - .addToArgs(SparkLauncher.NO_RESOURCE) - .addToArgs(conf.roleSpecificConf.appArgs: _*) - .build() - } val driverPod = new PodBuilder(pod.pod) .editOrNewMetadata() .withName(driverPodName) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala new file mode 100644 index 000000000000..fda0fff712df --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features.bindings + +import io.fabric8.kubernetes.api.model.ContainerBuilder +import io.fabric8.kubernetes.api.model.HasMetadata + +import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} +import org.apache.spark.deploy.k8s.KubernetesDriverSpecificConf +import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep +import org.apache.spark.launcher.SparkLauncher + +private[spark] class JavaDriverFeatureStep( + kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) + extends KubernetesFeatureConfigStep { + override def configurePod(pod: SparkPod): SparkPod = { + val withDriverArgs = new ContainerBuilder(pod.container) + // The user application jar is merged into the spark.jars list and managed through that + // property, so there is no need to reference it explicitly here. + .addToArgs(SparkLauncher.NO_RESOURCE) + .addToArgs(kubernetesConf.roleSpecificConf.appArgs: _*) + .build() + SparkPod(pod.pod, withDriverArgs) + } + override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty + + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala index 02d8fbc21c15..fb2fbb9dd423 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala @@ -16,42 +16,55 @@ */ package org.apache.spark.deploy.k8s.features.bindings +import scala.collection.JavaConverters._ + import io.fabric8.kubernetes.api.model.ContainerBuilder +import io.fabric8.kubernetes.api.model.EnvVar +import io.fabric8.kubernetes.api.model.EnvVarBuilder import io.fabric8.kubernetes.api.model.HasMetadata -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesDriverSpecificConf import org.apache.spark.deploy.k8s.KubernetesUtils import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep private[spark] class PythonDriverFeatureStep( - kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) + kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) extends KubernetesFeatureConfigStep { override def configurePod(pod: SparkPod): SparkPod = { - val mainResource = kubernetesConf.pySparkMainResource() - require(mainResource.isDefined, "PySpark Main Resource must be defined") - val otherPyFiles = kubernetesConf.pyFiles().map(pyFile => - KubernetesUtils.resolveFileUrisAndPath(pyFile.split(",")) - .mkString(":")).getOrElse("") - val withPythonPrimaryFileContainer = new ContainerBuilder(pod.container) - .addNewEnv() - .withName(ENV_PYSPARK_ARGS) - .withValue(kubernetesConf.pySparkAppArgs().getOrElse("")) - .endEnv() - .addNewEnv() - .withName(ENV_PYSPARK_PRIMARY) - .withValue(KubernetesUtils.resolveFileUri(mainResource.get)) - .endEnv() - .addNewEnv() - .withName(ENV_PYSPARK_FILES) - .withValue(if (otherPyFiles == "") {""} else otherPyFiles) - .endEnv() - .addNewEnv() - .withName(ENV_PYSPARK_PYTHON_VERSION) - .withValue(kubernetesConf.pySparkPythonVersion()) - .endEnv() - .build() - SparkPod(pod.pod, withPythonPrimaryFileContainer) + val roleConf = kubernetesConf.roleSpecificConf + require(roleConf.mainAppResource.isDefined, "PySpark Main Resource must be defined") + val maybePythonArgs: Option[EnvVar] = Option(roleConf.appArgs).filter(_.nonEmpty).map( + s => + new EnvVarBuilder() + .withName(ENV_PYSPARK_ARGS) + .withValue(s.mkString(",")) + .build()) + val maybePythonFiles: Option[EnvVar] = kubernetesConf.pyFiles().map( + pyFiles => + new EnvVarBuilder() + .withName(ENV_PYSPARK_FILES) + .withValue(KubernetesUtils.resolveFileUrisAndPath(pyFiles.split(",")) + .mkString(":")) + .build()) + val envSeq : Seq[EnvVar] = + Seq(new EnvVarBuilder() + .withName(ENV_PYSPARK_PRIMARY) + .withValue(KubernetesUtils.resolveFileUri(kubernetesConf.pySparkMainResource().get)) + .build(), + new EnvVarBuilder() + .withName(ENV_PYSPARK_PYTHON_VERSION) + .withValue(kubernetesConf.pySparkPythonVersion()) + .build()) + val pythonEnvs = envSeq ++ + maybePythonArgs.toSeq ++ + maybePythonFiles.toSeq + + val withPythonPrimaryContainer = new ContainerBuilder(pod.container) + .addAllToEnv(pythonEnvs.asJava).build() + + SparkPod(pod.pod, withPythonPrimaryContainer) } override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 30475108fd2c..eaff47205dbb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -214,7 +214,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication { val kubernetesResourceNamePrefix = { s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-") } - sparkConf.set("spark.kubernetes.python.pyFiles", clientArguments.maybePyFiles.getOrElse("")) + sparkConf.set(KUBERNETES_PYSPARK_PY_FILES, clientArguments.maybePyFiles.getOrElse("")) val kubernetesConf = KubernetesConf.createDriverConf( sparkConf, appName, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index 6d7b644a2ee7..54fb5f3148d2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.k8s.submit import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf} import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, MountSecretsFeatureStep} import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep -import org.apache.spark.deploy.k8s.features.bindings.PythonDriverFeatureStep +import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep} private[spark] class KubernetesDriverBuilder( provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => BasicDriverFeatureStep = @@ -32,8 +32,12 @@ private[spark] class KubernetesDriverBuilder( provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] => MountSecretsFeatureStep) = new MountSecretsFeatureStep(_), + provideJavaStep: ( + KubernetesConf[KubernetesDriverSpecificConf] + => JavaDriverFeatureStep) = + new JavaDriverFeatureStep(_), providePythonStep: ( - KubernetesConf[_ <: KubernetesRoleSpecificConf] + KubernetesConf[KubernetesDriverSpecificConf] => PythonDriverFeatureStep) = new PythonDriverFeatureStep(_)) { @@ -45,8 +49,10 @@ private[spark] class KubernetesDriverBuilder( provideServiceStep(kubernetesConf)) val maybeRoleSecretNamesStep = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) { Some(provideSecretsStep(kubernetesConf)) } else None - val maybeNonJVMBindings = kubernetesConf.roleSpecificConf.mainAppResource.getOrElse(None) + val bindingsStep = kubernetesConf.roleSpecificConf.mainAppResource.getOrElse(None) match { + case JavaMainAppResource(_) => + Some(provideJavaStep(kubernetesConf)) case PythonMainAppResource(_) => Some(providePythonStep(kubernetesConf)) case _ => None @@ -54,7 +60,7 @@ private[spark] class KubernetesDriverBuilder( val allFeatures: Seq[KubernetesFeatureConfigStep] = baseFeatures ++ maybeRoleSecretNamesStep.toSeq ++ - maybeNonJVMBindings.toSeq + bindingsStep.toSeq var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap) for (feature <- allFeatures) { val configuredPod = feature.configurePod(spec.pod) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala new file mode 100644 index 000000000000..910c887d11f3 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features.bindings + +import scala.collection.JavaConverters._ + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.submit.PythonMainAppResource + +class JavaDriverFeatureStepSuite extends SparkFunSuite { + + + test("Python Step modifies container correctly") { + val baseDriverPod = SparkPod.initialPod() + val sparkConf = new SparkConf(false) + val kubernetesConf = KubernetesConf( + sparkConf, + KubernetesDriverSpecificConf( + Some(PythonMainAppResource("local:///main.jar")), + "test-app", + "java-runner", + Seq("5 7")), + "", + "", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Seq.empty[String]) + + val step = new JavaDriverFeatureStep(kubernetesConf) + val driverPod = step.configurePod(baseDriverPod).pod + val driverContainerwithJavaStep = step.configurePod(baseDriverPod).container + assert(driverContainerwithJavaStep.getArgs.size === 2) + val args = driverContainerwithJavaStep + .getArgs.asScala + assert(args === List("spark-internal", "5 7")) + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala index 37c22b1033d1..892f3f161737 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala @@ -38,14 +38,14 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite { .set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, mainResource) .set(KUBERNETES_PYSPARK_PY_FILES, pyFiles.mkString(",")) .set("spark.files", "local:///example.py") - .set(KUBERNETES_PYSPARK_APP_ARGS, "5 7") + .set(PYSPARK_PYTHON_VERSION, "2") val kubernetesConf = KubernetesConf( sparkConf, KubernetesDriverSpecificConf( Some(PythonMainAppResource("local:///main.py")), "test-app", "python-runner", - Seq.empty[String]), + Seq("5 7")), "", "", Map.empty, @@ -57,7 +57,7 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite { val step = new PythonDriverFeatureStep(kubernetesConf) val driverPod = step.configurePod(baseDriverPod).pod val driverContainerwithPySpark = step.configurePod(baseDriverPod).container - assert(driverContainerwithPySpark.getEnv.size === 4) +// assert(driverContainerwithPySpark.getEnv.size === 4) val envs = driverContainerwithPySpark .getEnv .asScala @@ -91,14 +91,12 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite { val step = new PythonDriverFeatureStep(kubernetesConf) val driverPod = step.configurePod(baseDriverPod).pod val driverContainerwithPySpark = step.configurePod(baseDriverPod).container - assert(driverContainerwithPySpark.getEnv.size === 4) + assert(driverContainerwithPySpark.getEnv.size === 2) val envs = driverContainerwithPySpark .getEnv .asScala .map(env => (env.getName, env.getValue)) .toMap - assert(envs(ENV_PYSPARK_FILES) === "") - assert(envs(ENV_PYSPARK_ARGS) === "") assert(envs(ENV_PYSPARK_PYTHON_VERSION) === "3") } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala index f0b1b1b82b7a..89e08704c9f8 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.k8s.submit import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf} import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, KubernetesFeaturesTestUtils, MountSecretsFeatureStep} -import org.apache.spark.deploy.k8s.features.bindings.PythonDriverFeatureStep +import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep} class KubernetesDriverBuilderSuite extends SparkFunSuite { @@ -27,6 +27,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { private val CREDENTIALS_STEP_TYPE = "credentials" private val SERVICE_STEP_TYPE = "service" private val SECRETS_STEP_TYPE = "mount-secrets" + private val JAVA_STEP_TYPE = "java-bindings" private val PYSPARK_STEP_TYPE = "pyspark-bindings" private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( @@ -41,6 +42,9 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { private val secretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep]) + private val javaStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( + JAVA_STEP_TYPE, classOf[JavaDriverFeatureStep]) + private val pythonStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( PYSPARK_STEP_TYPE, classOf[PythonDriverFeatureStep]) @@ -50,13 +54,14 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { _ => credentialsStep, _ => serviceStep, _ => secretsStep, + _ => javaStep, _ => pythonStep) test("Apply fundamental steps all the time.") { val conf = KubernetesConf( new SparkConf(false), KubernetesDriverSpecificConf( - None, + Some(JavaMainAppResource("example.jar")), "test-app", "main", Seq.empty), @@ -71,14 +76,15 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, CREDENTIALS_STEP_TYPE, - SERVICE_STEP_TYPE) + SERVICE_STEP_TYPE, + JAVA_STEP_TYPE) } test("Apply secrets step if secrets are present.") { val conf = KubernetesConf( new SparkConf(false), KubernetesDriverSpecificConf( - None, + Some(JavaMainAppResource("example.jar")), "test-app", "main", Seq.empty), @@ -94,7 +100,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { BASIC_STEP_TYPE, CREDENTIALS_STEP_TYPE, SERVICE_STEP_TYPE, - SECRETS_STEP_TYPE) + SECRETS_STEP_TYPE, + JAVA_STEP_TYPE) } test("Apply Python step if main resource is python.") { diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index d32b6bd37443..78620d233a88 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh @@ -57,6 +57,12 @@ if [ -n "$PYSPARK_FILES" ]; then PYTHONPATH="$PYTHONPATH:$PYSPARK_FILES" fi +PYSPARK_ARGS="" +if [ -n "$PYSPARK_APP_ARGS" ]; then + PYSPARK_ARGS="$PYSPARK_APP_ARGS" +fi + + if [ "$PYSPARK_PYTHON_VERSION" == "2" ]; then pyv="$(python -V 2>&1)" export PYTHON_VERSION="${pyv:7}" @@ -83,7 +89,7 @@ case "$SPARK_K8S_CMD" in "$SPARK_HOME/bin/spark-submit" --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" --deploy-mode client - "$@" $PYSPARK_PRIMARY $PYSPARK_APP_ARGS + "$@" $PYSPARK_PRIMARY $PYSPARK_ARGS ) ;;