Skip to content

Commit af1f4da

Browse files
Hemant Bhanawatrxin
authored andcommitted
[SPARK-13904][SCHEDULER] Add support for pluggable cluster manager
## What changes were proposed in this pull request? This commit adds support for pluggable cluster manager. And also allows a cluster manager to clean up tasks without taking the parent process down. To plug a new external cluster manager, ExternalClusterManager trait should be implemented. It returns task scheduler and backend scheduler that will be used by SparkContext to schedule tasks. An external cluster manager is registered using the java.util.ServiceLoader mechanism (This mechanism is also being used to register data sources like parquet, json, jdbc etc.). This allows auto-loading implementations of ExternalClusterManager interface. Currently, when a driver fails, executors exit using system.exit. This does not bode well for cluster managers that would like to reuse the parent process of an executor. Hence, 1. Moving system.exit to a function that can be overriden in subclasses of CoarseGrainedExecutorBackend. 2. Added functionality of killing all the running tasks in an executor. ## How was this patch tested? ExternalClusterManagerSuite.scala was added to test this patch. Author: Hemant Bhanawat <[email protected]> Closes #11723 from hbhanawat/pluggableScheduler.
1 parent 3394b12 commit af1f4da

File tree

7 files changed

+193
-8
lines changed

7 files changed

+193
-8
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark
2020
import java.io._
2121
import java.lang.reflect.Constructor
2222
import java.net.URI
23-
import java.util.{Arrays, Properties, UUID}
23+
import java.util.{Arrays, Properties, ServiceLoader, UUID}
2424
import java.util.concurrent.ConcurrentMap
2525
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
2626

@@ -2453,9 +2453,32 @@ object SparkContext extends Logging {
24532453
"in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.")
24542454
createTaskScheduler(sc, "mesos://" + zkUrl, deployMode)
24552455

2456-
case _ =>
2457-
throw new SparkException("Could not parse Master URL: '" + master + "'")
2456+
case masterUrl =>
2457+
val cm = getClusterManager(masterUrl) match {
2458+
case Some(clusterMgr) => clusterMgr
2459+
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
2460+
}
2461+
try {
2462+
val scheduler = cm.createTaskScheduler(sc, masterUrl)
2463+
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
2464+
cm.initialize(scheduler, backend)
2465+
(backend, scheduler)
2466+
} catch {
2467+
case NonFatal(e) =>
2468+
throw new SparkException("External scheduler cannot be instantiated", e)
2469+
}
2470+
}
2471+
}
2472+
2473+
private def getClusterManager(url: String): Option[ExternalClusterManager] = {
2474+
val loader = Utils.getContextOrSparkClassLoader
2475+
val serviceLoaders =
2476+
ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
2477+
if (serviceLoaders.size > 1) {
2478+
throw new SparkException(s"Multiple Cluster Managers ($serviceLoaders) registered " +
2479+
s"for the url $url:")
24582480
}
2481+
serviceLoaders.headOption
24592482
}
24602483
}
24612484

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ private[spark] class CoarseGrainedExecutorBackend(
6464
// Always receive `true`. Just ignore it
6565
case Failure(e) =>
6666
logError(s"Cannot register with driver: $driverUrl", e)
67-
System.exit(1)
67+
exitExecutor()
6868
}(ThreadUtils.sameThread)
6969
}
7070

