Skip to content
Closed
118 changes: 84 additions & 34 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ private[master] class Master(

/**
* Schedule executors to be launched on the workers.
* Returns an array containing number of cores assigned to each worker (None if scheduling fails)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, for if another change is needed: this could be a @return tag

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, this shouldn't say None anymore

*
* There are two modes of launching executors. The first attempts to spread out an application's
* executors on as many workers as possible, while the second does the opposite (i.e. launch them
Expand All @@ -543,59 +544,108 @@ private[master] class Master(
* multiple executors from the same application may be launched on the same worker if the worker
* has enough cores and memory. Otherwise, each executor grabs all the cores available on the
* worker by default, in which case only one executor may be launched on each worker.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add to this java doc what the new return type is? Also, it would be good to add a short paragraph to explain the necessity to allocate N cores at a time, and give an example of where allocating 1 core at a time fails.

*
* It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core
* at a time). Consider the following example: cluster has 4 workers with 16 cores each.
* User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is
* allocated at a time, 12 cores from each worker would be assigned to each executor.
* Since 12 < 16, no executors would launch [SPARK-8881].
*/
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
private[master] def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
// If the number of cores per executor is not specified, then we can just schedule
// 1 core at a time since we expect a single executor to be launched on each worker
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then we should add a comment here to say that:

// If the number of cores per executor is not specified, then we can just schedule
// 1 core at a time since we expect a single executor to be launched on each worker

val memoryPerExecutor = app.desc.memoryPerExecutorMB
val numUsable = usableWorkers.length
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
val assignedMemory = new Array[Int](numUsable) // Amount of memory to give to each worker
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var pos = 0
var lastCoresToAssign = coresToAssign
if (spreadOutApps) {
// Try to spread out each app among all the workers, until it has all its cores
for (app <- waitingApps if app.coresLeft > 0) {
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var pos = 0
while (toAssign > 0) {
if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
toAssign -= 1
assigned(pos) += 1
}
pos = (pos + 1) % numUsable
// Try to spread out executors among workers (sparse scheduling)
while (coresToAssign > 0) {
if (usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor &&
usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor) {
coresToAssign -= coresPerExecutor
assignedCores(pos) += coresPerExecutor
assignedMemory(pos) += memoryPerExecutor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I stared at this loop for a little bit and I think it could bring us into an infinite loop.

E.g. We have 3 workers, with 3, 3, and 4 free cores respectively, so that coresToAssign == 10. Now let's say coresPerExecutor == 3, so after allocating 3 executors we end up with coresToAssign == 1. What happens next? Well, none of the usable workers can accommodate a new executor, and coresToAssign > 0 is still true, so this loop never exits.

Let me know if I'm missing something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(same for the non spread out case)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't app.coresLeft be a multiple of 3 in this case? so 9 or 12 rather than 10? but yeah it still raises the question of what happens if there simply aren't enough cores on one worker: I want 4x 3-core executors, and I have 3x 4-core workers. It will never schedule. Previously I think we'd just manage to schedule 3x 3-core executors but I think this would keep looping. I think there needs to be some logic for detecting when there is no worker left that could possibly fit another.

I haven't thought this through either but are there race condition problems here too? as long as the worst case is just that resources that looked available aren't anymore and fail to schedule, that's fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"resources that looked available aren't anymore and fail to schedule, that's fine." This is the assumption being made here. If the user didn't care about the size of the executor, they would skip executor.cores and the algorithm would proceed as before (best-effort: one-core at a time). If they do, we should either schedule as requested or not at all. If we care to be extra-friendly, we could add a check to log a message from within the loop: "Not enough resources, please check spark.cores.max and spark.executor.cores" ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems OK but I think there is an infinite loop problem here still?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could potentially return assignedCores that we have thus far and proceed with scheduling. But as discussed earlier, we are better off failing than scheduling incorrectly. Do you feel otherwise?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't see your note. I would think that by failing and allowing the user to reconfigure, we would be doing them a favor. But I can see the value in scheduling whatever we can as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we have both versions. We can choose to keep this or revert to the previous one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the previous behavior was to schedule as much as possible? since before it would only try to assign as many cores as are available, not necessarily as many as are requested. If so I think it's best to retain that behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't think it's really a user error. The contract of setting spark.executor.cores is that every executor has that exactly many cores. If the total number of cores across the cluster is not a multiple of that then there will be some unused cores, but scheduling should still work.

}
// Now that we've decided how many cores to give on each node, let's actually give them
for (pos <- 0 until numUsable if assigned(pos) > 0) {
allocateWorkerResourceToExecutors(app, assigned(pos), usableWorkers(pos))
pos = (pos + 1) % numUsable
if (pos == 0) {
if (lastCoresToAssign == coresToAssign) {
return assignedCores
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to say just break here, but you can't in Scala eh? I think the doc about the return type needs to update if this is returned now

}
lastCoresToAssign = coresToAssign
}
}
} else {
// Pack each app into as few workers as possible until we've assigned all its cores
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (app <- waitingApps if app.coresLeft > 0) {
allocateWorkerResourceToExecutors(app, app.coresLeft, worker)
// Pack executors into as few workers as possible (dense scheduling)
while (coresToAssign > 0) {
while (usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor &&
usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor &&
coresToAssign > 0) {
coresToAssign -= coresPerExecutor
assignedCores(pos) += coresPerExecutor
assignedMemory(pos) += memoryPerExecutor
}
pos = (pos + 1) % numUsable
if (pos == 0) {
if (lastCoresToAssign == coresToAssign) {
return assignedCores
}
lastCoresToAssign = coresToAssign
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a lot of duplicated code between the two cases now. I think there's a nicer way to express this. The termination condition is really (1) if we have allocated all cores, or (2) if there are no more workers with enough resources. Something like the following:

...
val assignedMemory = ...
var coresToAssign = ...
var freeWorkers = (1 to numUsable)

def canLaunchExecutor(pos: Int): Boolean = {
  usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor &&
  usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor
}

while (coresToAssign > 0 && freeWorkers.nonEmpty) {
  freeWorkers = freeWorkers.filter(canLaunchExecutor)
  freeWorkers.foreach { pos =>
    var keepScheduling = true
    while (keepScheduling && canLaunchExecutor(pos) && coresToAssign > 0) {
      coresToAssign -= coresPerExecutor
      assignedCores(pos) += coresPerExecutor
      assignedMemory(pos) += memoryPerExecutor

      // Spreading out an application means spreading out its executors across as
      // many workers as possible. If we are not spreading out, then we should keep
      // scheduling executors on this worker until we use all of its resources.
      // Otherwise, just move on to the next worker.
      if (spreadOutApp) {
        keepScheduling = false
      }
    }
  }
}

This basically says you just keep narrowing the available set of workers until you have assigned all the cores. If it turns out that there are no more available workers, then we should just stop. The other thing is that I merged the spread-out vs non-spread-out cases with a common while loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely cleaner. Will try and incorporate this. Thanks!

}
}
}
assignedCores
}

/**
* Schedule and launch executors on workers
*/
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
for (app <- waitingApps if app.coresLeft > 0) {
val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
// Filter out workers that don't have enough resources to launch an executor
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scheduleExecutorsOnWorkers actually makes an assumption that usableWorkers has enough resources for at least 1 executor. If this assumption turns out to be false, the method could go into an infinite loop as @srowen mentioned earlier.

I think it would simplify things if we just move the declaration of usableWorkers into scheduleExecutorsOnWorkers. Then we can avoid the implicit logical coupling here between the two methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usableWorkers is being used in startExecutorsOnWorkers as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, you're right. In the latest comment I've suggested an alternative that removes this logical coupling.


// Now that we've decided how many cores to allocate on each worker, let's allocate them
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}

/**
* Allocate a worker's resources to one or more executors.
* @param app the info of the application which the executors belong to
* @param coresToAllocate cores on this worker to be allocated to this application
* @param assignedCores number of cores on this worker for this application
* @param coresPerExecutor number of cores per executor
* @param worker the worker info
*/
private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
coresToAllocate: Int,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
val memoryPerExecutor = app.desc.memoryPerExecutorMB
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate)
var coresLeft = coresToAllocate
while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) {
val exec = app.addExecutor(worker, coresPerExecutor)
coresLeft -= coresPerExecutor
// If the number of cores per executor is specified, we divide the cores assigned
// to this worker evenly among the executors with no remainder.
// Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
for (i <- 1 to numExecutors) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, doesn't this change default behavior? Previously a single executor would acquire all cores on a worker. Now there will be N executors on the worker, each occupying exactly one core.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might have to pass in an option of coresPerExecutor to this method. If it's not defined, we want to launch 1 executor that grabs all the cores allocated on this worker:

// If the number of cores per executor is explicitly specified, then we divide
// the cores assigned to this worker evenly among the executors with no remainder.
// Otherwise, we launch a single executor that grabs all the cores allocated to
// this application on this worker.
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assignment of cores per worker is done by the time this method is invoked. coresPerExecutor is 1 by default. If we have N workers and N executors with coresPerExecutor = 1 and we are spreading out, assignedCores = 1 and we would launch 1 executor per worker with 1 core each, as expected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coresPerExecutor is 1 by default

That's not true (see documentation). If spark.executor.cores is not specified then the executor should grab all the cores allocated on the worker. This number could be and usually is > 1.

Example: We have 3 workers with 8 cores each, spark.cores.max is 12 and spark.executor.cores is not set. Then we'll allocate 4 cores on each worker for this application, and we should end up with 3 executors, one on each worker, each with 4 cores.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. I thought the default behavior had inherited the bug, didn't realize it was intentional. Will fix this in a few mins.

val exec = app.addExecutor(worker, coresToAssign)
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
// into SPARK-6688.
val conf = getLoggingConf(testDirPath, compressionCodec)
.set("spark.hadoop.fs.defaultFS", "unsupported://example.com")
val sc = new SparkContext("local-cluster[2,2,512]", "test", conf)
val sc = new SparkContext("local-cluster[2,2,1024]", "test", conf)
assert(sc.eventLogger.isDefined)
val eventLogger = sc.eventLogger.get
val eventLogPath = eventLogger.logPath
Expand Down