diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala index e598a38e7f36..94b5c37f96e3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.features import io.fabric8.kubernetes.api.model._ import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.Constants.ENV_EXECUTOR_ID private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep { @@ -56,7 +57,13 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) new VolumeBuilder() .withHostPath(new HostPathVolumeSource(hostPath, "")) - case KubernetesPVCVolumeConf(claimName) => + case KubernetesPVCVolumeConf(claimNameTemplate) => + val claimName = conf match { + case c: KubernetesExecutorConf => + claimNameTemplate.replaceAll(ENV_EXECUTOR_ID, c.executorId) + case _ => + claimNameTemplate + } new VolumeBuilder() .withPersistentVolumeClaim( new PersistentVolumeClaimVolumeSource(claimName, spec.mountReadOnly)) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index 0d0ed50c0927..a9a1ec46a6e6 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -64,6 +64,31 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { } + test("SPARK-32713 Mounts parameterized persistentVolumeClaims in executors") { + val volumeConf = KubernetesVolumeSpec( + "testVolume", + "/tmp", + "", + true, + KubernetesPVCVolumeConf("pvc-spark-SPARK_EXECUTOR_ID") + ) + val driverConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf)) + val driverStep = new MountVolumesFeatureStep(driverConf) + val driverPod = driverStep.configurePod(SparkPod.initialPod()) + + assert(driverPod.pod.getSpec.getVolumes.size() === 1) + val driverPVC = driverPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim + assert(driverPVC.getClaimName === "pvc-spark-SPARK_EXECUTOR_ID") + + val executorConf = KubernetesTestConf.createExecutorConf(volumes = Seq(volumeConf)) + val executorStep = new MountVolumesFeatureStep(executorConf) + val executorPod = executorStep.configurePod(SparkPod.initialPod()) + + assert(executorPod.pod.getSpec.getVolumes.size() === 1) + val executorPVC = executorPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim + assert(executorPVC.getClaimName === s"pvc-spark-${KubernetesTestConf.EXECUTOR_ID}") + } + test("Mounts emptyDir") { val volumeConf = KubernetesVolumeSpec( "testVolume",