-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-28042][K8S] Support using volume mount as local storage #24879
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thank you for making a PR, @chenjunjiedada ! |
|
ok to test |
|
Test build #106542 has finished for PR 24879 at commit
|
|
Kubernetes integration test starting |
...tes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
Show resolved
Hide resolved
|
Kubernetes integration test status success |
felixcheung
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we add some documentation?
| new KerberosConfDriverFeatureStep(conf), | ||
| new PodTemplateConfigMapStep(conf)) | ||
| new PodTemplateConfigMapStep(conf), | ||
| new LocalDirsFeatureStep(conf)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not move this right after new MountVolumesFeatureStep(conf)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think some volume mounts maybe setup in PodTemplateConfigMapStep, so I move to last to ensure that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, since the intent is for users to provide custom local dir volumes via either mount volumes or pod templates it needs to appear after both
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either is fine, just wanted to point out that the pod template is initialized before any step is executed (i.e. PodTemplateConfigMapStep is not where the template is loaded).
| hasVolumeMount(pod, localDirVolume.getName, localDirPath) match { | ||
| case true => | ||
| pod.container.getVolumeMounts().asScala | ||
| .find(m => m.getName.equals(localDirVolume.getName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please see other lines for indentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, will check and run scalafmt.
| .endEmptyDir() | ||
| .build() | ||
| val name = s"spark-local-dir-${index + 1}" | ||
| hasVolume(pod, name) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a super big deal, but hasVolume is basically the same logic pod.pod.getSpec().getVolumes().asScala.exists as pod.pod.getSpec.getVolumes().asScala.find
I think you can refactor this to avoid scanning multiple times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in the latest commit.
|
Test build #106543 has finished for PR 24879 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Test build #106544 has finished for PR 24879 at commit
|
|
Kubernetes integration test status success |
|
Hi @dongjoon-hyun and @felixcheung, I updated the code in the latest commits, could you please take a look? Thanks. |
liyinan926
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| .build() | ||
| val name = s"spark-local-dir-${index + 1}" | ||
| findVolume(pod, name) match { | ||
| case Some(volume) => volume |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
integration tests should exercise both cases (found the volume or created it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you, there is an existing unit test for creating a local directory via emptyDir, this PR adds a unit test for the case of found volume. Both cases should already be covered. Is that ok?
|
So my impression was that hostPath volumes are unsafe because they break the boundary of isolation between the container and the underlying machine, and the isolation between containers. I also was under the impression that containers have to be run as root to support hostPath volumes. Because of these factors I would be hesitant to advertise supporting this as a first-class feature in the Spark on Kubernetes integration. If a user so desired they can use the existing pod template feature to break glass, and this seems more like a break glass feature in my estimation. |
|
Good point @mccheah, First, I'd like to explain that this patch is not only for hostPath volume but also it suits for other volumes such as PV, etc. it tries to adjust feature build order to adopt volumes which already be supported as first-class features. Second, the hostPath volume can be written by a non-root user if we change the file permission according to the description here. Lastly, I think users that care about the performance should want this since even they can define volumes inside podTemplate, the volumes are not utilized, I have run spark-sql-perf on spark on Kubernetes with 1T scale, with the patch, most of the shuffle bounded queries can improve a lot, especially for q17, q25, q29, when using 4 disks as local storage instead of emptyDir, the query time improve more than 10X. In summary, this patch is just to help users easily leverage what they have to improve performance. |
|
@vanzin , could you please help to have a look? |
|
@mccheah |
|
Hi @felixcheung @mccheah , Do you think it is ready? thanks. |
|
Retest this please. |
|
test this please |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #108131 has finished for PR 24879 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Kubernetes integration test starting |
|
Test build #108149 has finished for PR 24879 at commit
|
|
Kubernetes integration test status success |
docs/running-on-kubernetes.md
Outdated
| ``` | ||
|
|
||
|
|
||
| If none volume is set as local storage, Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `SPARK_LOCAL_DIRS`. If no directories are explicitly specified then a default directory is created and configured appropriately. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"If no volume is set..."
Should mention the config (spark.local.dir) which is preferred over the env variable.
| import java.util.UUID | ||
|
|
||
| import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMountBuilder} | ||
| import collection.JavaConverters._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import scala....
| new KerberosConfDriverFeatureStep(conf), | ||
| new PodTemplateConfigMapStep(conf)) | ||
| new PodTemplateConfigMapStep(conf), | ||
| new LocalDirsFeatureStep(conf)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either is fine, just wanted to point out that the pod template is initialized before any step is executed (i.e. PodTemplateConfigMapStep is not where the template is loaded).
| var localDirVolumeMounts : Seq[VolumeMount] = Seq() | ||
|
|
||
| if (localDirs.isEmpty) { | ||
| localDirs = resolvedLocalDirs.toSeq |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get rid of resolvedLocalDirs and move that statement here instead.
|
|
||
| def findLocalDirVolumeMount(pod: SparkPod): Seq[String] = { | ||
| val localDirVolumes = pod.pod.getSpec.getVolumes.asScala | ||
| .filter(v => v.getName.startsWith("spark-local-dir-")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: either:
.filter { v => ... }
or
.filter(_.getName()...)
Also in a few places below.
| val localDirVolumes = pod.pod.getSpec.getVolumes.asScala | ||
| .filter(v => v.getName.startsWith("spark-local-dir-")) | ||
|
|
||
| localDirVolumes.map { volume => pod.container.getVolumeMounts.asScala |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move pod.container... to the next line.
| .filter(v => v.getName.startsWith("spark-local-dir-")) | ||
|
|
||
| localDirVolumes.map { volume => pod.container.getVolumeMounts.asScala | ||
| .find(m => m.getName.equals(volume.getName)) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use == for strings in scala.
| case Some(m) => m.getMountPath | ||
| case _ => "" | ||
| } | ||
| }.filter(s => s.length > 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use localDirVolumes.flatMap and avoid this filter.
| val configuredPod = mountVolumeStep.configurePod(SparkPod.initialPod()) | ||
|
|
||
| val sparkConf = new SparkConfWithEnv(Map()) | ||
| val localDirConf = KubernetesTestConf.createDriverConf(sparkConf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use the same KubernetesConf for all steps right? That's how it works when spark-submit is run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is the idea, to keep building out one conf. (There were some early proposals around making these stages "more functional" but those were considered less broadly familiar and less spark idiomatic)
|
|
||
| import org.apache.spark.{SparkConf, SparkFunSuite} | ||
| import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod} | ||
| import org.apache.spark.deploy.k8s.{KubernetesHostPathVolumeConf, KubernetesTestConf, KubernetesVolumeSpec, SparkPod} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the wildcard when the import line gets too long.
|
Just to make sure I understand the rationale - we want the option to automatically create local directories "spark-local-dir-xxx" as a simplified UX (why mess around with a second channel of config when a working dir can be created automatically from the information in the first?) |
Right. That's also more similar to other backends, where the local dirs are defined by the cluster manager themselves and users don't have to mess with the Spark configuration. (Except here they have to, but it can be encapsulated in the pod template.) |
|
@vanzin thanks, I agree that makes sense |
|
Kubernetes integration test starting |
|
Test build #108190 has finished for PR 24879 at commit
|
|
Kubernetes integration test status success |
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some small nits.
|
|
||
| import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMountBuilder} | ||
| import io.fabric8.kubernetes.api.model._ | ||
| import scala.collection.JavaConverters._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, style checker should have complained about this (scala imports should be separate from others).
| } | ||
|
|
||
| def findLocalDirVolumeMount(pod: SparkPod): Seq[String] = { | ||
| val localDirVolumes = pod.pod.getSpec.getVolumes.asScala |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... do you need to list the volumes at all? Can't you just look at the mounts (since they already have the volume name)?
| SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts) | ||
| } | ||
|
|
||
| def findLocalDirVolumeMount(pod: SparkPod): Seq[String] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mount => Mounts
|
Kubernetes integration test starting |
|
Test build #108273 has finished for PR 24879 at commit
|
|
Kubernetes integration test status success |
|
Merging to master. |
This pr is used to support using hostpath/PV volume mounts as local storage. In KubernetesExecutorBuilder.scala, the LocalDrisFeatureStep is built before MountVolumesFeatureStep which means we cannot use any volumes mount later. This pr adjust the order of feature building steps which moves localDirsFeature at last so that we can check if directories in SPARK_LOCAL_DIRS are set to volumes mounted such as hostPath, PV, or others. Unit tests Closes apache#24879 from chenjunjiedada/SPARK-28042. Lead-authored-by: Junjie Chen <[email protected]> Co-authored-by: Junjie Chen <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
What changes were proposed in this pull request?
This pr is used to support using hostpath/PV volume mounts as local storage. In KubernetesExecutorBuilder.scala, the LocalDrisFeatureStep is built before MountVolumesFeatureStep which means we cannot use any volumes mount later. This pr adjust the order of feature building steps which moves localDirsFeature at last so that we can check if directories in SPARK_LOCAL_DIRS are set to volumes mounted such as hostPath, PV, or others.
How was this patch tested?
Unit tests