diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 2a36ec4fa811..8f5b97ccb1f8 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -60,8 +60,16 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private val maxCoresOption = conf.getOption("spark.cores.max").map(_.toInt) + private val executorCoresOption = conf.getOption("spark.executor.cores").map(_.toInt) + + private val minCoresPerExecutor = executorCoresOption.getOrElse(1) + // Maximum number of cores to acquire - private val maxCores = maxCoresOption.getOrElse(Int.MaxValue) + private val maxCores = { + val cores = maxCoresOption.getOrElse(Int.MaxValue) + // Set maxCores to a multiple of smallest executor we can launch + cores - (cores % minCoresPerExecutor) + } private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false) @@ -489,8 +497,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( } private def executorCores(offerCPUs: Int): Int = { - sc.conf.getInt("spark.executor.cores", - math.min(offerCPUs, maxCores - totalCoresAcquired)) + executorCoresOption.getOrElse( + math.min(offerCPUs, maxCores - totalCoresAcquired) + ) } override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: TaskStatus) {