Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,17 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
with org.apache.mesos.Scheduler
with MesosSchedulerUtils {

val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
// Blacklist a slave after this many failures
private val MAX_SLAVE_FAILURES = 2

// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
private val maxCoresOption = conf.getOption("spark.cores.max").map(_.toInt)

val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)
// Maximum number of cores to acquire
private val maxCores = maxCoresOption.getOrElse(Int.MaxValue)

val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)

private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)

private[this] val shutdownTimeoutMS =
conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s")
Expand All @@ -75,10 +78,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)

// Cores we have acquired with each Mesos task ID
val coresByTaskId = new mutable.HashMap[String, Int]
val gpusByTaskId = new mutable.HashMap[String, Int]
var totalCoresAcquired = 0
var totalGpusAcquired = 0
private val coresByTaskId = new mutable.HashMap[String, Int]
private val gpusByTaskId = new mutable.HashMap[String, Int]
private var totalCoresAcquired = 0
private var totalGpusAcquired = 0

// SlaveID -> Slave
// This map accumulates entries for the duration of the job. Slaves are never deleted, because
Expand Down Expand Up @@ -108,7 +111,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
// may lead to deadlocks since the superclass might also try to lock
private val stateLock = new ReentrantLock

val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0)
private val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0)

// Offer constraints
private val slaveOfferConstraints =
Expand Down Expand Up @@ -139,7 +142,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
securityManager.isAuthenticationEnabled())
}

var nextMesosTaskId = 0
private var nextMesosTaskId = 0

@volatile var appId: String = _

Expand Down Expand Up @@ -256,7 +259,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}

override def sufficientResourcesRegistered(): Boolean = {
totalCoresAcquired >= maxCores * minRegisteredRatio
totalCoreCount.get >= maxCoresOption.getOrElse(0) * minRegisteredRatio
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only substantive change.

Copy link
Contributor

@skonto skonto Feb 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify stuff. totalCoreCount holds the total number of cores in the cluster registered by executors connected back to the scheduler. So as soon as you have the all executors connected you can start the tasks, instead of gradually registering executors and running tasks on them.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep.

}

override def disconnected(d: org.apache.mesos.SchedulerDriver) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ package org.apache.spark.scheduler.cluster.mesos
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.concurrent.Promise
import scala.reflect.ClassTag

import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
Expand All @@ -37,8 +35,8 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.internal.config._
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RemoveExecutor}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.mesos.Utils._

Expand Down Expand Up @@ -304,25 +302,29 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
}

test("weburi is set in created scheduler driver") {
setBackend()
initializeSparkConf()
sc = new SparkContext(sparkConf)

val taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)

val driver = mock[SchedulerDriver]
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)

val securityManager = mock[SecurityManager]

val backend = new MesosCoarseGrainedSchedulerBackend(
taskScheduler, sc, "master", securityManager) {
taskScheduler, sc, "master", securityManager) {
override protected def createSchedulerDriver(
masterUrl: String,
scheduler: Scheduler,
sparkUser: String,
appName: String,
conf: SparkConf,
webuiUrl: Option[String] = None,
checkpoint: Option[Boolean] = None,
failoverTimeout: Option[Double] = None,
frameworkId: Option[String] = None): SchedulerDriver = {
masterUrl: String,
scheduler: Scheduler,
sparkUser: String,
appName: String,
conf: SparkConf,
webuiUrl: Option[String] = None,
checkpoint: Option[Boolean] = None,
failoverTimeout: Option[Double] = None,
frameworkId: Option[String] = None): SchedulerDriver = {
markRegistered()
assert(webuiUrl.isDefined)
assert(webuiUrl.get.equals("http://webui"))
Expand Down Expand Up @@ -419,37 +421,11 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(!dockerInfo.getForcePullImage)
}

test("Do not call removeExecutor() after backend is stopped") {
setBackend()

// launches a task on a valid offer
val offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
verifyTaskLaunched(driver, "o1")

// launches a thread simulating status update
val statusUpdateThread = new Thread {
override def run(): Unit = {
while (!stopCalled) {
Thread.sleep(100)
}

val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED)
backend.statusUpdate(driver, status)
}
}.start

backend.stop()
// Any method of the backend involving sending messages to the driver endpoint should not
// be called after the backend is stopped.
verify(driverEndpoint, never()).askSync(isA(classOf[RemoveExecutor]))(any[ClassTag[_]])
}

