diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index b9cf11110d8ab..bbabd680c1a47 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -23,7 +23,7 @@ import java.util.{Collections, UUID} import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, HasMetadata, OwnerReferenceBuilder, Pod, PodBuilder, Quantity} +import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, OwnerReferenceBuilder, Pod, PodBuilder, Quantity} import io.fabric8.kubernetes.client.KubernetesClient import org.apache.commons.codec.binary.Hex import org.apache.hadoop.fs.{FileSystem, Path} @@ -381,4 +381,37 @@ object KubernetesUtils extends Logging { } } } + + /** + * This function builds the EnvVar objects for each key-value env with non-null value. + * If value is an empty string, define a key-only environment variable. + */ + @Since("3.4.0") + def buildEnvVars(env: Seq[(String, String)]): Seq[EnvVar] = { + env.filterNot(_._2 == null) + .map { case (k, v) => + new EnvVarBuilder() + .withName(k) + .withValue(v) + .build() + } + } + + /** + * This function builds the EnvVar objects for each field ref env + * with non-null apiVersion and fieldPath. + */ + @Since("3.4.0") + def buildEnvVarsWithFieldRef(env: Seq[(String, String, String)]): Seq[EnvVar] = { + env.filterNot(_._2 == null) + .filterNot(_._3 == null) + .map { case (key, apiVersion, fieldPath) => + new EnvVarBuilder() + .withName(key) + .withValueFrom(new EnvVarSourceBuilder() + .withNewFieldRef(apiVersion, fieldPath) + .build()) + .build() + } + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 413f5bca9dfca..4b5eece54c8c5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -83,16 +83,8 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB override def configurePod(pod: SparkPod): SparkPod = { - val driverCustomEnvs = (Seq( - (ENV_APPLICATION_ID, conf.appId) - ) ++ conf.environment) - .map { env => - new EnvVarBuilder() - .withName(env._1) - .withValue(env._2) - .build() - } - + val driverCustomEnvs = KubernetesUtils.buildEnvVars( + Seq(ENV_APPLICATION_ID -> conf.appId) ++ conf.environment) val driverCpuQuantity = new Quantity(driverCoresRequest) val driverMemoryQuantity = new Quantity(s"${driverMemoryWithOverheadMiB}Mi") val maybeCpuLimitQuantity = driverLimitCores.map { limitCores => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index deb0efffe60a0..5ad06b2df3da1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -122,68 +122,45 @@ private[spark] class BasicExecutorFeatureStep( buildExecutorResourcesQuantities(execResources.customResources.values.toSet) val executorEnv: Seq[EnvVar] = { - (Seq( - (ENV_DRIVER_URL, driverUrl), - (ENV_EXECUTOR_CORES, execResources.cores.get.toString), - (ENV_EXECUTOR_MEMORY, executorMemoryString), - (ENV_APPLICATION_ID, kubernetesConf.appId), - // This is to set the SPARK_CONF_DIR to be /opt/spark/conf - (ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL), - (ENV_EXECUTOR_ID, kubernetesConf.executorId), - (ENV_RESOURCE_PROFILE_ID, resourceProfile.id.toString) - ) ++ kubernetesConf.environment).map { case (k, v) => - new EnvVarBuilder() - .withName(k) - .withValue(v) - .build() - } - } ++ { - Seq(new EnvVarBuilder() - .withName(ENV_EXECUTOR_POD_IP) - .withValueFrom(new EnvVarSourceBuilder() - .withNewFieldRef("v1", "status.podIP") - .build()) - .build()) - } ++ { - Seq(new EnvVarBuilder() - .withName(ENV_EXECUTOR_POD_NAME) - .withValueFrom(new EnvVarSourceBuilder() - .withNewFieldRef("v1", "metadata.name") - .build()) - .build()) - } ++ { - if (kubernetesConf.get(AUTH_SECRET_FILE_EXECUTOR).isEmpty) { - Option(secMgr.getSecretKey()).map { authSecret => - new EnvVarBuilder() - .withName(SecurityManager.ENV_AUTH_SECRET) - .withValue(authSecret) - .build() - } - } else None - } ++ { - kubernetesConf.get(EXECUTOR_CLASS_PATH).map { cp => - new EnvVarBuilder() - .withName(ENV_CLASSPATH) - .withValue(cp) - .build() - } - } ++ { - val userOpts = kubernetesConf.get(EXECUTOR_JAVA_OPTIONS).toSeq.flatMap { opts => - val subsOpts = Utils.substituteAppNExecIds(opts, kubernetesConf.appId, - kubernetesConf.executorId) + val sparkAuthSecret = Option(secMgr.getSecretKey()).map { + case authSecret: String if kubernetesConf.get(AUTH_SECRET_FILE_EXECUTOR).isEmpty => + Seq(SecurityManager.ENV_AUTH_SECRET -> authSecret) + case _ => Nil + }.getOrElse(Nil) + + val userOpts = kubernetesConf.get(EXECUTOR_JAVA_OPTIONS).toSeq.flatMap { opts => + val subsOpts = Utils.substituteAppNExecIds(opts, kubernetesConf.appId, + kubernetesConf.executorId) Utils.splitCommandString(subsOpts) - } + } - val sparkOpts = Utils.sparkJavaOpts(kubernetesConf.sparkConf, - SparkConf.isExecutorStartupConf) + val sparkOpts = Utils.sparkJavaOpts(kubernetesConf.sparkConf, + SparkConf.isExecutorStartupConf) - (userOpts ++ sparkOpts).zipWithIndex.map { case (opt, index) => - new EnvVarBuilder() - .withName(s"$ENV_JAVA_OPT_PREFIX$index") - .withValue(opt) - .build() - } - } + val allOpts = (userOpts ++ sparkOpts).zipWithIndex.map { case (opt, index) => + (s"$ENV_JAVA_OPT_PREFIX$index", opt) + }.toMap + + KubernetesUtils.buildEnvVars( + Seq( + ENV_DRIVER_URL -> driverUrl, + ENV_EXECUTOR_CORES -> execResources.cores.get.toString, + ENV_EXECUTOR_MEMORY -> executorMemoryString, + ENV_APPLICATION_ID -> kubernetesConf.appId, + // This is to set the SPARK_CONF_DIR to be /opt/spark/conf + ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL, + ENV_EXECUTOR_ID -> kubernetesConf.executorId, + ENV_RESOURCE_PROFILE_ID -> resourceProfile.id.toString) + ++ kubernetesConf.environment + ++ sparkAuthSecret + ++ Seq(ENV_CLASSPATH -> kubernetesConf.get(EXECUTOR_CLASS_PATH).orNull) + ++ allOpts) ++ + KubernetesUtils.buildEnvVarsWithFieldRef( + Seq( + (ENV_EXECUTOR_POD_IP, "v1", "status.podIP"), + (ENV_EXECUTOR_POD_NAME, "v1", "metadata.name") + )) + } executorEnv.find(_.getName == ENV_EXECUTOR_DIRS).foreach { e => e.setValue(e.getValue .replaceAll(ENV_APPLICATION_ID, kubernetesConf.appId) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala index e9fbcd795cc00..455712cec1f69 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.features import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder} +import io.fabric8.kubernetes.api.model.ContainerBuilder import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ @@ -84,25 +84,18 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf) "variables instead.") } - val pythonEnvs = - Seq( - conf.get(PYSPARK_PYTHON) - .orElse(environmentVariables.get(ENV_PYSPARK_PYTHON)).map { value => - new EnvVarBuilder() - .withName(ENV_PYSPARK_PYTHON) - .withValue(value) - .build() - }, - conf.get(PYSPARK_DRIVER_PYTHON) - .orElse(conf.get(PYSPARK_PYTHON)) - .orElse(environmentVariables.get(ENV_PYSPARK_DRIVER_PYTHON)) - .orElse(environmentVariables.get(ENV_PYSPARK_PYTHON)).map { value => - new EnvVarBuilder() - .withName(ENV_PYSPARK_DRIVER_PYTHON) - .withValue(value) - .build() - } - ).flatten + val pythonEnvs = { + KubernetesUtils.buildEnvVars( + Seq( + ENV_PYSPARK_PYTHON -> conf.get(PYSPARK_PYTHON) + .orElse(environmentVariables.get(ENV_PYSPARK_PYTHON)) + .orNull, + ENV_PYSPARK_DRIVER_PYTHON -> conf.get(PYSPARK_DRIVER_PYTHON) + .orElse(conf.get(PYSPARK_PYTHON)) + .orElse(environmentVariables.get(ENV_PYSPARK_DRIVER_PYTHON)) + .orElse(environmentVariables.get(ENV_PYSPARK_PYTHON)) + .orNull)) + } // re-write primary resource to be the remote one and upload the related file val newResName = KubernetesUtils diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala index 5498238307d1c..2259ba99e6a59 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala @@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder} import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -127,4 +127,36 @@ class KubernetesUtilsSuite extends SparkFunSuite with PrivateMethodTester { } } } + + test("SPARK-38582: verify that envVars is built with kv env as expected") { + val input = for (i <- 9 to 1 by -1) yield (s"testEnvKey.$i", s"testEnvValue.$i") + val expectedEnvVars = (input :+ ("testKeyWithEmptyValue" -> "")).map { case(k, v) => + new EnvVarBuilder() + .withName(k) + .withValue(v).build() + } + val outputEnvVars = + KubernetesUtils.buildEnvVars(input ++ + Seq("testKeyWithNullValue" -> null, "testKeyWithEmptyValue" -> "")) + assert(outputEnvVars.toSet == expectedEnvVars.toSet) + } + + test("SPARK-38582: verify that envVars is built with field ref env as expected") { + val input = for (i <- 9 to 1 by -1) yield (s"testEnvKey.$i", s"v$i", s"testEnvValue.$i") + val expectedEnvVars = input.map { env => + new EnvVarBuilder() + .withName(env._1) + .withValueFrom(new EnvVarSourceBuilder() + .withNewFieldRef(env._2, env._3) + .build()) + .build() + } + val outputEnvVars = + KubernetesUtils.buildEnvVarsWithFieldRef( + input ++ Seq( + ("testKey1", null, "testValue1"), + ("testKey2", "v1", null), + ("testKey3", null, null))) + assert(outputEnvVars == expectedEnvVars) + } }