Skip to content
Closed
94 changes: 59 additions & 35 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -544,58 +544,82 @@ private[master] class Master(
* has enough cores and memory. Otherwise, each executor grabs all the cores available on the
* worker by default, in which case only one executor may be launched on each worker.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add to this java doc what the new return type is? Also, it would be good to add a short paragraph to explain the necessity to allocate N cores at a time, and give an example of where allocating 1 core at a time fails.

*/
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this

private[master] def scheduleExecutorsOnWorkers(app: ApplicationInfo,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please keep this private (or do you plan to add tests for this?)

usableWorkers: Array[WorkerInfo], spreadOutApps: Boolean): Array[Int] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style:

private def scheduleExecutorsOnWorkers(
    app: ApplicationInfo,
    usableWorkers: Array[WorkerInfo],
    spreadOutApps: Boolean): Array[Int] = {
  ...
}

val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then we should add a comment here to say that:

// If the number of cores per executor is not specified, then we can just schedule
// 1 core at a time since we expect a single executor to be launched on each worker

val memoryPerExecutor = app.desc.memoryPerExecutorMB
val numUsable = usableWorkers.length
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
val assignedMemory = new Array[Int](numUsable) // Amount of memory to give to each worker
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you call this coresToAssign to be more specific

var pos = 0
if (spreadOutApps) {
// Try to spread out each app among all the workers, until it has all its cores
for (app <- waitingApps if app.coresLeft > 0) {
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var pos = 0
while (toAssign > 0) {
if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
toAssign -= 1
assigned(pos) += 1
}
pos = (pos + 1) % numUsable
}
// Now that we've decided how many cores to give on each node, let's actually give them
for (pos <- 0 until numUsable if assigned(pos) > 0) {
allocateWorkerResourceToExecutors(app, assigned(pos), usableWorkers(pos))
// Try to spread out executors among workers (sparse scheduling)
while (toAssign > 0) {
if (usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor &&
usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor) {
toAssign -= coresPerExecutor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand what you're trying to change, this won't help. If there aren't enough cores on any worker, then this becomes an infinite loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please read code carefully

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh, sorry that is not addressing my question but I think I see the situation now i.e. I need 16 cores for 2 8 core executors and I have 4 workers so each fails to be enough to cause an executor to launch anywhere? Example would be really helpful or a test.

If so think this is also fixable by just considering no more workers than executors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also yes I see you still have the filtering on cores available so this shouldn't keep looping over workers, right. Unless the available count can drop while this is in progress but that is either not a problem or already a problem so not directly relevant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider the following: 4 workers each with 16 cores, spark.cores.max=48, spark.executor.cores = 16. When we spread out, we allocate one core at a time and in doing so end up allocating 12 cores from each worker. First, we ended up ignoring spark.executor.cores during allocation, which isn't right. Second, when the following condition is checked: while (coresLeft >= coresPerExecutor), coresLeft is 12 and coresPerExecutor is 16. As a result, executors don't launch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that's right.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, makes sense. Is it maybe more direct to never spread the allocation over more than 3 workers in this case since only 3 executors are needed? Same effect but I also see the value in allocating whole executors of cores at a time for clarity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Allocating spark.executor.cores at a time is cleaner and directly enforces semantics.

assignedCores(pos) += coresPerExecutor
assignedMemory(pos) += memoryPerExecutor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I stared at this loop for a little bit and I think it could bring us into an infinite loop.

E.g. We have 3 workers, with 3, 3, and 4 free cores respectively, so that coresToAssign == 10. Now let's say coresPerExecutor == 3, so after allocating 3 executors we end up with coresToAssign == 1. What happens next? Well, none of the usable workers can accommodate a new executor, and coresToAssign > 0 is still true, so this loop never exits.

Let me know if I'm missing something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(same for the non spread out case)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't app.coresLeft be a multiple of 3 in this case? so 9 or 12 rather than 10? but yeah it still raises the question of what happens if there simply aren't enough cores on one worker: I want 4x 3-core executors, and I have 3x 4-core workers. It will never schedule. Previously I think we'd just manage to schedule 3x 3-core executors but I think this would keep looping. I think there needs to be some logic for detecting when there is no worker left that could possibly fit another.

I haven't thought this through either but are there race condition problems here too? as long as the worst case is just that resources that looked available aren't anymore and fail to schedule, that's fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"resources that looked available aren't anymore and fail to schedule, that's fine." This is the assumption being made here. If the user didn't care about the size of the executor, they would skip executor.cores and the algorithm would proceed as before (best-effort: one-core at a time). If they do, we should either schedule as requested or not at all. If we care to be extra-friendly, we could add a check to log a message from within the loop: "Not enough resources, please check spark.cores.max and spark.executor.cores" ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems OK but I think there is an infinite loop problem here still?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could potentially return assignedCores that we have thus far and proceed with scheduling. But as discussed earlier, we are better off failing than scheduling incorrectly. Do you feel otherwise?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't see your note. I would think that by failing and allowing the user to reconfigure, we would be doing them a favor. But I can see the value in scheduling whatever we can as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we have both versions. We can choose to keep this or revert to the previous one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the previous behavior was to schedule as much as possible? since before it would only try to assign as many cores as are available, not necessarily as many as are requested. If so I think it's best to retain that behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't think it's really a user error. The contract of setting spark.executor.cores is that every executor has that exactly many cores. If the total number of cores across the cluster is not a multiple of that then there will be some unused cores, but scheduling should still work.

}
pos = (pos + 1) % numUsable
}
} else {
// Pack each app into as few workers as possible until we've assigned all its cores
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (app <- waitingApps if app.coresLeft > 0) {
allocateWorkerResourceToExecutors(app, app.coresLeft, worker)
// Pack executors into as few workers as possible (dense scheduling)
while (toAssign > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This branch shouldn't change right? it's now identical to the above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not identical. In one we are spreading out, in another we aren't.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the while vs if, I see. But this is missing a condition in toAssign in the inner loop then and over allocates

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still has a problem - say I want to assign 4 cores for one executor but there are 24 free (and plenty of memory). The inner while loop will assign 24 cores to one worker and get 6 executors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Missing a check on toAssign in inner loop.

while (usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor &&
usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor &&
toAssign > 0) {
toAssign -= coresPerExecutor
assignedCores(pos) += coresPerExecutor
assignedMemory(pos) += memoryPerExecutor
}
pos = (pos + 1) % numUsable
}
}
assignedCores
}

/**
* Schedule and launch executors on workers
*/
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
for (app <- waitingApps if app.coresLeft > 0) {
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor)
.sortBy(_.coresFree).reverse
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scheduleExecutorsOnWorkers actually makes an assumption that usableWorkers has enough resources for at least 1 executor. If this assumption turns out to be false, the method could go into an infinite loop as @srowen mentioned earlier.

I think it would simplify things if we just move the declaration of usableWorkers into scheduleExecutorsOnWorkers. Then we can avoid the implicit logical coupling here between the two methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usableWorkers is being used in startExecutorsOnWorkers as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, you're right. In the latest comment I've suggested an alternative that removes this logical coupling.


// Now that we've decided how many cores to allocate on each worker, let's allocate them
var pos = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping, can you remove this variable? The for loop down there already declares this so you don't need to declare it here again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, missed this one somehow

for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(app, assignedCores(pos), coresPerExecutor,
usableWorkers(pos))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style:

allocateWorkerResourceToExecutors(
  app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))

}
}
}