test("mesos supports spark.executor.uri") {
val url = "spark.spark.spark.com"
setBackend(Map(
"spark.executor.uri" -> url
), false)
), null)

val (mem, cpu) = (backend.executorMemory(sc), 4)

Expand All @@ -465,7 +441,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
setBackend(Map(
"spark.mesos.fetcherCache.enable" -> "true",
"spark.executor.uri" -> url
), false)
), null)
val offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
val launchedTasks = verifyTaskLaunched(driver, "o1")
Expand All @@ -479,7 +455,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
setBackend(Map(
"spark.mesos.fetcherCache.enable" -> "false",
"spark.executor.uri" -> url
), false)
), null)
val offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
val launchedTasks = verifyTaskLaunched(driver, "o1")
Expand All @@ -504,8 +480,31 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(networkInfos.get(0).getName == "test-network-name")
}

test("supports spark.scheduler.minRegisteredResourcesRatio") {
val expectedCores = 1
setBackend(Map(
"spark.cores.max" -> expectedCores.toString,
"spark.scheduler.minRegisteredResourcesRatio" -> "1.0"))

val offers = List(Resources(backend.executorMemory(sc), expectedCores))
offerResources(offers)
val launchedTasks = verifyTaskLaunched(driver, "o1")
assert(!backend.isReady)

registerMockExecutor(launchedTasks(0).getTaskId.getValue, "s1", expectedCores)
assert(backend.isReady)
}

private case class Resources(mem: Int, cpus: Int, gpus: Int = 0)

private def registerMockExecutor(executorId: String, slaveId: String, cores: Integer) = {
val mockEndpointRef = mock[RpcEndpointRef]
val mockAddress = mock[RpcAddress]
val message = RegisterExecutor(executorId, mockEndpointRef, slaveId, cores, Map.empty)

backend.driverEndpoint.askSync[Boolean](message)
}

private def verifyDeclinedOffer(driver: SchedulerDriver,
offerId: OfferID,
filter: Boolean = false): Unit = {
Expand Down Expand Up @@ -534,8 +533,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
private def createSchedulerBackend(
taskScheduler: TaskSchedulerImpl,
driver: SchedulerDriver,
shuffleClient: MesosExternalShuffleClient,
endpoint: RpcEndpointRef): MesosCoarseGrainedSchedulerBackend = {
shuffleClient: MesosExternalShuffleClient) = {
val securityManager = mock[SecurityManager]

val backend = new MesosCoarseGrainedSchedulerBackend(
Expand All @@ -553,9 +551,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite

override protected def getShuffleClient(): MesosExternalShuffleClient = shuffleClient

override protected def createDriverEndpointRef(
properties: ArrayBuffer[(String, String)]): RpcEndpointRef = endpoint

// override to avoid race condition with the driver thread on `mesosDriver`
override def startScheduler(newDriver: SchedulerDriver): Unit = {
mesosDriver = newDriver
Expand All @@ -571,31 +566,35 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
backend
}

private def setBackend(sparkConfVars: Map[String, String] = null,
setHome: Boolean = true) {
private def initializeSparkConf(
sparkConfVars: Map[String, String] = null,
home: String = "/path"): Unit = {
sparkConf = (new SparkConf)
.setMaster("local[*]")
.setAppName("test-mesos-dynamic-alloc")
.set("spark.mesos.driver.webui.url", "http://webui")

if (setHome) {
sparkConf.setSparkHome("/path")
if (home != null) {
sparkConf.setSparkHome(home)
}

if (sparkConfVars != null) {
sparkConf.setAll(sparkConfVars)
}
}

private def setBackend(sparkConfVars: Map[String, String] = null, home: String = "/path") {
initializeSparkConf(sparkConfVars, home)
sc = new SparkContext(sparkConf)

driver = mock[SchedulerDriver]
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)

taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)

externalShuffleClient = mock[MesosExternalShuffleClient]
driverEndpoint = mock[RpcEndpointRef]
when(driverEndpoint.ask(any())(any())).thenReturn(Promise().future)

backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient, driverEndpoint)
backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient)
}
}