Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,15 @@ The configuration properties for mounting volumes into the executor pods use pre

## Local Storage

Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `SPARK_LOCAL_DIRS`. If no directories are explicitly specified then a default directory is created and configured appropriately.
Spark supports using volumes to spill data during shuffles and other operations. To use a volume as local storage, the volume's name should starts with `spark-local-dir-`, for example:

```
--conf spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.path=<mount path>
--conf spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.readOnly=false
```


If no volume is set as local storage, Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `spark.local.dir` or the environment variable `SPARK_LOCAL_DIRS` . If no directories are explicitly specified then a default directory is created and configured appropriately.

`emptyDir` volumes use the ephemeral storage feature of Kubernetes and do not persist beyond the life of the pod.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package org.apache.spark.deploy.k8s.features

import java.util.UUID

import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMountBuilder}
import scala.collection.JavaConverters._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, style checker should have complained about this (scala imports should be separate from others).


import io.fabric8.kubernetes.api.model._

import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._
Expand All @@ -28,36 +30,47 @@ private[spark] class LocalDirsFeatureStep(
defaultLocalDir: String = s"/var/data/spark-${UUID.randomUUID}")
extends KubernetesFeatureConfigStep {

// Cannot use Utils.getConfiguredLocalDirs because that will default to the Java system
// property - we want to instead default to mounting an emptydir volume that doesn't already
// exist in the image.
// We could make utils.getConfiguredLocalDirs opinionated about Kubernetes, as it is already
// a bit opinionated about YARN and Mesos.
private val resolvedLocalDirs = Option(conf.sparkConf.getenv("SPARK_LOCAL_DIRS"))
.orElse(conf.getOption("spark.local.dir"))
.getOrElse(defaultLocalDir)
.split(",")
private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS)

override def configurePod(pod: SparkPod): SparkPod = {
val localDirVolumes = resolvedLocalDirs
.zipWithIndex
.map { case (localDir, index) =>
new VolumeBuilder()
.withName(s"spark-local-dir-${index + 1}")
.withNewEmptyDir()
.withMedium(if (useLocalDirTmpFs) "Memory" else null)
.endEmptyDir()
.build()
}
val localDirVolumeMounts = localDirVolumes
.zip(resolvedLocalDirs)
.map { case (localDirVolume, localDirPath) =>
new VolumeMountBuilder()
.withName(localDirVolume.getName)
.withMountPath(localDirPath)
.build()
}
var localDirs = pod.container.getVolumeMounts.asScala
.filter(_.getName.startsWith("spark-local-dir-"))
.map(_.getMountPath)
var localDirVolumes : Seq[Volume] = Seq()
var localDirVolumeMounts : Seq[VolumeMount] = Seq()

if (localDirs.isEmpty) {
// Cannot use Utils.getConfiguredLocalDirs because that will default to the Java system
// property - we want to instead default to mounting an emptydir volume that doesn't already
// exist in the image.
// We could make utils.getConfiguredLocalDirs opinionated about Kubernetes, as it is already
// a bit opinionated about YARN and Mesos.
val resolvedLocalDirs = Option(conf.sparkConf.getenv("SPARK_LOCAL_DIRS"))
.orElse(conf.getOption("spark.local.dir"))
.getOrElse(defaultLocalDir)
.split(",")
localDirs = resolvedLocalDirs.toBuffer
localDirVolumes = resolvedLocalDirs
.zipWithIndex
.map { case (_, index) =>
new VolumeBuilder()
.withName(s"spark-local-dir-${index + 1}")
.withNewEmptyDir()
.withMedium(if (useLocalDirTmpFs) "Memory" else null)
.endEmptyDir()
.build()
}

localDirVolumeMounts = localDirVolumes
.zip(resolvedLocalDirs)
.map { case (localDirVolume, localDirPath) =>
new VolumeMountBuilder()
.withName(localDirVolume.getName)
.withMountPath(localDirPath)
.build()
}
}

val podWithLocalDirVolumes = new PodBuilder(pod.pod)
.editSpec()
.addToVolumes(localDirVolumes: _*)
Expand All @@ -66,7 +79,7 @@ private[spark] class LocalDirsFeatureStep(
val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container)
.addNewEnv()
.withName("SPARK_LOCAL_DIRS")
.withValue(resolvedLocalDirs.mkString(","))
.withValue(localDirs.mkString(","))
.endEnv()
.addToVolumeMounts(localDirVolumeMounts: _*)
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ private[spark] class KubernetesDriverBuilder {
new DriverServiceFeatureStep(conf),
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new LocalDirsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
new DriverCommandFeatureStep(conf),
new HadoopConfDriverFeatureStep(conf),
new KerberosConfDriverFeatureStep(conf),
new PodTemplateConfigMapStep(conf))
new PodTemplateConfigMapStep(conf),
new LocalDirsFeatureStep(conf))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not move this right after new MountVolumesFeatureStep(conf)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think some volume mounts maybe setup in PodTemplateConfigMapStep, so I move to last to ensure that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, since the intent is for users to provide custom local dir volumes via either mount volumes or pod templates it needs to appear after both

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either is fine, just wanted to point out that the pod template is initialized before any step is executed (i.e. PodTemplateConfigMapStep is not where the template is loaded).


val spec = KubernetesDriverSpec(
initialPod,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ private[spark] class KubernetesExecutorBuilder {
new BasicExecutorFeatureStep(conf, secMgr),
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new LocalDirsFeatureStep(conf),
new MountVolumesFeatureStep(conf))
new MountVolumesFeatureStep(conf),
new LocalDirsFeatureStep(conf))

features.foldLeft(initialPod) { case (pod, feature) => feature.configurePod(pod) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ package org.apache.spark.deploy.k8s.features
import io.fabric8.kubernetes.api.model.{EnvVarBuilder, VolumeBuilder, VolumeMountBuilder}

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
import org.apache.spark.util.SparkConfWithEnv

class LocalDirsFeatureStepSuite extends SparkFunSuite {
Expand Down Expand Up @@ -116,4 +115,30 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite {
.withValue(defaultLocalDir)
.build())
}

test("local dir on mounted volume") {
val volumeConf = KubernetesVolumeSpec(
"spark-local-dir-test",
"/tmp",
"",
false,
KubernetesHostPathVolumeConf("/hostPath/tmp")
)
val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf))
val mountVolumeStep = new MountVolumesFeatureStep(kubernetesConf)
val configuredPod = mountVolumeStep.configurePod(SparkPod.initialPod())
val localDirStep = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir)
val newConfiguredPod = localDirStep.configurePod(configuredPod)

assert(newConfiguredPod.pod.getSpec.getVolumes.size() === 1)
assert(newConfiguredPod.pod.getSpec.getVolumes.get(0).getHostPath.getPath === "/hostPath/tmp")
assert(newConfiguredPod.container.getVolumeMounts.size() === 1)
assert(newConfiguredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp")
assert(newConfiguredPod.container.getVolumeMounts.get(0).getName === "spark-local-dir-test")
assert(newConfiguredPod.container.getEnv.get(0) ===
new EnvVarBuilder()
.withName("SPARK_LOCAL_DIRS")
.withValue("/tmp")
.build())
}
}