Skip to content

Commit 06eca13

Browse files
Hemant Bhanawatymahajan
authored andcommitted
[SPARK-14729][SCHEDULER] Refactored YARN scheduler creation code to use newly added ExternalClusterManager
With the addition of ExternalClusterManager(ECM) interface in PR apache#11723, any cluster manager can now be integrated with Spark. It was suggested in ExternalClusterManager PR that one of the existing cluster managers should start using the new interface to ensure that the API is correct. Ideally, all the existing cluster managers should eventually use the ECM interface but as a first step yarn will now use the ECM interface. This PR refactors YARN code from SparkContext.createTaskScheduler function into YarnClusterManager that implements ECM interface. Since this is refactoring, no new tests has been added. Existing tests have been run. Basic manual testing with YARN was done too. Author: Hemant Bhanawat <[email protected]> Closes apache#12641 from hbhanawat/yarnClusterMgr. Conflicts: core/src/main/scala/org/apache/spark/SparkContext.scala core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
1 parent 7a1ace9 commit 06eca13

File tree

4 files changed

+151
-0
lines changed

4 files changed

+151
-0
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2731,6 +2731,70 @@ object SparkContext extends Logging {
27312731
}
27322732
(backend, scheduler)
27332733

2734+
case "yarn" if deployMode == "cluster" =>
2735+
val scheduler = try {
2736+
val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
2737+
val cons = clazz.getConstructor(classOf[SparkContext])
2738+
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
2739+
} catch {
2740+
// TODO: Enumerate the exact reasons why it can fail
2741+
// But irrespective of it, it means we cannot proceed !
2742+
case e: Exception =>
2743+
throw new SparkException("YARN mode not available ?", e)
2744+
}
2745+
val backend = try {
2746+
val clazz =
2747+
Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
2748+
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
2749+
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
2750+
} catch {
2751+
case e: Exception =>
2752+
throw new SparkException("YARN mode not available ?", e)
2753+
}
2754+
scheduler.initialize(backend)
2755+
(backend, scheduler)
2756+
2757+
case "yarn" if deployMode == "client" =>
2758+
val scheduler = try {
2759+
val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler")
2760+
val cons = clazz.getConstructor(classOf[SparkContext])
2761+
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
2762+
2763+
} catch {
2764+
case e: Exception =>
2765+
throw new SparkException("YARN mode not available ?", e)
2766+
}
2767+
2768+
val backend = try {
2769+
val clazz =
2770+
Utils.classForName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
2771+
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
2772+
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
2773+
} catch {
2774+
case e: Exception =>
2775+
throw new SparkException("YARN mode not available ?", e)
2776+
}
2777+
2778+
scheduler.initialize(backend)
2779+
(backend, scheduler)
2780+
2781+
case MESOS_REGEX(mesosUrl) =>
2782+
MesosNativeLibrary.load()
2783+
val scheduler = new TaskSchedulerImpl(sc)
2784+
val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)
2785+
val backend = if (coarseGrained) {
2786+
new CoarseMesosSchedulerBackend(scheduler, sc, mesosUrl, sc.env.securityManager)
2787+
} else {
2788+
new MesosSchedulerBackend(scheduler, sc, mesosUrl)
2789+
}
2790+
scheduler.initialize(backend)
2791+
(backend, scheduler)
2792+
2793+
case zkUrl if zkUrl.startsWith("zk://") =>
2794+
logWarning("Master URL for a multi-master Mesos cluster managed by ZooKeeper should be " +
2795+
"in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.")
2796+
createTaskScheduler(sc, "mesos://" + zkUrl, deployMode)
2797+
27342798
case masterUrl =>
27352799
val cm = getClusterManager(masterUrl) match {
27362800
case Some(clusterMgr) => clusterMgr

core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,4 +129,34 @@ class SparkContextSchedulerCreationSuite
129129
case _ => fail()
130130
}
131131
}
132+
133+
def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) {
134+
val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString)
135+
try {
136+
val sched = createTaskScheduler(master, "client", conf)
137+
assert(sched.backend.getClass === expectedClass)
138+
} catch {
139+
case e: UnsatisfiedLinkError =>
140+
assert(e.getMessage.contains("mesos"))
141+
logWarning("Mesos not available, could not test actual Mesos scheduler creation")
142+
case e: Throwable => fail(e)
143+
}
144+
}
145+
146+
test("mesos fine-grained") {
147+
testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend], coarse = false)
148+
}
149+
150+
test("mesos coarse-grained") {
151+
testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend], coarse = true)
152+
}
153+
154+
test("mesos with zookeeper") {
155+
testMesos("mesos://zk://localhost:1234,localhost:2345",
156+
classOf[MesosSchedulerBackend], coarse = false)
157+
}
158+
159+
test("mesos with zookeeper and Master URL starting with zk://") {
160+
testMesos("zk://localhost:1234,localhost:2345", classOf[MesosSchedulerBackend], coarse = false)
161+
}
132162
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.apache.spark.scheduler.cluster.YarnClusterManager
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler.cluster
19+
20+
import org.apache.spark.{SparkContext, SparkException}
21+
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
22+
23+
/**
24+
* Cluster Manager for creation of Yarn scheduler and backend
25+
*/
26+
private[spark] class YarnClusterManager extends ExternalClusterManager {
27+
28+
override def canCreate(masterURL: String): Boolean = {
29+
masterURL == "yarn"
30+
}
31+
32+
override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
33+
sc.deployMode match {
34+
case "cluster" => new YarnClusterScheduler(sc)
35+
case "client" => new YarnScheduler(sc)
36+
case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
37+
}
38+
}
39+
40+
override def createSchedulerBackend(sc: SparkContext,
41+
masterURL: String,
42+
scheduler: TaskScheduler): SchedulerBackend = {
43+
sc.deployMode match {
44+
case "cluster" =>
45+
new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
46+
case "client" =>
47+
new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
48+
case _ =>
49+
throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
50+
}
51+
}
52+
53+
override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
54+
scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
55+
}
56+
}

0 commit comments

Comments
 (0)