Skip to content

Commit 800834f

Browse files
author
Hemant Bhanawat
committed
This commit adds support for pluggable cluster manager. And also allows a cluster manager to clean up tasks without taking the parent process down.
To plug a new external cluster manager, ExternalClusterManager trait should be implemented. It returns task scheduler and backend scheduler that will be used by SparkContext to schedule tasks. An external cluster manager is registered using the java.util.ServiceLoader mechanism (This mechanism is also being used to register data sources like parquet, json, jdbc etc.). This allows auto-loading implementations of ExternalClusterManager interface. Currently, when a driver fails, executors exit using system.exit. This does not bode well for cluster managers that would like to reuse the parent process of an executor. Hence, 1. Moving system.exit to a function that can be overriden in subclasses of CoarseGrainedExecutorBackend. 2. Added functionality of killing all the running tasks in an executor.
1 parent e76679a commit 800834f

File tree

7 files changed

+171
-9
lines changed

7 files changed

+171
-9
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark
2020
import java.io._
2121
import java.lang.reflect.Constructor
2222
import java.net.URI
23-
import java.util.{Arrays, Properties, UUID}
23+
import java.util.{Arrays, Properties, ServiceLoader, UUID}
2424
import java.util.concurrent.ConcurrentMap
2525
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
2626

@@ -2443,8 +2443,34 @@ object SparkContext extends Logging {
24432443
"in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.")
24442444
createTaskScheduler(sc, "mesos://" + zkUrl, deployMode)
24452445

