-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-12559][SPARK SUBMIT] fix --packages for stand-alone cluster mode #18630
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@vanzin pls review. |
81a5dee to
9816704
Compare
|
Test build #79598 has finished for PR 18630 at commit
|
|
Test build #79599 has finished for PR 18630 at commit
|
|
Are you trying to support |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can merge this with mesos cluster OptionAssigner like OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars.packages").
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok...yes will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API interface is changed, this is not compilable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok will fix that... probably didnt update my branch...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I think jars and files can only be remote resources, otherwise how can remote driver visit SparkSubmit local resources? So I think some defensive code may be necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the local resources....Ivy does the resolution and downloads them locally at the file system's node (/home/$User/.ivy/...) where the driver is launched on. That's the idea about jars then the jar is added to the classpath. I verified that it works. Also the method here:
| private[deploy] def downloadFileList( |
returns the path as it is if it is local. So this works.
I was only focusing on packages... but yes need to write some more code to handle remote jars already passed though, yeah missed that my bad... As for the files no need to download them they are meant to be downloaded to the executor's working directory only, not the driver's one.
Docs: "Comma-separated list of files to be placed in the working directory of each executor. Globs are allowed."
Yes that is my intention as stated in the description. I will proceed with the changes thnx for the review. |
|
One thing I noticed is that if you use https as part of a uri in spark.jars you need to set : --conf spark.ssl.enabled=true --conf spark.ssl.protocol=TLS but this does nto apply to the user jar. I think that is an issue. |
9816704 to
1f92106
Compare
|
@jerryshao @vanzin I updated the PR, pls review. In standalone cluster mode with this new update I can resolve jars paths of the form: /tmp/tmp7255787727862122372/jarname.jar,
Yarn for example avoids this since the distributed cache is used: spark/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala Line 1184 in 4ce735e
|
|
Test build #79669 has finished for PR 18630 at commit
|
|
Test build #79670 has finished for PR 18630 at commit
|
|
I'm wondering if we can prepare all the resources in |
|
@jerryshao I agree I have already discussed it with @vanzin for utilizing the distributed cache or whatever is available there in another PR for mesos. He agreed with that being the next step as we dont exploit ivy cache. Check my last comment here: #18587 For mesos I am also going to discuss this with Mesosphere guys this week, to understand what is the best thing to do. Flink has one layer for all cluster envs and makes more sense since when we can indeed share stuff among envs we should do so: https://issues.apache.org/jira/browse/FLINK-6177 Bottom line, I see these as quick fixes because we had customers who wanted this feature. Now for the final proper fix I could try to refactor stuff and re-use yarn approach for all envs, but as I said I plan to discuss it for mesos at least. How do you want me to proceed? |
|
One layer for different cluster managers seems promising, but looks like it requires lots of refactoring works (trying to build a Spark own distributed cache and change the existing way to distribute resources). Maybe a quick fix is enough (I don't think there's lots of users will use standalone cluster mode). |
|
As with the Mesos change, I'm ok with this approach until there's some sort of distributed cache available for those cluster managers. I still need to review the code, though. @skonto please mention in the PR title that this is for cluster mode. |
|
@vanzin thnx done, I changed the title. I am willing to work on the distributed cache but needs time for sure. |
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please write a more descriptive commit message. There's a lot going on here that gets no mention there.
I also dislike that there's a bunch of code that is being duplicated now; some re-factoring is in order here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pick a style; either use mutable.Foo everywhere, or import each class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this is too much copy & paste for my liking. Could you instead refactor this code into a helper class or object (e.g. IvytUtils)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its not exactly the same with the code in SparkSubmit but I can give it a shot. I thought about too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear what R and Python have to do with downloading jar files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just saying we dont covering them, these details might be helpful when people read code so they know what is pending. I can remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its more scala style that I prefer but anyway...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So now all the stuff in spark.jars that actually exists on the node will be automatically added to the driver's classpath, which is a change in behavior from before. It's probably ok, but please call out this kind of thing in the commit message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I will do so... thnx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're also implementing a separate feature that is not part of SPARK-12559. You should at the very least call this out in the commit message. I wouldn't be surprised if there's already a bug tracking this, though, so you should take a look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cannot omit the jars here... I skipped them earlier in SparkSubmit with the condition I set there, so I have to do this somewhere.. I am just migrating the place where this action takes place...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These helper methods in SparkSubmit should probably be moved to a more properly-named object if they're going to be used in other places. e.g., instead of my previous IvyUtils suggestion, you could have a DependencyUtils or something like that having all these methods.
Then you could use that method in DriverRunner instead of its own downloadUserJar.
a5507ee to
c559a08
Compare
|
@vanzin I did the required refactoring, addressing the issues you mentioned and added to the description a better message so its available in git log when this PR is merged. Pls review. |
|
Test build #79906 has finished for PR 18630 at commit
|
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few minor things otherwise looks ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add empty line before this one, remove the empty line after it.
It's also weird for this method to depend on SparkSubmitArguments, and fall back to system properties. It makes more sense for it to take all those argument explicitly, and let the caller defined where they come from.
Finally there is a typo in the method name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok sure will change it, but remember that in another comment you mentioned that this is too much copy paste. So now in all places where I call this function I will have to go back and create the values locally before I pass them to the method which IMHO will look as copy paste again... anyway will fix no problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really oddly indented.
securityProperties.map { pName =>
sys.props.get(pName).map { pValue =>
sparkProperties.put(pName, pValue)
}
}
Also, it doesn't make a ton of sense to use map when you're not using the return value. That's what foreach is for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (blah) {
// code
}
But really you don't need childClasspath here at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean simply split and add each jar? Yes should be more compact.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mind moving this to a separate source file?
I think there's other stuff in SparkSubmit that should eventually be moved here, but it's ok to leave that for later.
Also, nit: no space after private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using a var I find it cleaner to define a different variable (e.g. val localJars = DependencyUtils...).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok no problem.
c559a08 to
70649e2
Compare
|
@vanzin fixed the issues. Please give it another try or merge. |
|
Test build #80208 has finished for PR 18630 at commit
|
|
@vanzin fixed the issues. Ready for merge. |
|
Test build #80433 has finished for PR 18630 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: not necessary anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: no space after map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
map -> foreach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
map -> foreach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove blank line.
BryanCutler
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this looks fine to me. I would prefer just keeping the common code in SparkSubmitUtils instead of adding a new object. Is there no tests that can be added to test this behavior? Also, I think some of the logic in SparkSubmit works to provide py files along with the jars. If that is also not working for standalone cluster, maybe it can easily be added with this change too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be an indention here. Maybe this instead?
val resolvedMavenCoordinates = DependencyUtils.resolveMavenDepndencies(args.packagesExclusions,
args.packages, ivySettings, exclusions = exclusions)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: i think this could be moved to the line above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just use o.a.s.util.Utils.createTempDir? @vanzin it's slightly different but I think serves the same purpose, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code cannot call into things that initialize logging, and the Utils method initializes logging indirectly through ShutdownHookManager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, makes sense thanks for clarifying that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a huge statement. I think it would be better to break it up to be easier to follow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's your suggestion?
This syntax is more functional and easier to follow than a bunch of disjoint statements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is a suggestion, not much different but I prefer some separation between the calls to resolveGlobPaths and downloadFileList since there is a lot going on in each of those, and checking nonEmpty vs .filterNot(_ == "")
def resolveAndDownloadJars(jars: String, userJar: String): String = {
...
Option(jars).map { allJars =>
val jarList = SparkSubmit.resolveGlobPaths(allJars, hadoopConf)
.split(",")
.filterNot(_.contains(userJar.split("/").last))
if (jarList.nonEmpty) {
SparkSubmit.downloadFileList(jarList.mkString(","), targetDir, sparkProperties, hadoopConf)
} else {
null
}
}.orNull
}it's up to you though, not a huge deal. If you do leave it as is, the var jar should be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this new object? It make the order of calls confusing. For instance SparkSubmit calls DependencyUtils which makes calls to SparkSubmitUtils. Can we just move this to SparkSubmitUtils?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I asked for this change, because SparkSubmitUtils is kind of a bad place to keep this code if things other than SparkSubmit start using it. The calls into SparkSubmitUtils are not optimal but those can be refactored separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, that's fine if you are thinking the calls back to SparkSubmitUtils can be moved here eventually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this same code repeated elsewhere? if so maybe it could be put into a common function
a177992 to
c0b0a7d
Compare
|
@vanzin I fixed these minor issues. |
|
@BryanCutler @vanzin to make things testable DriverWrapper needs refactoring from a quick look I took. |
|
I wasn't really expecting python support to be added here. I wonder if there's a bug open for that. |
|
Sure, python support could be added at a later point, I was just thinking if it was only a small addition to what's already here, but no problem. Btw, after checking out this PR I tried spark-shell and got the error below. Not sure if it was my environment, but after switching back to master it worked fine |
|
@BryanCutler, you just started spark shell and it failed? How can I reproduce it? |
|
Yeah, just by running |
|
@BryanCutler yes check here: |
|
Maybe it was just something with my env - but I was running it locally, can you just verify that works too? Just don't specify the |
|
@BryanCutler sure check here, it works without the master in and out the spark home dir : |
|
This is how I build things: |
|
Ok, thanks for checking. It doesn't look like it's coming from your changes, so I'm sure it's just me. |
|
Test build #80468 has finished for PR 18630 at commit
|
|
@vanzin do you think we are ready for a merge? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your method name still has a typo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
c0b0a7d to
db60b27
Compare
|
@vanzin fixed. |
|
Test build #80549 has finished for PR 18630 at commit
|
|
You forgot to address @BryanCutler 's comments; I'll fix the easy ones during merge. Merging to master. |
|
@vanzin I dint forget them, I didnt see agreement. I was waiting for your approval for the rest of the comments. Are you referring to testing or the minor nit things? I could fix them in a new PR anyway just let me know which need a fix. |
Fixes --packages flag for the stand-alone case in cluster mode. Adds to the driver classpath the jars that are resolved via ivy along with any other jars passed to `spark.jars`. Jars not resolved by ivy are downloaded explicitly to a tmp folder on the driver node. Similar code is available in SparkSubmit so we refactored part of it to use it at the DriverWrapper class which is responsible for launching driver in standalone cluster mode. Note: In stand-alone mode `spark.jars` contains the user jar so it can be fetched later on at the executor side. Manually by submitting a driver in cluster mode within a standalone cluster and checking if dependencies were resolved at the driver side. Author: Stavros Kontopoulos <[email protected]> Closes apache#18630 from skonto/fix_packages_stand_alone_cluster.
What changes were proposed in this pull request?
Fixes --packages flag for the stand-alone case in cluster mode. Adds to the driver classpath the jars that are resolved via ivy along with any other jars passed to
spark.jars. Jars not resolved by ivy are downloaded explicitly to a tmp folder on the driver node. Similar code is available in SparkSubmit so we refactored part of it to use it at the DriverWrapper class which is responsible for launching driver in standalone cluster mode.Note: In stand-alone mode
spark.jarscontains the user jar so it can be fetched later on at the executor side.How was this patch tested?
Manually by submitting a driver in cluster mode within a standalone cluster and checking if dependencies were resolved at the driver side.