Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ object Constants {
val SPARK_CONF_VOLUME_EXEC = "spark-conf-volume-exec"
val SPARK_CONF_DIR_INTERNAL = "/opt/spark/conf"
val SPARK_CONF_FILE_NAME = "spark.properties"
val SPARK_ENV_FILE_NAME = "spark-env.sh"
val SPARK_CONF_PATH = s"$SPARK_CONF_DIR_INTERNAL/$SPARK_CONF_FILE_NAME"
val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ private[spark] class Client(
def run(): Unit = {
val resolvedDriverSpec = builder.buildFromFeatures(conf, kubernetesClient)
val configMapName = KubernetesClientUtils.configMapNameDriver
val confFilesMap = KubernetesClientUtils.buildSparkConfDirFilesMap(configMapName,
val rawConfFilesMap = KubernetesClientUtils.buildSparkConfDirFilesMap(configMapName,
conf.sparkConf, resolvedDriverSpec.systemProperties)
val confFilesMap = KubernetesClientUtils.overrideDefaultSparkEnv(conf, rawConfFilesMap)
val configMap = KubernetesClientUtils.buildConfigMap(configMapName, confFilesMap +
(KUBERNETES_NAMESPACE.key -> conf.namespace))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder, KeyToPath}

import org.apache.spark.SparkConf
import org.apache.spark.annotation.{DeveloperApi, Since, Unstable}
import org.apache.spark.deploy.k8s.{Config, Constants, KubernetesUtils}
import org.apache.spark.deploy.k8s.{Config, Constants, KubernetesDriverConf, KubernetesUtils}
import org.apache.spark.deploy.k8s.Config.{KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH, KUBERNETES_NAMESPACE}
import org.apache.spark.deploy.k8s.Constants.ENV_SPARK_CONF_DIR
import org.apache.spark.deploy.k8s.Constants.{ENV_SPARK_CONF_DIR, SPARK_ENV_FILE_NAME}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.{CONFIG, PATH, PATHS}
import org.apache.spark.util.ArrayImplicits._
Expand Down Expand Up @@ -116,6 +116,31 @@ object KubernetesClientUtils extends Logging {
}.toList.sortBy(x => x.getKey) // List is sorted to make mocking based tests work
}

@Since("4.2.0")
def overrideDefaultSparkEnv(conf: KubernetesDriverConf,
confFilesMap: Map[String, String]): Map[String, String] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From Apache Spark 4.1.0, we need to provide a Java-friendly version together like the following.

def buildSparkConfDirFilesMapJava(
def buildSparkConfDirFilesMap(

if (conf.environment.isEmpty) {
return confFilesMap
}
val customEnvs = conf.environment.map {
case (k, v) => s"export $k=$v"
}.mkString("\n", "\n", "\n")
val mapSize = confFilesMap.map {
case (k, v) => k.length + v.length
}.sum
val maxSize = conf.sparkConf.get(Config.CONFIG_MAP_MAXSIZE)
if (mapSize + customEnvs.length < maxSize) {
confFilesMap ++ Map(SPARK_ENV_FILE_NAME ->
s"${confFilesMap.getOrElse(SPARK_ENV_FILE_NAME, "")}$customEnvs")
} else {
logWarning(log"Skipped custom driver env, due to size constraint" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to introduce unsafe behavior to the Spark. Previously, we didn't fail?

Copy link
Author

@littlexyw littlexyw Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to introduce unsafe behavior to the Spark. Previously, we didn't fail?

Thanks for catching this! You're absolutely right - the original behavior didn't fail when it should have. I agree this introduces unsafe behavior.

To address this, I've submitted a separate PR that adds proper validation:
#53455: Fail app on ConfigMap size over limit

Could you please review this new fix when you have a moment? Would love to hear your thoughts!

log" Please see, config: `${MDC(CONFIG, Config.CONFIG_MAP_MAXSIZE.key)}`" +
log" for more details.")
confFilesMap
}
}


/**
* Build a ConfigMap that will hold the content for environment variable SPARK_CONF_DIR
* on remote pods. (Java-friendly)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import io.fabric8.kubernetes.api.model.ConfigMapBuilder
import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config
import org.apache.spark.deploy.k8s.{Config, KubernetesTestConf}
import org.apache.spark.util.Utils

class KubernetesClientUtilsSuite extends SparkFunSuite with BeforeAndAfter {
Expand Down Expand Up @@ -128,4 +128,52 @@ class KubernetesClientUtilsSuite extends SparkFunSuite with BeforeAndAfter {
.build()
assert(outputConfigMap === expectedConfigMap)
}

test("SPARK-54605: check custom driver env appended as expected when config map size is " +
"below threshold.") {
val input = Map("spark-env.sh" -> "test12345\ntest12345", "testConf.1" -> "test123456")
val sparkConf = testSetup(input.map(f => f._1 -> f._2.getBytes(StandardCharsets.UTF_8)))
.set(Config.CONFIG_MAP_MAXSIZE.key, "90")
val conf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
environment = Map("AAA" -> "value1", "BBB" -> "value2")
)
val confFileMap = KubernetesClientUtils.overrideDefaultSparkEnv(conf, input)
val expectedConfFileMap = Map(
"spark-env.sh" -> "test12345\ntest12345\nexport AAA=value1\nexport BBB=value2\n",
"testConf.1" -> "test123456")
assert(confFileMap === expectedConfFileMap)
}

test("SPARK-54605: check custom driver env not appended when config map size is " +
"larger than threshold.") {
val input = Map("spark-env.sh" -> "test12345\ntest12345", "testConf.1" -> "test123456")
val sparkConf = testSetup(input.map(f => f._1 -> f._2.getBytes(StandardCharsets.UTF_8)))
.set(Config.CONFIG_MAP_MAXSIZE.key, "87")
val conf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
environment = Map("AAA" -> "value1", "BBB" -> "value2")
)
val confFileMap = KubernetesClientUtils.overrideDefaultSparkEnv(conf, input)
val expectedConfFileMap = Map(
"spark-env.sh" -> "test12345\ntest12345",
"testConf.1" -> "test123456")
assert(confFileMap === expectedConfFileMap)
}

test("SPARK-54605: check custom driver env not appended when config map size is " +
"equal to threshold.") {
val input = Map("spark-env.sh" -> "test12345\ntest12345", "testConf.1" -> "test123456")
val sparkConf = testSetup(input.map(f => f._1 -> f._2.getBytes(StandardCharsets.UTF_8)))
.set(Config.CONFIG_MAP_MAXSIZE.key, "88")
val conf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
environment = Map("AAA" -> "value1", "BBB" -> "value2")
)
val confFileMap = KubernetesClientUtils.overrideDefaultSparkEnv(conf, input)
val expectedConfFileMap = Map(
"spark-env.sh" -> "test12345\ntest12345",
"testConf.1" -> "test123456")
assert(confFileMap === expectedConfFileMap)
}
}