Skip to content
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.{Collections, UUID}

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, HasMetadata, OwnerReferenceBuilder, Pod, PodBuilder, Quantity}
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, OwnerReferenceBuilder, Pod, PodBuilder, Quantity}
import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.commons.codec.binary.Hex
import org.apache.hadoop.fs.{FileSystem, Path}
Expand Down Expand Up @@ -381,4 +381,34 @@ object KubernetesUtils extends Logging {
}
}
}

/**
* This function builds the EnvVar objects for each key-value env.
Copy link
Member

@dongjoon-hyun dongjoon-hyun Aug 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function description is inconsistent from the code body because we don't build EnvVar when value is null.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, in the function description, can we explicitly mention that we can use an empty string value to define a key-only EnvVar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should mention it, because it's supported to set on yaml and works normally in container.

*/
@Since("3.4.0")
def buildEnvVars(env: Map[String, String]): Seq[EnvVar] = {
env.filter(env => env._2 != null)
.map { env =>
new EnvVarBuilder()
.withName(env._1)
.withValue(env._2)
.build()
}.toSeq
}

/**
* This function builds the EnvVar objects for each field ref env.
*/
@Since("3.4.0")
def buildEnvVarsWithFieldRef(env: Seq[(String, String, String)]): Seq[EnvVar] = {
env.filter(env => env._2 != null && env._3 != null)
.map { env =>
new EnvVarBuilder()
.withName(env._1)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef(env._2, env._3)
.build())
.build()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,8 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB

override def configurePod(pod: SparkPod): SparkPod = {
val driverCustomEnvs = (Seq(
(ENV_APPLICATION_ID, conf.appId)
) ++ conf.environment)
.map { env =>
new EnvVarBuilder()
.withName(env._1)
.withValue(env._2)
.build()
}

val driverCustomEnvs = KubernetesUtils.buildEnvVars(
Map(ENV_APPLICATION_ID -> conf.appId) ++ conf.environment)
val driverCpuQuantity = new Quantity(driverCoresRequest)
val driverMemoryQuantity = new Quantity(s"${driverMemoryWithOverheadMiB}Mi")
val maybeCpuLimitQuantity = driverLimitCores.map { limitCores =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,68 +122,41 @@ private[spark] class BasicExecutorFeatureStep(
buildExecutorResourcesQuantities(execResources.customResources.values.toSet)

val executorEnv: Seq[EnvVar] = {
(Seq(
(ENV_DRIVER_URL, driverUrl),
(ENV_EXECUTOR_CORES, execResources.cores.get.toString),
(ENV_EXECUTOR_MEMORY, executorMemoryString),
(ENV_APPLICATION_ID, kubernetesConf.appId),
// This is to set the SPARK_CONF_DIR to be /opt/spark/conf
(ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL),
(ENV_EXECUTOR_ID, kubernetesConf.executorId),
(ENV_RESOURCE_PROFILE_ID, resourceProfile.id.toString)
) ++ kubernetesConf.environment).map { case (k, v) =>
new EnvVarBuilder()
.withName(k)
.withValue(v)
.build()
}
} ++ {
Seq(new EnvVarBuilder()
.withName(ENV_EXECUTOR_POD_IP)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef("v1", "status.podIP")
.build())
.build())
} ++ {
Seq(new EnvVarBuilder()
.withName(ENV_EXECUTOR_POD_NAME)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef("v1", "metadata.name")
.build())
.build())
} ++ {
if (kubernetesConf.get(AUTH_SECRET_FILE_EXECUTOR).isEmpty) {
Option(secMgr.getSecretKey()).map { authSecret =>
new EnvVarBuilder()
.withName(SecurityManager.ENV_AUTH_SECRET)
.withValue(authSecret)
.build()
}
} else None
} ++ {
kubernetesConf.get(EXECUTOR_CLASS_PATH).map { cp =>
new EnvVarBuilder()
.withName(ENV_CLASSPATH)
.withValue(cp)
.build()
}
} ++ {
val userOpts = kubernetesConf.get(EXECUTOR_JAVA_OPTIONS).toSeq.flatMap { opts =>
val subsOpts = Utils.substituteAppNExecIds(opts, kubernetesConf.appId,
kubernetesConf.executorId)
val sparkAuthSecret = if (kubernetesConf.get(AUTH_SECRET_FILE_EXECUTOR).isEmpty) {
Map(SecurityManager.ENV_AUTH_SECRET -> secMgr.getSecretKey())
} else {
Nil
}
val userOpts = kubernetesConf.get(EXECUTOR_JAVA_OPTIONS).toSeq.flatMap { opts =>
val subsOpts = Utils.substituteAppNExecIds(opts, kubernetesConf.appId,
kubernetesConf.executorId)
Utils.splitCommandString(subsOpts)
}

val sparkOpts = Utils.sparkJavaOpts(kubernetesConf.sparkConf,
SparkConf.isExecutorStartupConf)

(userOpts ++ sparkOpts).zipWithIndex.map { case (opt, index) =>
new EnvVarBuilder()
.withName(s"$ENV_JAVA_OPT_PREFIX$index")
.withValue(opt)
.build()
}
}
val sparkOpts = Utils.sparkJavaOpts(kubernetesConf.sparkConf,
SparkConf.isExecutorStartupConf)
val allOpts = (userOpts ++ sparkOpts).zipWithIndex.map { case (opt, index) =>
(s"$ENV_JAVA_OPT_PREFIX$index", opt)
}.toMap
KubernetesUtils.buildEnvVars(
Map(
ENV_DRIVER_URL -> driverUrl,
ENV_EXECUTOR_CORES -> execResources.cores.get.toString,
ENV_EXECUTOR_MEMORY -> executorMemoryString,
ENV_APPLICATION_ID -> kubernetesConf.appId,
// This is to set the SPARK_CONF_DIR to be /opt/spark/conf
ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
ENV_EXECUTOR_ID -> kubernetesConf.executorId,
ENV_RESOURCE_PROFILE_ID -> resourceProfile.id.toString)
++ kubernetesConf.environment
++ sparkAuthSecret
++ Map(ENV_CLASSPATH-> kubernetesConf.get(EXECUTOR_CLASS_PATH).orNull)
++ allOpts) ++
KubernetesUtils.buildEnvVarsWithFieldRef(
Seq(
(ENV_EXECUTOR_POD_IP, "v1", "status.podIP"),
(ENV_EXECUTOR_POD_NAME, "v1", "metadata.name")
))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, new logic looks inconsistent to me in terms of the order of variable definition.
New logic seems to have a limitation where buildEnvVars-defined ones are overwritten by buildEnvVarsWithFieldRef-defined ones always. So, do we assume variable uniqueness here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need variable uniqueness. As you said, envVar is overwrite in pod.spec.container.envVar.

  • buildEnvVarsWithFieldRef method can project Pod-level fields into the running container as environment variables, which value is defined until pod scheduled or later.
  • buildEnvVars method only put prepared key-value into pre-submission container as environment variables.
    From normal usage, I believe buildEnvVarsWithFieldRef has higher priority than buildEnvVars.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, @dcoliversun .

}
executorEnv.find(_.getName == ENV_EXECUTOR_DIRS).foreach { e =>
e.setValue(e.getValue
.replaceAll(ENV_APPLICATION_ID, kubernetesConf.appId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.features

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder}
import io.fabric8.kubernetes.api.model.ContainerBuilder

import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
Expand Down Expand Up @@ -84,25 +84,18 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
"variables instead.")
}

val pythonEnvs =
Seq(
conf.get(PYSPARK_PYTHON)
.orElse(environmentVariables.get(ENV_PYSPARK_PYTHON)).map { value =>
new EnvVarBuilder()
.withName(ENV_PYSPARK_PYTHON)
.withValue(value)
.build()
},
conf.get(PYSPARK_DRIVER_PYTHON)
.orElse(conf.get(PYSPARK_PYTHON))
.orElse(environmentVariables.get(ENV_PYSPARK_DRIVER_PYTHON))
.orElse(environmentVariables.get(ENV_PYSPARK_PYTHON)).map { value =>
new EnvVarBuilder()
.withName(ENV_PYSPARK_DRIVER_PYTHON)
.withValue(value)
.build()
}
).flatten
val pythonEnvs = {
KubernetesUtils.buildEnvVars(
Map(
ENV_PYSPARK_PYTHON -> conf.get(PYSPARK_PYTHON)
.orElse(environmentVariables.get(ENV_PYSPARK_PYTHON))
.orNull,
ENV_PYSPARK_DRIVER_PYTHON -> conf.get(PYSPARK_DRIVER_PYTHON)
.orElse(conf.get(PYSPARK_PYTHON))
.orElse(environmentVariables.get(ENV_PYSPARK_DRIVER_PYTHON))
.orElse(environmentVariables.get(ENV_PYSPARK_PYTHON))
.orNull))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you check what happens when both PYSPARK_PYTHON and ENV_PYSPARK_PYTHON is not defined? New logic looks not identical with the previous one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me check it

Copy link
Contributor Author

@dcoliversun dcoliversun Aug 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe new logic is identical with previous one.

Old logic

val pythonEnvs =
Seq(
conf.get(PYSPARK_PYTHON)
.orElse(environmentVariables.get(ENV_PYSPARK_PYTHON)).map { value =>
new EnvVarBuilder()
.withName(ENV_PYSPARK_PYTHON)
.withValue(value)
.build()
},
conf.get(PYSPARK_DRIVER_PYTHON)
.orElse(conf.get(PYSPARK_PYTHON))
.orElse(environmentVariables.get(ENV_PYSPARK_DRIVER_PYTHON))
.orElse(environmentVariables.get(ENV_PYSPARK_PYTHON)).map { value =>
new EnvVarBuilder()
.withName(ENV_PYSPARK_DRIVER_PYTHON)
.withValue(value)
.build()
}
).flatten

In old logic, when both PYSPARK_PYTHON and ENV_PYSPARK_PYTHON is not defined, spark pod has empty environment variables, which of keys are ENV_PYSPARK_PYTHON and ENV_PYSPARK_DRIVER_PYTHON, and values are NULL. The corresponding Pod yaml is as follows

...
spec:
  ...
  env:
  - name: PYSPARK_PYTHON
  - name: PYSPARK_DRIVER_PYTHON

Exec echo command, the result is as follows.

$ echo $PYSPARK_PYTHON
<BLANK LINE>
$ echo $PYSPARK_DRIVER_PYTHON
<BLANK LINE>

New logic

When both PYSPARK_PYTHON and ENV_PYSPARK_PYTHON is not defined, no envVars are set in Pod. It behaves the same as environment variable with NULL value.

Existing code can also illustrate these, unit test about auth secret propagation expects that SPARK_CLASSPATH doesn't exists in executor environment variables.

test("auth secret propagation") {
val conf = baseConf.clone()
.set(config.NETWORK_AUTH_ENABLED, true)
.set("spark.master", "k8s://127.0.0.1")
val secMgr = new SecurityManager(conf)
secMgr.initializeAuth()
val step = new BasicExecutorFeatureStep(KubernetesTestConf.createExecutorConf(sparkConf = conf),
secMgr, defaultProfile)
val executor = step.configurePod(SparkPod.initialPod())
checkEnv(executor, conf, Map(SecurityManager.ENV_AUTH_SECRET -> secMgr.getSecretKey()))
}

// Check that the expected environment variables are present.
private def checkEnv(
executorPod: SparkPod,
conf: SparkConf,
additionalEnvVars: Map[String, String]): Unit = {
val defaultEnvs = Map(
ENV_EXECUTOR_ID -> "1",
ENV_DRIVER_URL -> DRIVER_ADDRESS.toString,
ENV_EXECUTOR_CORES -> "1",
ENV_EXECUTOR_MEMORY -> "1024m",
ENV_APPLICATION_ID -> KubernetesTestConf.APP_ID,
ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
ENV_SPARK_USER -> Utils.getCurrentUserName(),
ENV_RESOURCE_PROFILE_ID -> "0",
// These are populated by K8s on scheduling
ENV_EXECUTOR_POD_IP -> null,
ENV_EXECUTOR_POD_NAME -> null)
val extraJavaOptsStart = additionalEnvVars.keys.count(_.startsWith(ENV_JAVA_OPT_PREFIX))
val extraJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val extraJavaOptsEnvs = extraJavaOpts.zipWithIndex.map { case (opt, ind) =>
s"$ENV_JAVA_OPT_PREFIX${ind + extraJavaOptsStart}" -> opt
}.toMap
val containerEnvs = executorPod.container.getEnv.asScala.map {
x => (x.getName, x.getValue)
}.toMap
val expectedEnvs = defaultEnvs ++ additionalEnvVars ++ extraJavaOptsEnvs
assert(containerEnvs === expectedEnvs)
}

} ++ {
kubernetesConf.get(EXECUTOR_CLASS_PATH).map { cp =>
new EnvVarBuilder()
.withName(ENV_CLASSPATH)
.withValue(cp)
.build()
}
} ++ {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for in-depth analysis.


// re-write primary resource to be the remote one and upload the related file
val newResName = KubernetesUtils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder}
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -127,4 +127,35 @@ class KubernetesUtilsSuite extends SparkFunSuite with PrivateMethodTester {
}
}
}

test("SPARK-38582: verify that envVars built with kv env as expected") {
val input = for (i <- 9 to 1 by -1) yield (s"testEnvKey.$i", s"testEnvValue.$i")
val expectedEnvVars = input.map { case(k, v) =>
new EnvVarBuilder()
.withName(k)
.withValue(v).build()
}
val outputEnvVars =
KubernetesUtils.buildEnvVars(input.toMap + ("testKeyForNull" -> null))
assert(outputEnvVars.toSet == expectedEnvVars.toSet)
}

test("SPARK-38582: verify that envVars built with field ref env as expected") {
val input = for (i <- 9 to 1 by -1) yield (s"testEnvKey.$i", s"v$i", s"testEnvValue.$i")
val expectedEnvVars = input.map { env =>
new EnvVarBuilder()
.withName(env._1)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef(env._2, env._3)
.build())
.build()
}
val outputEnvVars =
KubernetesUtils.buildEnvVarsWithFieldRef(
input ++ Seq(
("testKey1", null, "testValue1"),
("testKey2", "v1", null),
("testKey3", null, null)))
assert(outputEnvVars == expectedEnvVars)
}
}