-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-13904][Scheduler]Add support for pluggable cluster manager #11723
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ws a cluster manager to clean up tasks without taking the parent process down. To plug a new external cluster manager, ExternalClusterManager trait should be implemented. It returns task scheduler and backend scheduler that will be used by SparkContext to schedule tasks. An external cluster manager is registered using the java.util.ServiceLoader mechanism (This mechanism is also being used to register data sources like parquet, json, jdbc etc.). This allows auto-loading implementations of ExternalClusterManager interface. Currently, when a driver fails, executors exit using system.exit. This does not bode well for cluster managers that would like to reuse the parent process of an executor. Hence, 1. Moving system.exit to a function that can be overriden in subclasses of CoarseGrainedExecutorBackend. 2. Added functionality of killing all the running tasks in an executor.
|
ok to test. |
|
Test build #53264 has finished for PR 11723 at commit
|
|
Hmm I'm not sure if it makes sense to create a public API for this at this point. This is directly exposing something that is very internal to the current implementation of Spark (SchedulerBackend), and these are by no means stable. |
|
@rxin Thanks for commenting. Spark was designed such that it is agnostic to the underlying cluster manager (as long as it can acquire executor processes, and these communicate with each other). Since Spark is now being used in newer and different use cases, there is a need for allowing other cluster managers to manage spark components. One such use case is - embedding spark components like executor and driver inside another process which may be a datastore. This allows co-location of data and processing. Another use case would be using Spark like an application server (you might have heard about spark-jobserver). Spark's current design allows handling such use cases if the cluster manager supports it. Hence, IMO, it is meaningful to allow plugging in new cluster managers. From code perspective, I think that even creation of TaskScheduler and SchedulerBackend for Yarn/Mesos/local mode should be done using a similar interface. |
|
@hbhanawat I understand that. The problem is not whether you can find a single legitimate use case. The introduction of every API always benefit something -- there is no argument about it. Otherwise nobody would be adding new APIs. The question is how many it benefits, and how the APIs can be maintained and evolved. You are effectively taking a bunch of private APIs that were never meant to be public and making them public. This approach is not maintainable. |
|
@hbhanawat to be clear, I think we might be able to add this as a semi-private API and external resource managers can use, but with the understanding that this is tied to specific versions of Spark and might break from release over release. Exposing this as a stable public API based on my experience seeing how Spark has evolved in the last 5 years is not going to work. |
|
@rxin ok, I get it. I would make ExternalClusterManager as private[spark] and mark it as developer API. I think that should suffice. |
|
Test build #53424 has finished for PR 11723 at commit
|
…king ExternalClusterManager as DeveloperApi and making it as private[spark].
|
Test build #53525 has finished for PR 11723 at commit
|
|
@rxin I have completed the changes. Please review. |
|
@rxin Any update? Any changes needed from my side? |
| // exactly one registered manager | ||
| case head :: Nil => Some(head) | ||
| case Nil => None | ||
| case multipleMgrs => sys.error(s"Multiple Cluster Managers registered " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you include the list of matching cluster managers in the message ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
@rxin : I would really like to have this PR in trunk. As things stand, for anyone using their own scheduler, one has to maintain a patch over open source release to have that glue in Spark. Re API breaking over Spark releases : I agree that breaking APIs is bad. But it would be atleast better than the current model of dealing with this: doing a merge for every release. |
|
Is this something Facebook needs too? |
|
@rxin : Yes !! At Facebook we are using an internal scheduler to run Spark executors. Maintaining an internal patch to have that "glue" and merging it against every Spark release can be avoided by this PR. Ideally we want to get to a place where there are no patches to maintain and everything is just pluggable. If we get there, testing RCs would be easier and we can help flagging issues early on (especially ones related to scalability). |
|
OK - once you are done with your own review ping me. I will take a look at it again. |
1. Fixed formatting issues 2. Added master url as part of the other functions of ExternalClusterManager
| setAppName("testcm").set("spark.driver.allowMultipleContexts", "true") | ||
| sc = new SparkContext(conf) | ||
| // check if the scheduler components are created | ||
| assert(sc.schedulerBackend.isInstanceOf[FakeSchedulerBackend]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I missed this in the last review comments yesterday. I thought that FakeSchedulerBackend was in this same file and you could rename it but now I see that its from some other place.
While reading, it feels odd to have Fake* and then Dummy* test classes. I am not sure about the whats followed in Spark codebase. Couple options:
- rename Dummy* classes => Fake_. Move all the Fake_ classes to a common test utils file for the module.
- Instead of re-using
FakeSchedulerBackendfrom another place, create aFakeSchedulerBackendhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I too missed it completely.
I think it wasn't a great idea in the first place to use FakeSchedulerBackend of some other class from maintenance perspective. I am going ahead with your option 2.
| * @param scheduler TaskScheduler that will be used with the scheduler backend. | ||
| * @return SchedulerBackend that works with a TaskScheduler | ||
| */ | ||
| def createSchedulerBackend(sc: SparkContext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the way we indent is
def createSchedulerBackend(
sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend
|
Sorry for the delay. This looks pretty good. Just have some comments about the style to be more consistent with rest of the Spark codebase. |
Conflicts: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
|
there are some conflicts with master - can you rebase? Thanks. |
|
The latest changes LGTM |
|
Test build #55997 has finished for PR 11723 at commit
|
|
Test build #55998 has finished for PR 11723 at commit
|
|
Test build #55994 has finished for PR 11723 at commit
|
|
test this please |
|
@rxin how do I get this retested by Jenkins? There were few issues going on with the Jenkins when I checked in my last changes and now it is not retesting it? |
|
Jenkins retest this please |
|
Jenkins add to whitelist |
|
Test build #56012 has finished for PR 11723 at commit
|
| * A cluster manager interface to plugin external scheduler. | ||
| */ | ||
| @DeveloperApi | ||
| trait ExternalClusterManager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i just realized most of the return types used in this class are private[spark], so your implementation of this interface would need to be in the spark package anyway. I'm going to add private[spark] to this when I merge.
|
Merging in master. Thanks. |
|
One thing - can you guys try to see if you can implement one of the existing cluster managers with this, and then we can make sure this is a proper API? Otherwise it is really easy to get removed because it is currently unused by anything in Spark. |
|
@rxin I will open another JIRA and a PR to do this. Thanks for the review. |
| * executor exits differently. For e.g. when an executor goes down, | ||
| * back-end may not want to take the parent process down. | ||
| */ | ||
| protected def exitExecutor(): Unit = System.exit(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should a parameter of exit code (int) be added to this method ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created #12457
## What changes were proposed in this pull request? This commit adds support for pluggable cluster manager. And also allows a cluster manager to clean up tasks without taking the parent process down. To plug a new external cluster manager, ExternalClusterManager trait should be implemented. It returns task scheduler and backend scheduler that will be used by SparkContext to schedule tasks. An external cluster manager is registered using the java.util.ServiceLoader mechanism (This mechanism is also being used to register data sources like parquet, json, jdbc etc.). This allows auto-loading implementations of ExternalClusterManager interface. Currently, when a driver fails, executors exit using system.exit. This does not bode well for cluster managers that would like to reuse the parent process of an executor. Hence, 1. Moving system.exit to a function that can be overriden in subclasses of CoarseGrainedExecutorBackend. 2. Added functionality of killing all the running tasks in an executor. ## How was this patch tested? ExternalClusterManagerSuite.scala was added to test this patch. Author: Hemant Bhanawat <[email protected]> Closes apache#11723 from hbhanawat/pluggableScheduler.
…he#11723, any cluster manager can now be integrated with Spark. It was suggested in ExternalClusterManager PR that one of the existing cluster managers should start using the new interface to ensure that the API is correct. Ideally, all the existing cluster managers should eventually use the ECM interface but as a first step yarn will now use the ECM interface. This PR refactors YARN code from SparkContext.createTaskScheduler function into YarnClusterManager that implements ECM interface.
…se newly added ExternalClusterManager ## What changes were proposed in this pull request? With the addition of ExternalClusterManager(ECM) interface in PR #11723, any cluster manager can now be integrated with Spark. It was suggested in ExternalClusterManager PR that one of the existing cluster managers should start using the new interface to ensure that the API is correct. Ideally, all the existing cluster managers should eventually use the ECM interface but as a first step yarn will now use the ECM interface. This PR refactors YARN code from SparkContext.createTaskScheduler function into YarnClusterManager that implements ECM interface. ## How was this patch tested? Since this is refactoring, no new tests has been added. Existing tests have been run. Basic manual testing with YARN was done too. Author: Hemant Bhanawat <[email protected]> Closes #12641 from hbhanawat/yarnClusterMgr.
This commit adds support for pluggable cluster manager. And also allows a cluster manager to clean up tasks without taking the parent process down. To plug a new external cluster manager, ExternalClusterManager trait should be implemented. It returns task scheduler and backend scheduler that will be used by SparkContext to schedule tasks. An external cluster manager is registered using the java.util.ServiceLoader mechanism (This mechanism is also being used to register data sources like parquet, json, jdbc etc.). This allows auto-loading implementations of ExternalClusterManager interface. Currently, when a driver fails, executors exit using system.exit. This does not bode well for cluster managers that would like to reuse the parent process of an executor. Hence, 1. Moving system.exit to a function that can be overriden in subclasses of CoarseGrainedExecutorBackend. 2. Added functionality of killing all the running tasks in an executor. ExternalClusterManagerSuite.scala was added to test this patch. Author: Hemant Bhanawat <[email protected]> Closes apache#11723 from hbhanawat/pluggableScheduler. Conflicts: core/src/main/scala/org/apache/spark/SparkContext.scala core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala core/src/main/scala/org/apache/spark/executor/Executor.scala core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala dev/.rat-excludes
…se newly added ExternalClusterManager With the addition of ExternalClusterManager(ECM) interface in PR apache#11723, any cluster manager can now be integrated with Spark. It was suggested in ExternalClusterManager PR that one of the existing cluster managers should start using the new interface to ensure that the API is correct. Ideally, all the existing cluster managers should eventually use the ECM interface but as a first step yarn will now use the ECM interface. This PR refactors YARN code from SparkContext.createTaskScheduler function into YarnClusterManager that implements ECM interface. Since this is refactoring, no new tests has been added. Existing tests have been run. Basic manual testing with YARN was done too. Author: Hemant Bhanawat <[email protected]> Closes apache#12641 from hbhanawat/yarnClusterMgr. Conflicts: core/src/main/scala/org/apache/spark/SparkContext.scala core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
What changes were proposed in this pull request?
This commit adds support for pluggable cluster manager. And also allows a cluster manager to clean up tasks without taking the parent process down.
To plug a new external cluster manager, ExternalClusterManager trait should be implemented. It returns task scheduler and backend scheduler that will be used by SparkContext to schedule tasks. An external cluster manager is registered using the java.util.ServiceLoader mechanism (This mechanism is also being used to register data sources like parquet, json, jdbc etc.). This allows auto-loading implementations of ExternalClusterManager interface.
Currently, when a driver fails, executors exit using system.exit. This does not bode well for cluster managers that would like to reuse the parent process of an executor. Hence,
How was this patch tested?
ExternalClusterManagerSuite.scala was added to test this patch.