Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ private[spark] object Config extends Logging {
.stringConf
.createWithDefault("spark")

val KUBERNETES_EXECUTOR_VOLUMES =
ConfigBuilder("spark.kubernetes.executor.volumes")
.doc("List of volumes mounted into the executor container. The format of this property is " +
"a comma-separated list of mappings following the form hostPath:containerPath[:ro|rw]")
.stringConf
.createWithDefault("")

val KUBERNETES_ALLOCATION_BATCH_SIZE =
ConfigBuilder("spark.kubernetes.allocation.batch.size")
.doc("Number of pods to launch at once in each round of executor allocation.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s

import java.io.File

import io.fabric8.kubernetes.api.model.{Container, Pod, PodBuilder}
import io.fabric8.kubernetes.api.model._

import org.apache.spark.SparkConf
import org.apache.spark.util.Utils
Expand All @@ -43,6 +43,57 @@ private[spark] object KubernetesUtils {
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
}

/**
* Parse a comma-delimited list of volume specs, each of which takes the form
* hostPath:containerPath[:ro|rw]; and add volume to pod and volume mount to container.
*
* @param pod original specification of the pod
* @param container original specification of the container
* @param volumes list of volume specs
* @return a tuple of (pod with the volume(s) added, container with mount(s) added)
*/
def addVolumes(pod: Pod, container : Container, volumes: String): (Pod, Container) = {
val podBuilder = new PodBuilder(pod).editOrNewSpec()
val containerBuilder = new ContainerBuilder(container)
var volumeCount = 0
volumes.split(",").map(_.split(":")).map { spec =>
var hostPath: Option[String] = None
var containerPath: Option[String] = None
var readOnly: Option[Boolean] = None
spec match {
case Array(hostPathV, containerPathV) =>
hostPath = Some(hostPathV)
containerPath = Some(containerPathV)
case Array(hostPathV, containerPathV, "ro") =>
hostPath = Some(hostPathV)
containerPath = Some(containerPathV)
readOnly = Some(true)
case Array(hostPathV, containerPathV, "rw") =>
hostPath = Some(hostPathV)
containerPath = Some(containerPathV)
readOnly = Some(false)
case spec =>
None
}
if (hostPath.isDefined && containerPath.isDefined) {
podBuilder.addToVolumes(new VolumeBuilder()
.withHostPath(new HostPathVolumeSource(hostPath.get))
.withName(s"hostPath-volume-$volumeCount")
.build())
val volumeBuilder = new VolumeMountBuilder()
.withMountPath(containerPath.get)
.withName(s"hostPath-volume-$volumeCount")
if (readOnly.isDefined) {
containerBuilder.addToVolumeMounts(volumeBuilder.withReadOnly(readOnly.get).build())
} else {
containerBuilder.addToVolumeMounts(volumeBuilder.build())
}
volumeCount += 1
}
}
(podBuilder.endSpec().build(), containerBuilder.build())
}

/**
* Append the given init-container to a pod's list of init-containers.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,21 +205,27 @@ private[spark] class ExecutorPodFactory(
.endSpec()
.build()

val (executorPodWithVolumes, executorContainerWithVolumes) =
KubernetesUtils.addVolumes(executorPod,
executorContainer,
sparkConf.get(KUBERNETES_EXECUTOR_VOLUMES))

val containerWithLimitCores = executorLimitCores.map { limitCores =>
val executorCpuLimitQuantity = new QuantityBuilder(false)
.withAmount(limitCores)
.build()
new ContainerBuilder(executorContainer)
new ContainerBuilder(executorContainerWithVolumes)
.editResources()
.addToLimits("cpu", executorCpuLimitQuantity)
.endResources()
.build()
}.getOrElse(executorContainer)
}.getOrElse(executorContainerWithVolumes)

val (maybeSecretsMountedPod, maybeSecretsMountedContainer) =
mountSecretsBootstrap.map { bootstrap =>
(bootstrap.addSecretVolumes(executorPod), bootstrap.mountSecrets(containerWithLimitCores))
}.getOrElse((executorPod, containerWithLimitCores))
(bootstrap.addSecretVolumes(executorPodWithVolumes),
bootstrap.mountSecrets(containerWithLimitCores))
}.getOrElse((executorPodWithVolumes, containerWithLimitCores))

val (bootstrappedPod, bootstrappedContainer) =
initContainerBootstrap.map { bootstrap =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,53 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
checkOwnerReferences(executor, driverPodUid)
}

test("single executor hostPath volume gets mounted") {
val conf = baseConf.clone()
conf.set(KUBERNETES_EXECUTOR_VOLUMES, "/tmp/mount:/opt/mount")
val factory = new ExecutorPodFactory(conf, None, None, None)

val executor = factory.createExecutorPod(
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())

assert(executor.getSpec.getContainers.size() === 1)
assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 1)
assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName
=== "hostPath-volume-0")
assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0)
.getMountPath === "/opt/mount")

assert(executor.getSpec.getVolumes.size() === 1)
assert(executor.getSpec.getVolumes.get(0).getHostPath.getPath === "/tmp/mount")

checkOwnerReferences(executor, driverPodUid)
}

test("multiple executor hostPath volumes get mounted") {
val conf = baseConf.clone()
conf.set(KUBERNETES_EXECUTOR_VOLUMES, "/tmp/mount1:/opt/mount1,/tmp/mount2:/opt/mount2")
val factory = new ExecutorPodFactory(conf, None, None, None)

val executor = factory.createExecutorPod(
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())

assert(executor.getSpec.getContainers.size() === 1)
assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 2)
assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName
=== "hostPath-volume-0")
assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(1).getName
=== "hostPath-volume-1")
assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0)
.getMountPath === "/opt/mount1")
assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(1)
.getMountPath === "/opt/mount2")

assert(executor.getSpec.getVolumes.size() === 2)
assert(executor.getSpec.getVolumes.get(0).getHostPath.getPath === "/tmp/mount1")
assert(executor.getSpec.getVolumes.get(1).getHostPath.getPath === "/tmp/mount2")

checkOwnerReferences(executor, driverPodUid)
}

// There is always exactly one controller reference, and it points to the driver pod.
private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
assert(executor.getMetadata.getOwnerReferences.size() === 1)
Expand Down