Skip to content

Commit 1f2e7b8

Browse files
EnricoMidongjoon-hyun
authored andcommitted
[SPARK-49731][K8S] Support K8s volume mount.subPathExpr and hostPath volume type
### What changes were proposed in this pull request? Add the following config options: - `spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.subPathExpr` - `spark.kubernetes.executor.volumes.hostPath.[VolumeName].options.type` ### Why are the changes needed? K8s Spec - https://kubernetes.io/docs/concepts/storage/volumes/#hostpath-volume-types - https://kubernetes.io/docs/concepts/storage/volumes/#using-subpath-expanded-environment These are natural extensions of the existing options - `spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.subPath` - `spark.kubernetes.executor.volumes.hostPath.[VolumeName].options.path` ### Does this PR introduce _any_ user-facing change? Above config options. ### How was this patch tested? Unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #48181 from EnricoMi/k8s-volume-options. Authored-by: Enrico Minack <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 983f6f4 commit 1f2e7b8

File tree

8 files changed

+144
-13
lines changed

8 files changed

+144
-13
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -769,8 +769,10 @@ private[spark] object Config extends Logging {
769769
val KUBERNETES_VOLUMES_NFS_TYPE = "nfs"
770770
val KUBERNETES_VOLUMES_MOUNT_PATH_KEY = "mount.path"
771771
val KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY = "mount.subPath"
772+
val KUBERNETES_VOLUMES_MOUNT_SUBPATHEXPR_KEY = "mount.subPathExpr"
772773
val KUBERNETES_VOLUMES_MOUNT_READONLY_KEY = "mount.readOnly"
773774
val KUBERNETES_VOLUMES_OPTIONS_PATH_KEY = "options.path"
775+
val KUBERNETES_VOLUMES_OPTIONS_TYPE_KEY = "options.type"
774776
val KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY = "options.claimName"
775777
val KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY = "options.storageClass"
776778
val KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY = "options.medium"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s
1818

1919
private[spark] sealed trait KubernetesVolumeSpecificConf
2020

21-
private[spark] case class KubernetesHostPathVolumeConf(hostPath: String)
21+
private[spark] case class KubernetesHostPathVolumeConf(hostPath: String, volumeType: String)
2222
extends KubernetesVolumeSpecificConf
2323

2424
private[spark] case class KubernetesPVCVolumeConf(
@@ -42,5 +42,6 @@ private[spark] case class KubernetesVolumeSpec(
4242
volumeName: String,
4343
mountPath: String,
4444
mountSubPath: String,
45+
mountSubPathExpr: String,
4546
mountReadOnly: Boolean,
4647
volumeConf: KubernetesVolumeSpecificConf)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ object KubernetesVolumeUtils {
4545
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_PATH_KEY"
4646
val readOnlyKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_READONLY_KEY"
4747
val subPathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY"
48+
val subPathExprKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATHEXPR_KEY"
4849
val labelKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_LABEL_KEY"
50+
verifyMutuallyExclusiveOptionKeys(properties, subPathKey, subPathExprKey)
4951

5052
val volumeLabelsMap = properties
5153
.filter(_._1.startsWith(labelKey))
@@ -57,6 +59,7 @@ object KubernetesVolumeUtils {
5759
volumeName = volumeName,
5860
mountPath = properties(pathKey),
5961
mountSubPath = properties.getOrElse(subPathKey, ""),
62+
mountSubPathExpr = properties.getOrElse(subPathExprKey, ""),
6063
mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean),
6164
volumeConf = parseVolumeSpecificConf(properties,
6265
volumeType, volumeName, Option(volumeLabelsMap)))
@@ -87,8 +90,11 @@ object KubernetesVolumeUtils {
8790
volumeType match {
8891
case KUBERNETES_VOLUMES_HOSTPATH_TYPE =>
8992
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY"
93+
val typeKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_TYPE_KEY"
9094
verifyOptionKey(options, pathKey, KUBERNETES_VOLUMES_HOSTPATH_TYPE)
91-
KubernetesHostPathVolumeConf(options(pathKey))
95+
// "" means that no checks will be performed before mounting the hostPath volume
96+
// backward compatibility default
97+
KubernetesHostPathVolumeConf(options(pathKey), options.getOrElse(typeKey, ""))
9298

9399
case KUBERNETES_VOLUMES_PVC_TYPE =>
94100
val claimNameKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY"
@@ -129,6 +135,16 @@ object KubernetesVolumeUtils {
129135
}
130136
}
131137

138+
private def verifyMutuallyExclusiveOptionKeys(
139+
options: Map[String, String],
140+
keys: String*): Unit = {
141+
val givenKeys = keys.filter(options.contains)
142+
if (givenKeys.length > 1) {
143+
throw new IllegalArgumentException("These config options are mutually exclusive: " +
144+
s"${givenKeys.mkString(", ")}")
145+
}
146+
}
147+
132148
private def verifySize(size: Option[String]): Unit = {
133149
size.foreach { v =>
134150
if (v.forall(_.isDigit) && parseLong(v) < 1024) {

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,14 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
6565
.withMountPath(spec.mountPath)
6666
.withReadOnly(spec.mountReadOnly)
6767
.withSubPath(spec.mountSubPath)
68+
.withSubPathExpr(spec.mountSubPathExpr)
6869
.withName(spec.volumeName)
6970
.build()
7071

7172
val volumeBuilder = spec.volumeConf match {
72-
case KubernetesHostPathVolumeConf(hostPath) =>
73-
/* "" means that no checks will be performed before mounting the hostPath volume */
73+
case KubernetesHostPathVolumeConf(hostPath, volumeType) =>
7474
new VolumeBuilder()
75-
.withHostPath(new HostPathVolumeSource(hostPath, ""))
75+
.withHostPath(new HostPathVolumeSource(hostPath, volumeType))
7676

7777
case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size, labels) =>
7878
val claimName = conf match {

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,10 @@ object KubernetesTestConf {
113113

114114
volumes.foreach { case spec =>
115115
val (vtype, configs) = spec.volumeConf match {
116-
case KubernetesHostPathVolumeConf(path) =>
117-
(KUBERNETES_VOLUMES_HOSTPATH_TYPE,
118-
Map(KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> path))
116+
case KubernetesHostPathVolumeConf(hostPath, volumeType) =>
117+
(KUBERNETES_VOLUMES_HOSTPATH_TYPE, Map(
118+
KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> hostPath,
119+
KUBERNETES_VOLUMES_OPTIONS_TYPE_KEY -> volumeType))
119120

120121
case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit, labels) =>
121122
val sconf = storageClass
@@ -145,6 +146,10 @@ object KubernetesTestConf {
145146
conf.set(key(vtype, spec.volumeName, KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY),
146147
spec.mountSubPath)
147148
}
149+
if (spec.mountSubPathExpr.nonEmpty) {
150+
conf.set(key(vtype, spec.volumeName, KUBERNETES_VOLUMES_MOUNT_SUBPATHEXPR_KEY),
151+
spec.mountSubPathExpr)
152+
}
148153
conf.set(key(vtype, spec.volumeName, KUBERNETES_VOLUMES_MOUNT_READONLY_KEY),
149154
spec.mountReadOnly.toString)
150155
configs.foreach { case (k, v) =>

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,20 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
3030
assert(volumeSpec.mountPath === "/path")
3131
assert(volumeSpec.mountReadOnly)
3232
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesHostPathVolumeConf] ===
33-
KubernetesHostPathVolumeConf("/hostPath"))
33+
KubernetesHostPathVolumeConf("/hostPath", ""))
34+
}
35+
36+
test("Parses hostPath volume type correctly") {
37+
val sparkConf = new SparkConf(false)
38+
sparkConf.set("test.hostPath.volumeName.mount.path", "/path")
39+
sparkConf.set("test.hostPath.volumeName.options.path", "/hostPath")
40+
sparkConf.set("test.hostPath.volumeName.options.type", "Type")
41+
42+
val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
43+
assert(volumeSpec.volumeName === "volumeName")
44+
assert(volumeSpec.mountPath === "/path")
45+
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesHostPathVolumeConf] ===
46+
KubernetesHostPathVolumeConf("/hostPath", "Type"))
3447
}
3548

3649
test("Parses subPath correctly") {
@@ -43,6 +56,33 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
4356
assert(volumeSpec.volumeName === "volumeName")
4457
assert(volumeSpec.mountPath === "/path")
4558
assert(volumeSpec.mountSubPath === "subPath")
59+
assert(volumeSpec.mountSubPathExpr === "")
60+
}
61+
62+
test("Parses subPathExpr correctly") {
63+
val sparkConf = new SparkConf(false)
64+
sparkConf.set("test.emptyDir.volumeName.mount.path", "/path")
65+
sparkConf.set("test.emptyDir.volumeName.mount.readOnly", "true")
66+
sparkConf.set("test.emptyDir.volumeName.mount.subPathExpr", "subPathExpr")
67+
68+
val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
69+
assert(volumeSpec.volumeName === "volumeName")
70+
assert(volumeSpec.mountPath === "/path")
71+
assert(volumeSpec.mountSubPath === "")
72+
assert(volumeSpec.mountSubPathExpr === "subPathExpr")
73+
}
74+
75+
test("Rejects mutually exclusive subPath and subPathExpr") {
76+
val sparkConf = new SparkConf(false)
77+
sparkConf.set("test.emptyDir.volumeName.mount.path", "/path")
78+
sparkConf.set("test.emptyDir.volumeName.mount.subPath", "subPath")
79+
sparkConf.set("test.emptyDir.volumeName.mount.subPathExpr", "subPathExpr")
80+
81+
val msg = intercept[IllegalArgumentException] {
82+
KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
83+
}.getMessage
84+
assert(msg === "These config options are mutually exclusive: " +
85+
"emptyDir.volumeName.mount.subPath, emptyDir.volumeName.mount.subPathExpr")
4686
}
4787

4888
test("Parses persistentVolumeClaim volumes correctly") {

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,9 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite {
137137
"spark-local-dir-test",
138138
"/tmp",
139139
"",
140+
"",
140141
false,
141-
KubernetesHostPathVolumeConf("/hostPath/tmp")
142+
KubernetesHostPathVolumeConf("/hostPath/tmp", "")
142143
)
143144
val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf))
144145
val mountVolumeStep = new MountVolumesFeatureStep(kubernetesConf)

0 commit comments

Comments
 (0)