2446-
case _ =>
2447-
throw new SparkException("Could not parse Master URL: '" + master + "'")
2446+
case masterUrl =>
2447+
val cm = getClusterManager(masterUrl) match {
2448+
case Some(clusterMgr) => clusterMgr
2449+
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
2450+
}
2451+
try {
2452+
val scheduler = cm.createTaskScheduler(sc)
2453+
val backend = cm.createSchedulerBackend(sc, scheduler)
2454+
cm.initialize(scheduler, backend)
2455+
(backend, scheduler)
2456+
} catch {
2457+
case e: Exception => {
2458+
throw new SparkException("External scheduler cannot be instantiated", e)
2459+
}
2460+
}
2461+
}
2462+
}
2463+
2464+
private def getClusterManager(url: String): Option[ExternalClusterManager] = {
2465+
val loader = Utils.getContextOrSparkClassLoader
2466+
val serviceLoader = ServiceLoader.load(classOf[ExternalClusterManager], loader)
2467+
2468+
serviceLoader.asScala.filter(_.canCreate(url)).toList match {
2469+
// exactly one registered manager
2470+
case head :: Nil => Some(head)
2471+
case Nil => None
2472+
case multipleMgrs => sys.error(s"Multiple Cluster Managers registered " +
2473+
s"for the url $url")
24482474
}
24492475
}
24502476
}

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ private[spark] class CoarseGrainedExecutorBackend(
6464
}
6565
case Failure(e) => {
6666
logError(s"Cannot register with driver: $driverUrl", e)
67-
System.exit(1)
67+
exitExecutor()
6868
}
6969
}(ThreadUtils.sameThread)
7070
}
@@ -82,12 +82,12 @@ private[spark] class CoarseGrainedExecutorBackend(
8282

8383
case RegisterExecutorFailed(message) =>
8484
logError("Slave registration failed: " + message)
85-
System.exit(1)
85+
exitExecutor()
8686

8787
case LaunchTask(data) =>
8888
if (executor == null) {
8989
logError("Received LaunchTask command but executor was null")
90-
System.exit(1)
90+
exitExecutor()
9191
} else {
9292
val taskDesc = ser.deserialize[TaskDescription](data.value)
9393
logInfo("Got assigned task " + taskDesc.taskId)
@@ -98,7 +98,7 @@ private[spark] class CoarseGrainedExecutorBackend(
9898
case KillTask(taskId, _, interruptThread) =>
9999
if (executor == null) {
100100
logError("Received KillTask command but executor was null")
101-
System.exit(1)
101+
exitExecutor()
102102
} else {
103103
executor.killTask(taskId, interruptThread)
104104
}
@@ -122,7 +122,7 @@ private[spark] class CoarseGrainedExecutorBackend(
122122
logInfo(s"Driver from $remoteAddress disconnected during shutdown")
123123
} else if (driver.exists(_.address == remoteAddress)) {
124124
logError(s"Driver $remoteAddress disassociated! Shutting down.")
125-
System.exit(1)
125+
exitExecutor()
126126
} else {
127127
logWarning(s"An unknown ($remoteAddress) driver disconnected.")
128128
}
@@ -135,6 +135,8 @@ private[spark] class CoarseGrainedExecutorBackend(
135135
case None => logWarning(s"Drop $msg because has not yet connected to driver")
136136
}
137137
}
138+
139+
def exitExecutor(): Unit = System.exit(1)
138140
}
139141

140142
private[spark] object CoarseGrainedExecutorBackend extends Logging {

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,14 @@ private[spark] class Executor(
149149
tr.kill(interruptThread)
150150
}
151151
}
152-
152+
def killAllTasks (interruptThread: Boolean) : Unit = {
153+
// kill all the running tasks
154+
for (taskRunner <- runningTasks.values().asScala) {
155+
if (taskRunner != null) {
156+
taskRunner.kill(interruptThread)
157+
}
158+
}
159+
}
153160
def stop(): Unit = {
154161
env.metricsSystem.report()
155162
heartbeater.shutdown()
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler
19+
20+
import org.apache.spark.SparkContext
21+
22+
/**
23+
* A cluster manager interface to plugin external scheduler.
24+
*
25+
*/
26+
trait ExternalClusterManager {
27+
28+
/**
29+
* Check if this cluster manager instance can create scheduler components
30+
* for a certain master URL.
31+
* @param masterURL the master URL
32+
* @return True if the cluster manager can create scheduler backend/
33+
*/
34+
def canCreate(masterURL : String): Boolean
35+
36+
/**
37+
* Create a task scheduler instance for the given SparkContext
38+
* @param sc SparkContext
39+
* @return TaskScheduler that will be responsible for task handling
40+
*/
41+
def createTaskScheduler (sc: SparkContext): TaskScheduler
42+
43+
/**
44+
* Create a scheduler backend for the given SparkContext and scheduler. This is
45+
* called after task scheduler is created using [[ExternalClusterManager.createTaskScheduler()]].
46+
* @param sc SparkContext
47+
* @param scheduler TaskScheduler that will be used with the scheduler backend.
48+
* @return SchedulerBackend that works with a TaskScheduler
49+
*/
50+
def createSchedulerBackend (sc: SparkContext, scheduler: TaskScheduler): SchedulerBackend
51+
52+
/**
53+
* Initialize task scheduler and backend scheduler. This is called after the
54+
* scheduler components are created
55+
* @param scheduler TaskScheduler that will be responsible for task handling
56+
* @param backend SchedulerBackend that works with a TaskScheduler
57+
*/
58+
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
59+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.apache.spark.scheduler.CheckExternalClusterManager
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler
19+
20+
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
21+
import org.apache.spark.executor.TaskMetrics
22+
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
23+
import org.apache.spark.storage.BlockManagerId
24+
25+
class ExternalClusterManagerSuite extends SparkFunSuite
26+
{
27+
test("launch of backend and scheduler") {
28+
val conf = new SparkConf().setMaster("myclusterManager").
29+
setAppName("testcm").set("spark.driver.allowMultipleContexts", "true")
30+
val sc = new SparkContext(conf)
31+
// check if the scheduler components are created
32+
assert(sc.schedulerBackend.isInstanceOf[FakeSchedulerBackend])
33+
assert(sc.taskScheduler.isInstanceOf[FakeScheduler])
34+
sc.stop
35+
}
36+
}
37+
38+
class CheckExternalClusterManager extends ExternalClusterManager {
39+
40+
def canCreate(masterURL: String): Boolean = masterURL == "myclusterManager"
41+
42+
def createTaskScheduler(sc: SparkContext): TaskScheduler = new FakeScheduler
43+
44+
def createSchedulerBackend(sc: SparkContext, scheduler: TaskScheduler): SchedulerBackend =
45+
new FakeSchedulerBackend()
46+
47+
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {}
48+
49+
}
50+
51+
class FakeScheduler extends TaskScheduler {
52+
override def rootPool: Pool = null
53+
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
54+
override def start(): Unit = {}
55+
override def stop(): Unit = {}
56+
override def submitTasks(taskSet: TaskSet): Unit = {}
57+
override def cancelTasks(stageId: Int, interruptThread: Boolean) {}
58+
override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
59+
override def defaultParallelism(): Int = 2
60+
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
61+
override def applicationAttemptId(): Option[String] = None
62+
def executorHeartbeatReceived(
63+
execId: String,
64+
accumUpdates: Array[(Long, Seq[AccumulableInfo])],
65+
blockManagerId: BlockManagerId): Boolean = true
66+
}

dev/.rat-excludes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,4 @@ LZ4BlockInputStream.java
9898
spark-deps-.*
9999
.*csv
100100
.*tsv
101+
org.apache.spark.scheduler.ExternalClusterManager

0 commit comments

Comments
 (0)