Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,15 @@ The configuration properties for mounting volumes into the executor pods use pre

## Local Storage

Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `SPARK_LOCAL_DIRS`. If no directories are explicitly specified then a default directory is created and configured appropriately.
Spark supports using volumes to spill data during shuffles and other operations. To use a volume as local storage, the volume's name should starts with `spark-local-dir-`, for example:

```
--conf spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.path=<mount path>
--conf spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.readOnly=false
```


If none volume is set as local storage, Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `SPARK_LOCAL_DIRS`. If no directories are explicitly specified then a default directory is created and configured appropriately.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"If no volume is set..."

Should mention the config (spark.local.dir) which is preferred over the env variable.


`emptyDir` volumes use the ephemeral storage feature of Kubernetes and do not persist beyond the life of the pod.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ package org.apache.spark.deploy.k8s.features

import java.util.UUID

import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMountBuilder}
import collection.JavaConverters._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import scala....

import io.fabric8.kubernetes.api.model._

import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._
Expand All @@ -40,24 +41,33 @@ private[spark] class LocalDirsFeatureStep(
private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS)

override def configurePod(pod: SparkPod): SparkPod = {
val localDirVolumes = resolvedLocalDirs
.zipWithIndex
.map { case (localDir, index) =>
new VolumeBuilder()
.withName(s"spark-local-dir-${index + 1}")
.withNewEmptyDir()
.withMedium(if (useLocalDirTmpFs) "Memory" else null)
.endEmptyDir()
.build()
}
val localDirVolumeMounts = localDirVolumes
.zip(resolvedLocalDirs)
.map { case (localDirVolume, localDirPath) =>
new VolumeMountBuilder()
.withName(localDirVolume.getName)
.withMountPath(localDirPath)
.build()
}
var localDirs = findLocalDirVolumeMount(pod)
var localDirVolumes : Seq[Volume] = Seq()
var localDirVolumeMounts : Seq[VolumeMount] = Seq()

if (localDirs.isEmpty) {
localDirs = resolvedLocalDirs.toSeq
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get rid of resolvedLocalDirs and move that statement here instead.

localDirVolumes = resolvedLocalDirs
.zipWithIndex
.map { case (_, index) =>
new VolumeBuilder()
.withName(s"spark-local-dir-${index + 1}")
.withNewEmptyDir()
.withMedium(if (useLocalDirTmpFs) "Memory" else null)
.endEmptyDir()
.build()
}

localDirVolumeMounts = localDirVolumes
.zip(resolvedLocalDirs)
.map { case (localDirVolume, localDirPath) =>
new VolumeMountBuilder()
.withName(localDirVolume.getName)
.withMountPath(localDirPath)
.build()
}
}

val podWithLocalDirVolumes = new PodBuilder(pod.pod)
.editSpec()
.addToVolumes(localDirVolumes: _*)
Expand All @@ -66,10 +76,22 @@ private[spark] class LocalDirsFeatureStep(
val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container)
.addNewEnv()
.withName("SPARK_LOCAL_DIRS")
.withValue(resolvedLocalDirs.mkString(","))
.withValue(localDirs.mkString(","))
.endEnv()
.addToVolumeMounts(localDirVolumeMounts: _*)
.build()
SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts)
}

def findLocalDirVolumeMount(pod: SparkPod): Seq[String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mount => Mounts

val localDirVolumes = pod.pod.getSpec.getVolumes.asScala
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... do you need to list the volumes at all? Can't you just look at the mounts (since they already have the volume name)?

.filter(v => v.getName.startsWith("spark-local-dir-"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: either:

.filter { v => ... }

or

.filter(_.getName()...)

Also in a few places below.


localDirVolumes.map { volume => pod.container.getVolumeMounts.asScala
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move pod.container... to the next line.

.find(m => m.getName.equals(volume.getName)) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use == for strings in scala.

case Some(m) => m.getMountPath
case _ => ""
}
}.filter(s => s.length > 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use localDirVolumes.flatMap and avoid this filter.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ private[spark] class KubernetesDriverBuilder {
new DriverServiceFeatureStep(conf),
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new LocalDirsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
new DriverCommandFeatureStep(conf),
new HadoopConfDriverFeatureStep(conf),
new KerberosConfDriverFeatureStep(conf),
new PodTemplateConfigMapStep(conf))
new PodTemplateConfigMapStep(conf),
new LocalDirsFeatureStep(conf))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not move this right after new MountVolumesFeatureStep(conf)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think some volume mounts maybe setup in PodTemplateConfigMapStep, so I move to last to ensure that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, since the intent is for users to provide custom local dir volumes via either mount volumes or pod templates it needs to appear after both

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either is fine, just wanted to point out that the pod template is initialized before any step is executed (i.e. PodTemplateConfigMapStep is not where the template is loaded).


val spec = KubernetesDriverSpec(
initialPod,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ private[spark] class KubernetesExecutorBuilder {
new BasicExecutorFeatureStep(conf, secMgr),
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new LocalDirsFeatureStep(conf),
new MountVolumesFeatureStep(conf))
new MountVolumesFeatureStep(conf),
new LocalDirsFeatureStep(conf))

features.foldLeft(initialPod) { case (pod, feature) => feature.configurePod(pod) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ package org.apache.spark.deploy.k8s.features
import io.fabric8.kubernetes.api.model.{EnvVarBuilder, VolumeBuilder, VolumeMountBuilder}

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod}
import org.apache.spark.deploy.k8s.{KubernetesHostPathVolumeConf, KubernetesTestConf, KubernetesVolumeSpec, SparkPod}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the wildcard when the import line gets too long.

import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
import org.apache.spark.util.SparkConfWithEnv

class LocalDirsFeatureStepSuite extends SparkFunSuite {
Expand Down Expand Up @@ -116,4 +115,33 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite {
.withValue(defaultLocalDir)
.build())
}

test("local dir on mounted volume") {
val volumeConf = KubernetesVolumeSpec(
"spark-local-dir-test",
"/tmp",
"",
false,
KubernetesHostPathVolumeConf("/hostPath/tmp")
)
val mountVolumeConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf))
val mountVolumeStep = new MountVolumesFeatureStep(mountVolumeConf)
val configuredPod = mountVolumeStep.configurePod(SparkPod.initialPod())

val sparkConf = new SparkConfWithEnv(Map())
val localDirConf = KubernetesTestConf.createDriverConf(sparkConf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use the same KubernetesConf for all steps right? That's how it works when spark-submit is run.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is the idea, to keep building out one conf. (There were some early proposals around making these stages "more functional" but those were considered less broadly familiar and less spark idiomatic)

val localDirStep = new LocalDirsFeatureStep(localDirConf, defaultLocalDir)
val newConfiguredPod = localDirStep.configurePod(configuredPod)

assert(newConfiguredPod.pod.getSpec.getVolumes.size() === 1)
assert(newConfiguredPod.pod.getSpec.getVolumes.get(0).getHostPath.getPath === "/hostPath/tmp")
assert(newConfiguredPod.container.getVolumeMounts.size() === 1)
assert(newConfiguredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp")
assert(newConfiguredPod.container.getVolumeMounts.get(0).getName === "spark-local-dir-test")
assert(newConfiguredPod.container.getEnv.get(0) ===
new EnvVarBuilder()
.withName("SPARK_LOCAL_DIRS")
.withValue("/tmp")
.build())
}
}