Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark
import java.io._
import java.lang.reflect.Constructor
import java.net.URI
import java.util.{Arrays, Properties, UUID}
import java.util.{Arrays, Properties, ServiceLoader, UUID}
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}

Expand Down Expand Up @@ -2443,8 +2443,34 @@ object SparkContext extends Logging {
"in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.")
createTaskScheduler(sc, "mesos://" + zkUrl, deployMode)

case _ =>
throw new SparkException("Could not parse Master URL: '" + master + "'")
case masterUrl =>
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
}
try {
val scheduler = cm.createTaskScheduler(sc)
val backend = cm.createSchedulerBackend(sc, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)
} catch {
case e: Exception => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

catch NonFatal here, i.e.

case NonFatal(e) =>

throw new SparkException("External scheduler cannot be instantiated", e)
}
}
}
}

private def getClusterManager(url: String): Option[ExternalClusterManager] = {
val loader = Utils.getContextOrSparkClassLoader
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe

val loader = Utils.getContextOrSparkClassLoader
val serviceLoaders = ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
if (serviceLoaders.size > 1) {
  throw new SparkException("Multiple Cluster Managers ($multipleMgrs) registered for the URL $url")
}
serviceLoaders.headOption

val serviceLoader = ServiceLoader.load(classOf[ExternalClusterManager], loader)

serviceLoader.asScala.filter(_.canCreate(url)).toList match {
// exactly one registered manager
case head :: Nil => Some(head)
case Nil => None
case multipleMgrs => sys.error(s"Multiple Cluster Managers registered " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you include the list of matching cluster managers in the message ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

s"for the url $url")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private[spark] class CoarseGrainedExecutorBackend(
}
case Failure(e) => {
logError(s"Cannot register with driver: $driverUrl", e)
System.exit(1)
exitExecutor()
}
}(ThreadUtils.sameThread)
}
Expand All @@ -82,12 +82,12 @@ private[spark] class CoarseGrainedExecutorBackend(

case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
System.exit(1)
exitExecutor()

case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
exitExecutor()
} else {
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
Expand All @@ -98,7 +98,7 @@ private[spark] class CoarseGrainedExecutorBackend(
case KillTask(taskId, _, interruptThread) =>
if (executor == null) {
logError("Received KillTask command but executor was null")
System.exit(1)
exitExecutor()
} else {
executor.killTask(taskId, interruptThread)
}
Expand All @@ -122,7 +122,7 @@ private[spark] class CoarseGrainedExecutorBackend(
logInfo(s"Driver from $remoteAddress disconnected during shutdown")
} else if (driver.exists(_.address == remoteAddress)) {
logError(s"Driver $remoteAddress disassociated! Shutting down.")
System.exit(1)
exitExecutor()
} else {
logWarning(s"An unknown ($remoteAddress) driver disconnected.")
}
Expand All @@ -135,6 +135,8 @@ private[spark] class CoarseGrainedExecutorBackend(
case None => logWarning(s"Drop $msg because has not yet connected to driver")
}
}

def exitExecutor(): Unit = System.exit(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some document here saying why this is a public API - otherwise somebody might come in and remove this next time this code gets used.

}

private[spark] object CoarseGrainedExecutorBackend extends Logging {
Expand Down
9 changes: 8 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,14 @@ private[spark] class Executor(
tr.kill(interruptThread)
}
}

def killAllTasks (interruptThread: Boolean) : Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a space between the methods

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not see this method being called from anywhere. If you don't plan to use it, please remove this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Done
  2. We are targeting a use case where in executors are launched inside another running process and when an executor goes down, it does not take the parent parent process down. In such cases, executors should kill the running tasks when they go down. Given that, the runningtasks a private val, we need a method that can be called from the executor backend.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here -- say why this is a public thing and how it is used.

also remove the space after killAllTasks

// kill all the running tasks
for (taskRunner <- runningTasks.values().asScala) {
if (taskRunner != null) {
taskRunner.kill(interruptThread)
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a space between the methods

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

def stop(): Unit = {
env.metricsSystem.report()
heartbeater.shutdown()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* A cluster manager interface to plugin external scheduler.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove extra newline

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

*/
@DeveloperApi
private[spark] trait ExternalClusterManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you sure you want this to be private spark?


/**
* Check if this cluster manager instance can create scheduler components
* for a certain master URL.
* @param masterURL the master URL
* @return True if the cluster manager can create scheduler backend/
*/
def canCreate(masterURL : String): Boolean

/**
* Create a task scheduler instance for the given SparkContext
* @param sc SparkContext
* @return TaskScheduler that will be responsible for task handling
*/
def createTaskScheduler (sc: SparkContext): TaskScheduler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also include the masterURL as a param ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added the master url. In the hindsight, I think it makes sense.


/**
* Create a scheduler backend for the given SparkContext and scheduler. This is
* called after task scheduler is created using [[ExternalClusterManager.createTaskScheduler()]].
* @param sc SparkContext
* @param scheduler TaskScheduler that will be used with the scheduler backend.
* @return SchedulerBackend that works with a TaskScheduler
*/
def createSchedulerBackend (sc: SparkContext, scheduler: TaskScheduler): SchedulerBackend
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also include the masterURL as a param ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


/**
* Initialize task scheduler and backend scheduler. This is called after the
* scheduler components are created
* @param scheduler TaskScheduler that will be responsible for task handling
* @param backend SchedulerBackend that works with a TaskScheduler
*/
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.scheduler.CheckExternalClusterManager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this file needed ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To instantiate the dummy cluster manager in the test.

Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId

class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext
{
test("launch of backend and scheduler") {
val conf = new SparkConf().setMaster("myclusterManager").
setAppName("testcm").set("spark.driver.allowMultipleContexts", "true")
sc = new SparkContext(conf)
// check if the scheduler components are created
assert(sc.schedulerBackend.isInstanceOf[FakeSchedulerBackend])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed this in the last review comments yesterday. I thought that FakeSchedulerBackend was in this same file and you could rename it but now I see that its from some other place.

While reading, it feels odd to have Fake* and then Dummy* test classes. I am not sure about the whats followed in Spark codebase. Couple options:

  • rename Dummy* classes => Fake_. Move all the Fake_ classes to a common test utils file for the module.
  • Instead of re-using FakeSchedulerBackend from another place, create a FakeSchedulerBackend here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I too missed it completely.
I think it wasn't a great idea in the first place to use FakeSchedulerBackend of some other class from maintenance perspective. I am going ahead with your option 2.

assert(sc.taskScheduler.isInstanceOf[FakeScheduler])
}
}

class CheckExternalClusterManager extends ExternalClusterManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to TestExternalClusterManager or DummyExternalClusterManager

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


def canCreate(masterURL: String): Boolean = masterURL == "myclusterManager"

def createTaskScheduler(sc: SparkContext): TaskScheduler = new FakeScheduler

def createSchedulerBackend(sc: SparkContext, scheduler: TaskScheduler): SchedulerBackend =
new FakeSchedulerBackend()

def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {}

}

class FakeScheduler extends TaskScheduler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To keep this consistent with the external CM declared above, you could rename this to TestTaskScheduler or DummyTaskScheduler

override def rootPool: Pool = null
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
override def start(): Unit = {}
override def stop(): Unit = {}
override def submitTasks(taskSet: TaskSet): Unit = {}
override def cancelTasks(stageId: Int, interruptThread: Boolean) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a Unit here for return type to be consistent

override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
override def defaultParallelism(): Int = 2
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def applicationAttemptId(): Option[String] = None
def executorHeartbeatReceived(
execId: String,
accumUpdates: Array[(Long, Seq[AccumulableInfo])],
blockManagerId: BlockManagerId): Boolean = true
}
1 change: 1 addition & 0 deletions dev/.rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,4 @@ LZ4BlockInputStream.java
spark-deps-.*
.*csv
.*tsv
org.apache.spark.scheduler.ExternalClusterManager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this needed ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rat throws an error for the files that do not have apache license. Hence excluding this file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The version of ExternalClusterManager in the PR has a license header in it. Can you remove this and try ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe there is some confusion. There is a file named org.apache.spark.scheduler.ExternalClusterManager in a test folder that is used by the ServiceLoader to launch the dummy external cluster manager. This file does not (and cannot) have a header and hence it needs to be excluded.