From addb8fec7c7fcd1535cfdc29594d74b04981ae19 Mon Sep 17 00:00:00 2001 From: madanadit Date: Wed, 4 Apr 2018 18:02:25 -0700 Subject: [PATCH 1/9] Spark executor volumes configuration --- .../org/apache/spark/deploy/k8s/Config.scala | 7 +++++ .../spark/deploy/k8s/KubernetesUtils.scala | 31 ++++++++++++++++++- .../cluster/k8s/ExecutorPodFactory.scala | 15 ++++++--- 3 files changed, 48 insertions(+), 5 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 471196ac0e3f..b034cdd61e47 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -98,6 +98,13 @@ private[spark] object Config extends Logging { .stringConf .createWithDefault("spark") + val KUBERNETES_EXECUTOR_VOLUMES = + ConfigBuilder("spark.kubernetes.executor.volumes") + .doc("List of volumes mounted into the executor container. The format of this property is " + + "a comma-separated list of mappings following the form hostPath:containerPath:name") + .stringConf + .createWithDefault("") + val KUBERNETES_ALLOCATION_BATCH_SIZE = ConfigBuilder("spark.kubernetes.allocation.batch.size") .doc("Number of pods to launch at once in each round of executor allocation.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 37331d8bbf9b..e1e5652cdb4c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s import java.io.File -import io.fabric8.kubernetes.api.model.{Container, Pod, PodBuilder} +import io.fabric8.kubernetes.api.model._ import org.apache.spark.SparkConf import org.apache.spark.util.Utils @@ -43,6 +43,35 @@ private[spark] object KubernetesUtils { opt1.foreach { _ => require(opt2.isEmpty, errMessage) } } + /** + * Parse a comma-delimited list of volume specs, each of which + * takes the form hostPath:containerPath:name and add to pod. + * + * @param pod original specification of the pod + * @param container original specification of the container + * @param volumes list of volume specs + * @return the pod with the init-container added to the list of InitContainers + */ + def addVolumes(pod: Pod, container : Container, volumes: String): (Pod, Container) = { + val podBuilder = new PodBuilder(pod).editOrNewSpec() + val containerBuilder = new ContainerBuilder(container) + volumes.split(",").map(_.split(":")).map { spec => + spec match { + case Array(hostPath, containerPath, name) => + podBuilder + .withVolumes(new VolumeBuilder() + .withHostPath(new HostPathVolumeSource(hostPath)).withName(name).build()) + containerBuilder.addToVolumeMounts(new VolumeMountBuilder() + .withMountPath(containerPath) + .withName(name) + .build()) + case spec => + None + } + } + (podBuilder.endSpec().build(), containerBuilder.build()) + } + /** * Append the given init-container to a pod's list of init-containers. * diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index 141bd2827e7c..c3ae6c936f15 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -205,21 +205,28 @@ private[spark] class ExecutorPodFactory( .endSpec() .build() + val volumes = sparkConf.get(KUBERNETES_EXECUTOR_VOLUMES) + val executorPodAndContainerWithVolumes = + KubernetesUtils.addVolumes(executorPod, executorContainer, volumes) + val executorPodWithVolumes = executorPodAndContainerWithVolumes._1 + val executorContainerWithVolumes = executorPodAndContainerWithVolumes._2 + val containerWithLimitCores = executorLimitCores.map { limitCores => val executorCpuLimitQuantity = new QuantityBuilder(false) .withAmount(limitCores) .build() - new ContainerBuilder(executorContainer) + new ContainerBuilder(executorContainerWithVolumes) .editResources() .addToLimits("cpu", executorCpuLimitQuantity) .endResources() .build() - }.getOrElse(executorContainer) + }.getOrElse(executorContainerWithVolumes) val (maybeSecretsMountedPod, maybeSecretsMountedContainer) = mountSecretsBootstrap.map { bootstrap => - (bootstrap.addSecretVolumes(executorPod), bootstrap.mountSecrets(containerWithLimitCores)) - }.getOrElse((executorPod, containerWithLimitCores)) + (bootstrap.addSecretVolumes(executorPodWithVolumes), + bootstrap.mountSecrets(containerWithLimitCores)) + }.getOrElse((executorPodWithVolumes, containerWithLimitCores)) val (bootstrappedPod, bootstrappedContainer) = initContainerBootstrap.map { bootstrap => From f33eaf53b914c3414a4d4a2bca9c5bc3d84aa268 Mon Sep 17 00:00:00 2001 From: madanadit Date: Thu, 5 Apr 2018 13:05:48 -0700 Subject: [PATCH 2/9] Fix style --- .../spark/deploy/k8s/KubernetesUtils.scala | 9 +- .../cluster/k8s/ExecutorPodFactory.scala | 235 ++++++++++-------- 2 files changed, 138 insertions(+), 106 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index e1e5652cdb4c..af1c0fd86769 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -44,8 +44,8 @@ private[spark] object KubernetesUtils { } /** - * Parse a comma-delimited list of volume specs, each of which - * takes the form hostPath:containerPath:name and add to pod. + * Parse a comma-delimited list of volume specs, each of which takes the form + * hostPath:containerPath:name; and add volume to pod and mount volume mount to container. * * @param pod original specification of the pod * @param container original specification of the container @@ -60,12 +60,15 @@ private[spark] object KubernetesUtils { case Array(hostPath, containerPath, name) => podBuilder .withVolumes(new VolumeBuilder() - .withHostPath(new HostPathVolumeSource(hostPath)).withName(name).build()) + .withHostPath(new HostPathVolumeSource(hostPath)) + .withName(name) + .build()) containerBuilder.addToVolumeMounts(new VolumeMountBuilder() .withMountPath(containerPath) .withName(name) .build()) case spec => + // TODO(adit): log warming None } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index c3ae6c936f15..92fce75bea41 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -21,26 +21,36 @@ import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model._ import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.deploy.k8s.{InitContainerBootstrap, KubernetesUtils, MountSecretsBootstrap, PodWithDetachedInitContainer} +import org.apache.spark.deploy.k8s.{ + InitContainerBootstrap, + KubernetesUtils, + MountSecretsBootstrap, + PodWithDetachedInitContainer +} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD} +import org.apache.spark.internal.config.{ + EXECUTOR_CLASS_PATH, + EXECUTOR_JAVA_OPTIONS, + EXECUTOR_MEMORY, + EXECUTOR_MEMORY_OVERHEAD +} import org.apache.spark.util.Utils /** - * A factory class for bootstrapping and creating executor pods with the given bootstrapping - * components. - * - * @param sparkConf Spark configuration - * @param mountSecretsBootstrap an optional component for mounting user-specified secrets onto - * user-specified paths into the executor container - * @param initContainerBootstrap an optional component for bootstrapping the executor init-container - * if one is needed, i.e., when there are remote dependencies to - * localize - * @param initContainerMountSecretsBootstrap an optional component for mounting user-specified - * secrets onto user-specified paths into the executor - * init-container - */ + * A factory class for bootstrapping and creating executor pods with the given bootstrapping + * components. + * + * @param sparkConf Spark configuration + * @param mountSecretsBootstrap an optional component for mounting user-specified secrets onto + * user-specified paths into the executor container + * @param initContainerBootstrap an optional component for bootstrapping the executor init-container + * if one is needed, i.e., when there are remote dependencies to + * localize + * @param initContainerMountSecretsBootstrap an optional component for mounting user-specified + * secrets onto user-specified paths into the executor + * init-container + */ private[spark] class ExecutorPodFactory( sparkConf: SparkConf, mountSecretsBootstrap: Option[MountSecretsBootstrap], @@ -68,44 +78,46 @@ private[spark] class ExecutorPodFactory( sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX) private val nodeSelector = - KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, - KUBERNETES_NODE_SELECTOR_PREFIX) + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX) private val executorContainerImage = sparkConf .get(EXECUTOR_CONTAINER_IMAGE) - .getOrElse(throw new SparkException("Must specify the executor container image")) + .getOrElse( + throw new SparkException("Must specify the executor container image")) private val imagePullPolicy = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY) private val blockManagerPort = sparkConf .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) - private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + private val executorPodNamePrefix = + sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) private val executorMemoryMiB = sparkConf.get(EXECUTOR_MEMORY) - private val executorMemoryString = sparkConf.get( - EXECUTOR_MEMORY.key, EXECUTOR_MEMORY.defaultValueString) + private val executorMemoryString = + sparkConf.get(EXECUTOR_MEMORY.key, EXECUTOR_MEMORY.defaultValueString) private val memoryOverheadMiB = sparkConf .get(EXECUTOR_MEMORY_OVERHEAD) - .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, - MEMORY_OVERHEAD_MIN_MIB)) + .getOrElse( + math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB private val executorCores = sparkConf.getDouble("spark.executor.cores", 1) - private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES) + private val executorLimitCores = + sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES) private val executorJarsDownloadDir = sparkConf.get(JARS_DOWNLOAD_LOCATION) /** - * Configure and construct an executor pod with the given parameters. - */ - def createExecutorPod( - executorId: String, - applicationId: String, - driverUrl: String, - executorEnvs: Seq[(String, String)], - driverPod: Pod, - nodeToLocalTaskCount: Map[String, Int]): Pod = { + * Configure and construct an executor pod with the given parameters. + */ + def createExecutorPod(executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod = { val name = s"$executorPodNamePrefix-exec-$executorId" // hostname must be no longer than 63 characters, so take the last 63 characters of the pod @@ -138,9 +150,13 @@ private[spark] class ExecutorPodFactory( val delimitedOpts = Utils.splitCommandString(opts) delimitedOpts.zipWithIndex.map { case (opt, index) => - new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build() + new EnvVarBuilder() + .withName(s"$ENV_JAVA_OPT_PREFIX$index") + .withValue(opt) + .build() } - }.getOrElse(Seq.empty[EnvVar]) + } + .getOrElse(Seq.empty[EnvVar]) val executorEnv = (Seq( (ENV_DRIVER_URL, driverUrl), // Executor backend expects integral value for executor cores, so round it up to an int. @@ -148,12 +164,14 @@ private[spark] class ExecutorPodFactory( (ENV_EXECUTOR_MEMORY, executorMemoryString), (ENV_APPLICATION_ID, applicationId), (ENV_EXECUTOR_ID, executorId), - (ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*")) ++ executorEnvs) - .map(env => new EnvVarBuilder() - .withName(env._1) - .withValue(env._2) - .build() - ) ++ Seq( + (ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*") + ) ++ executorEnvs) + .map( + env => + new EnvVarBuilder() + .withName(env._1) + .withValue(env._2) + .build()) ++ Seq( new EnvVarBuilder() .withName(ENV_EXECUTOR_POD_IP) .withValueFrom(new EnvVarSourceBuilder() @@ -161,13 +179,13 @@ private[spark] class ExecutorPodFactory( .build()) .build() ) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq - val requiredPorts = Seq( - (BLOCK_MANAGER_PORT_NAME, blockManagerPort)) - .map { case (name, port) => - new ContainerPortBuilder() - .withName(name) - .withContainerPort(port) - .build() + val requiredPorts = Seq((BLOCK_MANAGER_PORT_NAME, blockManagerPort)) + .map { + case (name, port) => + new ContainerPortBuilder() + .withName(name) + .withContainerPort(port) + .build() } val executorContainer = new ContainerBuilder() @@ -175,10 +193,10 @@ private[spark] class ExecutorPodFactory( .withImage(executorContainerImage) .withImagePullPolicy(imagePullPolicy) .withNewResources() - .addToRequests("memory", executorMemoryQuantity) - .addToLimits("memory", executorMemoryLimitQuantity) - .addToRequests("cpu", executorCpuQuantity) - .endResources() + .addToRequests("memory", executorMemoryQuantity) + .addToLimits("memory", executorMemoryLimitQuantity) + .addToRequests("cpu", executorCpuQuantity) + .endResources() .addAllToEnv(executorEnv.asJava) .withPorts(requiredPorts.asJava) .addToArgs("executor") @@ -186,73 +204,84 @@ private[spark] class ExecutorPodFactory( val executorPod = new PodBuilder() .withNewMetadata() - .withName(name) - .withLabels(resolvedExecutorLabels.asJava) - .withAnnotations(executorAnnotations.asJava) - .withOwnerReferences() - .addNewOwnerReference() - .withController(true) - .withApiVersion(driverPod.getApiVersion) - .withKind(driverPod.getKind) - .withName(driverPod.getMetadata.getName) - .withUid(driverPod.getMetadata.getUid) - .endOwnerReference() - .endMetadata() + .withName(name) + .withLabels(resolvedExecutorLabels.asJava) + .withAnnotations(executorAnnotations.asJava) + .withOwnerReferences() + .addNewOwnerReference() + .withController(true) + .withApiVersion(driverPod.getApiVersion) + .withKind(driverPod.getKind) + .withName(driverPod.getMetadata.getName) + .withUid(driverPod.getMetadata.getUid) + .endOwnerReference() + .endMetadata() .withNewSpec() - .withHostname(hostname) - .withRestartPolicy("Never") - .withNodeSelector(nodeSelector.asJava) - .endSpec() + .withHostname(hostname) + .withRestartPolicy("Never") + .withNodeSelector(nodeSelector.asJava) + .endSpec() .build() - val volumes = sparkConf.get(KUBERNETES_EXECUTOR_VOLUMES) val executorPodAndContainerWithVolumes = - KubernetesUtils.addVolumes(executorPod, executorContainer, volumes) + KubernetesUtils.addVolumes(executorPod, + executorContainer, + sparkConf.get(KUBERNETES_EXECUTOR_VOLUMES)) val executorPodWithVolumes = executorPodAndContainerWithVolumes._1 val executorContainerWithVolumes = executorPodAndContainerWithVolumes._2 - val containerWithLimitCores = executorLimitCores.map { limitCores => - val executorCpuLimitQuantity = new QuantityBuilder(false) - .withAmount(limitCores) - .build() - new ContainerBuilder(executorContainerWithVolumes) - .editResources() - .addToLimits("cpu", executorCpuLimitQuantity) - .endResources() - .build() - }.getOrElse(executorContainerWithVolumes) + val containerWithLimitCores = executorLimitCores + .map { limitCores => + val executorCpuLimitQuantity = new QuantityBuilder(false) + .withAmount(limitCores) + .build() + new ContainerBuilder(executorContainerWithVolumes) + .editResources() + .addToLimits("cpu", executorCpuLimitQuantity) + .endResources() + .build() + } + .getOrElse(executorContainerWithVolumes) val (maybeSecretsMountedPod, maybeSecretsMountedContainer) = - mountSecretsBootstrap.map { bootstrap => - (bootstrap.addSecretVolumes(executorPodWithVolumes), - bootstrap.mountSecrets(containerWithLimitCores)) - }.getOrElse((executorPodWithVolumes, containerWithLimitCores)) + mountSecretsBootstrap + .map { bootstrap => + (bootstrap.addSecretVolumes(executorPodWithVolumes), + bootstrap.mountSecrets(containerWithLimitCores)) + } + .getOrElse((executorPodWithVolumes, containerWithLimitCores)) val (bootstrappedPod, bootstrappedContainer) = - initContainerBootstrap.map { bootstrap => - val podWithInitContainer = bootstrap.bootstrapInitContainer( - PodWithDetachedInitContainer( - maybeSecretsMountedPod, - new ContainerBuilder().build(), - maybeSecretsMountedContainer)) + initContainerBootstrap + .map { bootstrap => + val podWithInitContainer = bootstrap.bootstrapInitContainer( + PodWithDetachedInitContainer(maybeSecretsMountedPod, + new ContainerBuilder().build(), + maybeSecretsMountedContainer)) - val (pod, mayBeSecretsMountedInitContainer) = - initContainerMountSecretsBootstrap.map { bootstrap => - // Mount the secret volumes given that the volumes have already been added to the - // executor pod when mounting the secrets into the main executor container. - (podWithInitContainer.pod, bootstrap.mountSecrets(podWithInitContainer.initContainer)) - }.getOrElse((podWithInitContainer.pod, podWithInitContainer.initContainer)) + val (pod, mayBeSecretsMountedInitContainer) = + initContainerMountSecretsBootstrap + .map { bootstrap => + // Mount the secret volumes given that the volumes have already been added to the + // executor pod when mounting the secrets into the main executor container. + (podWithInitContainer.pod, + bootstrap.mountSecrets(podWithInitContainer.initContainer)) + } + .getOrElse( + (podWithInitContainer.pod, podWithInitContainer.initContainer)) - val bootstrappedPod = KubernetesUtils.appendInitContainer( - pod, mayBeSecretsMountedInitContainer) + val bootstrappedPod = KubernetesUtils.appendInitContainer( + pod, + mayBeSecretsMountedInitContainer) - (bootstrappedPod, podWithInitContainer.mainContainer) - }.getOrElse((maybeSecretsMountedPod, maybeSecretsMountedContainer)) + (bootstrappedPod, podWithInitContainer.mainContainer) + } + .getOrElse((maybeSecretsMountedPod, maybeSecretsMountedContainer)) new PodBuilder(bootstrappedPod) .editSpec() - .addToContainers(bootstrappedContainer) - .endSpec() + .addToContainers(bootstrappedContainer) + .endSpec() .build() } } From 4016d6d33c296823e338f28deb7815c4be6607a1 Mon Sep 17 00:00:00 2001 From: madanadit Date: Thu, 5 Apr 2018 13:12:28 -0700 Subject: [PATCH 3/9] Undo unintended style change --- .../cluster/k8s/ExecutorPodFactory.scala | 234 ++++++++---------- 1 file changed, 103 insertions(+), 131 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index 92fce75bea41..7370d08a6bf6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -21,36 +21,26 @@ import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model._ import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.deploy.k8s.{ - InitContainerBootstrap, - KubernetesUtils, - MountSecretsBootstrap, - PodWithDetachedInitContainer -} +import org.apache.spark.deploy.k8s.{InitContainerBootstrap, KubernetesUtils, MountSecretsBootstrap, PodWithDetachedInitContainer} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.internal.config.{ - EXECUTOR_CLASS_PATH, - EXECUTOR_JAVA_OPTIONS, - EXECUTOR_MEMORY, - EXECUTOR_MEMORY_OVERHEAD -} +import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD} import org.apache.spark.util.Utils /** - * A factory class for bootstrapping and creating executor pods with the given bootstrapping - * components. - * - * @param sparkConf Spark configuration - * @param mountSecretsBootstrap an optional component for mounting user-specified secrets onto - * user-specified paths into the executor container - * @param initContainerBootstrap an optional component for bootstrapping the executor init-container - * if one is needed, i.e., when there are remote dependencies to - * localize - * @param initContainerMountSecretsBootstrap an optional component for mounting user-specified - * secrets onto user-specified paths into the executor - * init-container - */ + * A factory class for bootstrapping and creating executor pods with the given bootstrapping + * components. + * + * @param sparkConf Spark configuration + * @param mountSecretsBootstrap an optional component for mounting user-specified secrets onto + * user-specified paths into the executor container + * @param initContainerBootstrap an optional component for bootstrapping the executor init-container + * if one is needed, i.e., when there are remote dependencies to + * localize + * @param initContainerMountSecretsBootstrap an optional component for mounting user-specified + * secrets onto user-specified paths into the executor + * init-container + */ private[spark] class ExecutorPodFactory( sparkConf: SparkConf, mountSecretsBootstrap: Option[MountSecretsBootstrap], @@ -78,46 +68,44 @@ private[spark] class ExecutorPodFactory( sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX) private val nodeSelector = - KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, - KUBERNETES_NODE_SELECTOR_PREFIX) + KubernetesUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX) private val executorContainerImage = sparkConf .get(EXECUTOR_CONTAINER_IMAGE) - .getOrElse( - throw new SparkException("Must specify the executor container image")) + .getOrElse(throw new SparkException("Must specify the executor container image")) private val imagePullPolicy = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY) private val blockManagerPort = sparkConf .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) - private val executorPodNamePrefix = - sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) private val executorMemoryMiB = sparkConf.get(EXECUTOR_MEMORY) - private val executorMemoryString = - sparkConf.get(EXECUTOR_MEMORY.key, EXECUTOR_MEMORY.defaultValueString) + private val executorMemoryString = sparkConf.get( + EXECUTOR_MEMORY.key, EXECUTOR_MEMORY.defaultValueString) private val memoryOverheadMiB = sparkConf .get(EXECUTOR_MEMORY_OVERHEAD) - .getOrElse( - math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, - MEMORY_OVERHEAD_MIN_MIB)) + .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB private val executorCores = sparkConf.getDouble("spark.executor.cores", 1) - private val executorLimitCores = - sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES) + private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES) private val executorJarsDownloadDir = sparkConf.get(JARS_DOWNLOAD_LOCATION) /** - * Configure and construct an executor pod with the given parameters. - */ - def createExecutorPod(executorId: String, - applicationId: String, - driverUrl: String, - executorEnvs: Seq[(String, String)], - driverPod: Pod, - nodeToLocalTaskCount: Map[String, Int]): Pod = { + * Configure and construct an executor pod with the given parameters. + */ + def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod = { val name = s"$executorPodNamePrefix-exec-$executorId" // hostname must be no longer than 63 characters, so take the last 63 characters of the pod @@ -150,13 +138,9 @@ private[spark] class ExecutorPodFactory( val delimitedOpts = Utils.splitCommandString(opts) delimitedOpts.zipWithIndex.map { case (opt, index) => - new EnvVarBuilder() - .withName(s"$ENV_JAVA_OPT_PREFIX$index") - .withValue(opt) - .build() + new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build() } - } - .getOrElse(Seq.empty[EnvVar]) + }.getOrElse(Seq.empty[EnvVar]) val executorEnv = (Seq( (ENV_DRIVER_URL, driverUrl), // Executor backend expects integral value for executor cores, so round it up to an int. @@ -164,14 +148,12 @@ private[spark] class ExecutorPodFactory( (ENV_EXECUTOR_MEMORY, executorMemoryString), (ENV_APPLICATION_ID, applicationId), (ENV_EXECUTOR_ID, executorId), - (ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*") - ) ++ executorEnvs) - .map( - env => - new EnvVarBuilder() - .withName(env._1) - .withValue(env._2) - .build()) ++ Seq( + (ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*")) ++ executorEnvs) + .map(env => new EnvVarBuilder() + .withName(env._1) + .withValue(env._2) + .build() + ) ++ Seq( new EnvVarBuilder() .withName(ENV_EXECUTOR_POD_IP) .withValueFrom(new EnvVarSourceBuilder() @@ -179,13 +161,13 @@ private[spark] class ExecutorPodFactory( .build()) .build() ) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq - val requiredPorts = Seq((BLOCK_MANAGER_PORT_NAME, blockManagerPort)) - .map { - case (name, port) => - new ContainerPortBuilder() - .withName(name) - .withContainerPort(port) - .build() + val requiredPorts = Seq( + (BLOCK_MANAGER_PORT_NAME, blockManagerPort)) + .map { case (name, port) => + new ContainerPortBuilder() + .withName(name) + .withContainerPort(port) + .build() } val executorContainer = new ContainerBuilder() @@ -193,10 +175,10 @@ private[spark] class ExecutorPodFactory( .withImage(executorContainerImage) .withImagePullPolicy(imagePullPolicy) .withNewResources() - .addToRequests("memory", executorMemoryQuantity) - .addToLimits("memory", executorMemoryLimitQuantity) - .addToRequests("cpu", executorCpuQuantity) - .endResources() + .addToRequests("memory", executorMemoryQuantity) + .addToLimits("memory", executorMemoryLimitQuantity) + .addToRequests("cpu", executorCpuQuantity) + .endResources() .addAllToEnv(executorEnv.asJava) .withPorts(requiredPorts.asJava) .addToArgs("executor") @@ -204,84 +186,74 @@ private[spark] class ExecutorPodFactory( val executorPod = new PodBuilder() .withNewMetadata() - .withName(name) - .withLabels(resolvedExecutorLabels.asJava) - .withAnnotations(executorAnnotations.asJava) - .withOwnerReferences() - .addNewOwnerReference() - .withController(true) - .withApiVersion(driverPod.getApiVersion) - .withKind(driverPod.getKind) - .withName(driverPod.getMetadata.getName) - .withUid(driverPod.getMetadata.getUid) - .endOwnerReference() - .endMetadata() + .withName(name) + .withLabels(resolvedExecutorLabels.asJava) + .withAnnotations(executorAnnotations.asJava) + .withOwnerReferences() + .addNewOwnerReference() + .withController(true) + .withApiVersion(driverPod.getApiVersion) + .withKind(driverPod.getKind) + .withName(driverPod.getMetadata.getName) + .withUid(driverPod.getMetadata.getUid) + .endOwnerReference() + .endMetadata() .withNewSpec() - .withHostname(hostname) - .withRestartPolicy("Never") - .withNodeSelector(nodeSelector.asJava) - .endSpec() + .withHostname(hostname) + .withRestartPolicy("Never") + .withNodeSelector(nodeSelector.asJava) + .endSpec() .build() val executorPodAndContainerWithVolumes = KubernetesUtils.addVolumes(executorPod, - executorContainer, - sparkConf.get(KUBERNETES_EXECUTOR_VOLUMES)) + executorContainer, + sparkConf.get(KUBERNETES_EXECUTOR_VOLUMES)) val executorPodWithVolumes = executorPodAndContainerWithVolumes._1 val executorContainerWithVolumes = executorPodAndContainerWithVolumes._2 - val containerWithLimitCores = executorLimitCores - .map { limitCores => - val executorCpuLimitQuantity = new QuantityBuilder(false) - .withAmount(limitCores) - .build() - new ContainerBuilder(executorContainerWithVolumes) - .editResources() - .addToLimits("cpu", executorCpuLimitQuantity) - .endResources() - .build() - } - .getOrElse(executorContainerWithVolumes) + val containerWithLimitCores = executorLimitCores.map { limitCores => + val executorCpuLimitQuantity = new QuantityBuilder(false) + .withAmount(limitCores) + .build() + new ContainerBuilder(executorContainerWithVolumes) + .editResources() + .addToLimits("cpu", executorCpuLimitQuantity) + .endResources() + .build() + }.getOrElse(executorContainerWithVolumes) val (maybeSecretsMountedPod, maybeSecretsMountedContainer) = - mountSecretsBootstrap - .map { bootstrap => - (bootstrap.addSecretVolumes(executorPodWithVolumes), - bootstrap.mountSecrets(containerWithLimitCores)) - } - .getOrElse((executorPodWithVolumes, containerWithLimitCores)) + mountSecretsBootstrap.map { bootstrap => + (bootstrap.addSecretVolumes(executorPodWithVolumes), + bootstrap.mountSecrets(containerWithLimitCores)) + }.getOrElse((executorPodWithVolumes, containerWithLimitCores)) val (bootstrappedPod, bootstrappedContainer) = - initContainerBootstrap - .map { bootstrap => - val podWithInitContainer = bootstrap.bootstrapInitContainer( - PodWithDetachedInitContainer(maybeSecretsMountedPod, - new ContainerBuilder().build(), - maybeSecretsMountedContainer)) + initContainerBootstrap.map { bootstrap => + val podWithInitContainer = bootstrap.bootstrapInitContainer( + PodWithDetachedInitContainer( + maybeSecretsMountedPod, + new ContainerBuilder().build(), + maybeSecretsMountedContainer)) - val (pod, mayBeSecretsMountedInitContainer) = - initContainerMountSecretsBootstrap - .map { bootstrap => - // Mount the secret volumes given that the volumes have already been added to the - // executor pod when mounting the secrets into the main executor container. - (podWithInitContainer.pod, - bootstrap.mountSecrets(podWithInitContainer.initContainer)) - } - .getOrElse( - (podWithInitContainer.pod, podWithInitContainer.initContainer)) + val (pod, mayBeSecretsMountedInitContainer) = + initContainerMountSecretsBootstrap.map { bootstrap => + // Mount the secret volumes given that the volumes have already been added to the + // executor pod when mounting the secrets into the main executor container. + (podWithInitContainer.pod, bootstrap.mountSecrets(podWithInitContainer.initContainer)) + }.getOrElse((podWithInitContainer.pod, podWithInitContainer.initContainer)) - val bootstrappedPod = KubernetesUtils.appendInitContainer( - pod, - mayBeSecretsMountedInitContainer) + val bootstrappedPod = KubernetesUtils.appendInitContainer( + pod, mayBeSecretsMountedInitContainer) - (bootstrappedPod, podWithInitContainer.mainContainer) - } - .getOrElse((maybeSecretsMountedPod, maybeSecretsMountedContainer)) + (bootstrappedPod, podWithInitContainer.mainContainer) + }.getOrElse((maybeSecretsMountedPod, maybeSecretsMountedContainer)) new PodBuilder(bootstrappedPod) .editSpec() - .addToContainers(bootstrappedContainer) - .endSpec() + .addToContainers(bootstrappedContainer) + .endSpec() .build() } } From c5b6c0bdc0ac46e87d4ca41ab27b81be62beee6f Mon Sep 17 00:00:00 2001 From: madanadit Date: Thu, 5 Apr 2018 13:57:45 -0700 Subject: [PATCH 4/9] Read mode for mounted volumes --- .../org/apache/spark/deploy/k8s/Config.scala | 2 +- .../spark/deploy/k8s/KubernetesUtils.scala | 35 ++++++++++++++++--- 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index b034cdd61e47..0deaabd341d7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -101,7 +101,7 @@ private[spark] object Config extends Logging { val KUBERNETES_EXECUTOR_VOLUMES = ConfigBuilder("spark.kubernetes.executor.volumes") .doc("List of volumes mounted into the executor container. The format of this property is " + - "a comma-separated list of mappings following the form hostPath:containerPath:name") + "a comma-separated list of mappings following the form hostPath:containerPath[:ro|rw]") .stringConf .createWithDefault("") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index af1c0fd86769..f0bf2c39758d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -45,7 +45,7 @@ private[spark] object KubernetesUtils { /** * Parse a comma-delimited list of volume specs, each of which takes the form - * hostPath:containerPath:name; and add volume to pod and mount volume mount to container. + * hostPath:containerPath[:ro|rw]; and add volume to pod and volume mount to container. * * @param pod original specification of the pod * @param container original specification of the container @@ -55,20 +55,45 @@ private[spark] object KubernetesUtils { def addVolumes(pod: Pod, container : Container, volumes: String): (Pod, Container) = { val podBuilder = new PodBuilder(pod).editOrNewSpec() val containerBuilder = new ContainerBuilder(container) + var volumeCount = 0 volumes.split(",").map(_.split(":")).map { spec => spec match { - case Array(hostPath, containerPath, name) => + case Array(hostPath, containerPath) => podBuilder .withVolumes(new VolumeBuilder() .withHostPath(new HostPathVolumeSource(hostPath)) - .withName(name) + .withName(s"executor-volume-$volumeCount") .build()) containerBuilder.addToVolumeMounts(new VolumeMountBuilder() .withMountPath(containerPath) - .withName(name) + .withName(s"executor-volume-$volumeCount") .build()) + volumeCount += 1 + case Array(hostPath, containerPath, "ro") => + podBuilder + .withVolumes(new VolumeBuilder() + .withHostPath(new HostPathVolumeSource(hostPath)) + .withName(s"executor-volume-$volumeCount") + .build()) + containerBuilder.addToVolumeMounts(new VolumeMountBuilder() + .withMountPath(containerPath) + .withName(s"executor-volume-$volumeCount") + .withReadOnly(true) + .build()) + volumeCount += 1 + case Array(hostPath, containerPath, "rw") => + podBuilder + .withVolumes(new VolumeBuilder() + .withHostPath(new HostPathVolumeSource(hostPath)) + .withName(s"executor-volume-$volumeCount") + .build()) + containerBuilder.addToVolumeMounts(new VolumeMountBuilder() + .withMountPath(containerPath) + .withName(s"executor-volume-$volumeCount") + .withReadOnly(false) + .build()) + volumeCount += 1 case spec => - // TODO(adit): log warming None } } From 17cfdf3afede72efd970c1d3ac92105257c1f63c Mon Sep 17 00:00:00 2001 From: madanadit Date: Thu, 5 Apr 2018 15:05:07 -0700 Subject: [PATCH 5/9] Refactor --- .../spark/scheduler/cluster/k8s/ExecutorPodFactory.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index 7370d08a6bf6..2cdd5bf491cd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -205,12 +205,10 @@ private[spark] class ExecutorPodFactory( .endSpec() .build() - val executorPodAndContainerWithVolumes = + val (executorPodWithVolumes, executorContainerWithVolumes) = KubernetesUtils.addVolumes(executorPod, executorContainer, sparkConf.get(KUBERNETES_EXECUTOR_VOLUMES)) - val executorPodWithVolumes = executorPodAndContainerWithVolumes._1 - val executorContainerWithVolumes = executorPodAndContainerWithVolumes._2 val containerWithLimitCores = executorLimitCores.map { limitCores => val executorCpuLimitQuantity = new QuantityBuilder(false) From 45ad20d880cdd8c078f3c2afd11f2ca9e51e8b0c Mon Sep 17 00:00:00 2001 From: madanadit Date: Fri, 6 Apr 2018 11:18:09 -0700 Subject: [PATCH 6/9] Fix style --- .../spark/deploy/k8s/KubernetesUtils.scala | 77 +++++++++---------- 1 file changed, 35 insertions(+), 42 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index f0bf2c39758d..05b06387e86c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -44,57 +44,50 @@ private[spark] object KubernetesUtils { } /** - * Parse a comma-delimited list of volume specs, each of which takes the form - * hostPath:containerPath[:ro|rw]; and add volume to pod and volume mount to container. - * - * @param pod original specification of the pod - * @param container original specification of the container - * @param volumes list of volume specs - * @return the pod with the init-container added to the list of InitContainers + * Parse a comma-delimited list of volume specs, each of which takes the form + * hostPath:containerPath[:ro|rw]; and add volume to pod and volume mount to container. + * + * @param pod original specification of the pod + * @param container original specification of the container + * @param volumes list of volume specs + * @return the pod with the init-container added to the list of InitContainers */ def addVolumes(pod: Pod, container : Container, volumes: String): (Pod, Container) = { val podBuilder = new PodBuilder(pod).editOrNewSpec() val containerBuilder = new ContainerBuilder(container) var volumeCount = 0 volumes.split(",").map(_.split(":")).map { spec => + var hostPath: Option[String] = None + var containerPath: Option[String] = None + var readOnly: Option[Boolean] = None spec match { - case Array(hostPath, containerPath) => - podBuilder - .withVolumes(new VolumeBuilder() - .withHostPath(new HostPathVolumeSource(hostPath)) - .withName(s"executor-volume-$volumeCount") - .build()) - containerBuilder.addToVolumeMounts(new VolumeMountBuilder() - .withMountPath(containerPath) - .withName(s"executor-volume-$volumeCount") - .build()) - volumeCount += 1 - case Array(hostPath, containerPath, "ro") => - podBuilder - .withVolumes(new VolumeBuilder() - .withHostPath(new HostPathVolumeSource(hostPath)) - .withName(s"executor-volume-$volumeCount") - .build()) - containerBuilder.addToVolumeMounts(new VolumeMountBuilder() - .withMountPath(containerPath) - .withName(s"executor-volume-$volumeCount") - .withReadOnly(true) - .build()) - volumeCount += 1 - case Array(hostPath, containerPath, "rw") => - podBuilder - .withVolumes(new VolumeBuilder() - .withHostPath(new HostPathVolumeSource(hostPath)) - .withName(s"executor-volume-$volumeCount") - .build()) - containerBuilder.addToVolumeMounts(new VolumeMountBuilder() - .withMountPath(containerPath) + case Array(hostPathV, containerPathV) => + hostPath = Some(hostPathV) + containerPath = Some(containerPathV) + case Array(hostPathV, containerPathV, "ro") => + hostPath = Some(hostPathV) + containerPath = Some(containerPathV) + readOnly = Some(true) + case Array(hostPathV, containerPathV, "rw") => + hostPath = Some(hostPathV) + containerPath = Some(containerPathV) + readOnly = Some(false) + } + if (hostPath.isDefined && containerPath.isDefined) { + podBuilder + .withVolumes(new VolumeBuilder() + .withHostPath(new HostPathVolumeSource(hostPath.get)) .withName(s"executor-volume-$volumeCount") - .withReadOnly(false) .build()) - volumeCount += 1 - case spec => - None + val volumeBuilder = new VolumeMountBuilder() + .withMountPath(containerPath.get) + .withName(s"executor-volume-$volumeCount") + if (readOnly.isDefined) { + containerBuilder.addToVolumeMounts(volumeBuilder.withReadOnly(readOnly.get).build()) + } else { + containerBuilder.addToVolumeMounts(volumeBuilder.build()) + } + volumeCount += 1 } } (podBuilder.endSpec().build(), containerBuilder.build()) From 22d019a67afae119e4e65ac24c373bbaeb1ca571 Mon Sep 17 00:00:00 2001 From: madanadit Date: Fri, 6 Apr 2018 12:57:45 -0700 Subject: [PATCH 7/9] Add unit tests --- .../spark/deploy/k8s/KubernetesUtils.scala | 4 +- .../cluster/k8s/ExecutorPodFactorySuite.scala | 47 +++++++++++++++++++ 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 05b06387e86c..2fa10588f98a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -77,11 +77,11 @@ private[spark] object KubernetesUtils { podBuilder .withVolumes(new VolumeBuilder() .withHostPath(new HostPathVolumeSource(hostPath.get)) - .withName(s"executor-volume-$volumeCount") + .withName(s"hostPath-volume-$volumeCount") .build()) val volumeBuilder = new VolumeMountBuilder() .withMountPath(containerPath.get) - .withName(s"executor-volume-$volumeCount") + .withName(s"hostPath-volume-$volumeCount") if (readOnly.isDefined) { containerBuilder.addToVolumeMounts(volumeBuilder.withReadOnly(readOnly.get).build()) } else { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index a3c615be031d..f6df9af25307 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -182,6 +182,53 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef checkOwnerReferences(executor, driverPodUid) } + test("single executor hostPath volume gets mounted") { + val conf = baseConf.clone() + conf.set(KUBERNETES_EXECUTOR_VOLUMES, "/tmp/mount:/opt/mount") + val factory = new ExecutorPodFactory(conf, None, None, None) + + val executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + + assert(executor.getSpec.getContainers.size() === 1) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 1) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName + === "hostPath-volume-1") + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0) + .getMountPath === "/opt/mount") + + assert(executor.getSpec.getVolumes.size() === 1) + assert(executor.getSpec.getVolumes.get(0).getHostPath === "/tmp/mount") + + checkOwnerReferences(executor, driverPodUid) + } + + test("multiple executor hostPath volumes get mounted") { + val conf = baseConf.clone() + conf.set(KUBERNETES_EXECUTOR_VOLUMES, "/tmp/mount1:/opt/mount1,/tmp/mount2:/opt/mount2") + val factory = new ExecutorPodFactory(conf, None, None, None) + + val executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + + assert(executor.getSpec.getContainers.size() === 1) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 2) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName + === "hostPath-volume-1") + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(1).getName + === "hostPath-volume-2") + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0) + .getMountPath === "/opt/mount1") + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(1) + .getMountPath === "/opt/mount2") + + assert(executor.getSpec.getVolumes.size() === 2) + assert(executor.getSpec.getVolumes.get(0).getHostPath === "/tmp/mount1") + assert(executor.getSpec.getVolumes.get(1).getHostPath === "/tmp/mount2") + + checkOwnerReferences(executor, driverPodUid) + } + // There is always exactly one controller reference, and it points to the driver pod. private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = { assert(executor.getMetadata.getOwnerReferences.size() === 1) From cb789ff821dc78b589f2ae806c963b2e1a8c2cff Mon Sep 17 00:00:00 2001 From: madanadit Date: Fri, 6 Apr 2018 13:00:18 -0700 Subject: [PATCH 8/9] Update comment --- .../scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 2fa10588f98a..25eca3a1fc01 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -50,7 +50,7 @@ private[spark] object KubernetesUtils { * @param pod original specification of the pod * @param container original specification of the container * @param volumes list of volume specs - * @return the pod with the init-container added to the list of InitContainers + * @return a tuple of (pod with the volume(s) added, container with mount(s) added) */ def addVolumes(pod: Pod, container : Container, volumes: String): (Pod, Container) = { val podBuilder = new PodBuilder(pod).editOrNewSpec() From d9f46d35ba8aa4ae730fe63d81e18b2452d55d05 Mon Sep 17 00:00:00 2001 From: madanadit Date: Fri, 6 Apr 2018 16:56:31 -0700 Subject: [PATCH 9/9] Fix unit tests --- .../apache/spark/deploy/k8s/KubernetesUtils.scala | 5 +++-- .../cluster/k8s/ExecutorPodFactorySuite.scala | 12 ++++++------ 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 25eca3a1fc01..5d0117906426 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -72,10 +72,11 @@ private[spark] object KubernetesUtils { hostPath = Some(hostPathV) containerPath = Some(containerPathV) readOnly = Some(false) + case spec => + None } if (hostPath.isDefined && containerPath.isDefined) { - podBuilder - .withVolumes(new VolumeBuilder() + podBuilder.addToVolumes(new VolumeBuilder() .withHostPath(new HostPathVolumeSource(hostPath.get)) .withName(s"hostPath-volume-$volumeCount") .build()) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index f6df9af25307..4875aeef41f1 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -193,12 +193,12 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef assert(executor.getSpec.getContainers.size() === 1) assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 1) assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName - === "hostPath-volume-1") + === "hostPath-volume-0") assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0) .getMountPath === "/opt/mount") assert(executor.getSpec.getVolumes.size() === 1) - assert(executor.getSpec.getVolumes.get(0).getHostPath === "/tmp/mount") + assert(executor.getSpec.getVolumes.get(0).getHostPath.getPath === "/tmp/mount") checkOwnerReferences(executor, driverPodUid) } @@ -214,17 +214,17 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef assert(executor.getSpec.getContainers.size() === 1) assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 2) assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName - === "hostPath-volume-1") + === "hostPath-volume-0") assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(1).getName - === "hostPath-volume-2") + === "hostPath-volume-1") assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0) .getMountPath === "/opt/mount1") assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(1) .getMountPath === "/opt/mount2") assert(executor.getSpec.getVolumes.size() === 2) - assert(executor.getSpec.getVolumes.get(0).getHostPath === "/tmp/mount1") - assert(executor.getSpec.getVolumes.get(1).getHostPath === "/tmp/mount2") + assert(executor.getSpec.getVolumes.get(0).getHostPath.getPath === "/tmp/mount1") + assert(executor.getSpec.getVolumes.get(1).getHostPath.getPath === "/tmp/mount2") checkOwnerReferences(executor, driverPodUid) }