Skip to content

Commit f674e59

Browse files
add comment and minor fix
1 parent 2835929 commit f674e59

File tree

1 file changed

+13
-9
lines changed
  • core/src/main/scala/org/apache/spark/deploy/master

1 file changed

+13
-9
lines changed

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -482,22 +482,26 @@ private[spark] class Master(
482482

483483
// First schedule drivers, they take strict precedence over applications
484484
// Randomization helps balance drivers
485-
val shuffledAliveWorkers = Random.shuffle(workers).toArray
486-
val aliveWorkerNum = shuffledAliveWorkers.size
487-
var curPos = aliveWorkerNum - 1
488-
for (driver <- List(waitingDrivers: _*)) {
489-
val startFlag = curPos
490-
curPos = (curPos + 1) % aliveWorkerNum
485+
val shuffledWorkers = Random.shuffle(workers).toArray
486+
val workerNum = shuffledWorkers.size
487+
var curPos = 0
488+
for (driver <- List(waitingDrivers: _*)) { // iterate over a copy of waitingDrivers
489+
// For each waiting driver we pick a worker that has enough resources to launch it.
490+
// The picking does in a round-robin fashion, starting from position behind last
491+
// worker on which driver was just launched and ending with driver being launched
492+
// or we have iterated over all workers.
493+
val startPos = curPos
494+
curPos = (curPos + 1) % workerNum
491495
var launched = false
492-
while (curPos != startFlag && !launched) {
493-
val worker = shuffledAliveWorkers(curPos)
496+
while (curPos - 1 != startPos && !launched) {
497+
val worker = shuffledWorkers(curPos)
494498
if (worker.state == WorkerState.ALIVE
495499
&& worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
496500
launchDriver(worker, driver)
497501
waitingDrivers -= driver
498502
launched = true
499503
}
500-
curPos = (curPos + 1) % aliveWorkerNum
504+
curPos = (curPos + 1) % workerNum
501505
}
502506
}
503507

0 commit comments

Comments
 (0)