diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 8f5b97ccb1f8..0282ff0c5df9 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -72,7 +72,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( } private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false) - + private val executorGpus = conf.getInt("spark.mesos.executor.gpus", 0) private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0) private val taskLabels = conf.get("spark.mesos.task.labels", "") @@ -328,6 +328,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val offerAttributes = toAttributeMap(offer.getAttributesList) val offerMem = getResource(offer.getResourcesList, "mem") val offerCpus = getResource(offer.getResourcesList, "cpus") + val offerGpus = getResource(offer.getResourcesList, "gpus") val offerPorts = getRangeResource(offer.getResourcesList, "ports") val id = offer.getId.getValue @@ -335,17 +336,18 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val offerTasks = tasks(offer.getId) logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + - s"mem: $offerMem cpu: $offerCpus ports: $offerPorts." + + s"mem: $offerMem cpu: $offerCpus gpu: $offerGpus ports: $offerPorts." + s" Launching ${offerTasks.size} Mesos tasks.") for (task <- offerTasks) { val taskId = task.getTaskId val mem = getResource(task.getResourcesList, "mem") val cpus = getResource(task.getResourcesList, "cpus") + val gpus = getResource(task.getResourcesList, "gpus") val ports = getRangeResource(task.getResourcesList, "ports").mkString(",") logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus" + - s" ports: $ports") + s" gpu: $gpus ports: $ports") } driver.launchTasks( @@ -398,9 +400,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( launchTasks = true val taskId = newMesosTaskId() val offerCPUs = getResource(resources, "cpus").toInt - val taskGPUs = Math.min( - Math.max(0, maxGpus - totalGpusAcquired), getResource(resources, "gpus").toInt) - + val offerGPUs = getResource(resources, "gpus").toInt + var taskGPUs = executorGpus val taskCPUs = executorCores(offerCPUs) val taskMemory = executorMemory(sc) @@ -482,6 +483,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = { val offerMem = getResource(resources, "mem") val offerCPUs = getResource(resources, "cpus").toInt + val offerGPUs = getResource(resources, "gpus").toInt val cpus = executorCores(offerCPUs) val mem = executorMemory(sc) val ports = getRangeResource(resources, "ports") @@ -491,6 +493,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( cpus <= offerCPUs && cpus + totalCoresAcquired <= maxCores && mem <= offerMem && + executorGpus <= offerGPUs && + executorGpus + totalGpusAcquired <= maxGpus && numExecutors() < executorLimit && slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES && meetsPortRequirements diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 0418bfbaa5ed..1a515d64aa4b 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -165,18 +165,47 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite } - test("mesos does not acquire more than spark.mesos.gpus.max") { - val maxGpus = 5 - setBackend(Map("spark.mesos.gpus.max" -> maxGpus.toString)) + test("mesos acquires spark.mesos.executor.gpus number of gpus per executor") { + setBackend(Map("spark.mesos.gpus.max" -> "5", + "spark.mesos.executor.gpus" -> "2")) val executorMemory = backend.executorMemory(sc) - offerResources(List(Resources(executorMemory, 1, maxGpus + 1))) + offerResources(List(Resources(executorMemory, 1, 5))) val taskInfos = verifyTaskLaunched(driver, "o1") assert(taskInfos.length == 1) val gpus = backend.getResource(taskInfos.head.getResourcesList, "gpus") - assert(gpus == maxGpus) + assert(gpus == 2) + } + + + test("mesos declines offers that cannot satisfy spark.mesos.executor.gpus") { + setBackend(Map("spark.mesos.gpus.max" -> "5", + "spark.mesos.executor.gpus" -> "2")) + + val executorMemory = backend.executorMemory(sc) + offerResources(List(Resources(executorMemory, 1, 1))) + verifyDeclinedOffer(driver, createOfferId("o1")) + } + + + test("mesos declines offers that exceed spark.mesos.gpus.max") { + setBackend(Map("spark.mesos.gpus.max" -> "5", + "spark.mesos.executor.gpus" -> "2")) + + val executorMemory = backend.executorMemory(sc) + offerResources(List(Resources(executorMemory, 1, 2), + Resources(executorMemory, 1, 2), + Resources(executorMemory, 1, 2))) + + val taskInfos1 = verifyTaskLaunched(driver, "o1") + assert(backend.getResource(taskInfos1.head.getResourcesList, "gpus") == 2) + + val taskInfos2 = verifyTaskLaunched(driver, "o2") + assert(backend.getResource(taskInfos2.head.getResourcesList, "gpus") == 2) + + verifyDeclinedOffer(driver, createOfferId("o3")) }