diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 471196ac0e3f..0deaabd341d7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -98,6 +98,13 @@ private[spark] object Config extends Logging { .stringConf .createWithDefault("spark") + val KUBERNETES_EXECUTOR_VOLUMES = + ConfigBuilder("spark.kubernetes.executor.volumes") + .doc("List of volumes mounted into the executor container. The format of this property is " + + "a comma-separated list of mappings following the form hostPath:containerPath[:ro|rw]") + .stringConf + .createWithDefault("") + val KUBERNETES_ALLOCATION_BATCH_SIZE = ConfigBuilder("spark.kubernetes.allocation.batch.size") .doc("Number of pods to launch at once in each round of executor allocation.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 37331d8bbf9b..5d0117906426 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s import java.io.File -import io.fabric8.kubernetes.api.model.{Container, Pod, PodBuilder} +import io.fabric8.kubernetes.api.model._ import org.apache.spark.SparkConf import org.apache.spark.util.Utils @@ -43,6 +43,57 @@ private[spark] object KubernetesUtils { opt1.foreach { _ => require(opt2.isEmpty, errMessage) } } + /** + * Parse a comma-delimited list of volume specs, each of which takes the form + * hostPath:containerPath[:ro|rw]; and add volume to pod and volume mount to container. + * + * @param pod original specification of the pod + * @param container original specification of the container + * @param volumes list of volume specs + * @return a tuple of (pod with the volume(s) added, container with mount(s) added) + */ + def addVolumes(pod: Pod, container : Container, volumes: String): (Pod, Container) = { + val podBuilder = new PodBuilder(pod).editOrNewSpec() + val containerBuilder = new ContainerBuilder(container) + var volumeCount = 0 + volumes.split(",").map(_.split(":")).map { spec => + var hostPath: Option[String] = None + var containerPath: Option[String] = None + var readOnly: Option[Boolean] = None + spec match { + case Array(hostPathV, containerPathV) => + hostPath = Some(hostPathV) + containerPath = Some(containerPathV) + case Array(hostPathV, containerPathV, "ro") => + hostPath = Some(hostPathV) + containerPath = Some(containerPathV) + readOnly = Some(true) + case Array(hostPathV, containerPathV, "rw") => + hostPath = Some(hostPathV) + containerPath = Some(containerPathV) + readOnly = Some(false) + case spec => + None + } + if (hostPath.isDefined && containerPath.isDefined) { + podBuilder.addToVolumes(new VolumeBuilder() + .withHostPath(new HostPathVolumeSource(hostPath.get)) + .withName(s"hostPath-volume-$volumeCount") + .build()) + val volumeBuilder = new VolumeMountBuilder() + .withMountPath(containerPath.get) + .withName(s"hostPath-volume-$volumeCount") + if (readOnly.isDefined) { + containerBuilder.addToVolumeMounts(volumeBuilder.withReadOnly(readOnly.get).build()) + } else { + containerBuilder.addToVolumeMounts(volumeBuilder.build()) + } + volumeCount += 1 + } + } + (podBuilder.endSpec().build(), containerBuilder.build()) + } + /** * Append the given init-container to a pod's list of init-containers. * diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index 141bd2827e7c..2cdd5bf491cd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -205,21 +205,27 @@ private[spark] class ExecutorPodFactory( .endSpec() .build() + val (executorPodWithVolumes, executorContainerWithVolumes) = + KubernetesUtils.addVolumes(executorPod, + executorContainer, + sparkConf.get(KUBERNETES_EXECUTOR_VOLUMES)) + val containerWithLimitCores = executorLimitCores.map { limitCores => val executorCpuLimitQuantity = new QuantityBuilder(false) .withAmount(limitCores) .build() - new ContainerBuilder(executorContainer) + new ContainerBuilder(executorContainerWithVolumes) .editResources() .addToLimits("cpu", executorCpuLimitQuantity) .endResources() .build() - }.getOrElse(executorContainer) + }.getOrElse(executorContainerWithVolumes) val (maybeSecretsMountedPod, maybeSecretsMountedContainer) = mountSecretsBootstrap.map { bootstrap => - (bootstrap.addSecretVolumes(executorPod), bootstrap.mountSecrets(containerWithLimitCores)) - }.getOrElse((executorPod, containerWithLimitCores)) + (bootstrap.addSecretVolumes(executorPodWithVolumes), + bootstrap.mountSecrets(containerWithLimitCores)) + }.getOrElse((executorPodWithVolumes, containerWithLimitCores)) val (bootstrappedPod, bootstrappedContainer) = initContainerBootstrap.map { bootstrap => diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index a3c615be031d..4875aeef41f1 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -182,6 +182,53 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef checkOwnerReferences(executor, driverPodUid) } + test("single executor hostPath volume gets mounted") { + val conf = baseConf.clone() + conf.set(KUBERNETES_EXECUTOR_VOLUMES, "/tmp/mount:/opt/mount") + val factory = new ExecutorPodFactory(conf, None, None, None) + + val executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + + assert(executor.getSpec.getContainers.size() === 1) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 1) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName + === "hostPath-volume-0") + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0) + .getMountPath === "/opt/mount") + + assert(executor.getSpec.getVolumes.size() === 1) + assert(executor.getSpec.getVolumes.get(0).getHostPath.getPath === "/tmp/mount") + + checkOwnerReferences(executor, driverPodUid) + } + + test("multiple executor hostPath volumes get mounted") { + val conf = baseConf.clone() + conf.set(KUBERNETES_EXECUTOR_VOLUMES, "/tmp/mount1:/opt/mount1,/tmp/mount2:/opt/mount2") + val factory = new ExecutorPodFactory(conf, None, None, None) + + val executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + + assert(executor.getSpec.getContainers.size() === 1) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 2) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName + === "hostPath-volume-0") + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(1).getName + === "hostPath-volume-1") + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0) + .getMountPath === "/opt/mount1") + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(1) + .getMountPath === "/opt/mount2") + + assert(executor.getSpec.getVolumes.size() === 2) + assert(executor.getSpec.getVolumes.get(0).getHostPath.getPath === "/tmp/mount1") + assert(executor.getSpec.getVolumes.get(1).getHostPath.getPath === "/tmp/mount2") + + checkOwnerReferences(executor, driverPodUid) + } + // There is always exactly one controller reference, and it points to the driver pod. private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = { assert(executor.getMetadata.getOwnerReferences.size() === 1)