Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.{Collections, UUID}

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, HasMetadata, OwnerReferenceBuilder, Pod, PodBuilder, Quantity}
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, OwnerReferenceBuilder, Pod, PodBuilder, Quantity}
import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.commons.codec.binary.Hex
import org.apache.hadoop.fs.{FileSystem, Path}
Expand Down Expand Up @@ -381,4 +381,42 @@ object KubernetesUtils extends Logging {
}
}
}

/**
* This function builds the EnvVar objects for each key-value env.
Copy link
Member

@dongjoon-hyun dongjoon-hyun Aug 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function description is inconsistent from the code body because we don't build EnvVar when value is null.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, in the function description, can we explicitly mention that we can use an empty string value to define a key-only EnvVar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should mention it, because it's supported to set on yaml and works normally in container.

*/
@Since("3.4.0")
def buildEnvVarsWithKV(env: Map[String, String]): Seq[EnvVar] = {
if (env.isEmpty) {
Seq.empty
} else {
env.filter( env => env._2 != null)
.map { env =>
new EnvVarBuilder()
.withName(env._1)
.withValue(env._2)
.build()
}
}.toSeq
}

/**
* This function builds the EnvVar objects for each field ref env.
*/
@Since("3.4.0")
def buildEnvVarsWithFieldRef(env: Seq[(String, String, String)]): Seq[EnvVar] = {
if (env.isEmpty) {
Seq.empty
} else {
env.filter( env => env._2 != null && env._3 != null)
.map { env =>
new EnvVarBuilder()
.withName(env._1)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef(env._2, env._3)
.build())
.build()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,8 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB

override def configurePod(pod: SparkPod): SparkPod = {
val driverCustomEnvs = (Seq(
(ENV_APPLICATION_ID, conf.appId)
) ++ conf.environment)
.map { env =>
new EnvVarBuilder()
.withName(env._1)
.withValue(env._2)
.build()
}

val driverCustomEnvs = KubernetesUtils.buildEnvVarsWithKV(
conf.environment + (ENV_APPLICATION_ID -> conf.appId))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, are you sure about this change? This looks inconsistent to me because this breaks the existing behavior. Previously, conf.environment's ENV_APPLICATION_ID supersedes if there is a duplication.

Copy link
Contributor Author

@dcoliversun dcoliversun Apr 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your worries are right. After the latest commit, I don't think this refactoring will introduce lots of regressions.

  1. When there are two environment variables with the same key in the pod, the last value shall work.

I apply a pod with yaml

apiVersion: v1
kind: Pod
metadata:
  name: static-web
  labels:
    role: myrole
spec:
  containers:
    - name: web
      image: nginx
      env:
      - name: DEMO_GREETING
        value: "Hello from the environment"
      - name: DEMO_GREETING
        value: "Such a sweet sorrow"
      ports:
        - name: web
          containerPort: 80
          protocol: TCP

And execute the echo command, get the value of DEMO_GREETING is Such a sweet sorrow.

$ kubectl exec -ti static-web -n default -- /bin/bash
$ echo $DEMO_GREETING
Such a sweet sorrow
  1. ++ operation in Scala Map is overwrite append operation. As long as the order of our ++ remains the same as the previous env order, the value of the environment variable in the pod will not change. For example, user specified ENV_APPLICATION_ID as temp_app_id in sparkConf
spark.kubernetes.driverEnv.ENV_APPLICATION_ID=user_specified_app_id

According to the previous logic, two envs with the same key will be generated

env:
  - name: ENV_APPLICATION_ID
    value: spark-XXXXXXX # https://github.com/apache/spark/blob/b852645f69b3b7a0a2140a732c4c03b302f8795a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala#L222
  - name: ENV_APPLICATION_ID
    value: user_specified_app_id

According to the first point, the value of ENV_APPLICATION_ID is user_specified_app_id.

We replace seq with map. It causes the env as follow

env:
  - name: ENV_APPLICATION_ID
    value: user_specified_app_id

Because ++ in map is overwrite operation, so the value of ENV_APPLICATION_ID is overwritten. Finally, the value of ENV_APPLICATION_ID in pod is user_specified_app_id.

I agree that this refactoring does not bring env changes and can reduce unnecessary env.

val driverCpuQuantity = new Quantity(driverCoresRequest)
val driverMemoryQuantity = new Quantity(s"${driverMemoryWithOverheadMiB}Mi")
val maybeCpuLimitQuantity = driverLimitCores.map { limitCores =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,68 +116,41 @@ private[spark] class BasicExecutorFeatureStep(
buildExecutorResourcesQuantities(execResources.customResources.values.toSet)

val executorEnv: Seq[EnvVar] = {
(Seq(
(ENV_DRIVER_URL, driverUrl),
(ENV_EXECUTOR_CORES, execResources.cores.toString),
(ENV_EXECUTOR_MEMORY, executorMemoryString),
(ENV_APPLICATION_ID, kubernetesConf.appId),
// This is to set the SPARK_CONF_DIR to be /opt/spark/conf
(ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL),
(ENV_EXECUTOR_ID, kubernetesConf.executorId),
(ENV_RESOURCE_PROFILE_ID, resourceProfile.id.toString)
) ++ kubernetesConf.environment).map { case (k, v) =>
new EnvVarBuilder()
.withName(k)
.withValue(v)
.build()
}
} ++ {
Seq(new EnvVarBuilder()
.withName(ENV_EXECUTOR_POD_IP)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef("v1", "status.podIP")
.build())
.build())
} ++ {
Seq(new EnvVarBuilder()
.withName(ENV_EXECUTOR_POD_NAME)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef("v1", "metadata.name")
.build())
.build())
} ++ {
if (kubernetesConf.get(AUTH_SECRET_FILE_EXECUTOR).isEmpty) {
Option(secMgr.getSecretKey()).map { authSecret =>
new EnvVarBuilder()
.withName(SecurityManager.ENV_AUTH_SECRET)
.withValue(authSecret)
.build()
}
} else None
} ++ {
kubernetesConf.get(EXECUTOR_CLASS_PATH).map { cp =>
new EnvVarBuilder()
.withName(ENV_CLASSPATH)
.withValue(cp)
.build()
}
} ++ {
val userOpts = kubernetesConf.get(EXECUTOR_JAVA_OPTIONS).toSeq.flatMap { opts =>
val subsOpts = Utils.substituteAppNExecIds(opts, kubernetesConf.appId,
kubernetesConf.executorId)
val sparkAuthSecret = if (kubernetesConf.get(AUTH_SECRET_FILE_EXECUTOR).isEmpty) {
Map(SecurityManager.ENV_AUTH_SECRET -> secMgr.getSecretKey())
} else {
Nil
}
val userOpts = kubernetesConf.get(EXECUTOR_JAVA_OPTIONS).toSeq.flatMap { opts =>
val subsOpts = Utils.substituteAppNExecIds(opts, kubernetesConf.appId,
kubernetesConf.executorId)
Utils.splitCommandString(subsOpts)
}

val sparkOpts = Utils.sparkJavaOpts(kubernetesConf.sparkConf,
SparkConf.isExecutorStartupConf)

(userOpts ++ sparkOpts).zipWithIndex.map { case (opt, index) =>
new EnvVarBuilder()
.withName(s"$ENV_JAVA_OPT_PREFIX$index")
.withValue(opt)
.build()
}
}
val sparkOpts = Utils.sparkJavaOpts(kubernetesConf.sparkConf,
SparkConf.isExecutorStartupConf)
val allOpts = (userOpts ++ sparkOpts).zipWithIndex.map { case (opt, index) =>
(s"$ENV_JAVA_OPT_PREFIX$index", opt)
}.toMap
KubernetesUtils.buildEnvVarsWithKV(
Map(
ENV_DRIVER_URL -> driverUrl,
ENV_EXECUTOR_CORES -> execResources.cores.toString,
ENV_EXECUTOR_MEMORY -> executorMemoryString,
ENV_APPLICATION_ID -> kubernetesConf.appId,
// This is to set the SPARK_CONF_DIR to be /opt/spark/conf
ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
ENV_EXECUTOR_ID -> kubernetesConf.executorId,
ENV_RESOURCE_PROFILE_ID -> resourceProfile.id.toString,
ENV_CLASSPATH-> kubernetesConf.get(EXECUTOR_CLASS_PATH).orNull)
++ kubernetesConf.environment
++ sparkAuthSecret
++ allOpts) ++
KubernetesUtils.buildEnvVarsWithFieldRef(
Seq(
(ENV_EXECUTOR_POD_IP, "v1", "status.podIP"),
(ENV_EXECUTOR_POD_NAME, "v1", "metadata.name")
))
}
executorEnv.find(_.getName == ENV_EXECUTOR_DIRS).foreach { e =>
e.setValue(e.getValue
.replaceAll(ENV_APPLICATION_ID, kubernetesConf.appId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.features

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder}
import io.fabric8.kubernetes.api.model.ContainerBuilder

import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
Expand Down Expand Up @@ -84,25 +84,18 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
"variables instead.")
}

val pythonEnvs =
Seq(
conf.get(PYSPARK_PYTHON)
.orElse(environmentVariables.get(ENV_PYSPARK_PYTHON)).map { value =>
new EnvVarBuilder()
.withName(ENV_PYSPARK_PYTHON)
.withValue(value)
.build()
},
conf.get(PYSPARK_DRIVER_PYTHON)
.orElse(conf.get(PYSPARK_PYTHON))
.orElse(environmentVariables.get(ENV_PYSPARK_DRIVER_PYTHON))
.orElse(environmentVariables.get(ENV_PYSPARK_PYTHON)).map { value =>
new EnvVarBuilder()
.withName(ENV_PYSPARK_DRIVER_PYTHON)
.withValue(value)
.build()
}
).flatten
val pythonEnvs = {
KubernetesUtils.buildEnvVarsWithKV(
Map(
ENV_PYSPARK_PYTHON -> conf.get(PYSPARK_PYTHON)
.orElse(environmentVariables.get(ENV_PYSPARK_PYTHON))
.orNull,
ENV_PYSPARK_DRIVER_PYTHON -> conf.get(PYSPARK_DRIVER_PYTHON)
.orElse(conf.get(PYSPARK_PYTHON))
.orElse(environmentVariables.get(ENV_PYSPARK_DRIVER_PYTHON))
.orElse(environmentVariables.get(ENV_PYSPARK_PYTHON))
.orNull))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you check what happens when both PYSPARK_PYTHON and ENV_PYSPARK_PYTHON is not defined? New logic looks not identical with the previous one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me check it

Copy link
Contributor Author

@dcoliversun dcoliversun Aug 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe new logic is identical with previous one.

Old logic

val pythonEnvs =
Seq(
conf.get(PYSPARK_PYTHON)
.orElse(environmentVariables.get(ENV_PYSPARK_PYTHON)).map { value =>
new EnvVarBuilder()
.withName(ENV_PYSPARK_PYTHON)
.withValue(value)
.build()
},
conf.get(PYSPARK_DRIVER_PYTHON)
.orElse(conf.get(PYSPARK_PYTHON))
.orElse(environmentVariables.get(ENV_PYSPARK_DRIVER_PYTHON))
.orElse(environmentVariables.get(ENV_PYSPARK_PYTHON)).map { value =>
new EnvVarBuilder()
.withName(ENV_PYSPARK_DRIVER_PYTHON)
.withValue(value)
.build()
}
).flatten

In old logic, when both PYSPARK_PYTHON and ENV_PYSPARK_PYTHON is not defined, spark pod has empty environment variables, which of keys are ENV_PYSPARK_PYTHON and ENV_PYSPARK_DRIVER_PYTHON, and values are NULL. The corresponding Pod yaml is as follows

...
spec:
  ...
  env:
  - name: PYSPARK_PYTHON
  - name: PYSPARK_DRIVER_PYTHON

Exec echo command, the result is as follows.

$ echo $PYSPARK_PYTHON
<BLANK LINE>
$ echo $PYSPARK_DRIVER_PYTHON
<BLANK LINE>

New logic

When both PYSPARK_PYTHON and ENV_PYSPARK_PYTHON is not defined, no envVars are set in Pod. It behaves the same as environment variable with NULL value.

Existing code can also illustrate these, unit test about auth secret propagation expects that SPARK_CLASSPATH doesn't exists in executor environment variables.

test("auth secret propagation") {
val conf = baseConf.clone()
.set(config.NETWORK_AUTH_ENABLED, true)
.set("spark.master", "k8s://127.0.0.1")
val secMgr = new SecurityManager(conf)
secMgr.initializeAuth()
val step = new BasicExecutorFeatureStep(KubernetesTestConf.createExecutorConf(sparkConf = conf),
secMgr, defaultProfile)
val executor = step.configurePod(SparkPod.initialPod())
checkEnv(executor, conf, Map(SecurityManager.ENV_AUTH_SECRET -> secMgr.getSecretKey()))
}

// Check that the expected environment variables are present.
private def checkEnv(
executorPod: SparkPod,
conf: SparkConf,
additionalEnvVars: Map[String, String]): Unit = {
val defaultEnvs = Map(
ENV_EXECUTOR_ID -> "1",
ENV_DRIVER_URL -> DRIVER_ADDRESS.toString,
ENV_EXECUTOR_CORES -> "1",
ENV_EXECUTOR_MEMORY -> "1024m",
ENV_APPLICATION_ID -> KubernetesTestConf.APP_ID,
ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
ENV_SPARK_USER -> Utils.getCurrentUserName(),
ENV_RESOURCE_PROFILE_ID -> "0",
// These are populated by K8s on scheduling
ENV_EXECUTOR_POD_IP -> null,
ENV_EXECUTOR_POD_NAME -> null)
val extraJavaOptsStart = additionalEnvVars.keys.count(_.startsWith(ENV_JAVA_OPT_PREFIX))
val extraJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val extraJavaOptsEnvs = extraJavaOpts.zipWithIndex.map { case (opt, ind) =>
s"$ENV_JAVA_OPT_PREFIX${ind + extraJavaOptsStart}" -> opt
}.toMap
val containerEnvs = executorPod.container.getEnv.asScala.map {
x => (x.getName, x.getValue)
}.toMap
val expectedEnvs = defaultEnvs ++ additionalEnvVars ++ extraJavaOptsEnvs
assert(containerEnvs === expectedEnvs)
}

} ++ {
kubernetesConf.get(EXECUTOR_CLASS_PATH).map { cp =>
new EnvVarBuilder()
.withName(ENV_CLASSPATH)
.withValue(cp)
.build()
}
} ++ {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for in-depth analysis.


// re-write primary resource to be the remote one and upload the related file
val newResName = KubernetesUtils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder}
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -127,4 +127,35 @@ class KubernetesUtilsSuite extends SparkFunSuite with PrivateMethodTester {
}
}
}

test("SPARK-38582: verify that envVars built with kv env as expected") {
val input = for (i <- 9 to 1 by -1) yield (s"testEnvKey.$i", "testEnvValue123456")
val expectedEnvVars = input.map { case(k, v) =>
new EnvVarBuilder()
.withName(k)
.withValue(v).build()
}
val outputEnvVars =
KubernetesUtils.buildEnvVarsWithKV(input.toMap + ("testKeyForNull" -> null))
assert(outputEnvVars.toSet == expectedEnvVars.toSet)
}

test("SPARK-38582: verify that envVars built with field ref env as expected") {
val input = for (i <- 9 to 1 by -1) yield (s"testEnvKey.$i", s"v$i", "testEnvValue123456")
val expectedEnvVars = input.map { env =>
new EnvVarBuilder()
.withName(env._1)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef(env._2, env._3)
.build())
.build()
}
val outputEnvVars =
KubernetesUtils.buildEnvVarsWithFieldRef(
input ++ Seq(
("testKey1", null, "testValue1"),
("testKey2", "v1", null),
("testKey3", null, null)))
assert(outputEnvVars == expectedEnvVars)
}
}