Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark
import java.io._
import java.lang.reflect.Constructor
import java.net.URI
import java.util.{Arrays, Properties, UUID}
import java.util.{Arrays, Properties, ServiceLoader, UUID}
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}

Expand Down Expand Up @@ -2453,9 +2453,32 @@ object SparkContext extends Logging {
"in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.")
createTaskScheduler(sc, "mesos://" + zkUrl, deployMode)

case _ =>
throw new SparkException("Could not parse Master URL: '" + master + "'")
case masterUrl =>
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
}
try {
val scheduler = cm.createTaskScheduler(sc, masterUrl)
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)
} catch {
case NonFatal(e) =>
throw new SparkException("External scheduler cannot be instantiated", e)
}
}
}

private def getClusterManager(url: String): Option[ExternalClusterManager] = {
val loader = Utils.getContextOrSparkClassLoader
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe

val loader = Utils.getContextOrSparkClassLoader
val serviceLoaders = ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
if (serviceLoaders.size > 1) {
  throw new SparkException("Multiple Cluster Managers ($multipleMgrs) registered for the URL $url")
}
serviceLoaders.headOption

val serviceLoaders =
ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
if (serviceLoaders.size > 1) {
throw new SparkException(s"Multiple Cluster Managers ($serviceLoaders) registered " +
s"for the url $url:")
}
serviceLoaders.headOption
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private[spark] class CoarseGrainedExecutorBackend(
// Always receive `true`. Just ignore it
case Failure(e) =>
logError(s"Cannot register with driver: $driverUrl", e)
System.exit(1)
exitExecutor()
}(ThreadUtils.sameThread)
}

Expand All @@ -81,12 +81,12 @@ private[spark] class CoarseGrainedExecutorBackend(

case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
System.exit(1)
exitExecutor()

case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
exitExecutor()
} else {
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
Expand All @@ -97,7 +97,7 @@ private[spark] class CoarseGrainedExecutorBackend(
case KillTask(taskId, _, interruptThread) =>
if (executor == null) {
logError("Received KillTask command but executor was null")
System.exit(1)
exitExecutor()
} else {
executor.killTask(taskId, interruptThread)
}
Expand Down Expand Up @@ -127,7 +127,7 @@ private[spark] class CoarseGrainedExecutorBackend(
logInfo(s"Driver from $remoteAddress disconnected during shutdown")
} else if (driver.exists(_.address == remoteAddress)) {
logError(s"Driver $remoteAddress disassociated! Shutting down.")
System.exit(1)
exitExecutor()
} else {
logWarning(s"An unknown ($remoteAddress) driver disconnected.")
}
Expand All @@ -140,6 +140,13 @@ private[spark] class CoarseGrainedExecutorBackend(
case None => logWarning(s"Drop $msg because has not yet connected to driver")
}
}

/**
* This function can be overloaded by other child classes to handle
* executor exits differently. For e.g. when an executor goes down,
* back-end may not want to take the parent process down.
*/
protected def exitExecutor(): Unit = System.exit(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should a parameter of exit code (int) be added to this method ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #12457

}

private[spark] object CoarseGrainedExecutorBackend extends Logging {
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,21 @@ private[spark] class Executor(
}
}

/**
* Function to kill the running tasks in an executor.
* This can be called by executor back-ends to kill the
* tasks instead of taking the JVM down.
* @param interruptThread whether to interrupt the task thread
*/
def killAllTasks(interruptThread: Boolean) : Unit = {
// kill all the running tasks
for (taskRunner <- runningTasks.values().asScala) {
if (taskRunner != null) {
taskRunner.kill(interruptThread)
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a space between the methods

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


def stop(): Unit = {
env.metricsSystem.report()
heartbeater.shutdown()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* A cluster manager interface to plugin external scheduler.
*/
@DeveloperApi
trait ExternalClusterManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i just realized most of the return types used in this class are private[spark], so your implementation of this interface would need to be in the spark package anyway. I'm going to add private[spark] to this when I merge.


/**
* Check if this cluster manager instance can create scheduler components
* for a certain master URL.
* @param masterURL the master URL
* @return True if the cluster manager can create scheduler backend/
*/
def canCreate(masterURL: String): Boolean

/**
* Create a task scheduler instance for the given SparkContext
* @param sc SparkContext
* @param masterURL the master URL
* @return TaskScheduler that will be responsible for task handling
*/
def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler

/**
* Create a scheduler backend for the given SparkContext and scheduler. This is
* called after task scheduler is created using [[ExternalClusterManager.createTaskScheduler()]].
* @param sc SparkContext
* @param masterURL the master URL
* @param scheduler TaskScheduler that will be used with the scheduler backend.
* @return SchedulerBackend that works with a TaskScheduler
*/
def createSchedulerBackend(sc: SparkContext,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the way we indent is

def createSchedulerBackend(
    sc: SparkContext,
    masterURL: String,
    scheduler: TaskScheduler): SchedulerBackend

masterURL: String,
scheduler: TaskScheduler): SchedulerBackend

/**
* Initialize task scheduler and backend scheduler. This is called after the
* scheduler components are created
* @param scheduler TaskScheduler that will be responsible for task handling
* @param backend SchedulerBackend that works with a TaskScheduler
*/
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.scheduler.DummyExternalClusterManager
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId

class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext
{
test("launch of backend and scheduler") {
val conf = new SparkConf().setMaster("myclusterManager").
setAppName("testcm").set("spark.driver.allowMultipleContexts", "true")
sc = new SparkContext(conf)
// check if the scheduler components are created
assert(sc.schedulerBackend.isInstanceOf[DummySchedulerBackend])
assert(sc.taskScheduler.isInstanceOf[DummyTaskScheduler])
}
}

private class DummyExternalClusterManager extends ExternalClusterManager {

def canCreate(masterURL: String): Boolean = masterURL == "myclusterManager"

def createTaskScheduler(sc: SparkContext,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you fix the indent for the functions?

masterURL: String): TaskScheduler = new DummyTaskScheduler

def createSchedulerBackend(sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend = new DummySchedulerBackend()

def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {}

}

private class DummySchedulerBackend extends SchedulerBackend {
def start() {}
def stop() {}
def reviveOffers() {}
def defaultParallelism(): Int = 1
}

private class DummyTaskScheduler extends TaskScheduler {
override def rootPool: Pool = null
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
override def start(): Unit = {}
override def stop(): Unit = {}
override def submitTasks(taskSet: TaskSet): Unit = {}
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = {}
override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
override def defaultParallelism(): Int = 2
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def applicationAttemptId(): Option[String] = None
def executorHeartbeatReceived(
execId: String,
accumUpdates: Array[(Long, Seq[AccumulableInfo])],
blockManagerId: BlockManagerId): Boolean = true
}
1 change: 1 addition & 0 deletions dev/.rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,4 @@ LZ4BlockInputStream.java
spark-deps-.*
.*csv
.*tsv
org.apache.spark.scheduler.ExternalClusterManager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this needed ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rat throws an error for the files that do not have apache license. Hence excluding this file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The version of ExternalClusterManager in the PR has a license header in it. Can you remove this and try ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe there is some confusion. There is a file named org.apache.spark.scheduler.ExternalClusterManager in a test folder that is used by the ServiceLoader to launch the dummy external cluster manager. This file does not (and cannot) have a header and hence it needs to be excluded.