Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.{Collections, UUID}

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, HasMetadata, OwnerReferenceBuilder, Pod, PodBuilder, Quantity}
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, OwnerReferenceBuilder, Pod, PodBuilder, Quantity}
import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.commons.codec.binary.Hex
import org.apache.hadoop.fs.{FileSystem, Path}
Expand Down Expand Up @@ -381,4 +381,37 @@ object KubernetesUtils extends Logging {
}
}
}

/**
* This function builds the EnvVar objects for each key-value env with non-null value.
* If value is an empty string, define a key-only environment variable.
*/
@Since("3.4.0")
def buildEnvVars(env: Seq[(String, String)]): Seq[EnvVar] = {
env.filterNot(_._2 == null)
.map { case (k, v) =>
new EnvVarBuilder()
.withName(k)
.withValue(v)
.build()
}
}

/**
* This function builds the EnvVar objects for each field ref env
* with non-null apiVersion and fieldPath.
*/
@Since("3.4.0")
def buildEnvVarsWithFieldRef(env: Seq[(String, String, String)]): Seq[EnvVar] = {
env.filterNot(_._2 == null)
.filterNot(_._3 == null)
.map { case (key, apiVersion, fieldPath) =>
new EnvVarBuilder()
.withName(key)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef(apiVersion, fieldPath)
.build())
.build()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,8 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB

override def configurePod(pod: SparkPod): SparkPod = {
val driverCustomEnvs = (Seq(
(ENV_APPLICATION_ID, conf.appId)
) ++ conf.environment)
.map { env =>
new EnvVarBuilder()
.withName(env._1)
.withValue(env._2)
.build()
}

val driverCustomEnvs = KubernetesUtils.buildEnvVars(
Seq(ENV_APPLICATION_ID -> conf.appId) ++ conf.environment)
val driverCpuQuantity = new Quantity(driverCoresRequest)
val driverMemoryQuantity = new Quantity(s"${driverMemoryWithOverheadMiB}Mi")
val maybeCpuLimitQuantity = driverLimitCores.map { limitCores =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,68 +122,45 @@ private[spark] class BasicExecutorFeatureStep(
buildExecutorResourcesQuantities(execResources.customResources.values.toSet)

val executorEnv: Seq[EnvVar] = {
(Seq(
(ENV_DRIVER_URL, driverUrl),
(ENV_EXECUTOR_CORES, execResources.cores.get.toString),
(ENV_EXECUTOR_MEMORY, executorMemoryString),
(ENV_APPLICATION_ID, kubernetesConf.appId),
// This is to set the SPARK_CONF_DIR to be /opt/spark/conf
(ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL),
(ENV_EXECUTOR_ID, kubernetesConf.executorId),
(ENV_RESOURCE_PROFILE_ID, resourceProfile.id.toString)
) ++ kubernetesConf.environment).map { case (k, v) =>
new EnvVarBuilder()
.withName(k)
.withValue(v)
.build()
}
} ++ {
Seq(new EnvVarBuilder()
.withName(ENV_EXECUTOR_POD_IP)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef("v1", "status.podIP")
.build())
.build())
} ++ {
Seq(new EnvVarBuilder()
.withName(ENV_EXECUTOR_POD_NAME)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef("v1", "metadata.name")
.build())
.build())
} ++ {
if (kubernetesConf.get(AUTH_SECRET_FILE_EXECUTOR).isEmpty) {
Option(secMgr.getSecretKey()).map { authSecret =>
new EnvVarBuilder()
.withName(SecurityManager.ENV_AUTH_SECRET)
.withValue(authSecret)
.build()
}
} else None
} ++ {
kubernetesConf.get(EXECUTOR_CLASS_PATH).map { cp =>
new EnvVarBuilder()
.withName(ENV_CLASSPATH)
.withValue(cp)
.build()
}
} ++ {
val userOpts = kubernetesConf.get(EXECUTOR_JAVA_OPTIONS).toSeq.flatMap { opts =>
val subsOpts = Utils.substituteAppNExecIds(opts, kubernetesConf.appId,
kubernetesConf.executorId)
val sparkAuthSecret = Option(secMgr.getSecretKey()).map {
case authSecret: String if kubernetesConf.get(AUTH_SECRET_FILE_EXECUTOR).isEmpty =>
Seq(SecurityManager.ENV_AUTH_SECRET -> authSecret)
case _ => Nil
}.getOrElse(Nil)

val userOpts = kubernetesConf.get(EXECUTOR_JAVA_OPTIONS).toSeq.flatMap { opts =>
val subsOpts = Utils.substituteAppNExecIds(opts, kubernetesConf.appId,
kubernetesConf.executorId)
Utils.splitCommandString(subsOpts)
}
}

val sparkOpts = Utils.sparkJavaOpts(kubernetesConf.sparkConf,
SparkConf.isExecutorStartupConf)
val sparkOpts = Utils.sparkJavaOpts(kubernetesConf.sparkConf,
SparkConf.isExecutorStartupConf)

(userOpts ++ sparkOpts).zipWithIndex.map { case (opt, index) =>
new EnvVarBuilder()
.withName(s"$ENV_JAVA_OPT_PREFIX$index")
.withValue(opt)
.build()
}
}
val allOpts = (userOpts ++ sparkOpts).zipWithIndex.map { case (opt, index) =>
(s"$ENV_JAVA_OPT_PREFIX$index", opt)
}.toMap

KubernetesUtils.buildEnvVars(
Seq(
ENV_DRIVER_URL -> driverUrl,
ENV_EXECUTOR_CORES -> execResources.cores.get.toString,
ENV_EXECUTOR_MEMORY -> executorMemoryString,
ENV_APPLICATION_ID -> kubernetesConf.appId,
// This is to set the SPARK_CONF_DIR to be /opt/spark/conf
ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
ENV_EXECUTOR_ID -> kubernetesConf.executorId,
ENV_RESOURCE_PROFILE_ID -> resourceProfile.id.toString)
++ kubernetesConf.environment
++ sparkAuthSecret
++ Seq(ENV_CLASSPATH -> kubernetesConf.get(EXECUTOR_CLASS_PATH).orNull)
++ allOpts) ++
KubernetesUtils.buildEnvVarsWithFieldRef(
Seq(
(ENV_EXECUTOR_POD_IP, "v1", "status.podIP"),
(ENV_EXECUTOR_POD_NAME, "v1", "metadata.name")
))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, new logic looks inconsistent to me in terms of the order of variable definition.
New logic seems to have a limitation where buildEnvVars-defined ones are overwritten by buildEnvVarsWithFieldRef-defined ones always. So, do we assume variable uniqueness here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need variable uniqueness. As you said, envVar is overwrite in pod.spec.container.envVar.

  • buildEnvVarsWithFieldRef method can project Pod-level fields into the running container as environment variables, which value is defined until pod scheduled or later.
  • buildEnvVars method only put prepared key-value into pre-submission container as environment variables.
    From normal usage, I believe buildEnvVarsWithFieldRef has higher priority than buildEnvVars.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, @dcoliversun .

}
executorEnv.find(_.getName == ENV_EXECUTOR_DIRS).foreach { e =>
e.setValue(e.getValue
.replaceAll(ENV_APPLICATION_ID, kubernetesConf.appId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.features

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder}
import io.fabric8.kubernetes.api.model.ContainerBuilder

import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
Expand Down Expand Up @@ -84,25 +84,18 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
"variables instead.")
}

val pythonEnvs =
Seq(
conf.get(PYSPARK_PYTHON)
.orElse(environmentVariables.get(ENV_PYSPARK_PYTHON)).map { value =>
new EnvVarBuilder()
.withName(ENV_PYSPARK_PYTHON)
.withValue(value)
.build()
},
conf.get(PYSPARK_DRIVER_PYTHON)
.orElse(conf.get(PYSPARK_PYTHON))
.orElse(environmentVariables.get(ENV_PYSPARK_DRIVER_PYTHON))
.orElse(environmentVariables.get(ENV_PYSPARK_PYTHON)).map { value =>
new EnvVarBuilder()
.withName(ENV_PYSPARK_DRIVER_PYTHON)
.withValue(value)
.build()
}
).flatten
val pythonEnvs = {
KubernetesUtils.buildEnvVars(
Seq(
ENV_PYSPARK_PYTHON -> conf.get(PYSPARK_PYTHON)
.orElse(environmentVariables.get(ENV_PYSPARK_PYTHON))
.orNull,
ENV_PYSPARK_DRIVER_PYTHON -> conf.get(PYSPARK_DRIVER_PYTHON)
.orElse(conf.get(PYSPARK_PYTHON))
.orElse(environmentVariables.get(ENV_PYSPARK_DRIVER_PYTHON))
.orElse(environmentVariables.get(ENV_PYSPARK_PYTHON))
.orNull))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you check what happens when both PYSPARK_PYTHON and ENV_PYSPARK_PYTHON is not defined? New logic looks not identical with the previous one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me check it

Copy link
Contributor Author

@dcoliversun dcoliversun Aug 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe new logic is identical with previous one.

Old logic

val pythonEnvs =
Seq(
conf.get(PYSPARK_PYTHON)
.orElse(environmentVariables.get(ENV_PYSPARK_PYTHON)).map { value =>
new EnvVarBuilder()
.withName(ENV_PYSPARK_PYTHON)
.withValue(value)
.build()
},
conf.get(PYSPARK_DRIVER_PYTHON)
.orElse(conf.get(PYSPARK_PYTHON))
.orElse(environmentVariables.get(ENV_PYSPARK_DRIVER_PYTHON))
.orElse(environmentVariables.get(ENV_PYSPARK_PYTHON)).map { value =>
new EnvVarBuilder()
.withName(ENV_PYSPARK_DRIVER_PYTHON)
.withValue(value)
.build()
}
).flatten

In old logic, when both PYSPARK_PYTHON and ENV_PYSPARK_PYTHON is not defined, spark pod has empty environment variables, which of keys are ENV_PYSPARK_PYTHON and ENV_PYSPARK_DRIVER_PYTHON, and values are NULL. The corresponding Pod yaml is as follows

...
spec:
  ...
  env:
  - name: PYSPARK_PYTHON
  - name: PYSPARK_DRIVER_PYTHON

Exec echo command, the result is as follows.

$ echo $PYSPARK_PYTHON
<BLANK LINE>
$ echo $PYSPARK_DRIVER_PYTHON
<BLANK LINE>

New logic

When both PYSPARK_PYTHON and ENV_PYSPARK_PYTHON is not defined, no envVars are set in Pod. It behaves the same as environment variable with NULL value.

Existing code can also illustrate these, unit test about auth secret propagation expects that SPARK_CLASSPATH doesn't exists in executor environment variables.

test("auth secret propagation") {
val conf = baseConf.clone()
.set(config.NETWORK_AUTH_ENABLED, true)
.set("spark.master", "k8s://127.0.0.1")
val secMgr = new SecurityManager(conf)
secMgr.initializeAuth()
val step = new BasicExecutorFeatureStep(KubernetesTestConf.createExecutorConf(sparkConf = conf),
secMgr, defaultProfile)
val executor = step.configurePod(SparkPod.initialPod())
checkEnv(executor, conf, Map(SecurityManager.ENV_AUTH_SECRET -> secMgr.getSecretKey()))
}

// Check that the expected environment variables are present.
private def checkEnv(
executorPod: SparkPod,
conf: SparkConf,
additionalEnvVars: Map[String, String]): Unit = {
val defaultEnvs = Map(
ENV_EXECUTOR_ID -> "1",
ENV_DRIVER_URL -> DRIVER_ADDRESS.toString,
ENV_EXECUTOR_CORES -> "1",
ENV_EXECUTOR_MEMORY -> "1024m",
ENV_APPLICATION_ID -> KubernetesTestConf.APP_ID,
ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
ENV_SPARK_USER -> Utils.getCurrentUserName(),
ENV_RESOURCE_PROFILE_ID -> "0",
// These are populated by K8s on scheduling
ENV_EXECUTOR_POD_IP -> null,
ENV_EXECUTOR_POD_NAME -> null)
val extraJavaOptsStart = additionalEnvVars.keys.count(_.startsWith(ENV_JAVA_OPT_PREFIX))
val extraJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val extraJavaOptsEnvs = extraJavaOpts.zipWithIndex.map { case (opt, ind) =>
s"$ENV_JAVA_OPT_PREFIX${ind + extraJavaOptsStart}" -> opt
}.toMap
val containerEnvs = executorPod.container.getEnv.asScala.map {
x => (x.getName, x.getValue)
}.toMap
val expectedEnvs = defaultEnvs ++ additionalEnvVars ++ extraJavaOptsEnvs
assert(containerEnvs === expectedEnvs)
}

} ++ {
kubernetesConf.get(EXECUTOR_CLASS_PATH).map { cp =>
new EnvVarBuilder()
.withName(ENV_CLASSPATH)
.withValue(cp)
.build()
}
} ++ {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for in-depth analysis.


// re-write primary resource to be the remote one and upload the related file
val newResName = KubernetesUtils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder}
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -127,4 +127,36 @@ class KubernetesUtilsSuite extends SparkFunSuite with PrivateMethodTester {
}
}
}

test("SPARK-38582: verify that envVars is built with kv env as expected") {
val input = for (i <- 9 to 1 by -1) yield (s"testEnvKey.$i", s"testEnvValue.$i")
val expectedEnvVars = (input :+ ("testKeyWithEmptyValue" -> "")).map { case(k, v) =>
new EnvVarBuilder()
.withName(k)
.withValue(v).build()
}
val outputEnvVars =
KubernetesUtils.buildEnvVars(input ++
Seq("testKeyWithNullValue" -> null, "testKeyWithEmptyValue" -> ""))
assert(outputEnvVars.toSet == expectedEnvVars.toSet)
}

test("SPARK-38582: verify that envVars is built with field ref env as expected") {
val input = for (i <- 9 to 1 by -1) yield (s"testEnvKey.$i", s"v$i", s"testEnvValue.$i")
val expectedEnvVars = input.map { env =>
new EnvVarBuilder()
.withName(env._1)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef(env._2, env._3)
.build())
.build()
}
val outputEnvVars =
KubernetesUtils.buildEnvVarsWithFieldRef(
input ++ Seq(
("testKey1", null, "testValue1"),
("testKey2", "v1", null),
("testKey3", null, null)))
assert(outputEnvVars == expectedEnvVars)
}
}