/**
* Allocate a worker's resources to one or more executors.
* Allocate a worker's resources to one or more executors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you revert some of these changes? They have nothing to do with the issue (I actually prefer the period being there, but that's a minor point).

* @param app the info of the application which the executors belong to
* @param coresToAllocate cores on this worker to be allocated to this application
* @param assignedCores number of cores on this worker for this application
* @param coresPerExecutor number of cores per executor
* @param worker the worker info
*/
private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
coresToAllocate: Int,
assignedCores: Int,
coresPerExecutor: Int,
worker: WorkerInfo): Unit = {
val memoryPerExecutor = app.desc.memoryPerExecutorMB
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate)
var coresLeft = coresToAllocate
while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) {

var numExecutors = assignedCores/coresPerExecutor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add spaces around /

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, this can be a val

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment here stating your implicit assumptions:

// If cores per executor is specified, then this division should have a remainder of zero

for (i <- 1 to numExecutors) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, doesn't this change default behavior? Previously a single executor would acquire all cores on a worker. Now there will be N executors on the worker, each occupying exactly one core.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might have to pass in an option of coresPerExecutor to this method. If it's not defined, we want to launch 1 executor that grabs all the cores allocated on this worker:

// If the number of cores per executor is explicitly specified, then we divide
// the cores assigned to this worker evenly among the executors with no remainder.
// Otherwise, we launch a single executor that grabs all the cores allocated to
// this application on this worker.
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assignment of cores per worker is done by the time this method is invoked. coresPerExecutor is 1 by default. If we have N workers and N executors with coresPerExecutor = 1 and we are spreading out, assignedCores = 1 and we would launch 1 executor per worker with 1 core each, as expected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coresPerExecutor is 1 by default

That's not true (see documentation). If spark.executor.cores is not specified then the executor should grab all the cores allocated on the worker. This number could be and usually is > 1.

Example: We have 3 workers with 8 cores each, spark.cores.max is 12 and spark.executor.cores is not set. Then we'll allocate 4 cores on each worker for this application, and we should end up with 3 executors, one on each worker, each with 4 cores.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. I thought the default behavior had inherited the bug, didn't realize it was intentional. Will fix this in a few mins.

val exec = app.addExecutor(worker, coresPerExecutor)
coresLeft -= coresPerExecutor
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
// into SPARK-6688.
val conf = getLoggingConf(testDirPath, compressionCodec)
.set("spark.hadoop.fs.defaultFS", "unsupported://example.com")
val sc = new SparkContext("local-cluster[2,2,512]", "test", conf)
val sc = new SparkContext("local-cluster[2,2,1024]", "test", conf)
assert(sc.eventLogger.isDefined)
val eventLogger = sc.eventLogger.get
val eventLogPath = eventLogger.logPath
Expand Down