Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,25 @@ For example, the claim name of a `persistentVolumeClaim` with volume name `check
spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=check-point-pvc-claim
```

The configuration properties for mounting volumes into the executor pods use prefix `spark.kubernetes.executor.` instead of `spark.kubernetes.driver.`. For a complete list of available options for each supported type of volumes, please refer to the [Spark Properties](#spark-properties) section below.
The configuration properties for mounting volumes into the executor pods use prefix `spark.kubernetes.executor.` instead of `spark.kubernetes.driver.`. For a complete list of available options for each supported type of volumes, please refer to the [Spark Properties](#spark-properties) section below.

## Local Storage

Spark supports using volumes to spill data during shuffles and other operations. To use a volume as local storage, the volume's name should starts with `spark-local-dir-`, for example:

```
--conf spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.path=<mount path>
--conf spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.readOnly=false
```


If no volume is set as local storage, Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `spark.local.dir` or the environment variable `SPARK_LOCAL_DIRS` . If no directories are explicitly specified then a default directory is created and configured appropriately.

### Using RAM for local storage

`emptyDir` volumes use the nodes backing storage for ephemeral storage by default, this behaviour may not be appropriate for some compute environments. For example if you have diskless nodes with remote storage mounted over a network, having lots of executors doing IO to this remote storage may actually degrade performance.

In this case it may be desirable to set `spark.kubernetes.local.dirs.tmpfs=true` in your configuration which will cause the `emptyDir` volumes to be configured as `tmpfs` i.e. RAM backed volumes. When configured like this Sparks local storage usage will count towards your pods memory usage therefore you may wish to increase your memory requests by increasing the value of `spark.kubernetes.memoryOverheadFactor` as appropriate.

## Introspection and Debugging

Expand Down Expand Up @@ -804,6 +822,14 @@ specific to Spark on Kubernetes.
<code>spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-pvc-claim</code>.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.local.dirs.tmpfs</code></td>
<td><code>false</code>
<td>
Configure the <code>emptyDir</code> volumes used to back <code>SPARK_LOCAL_DIRS</code> within the Spark driver and executor pods to use <code>tmpfs</code> backing i.e. RAM. See <a href="#local-storage">Local Storage</a> earlier on this page
for more discussion of this.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.memoryOverheadFactor</code></td>
<td><code>0.1</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,15 @@ private[spark] object Config extends Logging {
"Ensure that major Python version is either Python2 or Python3")
.createWithDefault("2")

val KUBERNETES_LOCAL_DIRS_TMPFS =
ConfigBuilder("spark.kubernetes.local.dirs.tmpfs")
.doc("If set to true then emptyDir volumes created to back SPARK_LOCAL_DIRS will have " +
"their medium set to Memory so that they will be created as tmpfs (i.e. RAM) backed " +
"volumes. This may improve performance but scratch space usage will count towards " +
"your pods memory limit so you may wish to request more memory.")
.booleanConf
.createWithDefault(false)

val APP_RESOURCE_TYPE =
ConfigBuilder("spark.kubernetes.resource.type")
.doc("This sets the resource type internally")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,61 @@
*/
package org.apache.spark.deploy.k8s.features

import java.nio.file.Paths
import java.util.UUID

import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMountBuilder}
import scala.collection.JavaConverters._

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod}
import io.fabric8.kubernetes.api.model._

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._

private[spark] class LocalDirsFeatureStep(
conf: KubernetesConf[_ <: KubernetesRoleSpecificConf],
defaultLocalDir: String = s"/var/data/spark-${UUID.randomUUID}")
extends KubernetesFeatureConfigStep {

// Cannot use Utils.getConfiguredLocalDirs because that will default to the Java system
// property - we want to instead default to mounting an emptydir volume that doesn't already
// exist in the image.
// We could make utils.getConfiguredLocalDirs opinionated about Kubernetes, as it is already
// a bit opinionated about YARN and Mesos.
private val resolvedLocalDirs = Option(conf.sparkConf.getenv("SPARK_LOCAL_DIRS"))
.orElse(conf.getOption("spark.local.dir"))
.getOrElse(defaultLocalDir)
.split(",")
private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS)