@@ -81,12 +81,12 @@ private[spark] class CoarseGrainedExecutorBackend(
8181

8282
case RegisterExecutorFailed(message) =>
8383
logError("Slave registration failed: " + message)
84-
System.exit(1)
84+
exitExecutor()
8585

8686
case LaunchTask(data) =>
8787
if (executor == null) {
8888
logError("Received LaunchTask command but executor was null")
89-
System.exit(1)
89+
exitExecutor()
9090
} else {
9191
val taskDesc = ser.deserialize[TaskDescription](data.value)
9292
logInfo("Got assigned task " + taskDesc.taskId)
@@ -97,7 +97,7 @@ private[spark] class CoarseGrainedExecutorBackend(
9797
case KillTask(taskId, _, interruptThread) =>
9898
if (executor == null) {
9999
logError("Received KillTask command but executor was null")
100-
System.exit(1)
100+
exitExecutor()
101101
} else {
102102
executor.killTask(taskId, interruptThread)
103103
}
@@ -127,7 +127,7 @@ private[spark] class CoarseGrainedExecutorBackend(
127127
logInfo(s"Driver from $remoteAddress disconnected during shutdown")
128128
} else if (driver.exists(_.address == remoteAddress)) {
129129
logError(s"Driver $remoteAddress disassociated! Shutting down.")
130-
System.exit(1)
130+
exitExecutor()
131131
} else {
132132
logWarning(s"An unknown ($remoteAddress) driver disconnected.")
133133
}
@@ -140,6 +140,13 @@ private[spark] class CoarseGrainedExecutorBackend(
140140
case None => logWarning(s"Drop $msg because has not yet connected to driver")
141141
}
142142
}
143+
144+
/**
145+
* This function can be overloaded by other child classes to handle
146+
* executor exits differently. For e.g. when an executor goes down,
147+
* back-end may not want to take the parent process down.
148+
*/
149+
protected def exitExecutor(): Unit = System.exit(1)
143150
}
144151

145152
private[spark] object CoarseGrainedExecutorBackend extends Logging {

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,21 @@ private[spark] class Executor(
153153
}
154154
}
155155

156+
/**
157+
* Function to kill the running tasks in an executor.
158+
* This can be called by executor back-ends to kill the
159+
* tasks instead of taking the JVM down.
160+
* @param interruptThread whether to interrupt the task thread
161+
*/
162+
def killAllTasks(interruptThread: Boolean) : Unit = {
163+
// kill all the running tasks
164+
for (taskRunner <- runningTasks.values().asScala) {
165+
if (taskRunner != null) {
166+
taskRunner.kill(interruptThread)
167+
}
168+
}
169+
}
170+
156171
def stop(): Unit = {
157172
env.metricsSystem.report()
158173
heartbeater.shutdown()
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler
19+
20+
import org.apache.spark.SparkContext
21+
import org.apache.spark.annotation.DeveloperApi
22+
23+
/**
24+
* :: DeveloperApi ::
25+
* A cluster manager interface to plugin external scheduler.
26+
*/
27+
@DeveloperApi
28+
trait ExternalClusterManager {
29+
30+
/**
31+
* Check if this cluster manager instance can create scheduler components
32+
* for a certain master URL.
33+
* @param masterURL the master URL
34+
* @return True if the cluster manager can create scheduler backend/
35+
*/
36+
def canCreate(masterURL: String): Boolean
37+
38+
/**
39+
* Create a task scheduler instance for the given SparkContext
40+
* @param sc SparkContext
41+
* @param masterURL the master URL
42+
* @return TaskScheduler that will be responsible for task handling
43+
*/
44+
def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler
45+
46+
/**
47+
* Create a scheduler backend for the given SparkContext and scheduler. This is
48+
* called after task scheduler is created using [[ExternalClusterManager.createTaskScheduler()]].
49+
* @param sc SparkContext
50+
* @param masterURL the master URL
51+
* @param scheduler TaskScheduler that will be used with the scheduler backend.
52+
* @return SchedulerBackend that works with a TaskScheduler
53+
*/
54+
def createSchedulerBackend(sc: SparkContext,
55+
masterURL: String,
56+
scheduler: TaskScheduler): SchedulerBackend
57+
58+
/**
59+
* Initialize task scheduler and backend scheduler. This is called after the
60+
* scheduler components are created
61+
* @param scheduler TaskScheduler that will be responsible for task handling
62+
* @param backend SchedulerBackend that works with a TaskScheduler
63+
*/
64+
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
65+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.apache.spark.scheduler.DummyExternalClusterManager
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler
19+
20+
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
21+
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
22+
import org.apache.spark.storage.BlockManagerId
23+
24+
class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext
25+
{
26+
test("launch of backend and scheduler") {
27+
val conf = new SparkConf().setMaster("myclusterManager").
28+
setAppName("testcm").set("spark.driver.allowMultipleContexts", "true")
29+
sc = new SparkContext(conf)
30+
// check if the scheduler components are created
31+
assert(sc.schedulerBackend.isInstanceOf[DummySchedulerBackend])
32+
assert(sc.taskScheduler.isInstanceOf[DummyTaskScheduler])
33+
}
34+
}
35+
36+
private class DummyExternalClusterManager extends ExternalClusterManager {
37+
38+
def canCreate(masterURL: String): Boolean = masterURL == "myclusterManager"
39+
40+
def createTaskScheduler(sc: SparkContext,
41+
masterURL: String): TaskScheduler = new DummyTaskScheduler
42+
43+
def createSchedulerBackend(sc: SparkContext,
44+
masterURL: String,
45+
scheduler: TaskScheduler): SchedulerBackend = new DummySchedulerBackend()
46+
47+
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {}
48+
49+
}
50+
51+
private class DummySchedulerBackend extends SchedulerBackend {
52+
def start() {}
53+
def stop() {}
54+
def reviveOffers() {}
55+
def defaultParallelism(): Int = 1
56+
}
57+
58+
private class DummyTaskScheduler extends TaskScheduler {
59+
override def rootPool: Pool = null
60+
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
61+
override def start(): Unit = {}
62+
override def stop(): Unit = {}
63+
override def submitTasks(taskSet: TaskSet): Unit = {}
64+
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = {}
65+
override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
66+
override def defaultParallelism(): Int = 2
67+
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
68+
override def applicationAttemptId(): Option[String] = None
69+
def executorHeartbeatReceived(
70+
execId: String,
71+
accumUpdates: Array[(Long, Seq[AccumulableInfo])],
72+
blockManagerId: BlockManagerId): Boolean = true
73+
}

dev/.rat-excludes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,4 @@ LZ4BlockInputStream.java
9898
spark-deps-.*
9999
.*csv
100100
.*tsv
101+
org.apache.spark.scheduler.ExternalClusterManager

0 commit comments

Comments
 (0)