From 2e29b6609dfe96632ba4480eaa88060d25e3f593 Mon Sep 17 00:00:00 2001 From: Stephen Hopper Date: Thu, 2 Jul 2020 15:02:47 -0500 Subject: [PATCH 1/5] Backport patches #22323 and #24879 to Spark 2.4 --- .../org/apache/spark/deploy/k8s/Config.scala | 9 +++ .../k8s/features/LocalDirsFeatureStep.scala | 68 +++++++++++-------- .../k8s/submit/KubernetesDriverBuilder.scala | 6 +- .../k8s/KubernetesExecutorBuilder.scala | 5 +- 4 files changed, 55 insertions(+), 33 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index c7338a721595..dea3b2886375 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -225,6 +225,15 @@ private[spark] object Config extends Logging { "Ensure that major Python version is either Python2 or Python3") .createWithDefault("2") + val KUBERNETES_LOCAL_DIRS_TMPFS = + ConfigBuilder("spark.kubernetes.local.dirs.tmpfs") + .doc("If set to true then emptyDir volumes created to back SPARK_LOCAL_DIRS will have " + + "their medium set to Memory so that they will be created as tmpfs (i.e. RAM) backed " + + "volumes. This may improve performance but scratch space usage will count towards " + + "your pods memory limit so you may wish to request more memory.") + .booleanConf + .createWithDefault(false) + val APP_RESOURCE_TYPE = ConfigBuilder("spark.kubernetes.resource.type") .doc("This sets the resource type internally") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala index 70b307303d14..e42b03652d0a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala @@ -19,43 +19,55 @@ package org.apache.spark.deploy.k8s.features import java.nio.file.Paths import java.util.UUID -import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMountBuilder} - import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.Config._ private[spark] class LocalDirsFeatureStep( conf: KubernetesConf[_ <: KubernetesRoleSpecificConf], defaultLocalDir: String = s"/var/data/spark-${UUID.randomUUID}") extends KubernetesFeatureConfigStep { - // Cannot use Utils.getConfiguredLocalDirs because that will default to the Java system - // property - we want to instead default to mounting an emptydir volume that doesn't already - // exist in the image. - // We could make utils.getConfiguredLocalDirs opinionated about Kubernetes, as it is already - // a bit opinionated about YARN and Mesos. - private val resolvedLocalDirs = Option(conf.sparkConf.getenv("SPARK_LOCAL_DIRS")) - .orElse(conf.getOption("spark.local.dir")) - .getOrElse(defaultLocalDir) - .split(",") + private val useLocalDirTmpFs = conf.get(Config.KUBERNETES_LOCAL_DIRS_TMPFS) override def configurePod(pod: SparkPod): SparkPod = { - val localDirVolumes = resolvedLocalDirs - .zipWithIndex - .map { case (localDir, index) => - new VolumeBuilder() - .withName(s"spark-local-dir-${index + 1}") - .withNewEmptyDir() - .endEmptyDir() - .build() - } - val localDirVolumeMounts = localDirVolumes - .zip(resolvedLocalDirs) - .map { case (localDirVolume, localDirPath) => - new VolumeMountBuilder() - .withName(localDirVolume.getName) - .withMountPath(localDirPath) - .build() - } + var localDirs = pod.container.getVolumeMounts.asScala + .filter(_.getName.startsWith("spark-local-dir-")) + .map(_.getMountPath) + var localDirVolumes : Seq[Volume] = Seq() + var localDirVolumeMounts : Seq[VolumeMount] = Seq() + + if (localDirs.isEmpty) { + // Cannot use Utils.getConfiguredLocalDirs because that will default to the Java system + // property - we want to instead default to mounting an emptydir volume that doesn't already + // exist in the image. + // We could make utils.getConfiguredLocalDirs opinionated about Kubernetes, as it is already + // a bit opinionated about YARN and Mesos. + val resolvedLocalDirs = Option(conf.sparkConf.getenv("SPARK_LOCAL_DIRS")) + .orElse(conf.getOption("spark.local.dir")) + .getOrElse(defaultLocalDir) + .split(",") + localDirs = resolvedLocalDirs.toBuffer + localDirVolumes = resolvedLocalDirs + .zipWithIndex + .map { case (_, index) => + new VolumeBuilder() + .withName(s"spark-local-dir-${index + 1}") + .withNewEmptyDir() + .withMedium(if (useLocalDirTmpFs) "Memory" else null) + .endEmptyDir() + .build() + } + + localDirVolumeMounts = localDirVolumes + .zip(resolvedLocalDirs) + .map { case (localDirVolume, localDirPath) => + new VolumeMountBuilder() + .withName(localDirVolume.getName) + .withMountPath(localDirPath) + .build() + } + } + val podWithLocalDirVolumes = new PodBuilder(pod.pod) .editSpec() .addToVolumes(localDirVolumes: _*) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index 8f3f18ffadc3..44a6f8ba9564 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -58,8 +58,7 @@ private[spark] class KubernetesDriverBuilder( val baseFeatures = Seq( provideBasicStep(kubernetesConf), provideCredentialsStep(kubernetesConf), - provideServiceStep(kubernetesConf), - provideLocalDirsStep(kubernetesConf)) + provideServiceStep(kubernetesConf)) val secretFeature = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) { Seq(provideSecretsStep(kubernetesConf)) @@ -70,6 +69,7 @@ private[spark] class KubernetesDriverBuilder( val volumesFeature = if (kubernetesConf.roleVolumes.nonEmpty) { Seq(provideVolumesStep(kubernetesConf)) } else Nil + val localDirsFeature = Seq(provideLocalDirsStep(kubernetesConf)) val bindingsStep = kubernetesConf.roleSpecificConf.mainAppResource.map { case JavaMainAppResource(_) => @@ -81,7 +81,7 @@ private[spark] class KubernetesDriverBuilder( .getOrElse(provideJavaStep(kubernetesConf)) val allFeatures = (baseFeatures :+ bindingsStep) ++ - secretFeature ++ envSecretFeature ++ volumesFeature + secretFeature ++ envSecretFeature ++ volumesFeature ++ localDirsFeature var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap) for (feature <- allFeatures) { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index 364b6fb36772..59df8e896a2e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -40,7 +40,7 @@ private[spark] class KubernetesExecutorBuilder( def buildFromFeatures( kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = { - val baseFeatures = Seq(provideBasicStep(kubernetesConf), provideLocalDirsStep(kubernetesConf)) + val baseFeatures = Seq(provideBasicStep(kubernetesConf)) val secretFeature = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) { Seq(provideSecretsStep(kubernetesConf)) } else Nil @@ -50,8 +50,9 @@ private[spark] class KubernetesExecutorBuilder( val volumesFeature = if (kubernetesConf.roleVolumes.nonEmpty) { Seq(provideVolumesStep(kubernetesConf)) } else Nil + val localDirsFeature = Seq(provideLocalDirsStep(kubernetesConf)) - val allFeatures = baseFeatures ++ secretFeature ++ secretEnvFeature ++ volumesFeature + val allFeatures = baseFeatures ++ secretFeature ++ secretEnvFeature ++ volumesFeature ++ localDirsFeature var executorPod = SparkPod.initialPod() for (feature <- allFeatures) { From 143fdbcf301c544e03a7fcbec056dc5e7cbbec15 Mon Sep 17 00:00:00 2001 From: Stephen Hopper Date: Thu, 2 Jul 2020 15:22:26 -0500 Subject: [PATCH 2/5] Backport docs and tests too --- docs/running-on-kubernetes.md | 28 +++++++++- .../k8s/features/LocalDirsFeatureStep.scala | 2 +- .../features/LocalDirsFeatureStepSuite.scala | 56 ++++++++++++++++++- 3 files changed, 83 insertions(+), 3 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 037e1d54661e..59766dfce500 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -232,7 +232,25 @@ For example, the claim name of a `persistentVolumeClaim` with volume name `check spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=check-point-pvc-claim ``` -The configuration properties for mounting volumes into the executor pods use prefix `spark.kubernetes.executor.` instead of `spark.kubernetes.driver.`. For a complete list of available options for each supported type of volumes, please refer to the [Spark Properties](#spark-properties) section below. +The configuration properties for mounting volumes into the executor pods use prefix `spark.kubernetes.executor.` instead of `spark.kubernetes.driver.`. For a complete list of available options for each supported type of volumes, please refer to the [Spark Properties](#spark-properties) section below. + +## Local Storage + +Spark supports using volumes to spill data during shuffles and other operations. To use a volume as local storage, the volume's name should starts with `spark-local-dir-`, for example: + +``` +--conf spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.path= +--conf spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.readOnly=false +``` + + +If no volume is set as local storage, Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `spark.local.dir` or the environment variable `SPARK_LOCAL_DIRS` . If no directories are explicitly specified then a default directory is created and configured appropriately. + +### Using RAM for local storage + +`emptyDir` volumes use the nodes backing storage for ephemeral storage by default, this behaviour may not be appropriate for some compute environments. For example if you have diskless nodes with remote storage mounted over a network, having lots of executors doing IO to this remote storage may actually degrade performance. + +In this case it may be desirable to set `spark.kubernetes.local.dirs.tmpfs=true` in your configuration which will cause the `emptyDir` volumes to be configured as `tmpfs` i.e. RAM backed volumes. When configured like this Sparks local storage usage will count towards your pods memory usage therefore you may wish to increase your memory requests by increasing the value of `spark.kubernetes.memoryOverheadFactor` as appropriate. ## Introspection and Debugging @@ -804,6 +822,14 @@ specific to Spark on Kubernetes. spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-pvc-claim. + + spark.kubernetes.local.dirs.tmpfs + false + + Configure the emptyDir volumes used to back SPARK_LOCAL_DIRS within the Spark driver and executor pods to use tmpfs backing i.e. RAM. See Local Storage earlier on this page + for more discussion of this. + + spark.kubernetes.memoryOverheadFactor 0.1 diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala index e42b03652d0a..96c086f7486e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala @@ -76,7 +76,7 @@ private[spark] class LocalDirsFeatureStep( val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container) .addNewEnv() .withName("SPARK_LOCAL_DIRS") - .withValue(resolvedLocalDirs.mkString(",")) + .withValue(localDirs.mkString(",")) .endEnv() .addToVolumeMounts(localDirVolumeMounts: _*) .build() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala index a339827b819a..b01b52093acd 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala @@ -21,7 +21,7 @@ import org.mockito.Mockito import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s._ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { private val defaultLocalDir = "/var/data/default-local-dir" @@ -111,4 +111,58 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { .withValue("/var/data/my-local-dir-1,/var/data/my-local-dir-2") .build()) } + + test("Use tmpfs to back default local dir") { + Mockito.doReturn(null).when(sparkConf).get("spark.local.dir") + Mockito.doReturn(null).when(sparkConf).getenv("SPARK_LOCAL_DIRS") + Mockito.doReturn(true).when(sparkConf).get(KUBERNETES_LOCAL_DIRS_TMPFS) + val stepUnderTest = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir) + val configuredPod = stepUnderTest.configurePod(SparkPod.initialPod()) + assert(configuredPod.pod.getSpec.getVolumes.size === 1) + assert(configuredPod.pod.getSpec.getVolumes.get(0) === + new VolumeBuilder() + .withName(s"spark-local-dir-1") + .withNewEmptyDir() + .withMedium("Memory") + .endEmptyDir() + .build()) + assert(configuredPod.container.getVolumeMounts.size === 1) + assert(configuredPod.container.getVolumeMounts.get(0) === + new VolumeMountBuilder() + .withName(s"spark-local-dir-1") + .withMountPath(defaultLocalDir) + .build()) + assert(configuredPod.container.getEnv.size === 1) + assert(configuredPod.container.getEnv.get(0) === + new EnvVarBuilder() + .withName("SPARK_LOCAL_DIRS") + .withValue(defaultLocalDir) + .build()) + } + + test("local dir on mounted volume") { + val volumeConf = KubernetesVolumeSpec( + "spark-local-dir-test", + "/tmp", + "", + false, + KubernetesHostPathVolumeConf("/hostPath/tmp") + ) + val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf)) + val mountVolumeStep = new MountVolumesFeatureStep(kubernetesConf) + val configuredPod = mountVolumeStep.configurePod(SparkPod.initialPod()) + val localDirStep = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir) + val newConfiguredPod = localDirStep.configurePod(configuredPod) + + assert(newConfiguredPod.pod.getSpec.getVolumes.size() === 1) + assert(newConfiguredPod.pod.getSpec.getVolumes.get(0).getHostPath.getPath === "/hostPath/tmp") + assert(newConfiguredPod.container.getVolumeMounts.size() === 1) + assert(newConfiguredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp") + assert(newConfiguredPod.container.getVolumeMounts.get(0).getName === "spark-local-dir-test") + assert(newConfiguredPod.container.getEnv.get(0) === + new EnvVarBuilder() + .withName("SPARK_LOCAL_DIRS") + .withValue("/tmp") + .build()) + } } From 3d8b3608e3329062f7f1a385686a0d810cbfe6c8 Mon Sep 17 00:00:00 2001 From: Stephen Hopper Date: Thu, 2 Jul 2020 15:48:41 -0500 Subject: [PATCH 3/5] Lint fix --- .../scheduler/cluster/k8s/KubernetesExecutorBuilder.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index 59df8e896a2e..e02974dbd63a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -52,7 +52,8 @@ private[spark] class KubernetesExecutorBuilder( } else Nil val localDirsFeature = Seq(provideLocalDirsStep(kubernetesConf)) - val allFeatures = baseFeatures ++ secretFeature ++ secretEnvFeature ++ volumesFeature ++ localDirsFeature + val allFeatures = baseFeatures ++ secretFeature ++ secretEnvFeature ++ volumesFeature ++ + localDirsFeature var executorPod = SparkPod.initialPod() for (feature <- allFeatures) { From e93ef7d894deafaffd4e7a6652c337f28f7ce18a Mon Sep 17 00:00:00 2001 From: Stephen Hopper Date: Thu, 2 Jul 2020 16:08:26 -0500 Subject: [PATCH 4/5] Compilation fixes --- .../spark/deploy/k8s/features/LocalDirsFeatureStep.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala index 96c086f7486e..bc900b62388b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala @@ -16,10 +16,13 @@ */ package org.apache.spark.deploy.k8s.features -import java.nio.file.Paths import java.util.UUID -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod} +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ private[spark] class LocalDirsFeatureStep( @@ -27,7 +30,7 @@ private[spark] class LocalDirsFeatureStep( defaultLocalDir: String = s"/var/data/spark-${UUID.randomUUID}") extends KubernetesFeatureConfigStep { - private val useLocalDirTmpFs = conf.get(Config.KUBERNETES_LOCAL_DIRS_TMPFS) + private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS) override def configurePod(pod: SparkPod): SparkPod = { var localDirs = pod.container.getVolumeMounts.asScala From c1ccbfbf306de82f03ba3b9f0b46ce1ad230058e Mon Sep 17 00:00:00 2001 From: Stephen Hopper Date: Thu, 2 Jul 2020 16:16:42 -0500 Subject: [PATCH 5/5] Fix tests --- .../deploy/k8s/features/LocalDirsFeatureStepSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala index b01b52093acd..13b6ed2743fa 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.Config._ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { private val defaultLocalDir = "/var/data/default-local-dir" @@ -144,14 +145,13 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { val volumeConf = KubernetesVolumeSpec( "spark-local-dir-test", "/tmp", - "", false, KubernetesHostPathVolumeConf("/hostPath/tmp") ) - val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf)) - val mountVolumeStep = new MountVolumesFeatureStep(kubernetesConf) + val kubernetesTestConf = kubernetesConf.copy(roleVolumes = Seq(volumeConf)) + val mountVolumeStep = new MountVolumesFeatureStep(kubernetesTestConf) val configuredPod = mountVolumeStep.configurePod(SparkPod.initialPod()) - val localDirStep = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir) + val localDirStep = new LocalDirsFeatureStep(kubernetesTestConf, defaultLocalDir) val newConfiguredPod = localDirStep.configurePod(configuredPod) assert(newConfiguredPod.pod.getSpec.getVolumes.size() === 1)