override def configurePod(pod: SparkPod): SparkPod = {
val localDirVolumes = resolvedLocalDirs
.zipWithIndex
.map { case (localDir, index) =>
new VolumeBuilder()
.withName(s"spark-local-dir-${index + 1}")
.withNewEmptyDir()
.endEmptyDir()
.build()
}
val localDirVolumeMounts = localDirVolumes
.zip(resolvedLocalDirs)
.map { case (localDirVolume, localDirPath) =>
new VolumeMountBuilder()
.withName(localDirVolume.getName)
.withMountPath(localDirPath)
.build()
}
var localDirs = pod.container.getVolumeMounts.asScala
.filter(_.getName.startsWith("spark-local-dir-"))
.map(_.getMountPath)
var localDirVolumes : Seq[Volume] = Seq()
var localDirVolumeMounts : Seq[VolumeMount] = Seq()

if (localDirs.isEmpty) {
// Cannot use Utils.getConfiguredLocalDirs because that will default to the Java system
// property - we want to instead default to mounting an emptydir volume that doesn't already
// exist in the image.
// We could make utils.getConfiguredLocalDirs opinionated about Kubernetes, as it is already
// a bit opinionated about YARN and Mesos.
val resolvedLocalDirs = Option(conf.sparkConf.getenv("SPARK_LOCAL_DIRS"))
.orElse(conf.getOption("spark.local.dir"))
.getOrElse(defaultLocalDir)
.split(",")
localDirs = resolvedLocalDirs.toBuffer
localDirVolumes = resolvedLocalDirs
.zipWithIndex
.map { case (_, index) =>
new VolumeBuilder()
.withName(s"spark-local-dir-${index + 1}")
.withNewEmptyDir()
.withMedium(if (useLocalDirTmpFs) "Memory" else null)
.endEmptyDir()
.build()
}

localDirVolumeMounts = localDirVolumes
.zip(resolvedLocalDirs)
.map { case (localDirVolume, localDirPath) =>
new VolumeMountBuilder()
.withName(localDirVolume.getName)
.withMountPath(localDirPath)
.build()
}
}

val podWithLocalDirVolumes = new PodBuilder(pod.pod)
.editSpec()
.addToVolumes(localDirVolumes: _*)
Expand All @@ -64,7 +79,7 @@ private[spark] class LocalDirsFeatureStep(
val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container)
.addNewEnv()
.withName("SPARK_LOCAL_DIRS")
.withValue(resolvedLocalDirs.mkString(","))
.withValue(localDirs.mkString(","))
.endEnv()
.addToVolumeMounts(localDirVolumeMounts: _*)
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ private[spark] class KubernetesDriverBuilder(
val baseFeatures = Seq(
provideBasicStep(kubernetesConf),
provideCredentialsStep(kubernetesConf),
provideServiceStep(kubernetesConf),
provideLocalDirsStep(kubernetesConf))
provideServiceStep(kubernetesConf))

val secretFeature = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
Seq(provideSecretsStep(kubernetesConf))
Expand All @@ -70,6 +69,7 @@ private[spark] class KubernetesDriverBuilder(
val volumesFeature = if (kubernetesConf.roleVolumes.nonEmpty) {
Seq(provideVolumesStep(kubernetesConf))
} else Nil
val localDirsFeature = Seq(provideLocalDirsStep(kubernetesConf))

val bindingsStep = kubernetesConf.roleSpecificConf.mainAppResource.map {
case JavaMainAppResource(_) =>
Expand All @@ -81,7 +81,7 @@ private[spark] class KubernetesDriverBuilder(
.getOrElse(provideJavaStep(kubernetesConf))

val allFeatures = (baseFeatures :+ bindingsStep) ++
secretFeature ++ envSecretFeature ++ volumesFeature
secretFeature ++ envSecretFeature ++ volumesFeature ++ localDirsFeature

var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap)
for (feature <- allFeatures) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private[spark] class KubernetesExecutorBuilder(
def buildFromFeatures(
kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = {

val baseFeatures = Seq(provideBasicStep(kubernetesConf), provideLocalDirsStep(kubernetesConf))
val baseFeatures = Seq(provideBasicStep(kubernetesConf))
val secretFeature = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
Seq(provideSecretsStep(kubernetesConf))
} else Nil
Expand All @@ -50,8 +50,10 @@ private[spark] class KubernetesExecutorBuilder(
val volumesFeature = if (kubernetesConf.roleVolumes.nonEmpty) {
Seq(provideVolumesStep(kubernetesConf))
} else Nil
val localDirsFeature = Seq(provideLocalDirsStep(kubernetesConf))

val allFeatures = baseFeatures ++ secretFeature ++ secretEnvFeature ++ volumesFeature
val allFeatures = baseFeatures ++ secretFeature ++ secretEnvFeature ++ volumesFeature ++
localDirsFeature

var executorPod = SparkPod.initialPod()
for (feature <- allFeatures) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import org.mockito.Mockito
import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._

class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
private val defaultLocalDir = "/var/data/default-local-dir"
Expand Down Expand Up @@ -111,4 +112,57 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
.withValue("/var/data/my-local-dir-1,/var/data/my-local-dir-2")
.build())
}

test("Use tmpfs to back default local dir") {
Mockito.doReturn(null).when(sparkConf).get("spark.local.dir")
Mockito.doReturn(null).when(sparkConf).getenv("SPARK_LOCAL_DIRS")
Mockito.doReturn(true).when(sparkConf).get(KUBERNETES_LOCAL_DIRS_TMPFS)
val stepUnderTest = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir)
val configuredPod = stepUnderTest.configurePod(SparkPod.initialPod())
assert(configuredPod.pod.getSpec.getVolumes.size === 1)
assert(configuredPod.pod.getSpec.getVolumes.get(0) ===
new VolumeBuilder()
.withName(s"spark-local-dir-1")
.withNewEmptyDir()
.withMedium("Memory")
.endEmptyDir()
.build())
assert(configuredPod.container.getVolumeMounts.size === 1)
assert(configuredPod.container.getVolumeMounts.get(0) ===
new VolumeMountBuilder()
.withName(s"spark-local-dir-1")
.withMountPath(defaultLocalDir)
.build())
assert(configuredPod.container.getEnv.size === 1)
assert(configuredPod.container.getEnv.get(0) ===
new EnvVarBuilder()
.withName("SPARK_LOCAL_DIRS")
.withValue(defaultLocalDir)
.build())
}

test("local dir on mounted volume") {
val volumeConf = KubernetesVolumeSpec(
"spark-local-dir-test",
"/tmp",
false,
KubernetesHostPathVolumeConf("/hostPath/tmp")
)
val kubernetesTestConf = kubernetesConf.copy(roleVolumes = Seq(volumeConf))
val mountVolumeStep = new MountVolumesFeatureStep(kubernetesTestConf)
val configuredPod = mountVolumeStep.configurePod(SparkPod.initialPod())
val localDirStep = new LocalDirsFeatureStep(kubernetesTestConf, defaultLocalDir)
val newConfiguredPod = localDirStep.configurePod(configuredPod)

assert(newConfiguredPod.pod.getSpec.getVolumes.size() === 1)
assert(newConfiguredPod.pod.getSpec.getVolumes.get(0).getHostPath.getPath === "/hostPath/tmp")
assert(newConfiguredPod.container.getVolumeMounts.size() === 1)
assert(newConfiguredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp")
assert(newConfiguredPod.container.getVolumeMounts.get(0).getName === "spark-local-dir-test")
assert(newConfiguredPod.container.getEnv.get(0) ===
new EnvVarBuilder()
.withName("SPARK_LOCAL_DIRS")
.withValue("/tmp")
.build())
}
}