Skip to content

Commit 7633933

Browse files
dgshepdbtsai
authored andcommitted
[SPARK-20483] Mesos Coarse mode may starve other Mesos frameworks
## What changes were proposed in this pull request? Set maxCores to be a multiple of the smallest executor that can be launched. This ensures that we correctly detect the condition where no more executors will be launched when spark.cores.max is not a multiple of spark.executor.cores ## How was this patch tested? This was manually tested with other sample frameworks measuring their incoming offers to determine if starvation would occur. dbtsai mgummelt Author: Davis Shepherd <[email protected]> Closes #17786 from dgshep/fix_mesos_max_cores.
1 parent ba76662 commit 7633933

File tree

1 file changed

+12
-3
lines changed

1 file changed

+12
-3
lines changed

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,16 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
6060

6161
private val maxCoresOption = conf.getOption("spark.cores.max").map(_.toInt)
6262

63+
private val executorCoresOption = conf.getOption("spark.executor.cores").map(_.toInt)
64+
65+
private val minCoresPerExecutor = executorCoresOption.getOrElse(1)
66+
6367
// Maximum number of cores to acquire
64-
private val maxCores = maxCoresOption.getOrElse(Int.MaxValue)
68+
private val maxCores = {
69+
val cores = maxCoresOption.getOrElse(Int.MaxValue)
70+
// Set maxCores to a multiple of smallest executor we can launch
71+
cores - (cores % minCoresPerExecutor)
72+
}
6573

6674
private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)
6775

@@ -489,8 +497,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
489497
}
490498

491499
private def executorCores(offerCPUs: Int): Int = {
492-
sc.conf.getInt("spark.executor.cores",
493-
math.min(offerCPUs, maxCores - totalCoresAcquired))
500+
executorCoresOption.getOrElse(
501+
math.min(offerCPUs, maxCores - totalCoresAcquired)
502+
)
494503
}
495504

496505
override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: TaskStatus) {

0 commit comments

Comments
 (0)