-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23153][K8s] Support client dependencies with a Hadoop Compatible File System #23546
Conversation
Test build #101225 has finished for PR 23546 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
20603a4
to
7597e03
Compare
Test build #101228 has finished for PR 23546 at commit
|
7597e03
to
bf5f3b1
Compare
Integration tests failed with:
This is most likely due to dns on the test node. I saw this with ubuntu 18.04 and contents of /etc/resolv.conf causing issues. Workaround was to use with minikube start: |
@erikerlandson, @liyinan926 , @felixcheung pls review if the approach is approved I can add integration tests etc. |
Test build #101229 has finished for PR 23546 at commit
|
jenkins test this please |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Integration tests still fail for the same issue. |
...ce-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
Outdated
Show resolved
Hide resolved
...ce-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
Outdated
Show resolved
Hide resolved
...ce-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
Outdated
Show resolved
Hide resolved
...ce-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
Outdated
Show resolved
Hide resolved
...etes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
Outdated
Show resolved
Hide resolved
Test build #101254 has finished for PR 23546 at commit
|
Test build #101255 has finished for PR 23546 at commit
|
One python test is failing which is irrelevant:
|
@@ -330,6 +347,7 @@ private[spark] class SparkSubmit extends Logging { | |||
} | |||
} | |||
|
|||
// Fill in all spark properties of SparkConf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I underline this here because it should be a few lines above but not sure if it matters. The issue is a few lines above I have to use args.sparkProperties
instead. The other problem is that at line 802 we re-fill the properties if missing... not sure why.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd just move this initialization to the top of the function, then code can be consistent in always using sparkConf
.
And yes, this code is very confusing and probably does a lot of redundant things. Which I why I asked you to start with the plugin idea in the other PR, in the hopes that we can clean this up little by little.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree we should clean it up.
confKey = "spark.jars.repositories"), | ||
OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.ivy"), | ||
OptionAssigner(args.packagesExclusions, STANDALONE | MESOS, | ||
OptionAssigner(args.packages, STANDALONE | MESOS | KUBERNETES, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: Packages in containers maybe slow if your net is slow since ivy cache will be empty. Users in practice should build their dependencies in the image or use a pre-populated cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This point might be a good addition to the docs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add it.
@@ -400,7 +415,6 @@ There are several Spark on Kubernetes features that are currently being worked o | |||
Some of these include: | |||
|
|||
* Dynamic Resource Allocation and External Shuffle Service | |||
* Local File Dependency Management |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the fact that this is an hadoop (compatible) FS based solution imply there are use cases for local deps that aren't served by this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes there might be cases like the RSS server where users want to upload to a file server within the cluster. I am covering the cases mentioned in the design document which provide an API to use out of the box. The RSS implementation AFAIK needs improvements so its open for now, but we can work on it next. I could add a note there instead of removing that part of the doc saying ("partially done"), but since it is a working solution, I thought I could remove that from future work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, does any s3 object store fit the HCFS category?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say that's broadly applicable enough to call it "done"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have environments where there is nothing remotely HDFS like available and the systems are typically air-gapped so using external services like S3 isn't an option either. Primary storage is usually a high performance parallel file system (Lustre or IBM Spectrum Scale) which is just a POSIX compliant file system mounted to all nodes over the system interconnect.
Using hostPath
volume mounts isn't a realistic option either because these environments have strict security requirements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my opinion we need both options: a) upload to some dfs/obect store service b) a file server.
We cannot possibly support all systems out there if they have their own client libs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a good first step to better supporting client dependencies, thanks for the hard work @skonto
docs/running-on-kubernetes.md
Outdated
The app jar file will be uploaded to the S3 and then when the driver is launched it will be downloaded | ||
to the driver pod and will be added to its classpath. | ||
|
||
The client scheme is supported for the application jar, and dependencies specified by proeprties `spark.jars` and `spark.files`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: proeprties
-> properties
@@ -400,7 +415,6 @@ There are several Spark on Kubernetes features that are currently being worked o | |||
Some of these include: | |||
|
|||
* Dynamic Resource Allocation and External Shuffle Service | |||
* Local File Dependency Management |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have environments where there is nothing remotely HDFS like available and the systems are typically air-gapped so using external services like S3 isn't an option either. Primary storage is usually a high performance parallel file system (Lustre or IBM Spectrum Scale) which is just a POSIX compliant file system mounted to all nodes over the system interconnect.
Using hostPath
volume mounts isn't a realistic option either because these environments have strict security requirements.
--conf spark.hadoop.fs.s3a.fast.upload=true | ||
--conf spark.hadoop.fs.s3a.secret.key=.... | ||
client:///full/path/to/app.jar | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does the submission client the user's intention is to upload to S3 instead of say an HDFS cluster? I don't think this can be determined 100% sure only based on the present of those s3a
options.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw you have spark.kubernetes.file.upload.path
below, which should also be added here as an example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code is agnostic of the protocol. I am just using S3 as an example in the docs. If they dont put the properties submit will fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -289,6 +289,12 @@ private[spark] object Config extends Logging { | |||
.booleanConf | |||
.createWithDefault(true) | |||
|
|||
val KUBERNETES_FILE_UPLOAD_PATH = | |||
ConfigBuilder("spark.kubernetes.file.upload.path") | |||
.doc("HCFS path to upload files to, using the client scheme:// in cluster mode.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HCFS path where files with the client:// scheme will be uploded to in cluster mode.
if (fileScheme == "client") { | ||
if (conf.get(KUBERNETES_FILE_UPLOAD_PATH).isDefined) { | ||
val uploadPath = conf.get(KUBERNETES_FILE_UPLOAD_PATH).get | ||
s"${uploadPath}/${fileUri.getPath.split("/").last}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So a file client://path/to/app1.jar
will be uploaded to ${uploadPath}/app1.jar
? What if two client-local files at different local paths have the same file name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am currently not supporting that. I thought of that. People should make sure they dont create a conflict. Otherwise I will have to create random paths.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document this clearly, i.e., all client-side dependencies will be uploaded to the given path with a flat directory structure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do thanx.
val uploadPath = sConf.get(KUBERNETES_FILE_UPLOAD_PATH).get | ||
val fs = getHadoopFileSystem(Utils.resolveURI(uploadPath), hadoopConf) | ||
val storePath = new Path(s"${uploadPath}/${fileUri.getPath.split("/").last}") | ||
log.info(s"Uploading file: ${fileUri.getPath}...") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also mention the destination path in the log message.
def resolveFileUri(uri: String): String = { | ||
/** | ||
* Get the final path for a client file, if not return the uri as is. | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This empty comment line can be removed.
docs/running-on-kubernetes.md
Outdated
@@ -1010,6 +1024,15 @@ See the below table for the full list of pod specifications that will be overwri | |||
Spark will add additional labels specified by the spark configuration. | |||
</td> | |||
</tr> | |||
<tr> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be added to the table under the section Spark Properties
.
@@ -373,6 +407,49 @@ private[spark] class SparkSubmit extends Logging { | |||
localPyFiles = Option(args.pyFiles).map { | |||
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr) | |||
}.orNull | |||
|
|||
if (isKubernetesClient && | |||
sparkConf.getBoolean("spark.kubernetes.submitInDriver", false)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of checking spark.kubernetes.submitInDriver
, which AFAIK is used to indicate the cluster mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to make sure i run this only in cluster mode at the second submit time where we are using client mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my previous comment about these names not being optimal in the context of how k8s works.
bf5f3b1
to
c885132
Compare
Test build #102043 has finished for PR 23546 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
c885132
to
40d89fa
Compare
29ecdeb
to
ec2d66c
Compare
jenkins test this please |
@erikerlandson @srowen I added support for random dirs, pls let me know if this is ok now to be merged. Updated the doc to indicate that user will not see conflicts when spark apps are run in parallel. Hope that helps. |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #105450 has finished for PR 23546 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few nits here; I'd defer to @vanzin for comment on directory handling
if (isLocalAndResolvable(resource)) { | ||
SparkLauncher.NO_RESOURCE | ||
} else { | ||
resource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: indent
case e: Exception => | ||
throw new SparkException(s"Uploading file ${fileUri.getPath} failed...", e) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: pull the else up to this line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I resolved my previous comments that were out of date
ec2d66c
to
3c58f7b
Compare
@erikerlandson @srowen I fixed the two pending comments pls resolve the review. |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #105503 has finished for PR 23546 at commit
|
@srowen @erikerlandson gentle ping |
@erikerlandson @vanzin are you ok with the current status of things? Should it be merged? |
If there are no further requests I will merge. |
@erikerlandson from what I see there is no more activity, could you merge please? |
if i add local file on the client with ``--file somefile``` i expect it to be on classpath for the driver (on the cluster, using cluster deploy-mode). or at least i expect it to be in working directory for driver? but this does not seem to be the case? am i doing something wrong? |
…tainer` ### What changes were proposed in this pull request? This PR aims to simply steps to re-write primary resource in k8s spark application. ### Why are the changes needed? Re-write primary resource uses `renameMainAppResource` twice. * First `renameMainAppResource` in `baseDriverContainer` in is introduced by #23546 * #25870 refactors `renameMainAppResource` and introduces `renameMainAppResource` in `configureForJava`. Refactoring and `renameMainAppResource` in `configureForJava` makes `renameMainAppResource` in `baseDriverContainer` useless. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? * Pass the GA. * Pass k8s IT. ``` $ build/sbt -Pkubernetes -Pkubernetes-integration-tests -Dtest.exclude.tags=r -Dspark.kubernetes.test.imageRepo=kubespark "kubernetes-integration-tests/test" [info] KubernetesSuite: [info] - Run SparkPi with no resources (17 seconds, 443 milliseconds) [info] - Run SparkPi with no resources & statefulset allocation (17 seconds, 858 milliseconds) [info] - Run SparkPi with a very long application name. (30 seconds, 450 milliseconds) [info] - Use SparkLauncher.NO_RESOURCE (18 seconds, 596 milliseconds) [info] - Run SparkPi with a master URL without a scheme. (18 seconds, 534 milliseconds) [info] - Run SparkPi with an argument. (21 seconds, 853 milliseconds) [info] - Run SparkPi with custom labels, annotations, and environment variables. (14 seconds, 285 milliseconds) [info] - All pods have the same service account by default (13 seconds, 800 milliseconds) [info] - Run extraJVMOptions check on driver (7 seconds, 825 milliseconds) [info] - Run SparkRemoteFileTest using a remote data file (15 seconds, 242 milliseconds) [info] - Verify logging configuration is picked from the provided SPARK_CONF_DIR/log4j2.properties (15 seconds, 491 milliseconds) [info] - Run SparkPi with env and mount secrets. (26 seconds, 967 milliseconds) [info] - Run PySpark on simple pi.py example (20 seconds, 318 milliseconds) [info] - Run PySpark to test a pyfiles example (25 seconds, 659 milliseconds) [info] - Run PySpark with memory customization (25 seconds, 608 milliseconds) [info] - Run in client mode. (14 seconds, 620 milliseconds) [info] - Start pod creation from template (19 seconds, 916 milliseconds) [info] - SPARK-38398: Schedule pod creation from template (19 seconds, 966 milliseconds) [info] - PVs with local hostpath storage on statefulsets (22 seconds, 380 milliseconds) [info] - PVs with local hostpath and storageClass on statefulsets (26 seconds, 935 milliseconds) [info] - PVs with local storage (30 seconds, 75 milliseconds) [info] - Launcher client dependencies (2 minutes, 48 seconds) [info] - SPARK-33615: Launcher client archives (1 minute, 26 seconds) [info] - SPARK-33748: Launcher python client respecting PYSPARK_PYTHON (1 minute, 47 seconds) [info] - SPARK-33748: Launcher python client respecting spark.pyspark.python and spark.pyspark.driver.python (1 minute, 51 seconds) [info] - Launcher python client dependencies using a zip file (1 minute, 51 seconds) [info] - Test basic decommissioning (59 seconds, 765 milliseconds) [info] - Test basic decommissioning with shuffle cleanup (1 minute, 3 seconds) [info] - Test decommissioning with dynamic allocation & shuffle cleanups (2 minutes, 58 seconds) [info] - Test decommissioning timeouts (58 seconds, 754 milliseconds) [info] - SPARK-37576: Rolling decommissioning (1 minute, 15 seconds) [info] Run completed in 29 minutes, 15 seconds. [info] Total number of tests run: 31 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 31, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [success] Total time: 2020 s (33:40), completed 2022-4-2 12:35:52 ``` PS. #23546 introduces deleted code and `DepsTestsSuite`. `DepsTestsSuite` can check re-write primary resource. This PR can pass `DepsTestsSuite`, which can prove deletion about `renameMainAppResource` in `baseDriverContainer` does not affect the process about re-write primary resource. Closes #36044 from dcoliversun/SPARK-38770. Authored-by: Qian.Sun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
It works as follows:
Spark submit uploads the deps to the HCFS. Then the driver serves the deps via the Spark file server.
No hcfs uris are propagated.
The related design document is here. the next option to add is the RSS but has to be improved given the discussion in the past about it (Spark 2.3).
How was this patch tested?
Added integration tests based on Ceph nano. Looks very active.
Unfortunately minio needs hadoop >= 2.8.