Skip to content

Commit bc91bb1

Browse files
Avoid shuffle every time we schedule the driver using round robin
1 parent bbc7087 commit bc91bb1

File tree

1 file changed

+9
-5
lines changed
  • core/src/main/scala/org/apache/spark/deploy/master

1 file changed

+9
-5
lines changed

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -481,17 +481,21 @@ private[spark] class Master(
481481
if (state != RecoveryState.ALIVE) { return }
482482

483483
// First schedule drivers, they take strict precedence over applications
484+
val shuffledAliveWorkers = Random.shuffle(workers.filter(_.state == WorkerState.ALIVE)) // Randomization helps balance drivers
485+
val aliveWorkerNum = shuffledAliveWorkers.size
486+
var curPos = aliveWorkerNum - 1
484487
for (driver <- List(waitingDrivers: _*)) {
485-
val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
486-
val shuffledWorkersIter = shuffledWorkers.iterator
488+
val startFlag = curPos
489+
curPos = (curPos + 1) % aliveWorkerNum
487490
var launched = false
488-
while(shuffledWorkersIter.hasNext && !launched) {
489-
val worker = shuffledWorkersIter.next()
490-
if (worker.state == WorkerState.ALIVE && worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
491+
while (curPos != startFlag && !launched) {
492+
val worker = shuffledAliveWorkers(curPos)
493+
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
491494
launchDriver(worker, driver)
492495
waitingDrivers -= driver
493496
launched = true
494497
}
498+
curPos = (curPos + 1) % aliveWorkerNum
495499
}
496500
}
497501

0 commit comments

Comments
 (0)