Skip to content
Closed
112 changes: 75 additions & 37 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ private[master] class Master(

/**
* Schedule executors to be launched on the workers.
* Returns an array containing number of cores assigned to each worker.
*
* There are two modes of launching executors. The first attempts to spread out an application's
* executors on as many workers as possible, while the second does the opposite (i.e. launch them
Expand All @@ -543,59 +544,96 @@ private[master] class Master(
* multiple executors from the same application may be launched on the same worker if the worker
* has enough cores and memory. Otherwise, each executor grabs all the cores available on the
* worker by default, in which case only one executor may be launched on each worker.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add to this java doc what the new return type is? Also, it would be good to add a short paragraph to explain the necessity to allocate N cores at a time, and give an example of where allocating 1 core at a time fails.

*
* It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core
* at a time). Consider the following example: cluster has 4 workers with 16 cores each.
* User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is
* allocated at a time, 12 cores from each worker would be assigned to each executor.
* Since 12 < 16, no executors would launch [SPARK-8881].
*/
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
if (spreadOutApps) {
// Try to spread out each app among all the workers, until it has all its cores
for (app <- waitingApps if app.coresLeft > 0) {
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var pos = 0
while (toAssign > 0) {
if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
toAssign -= 1
assigned(pos) += 1
private[master] def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
// If the number of cores per executor is not specified, then we can just schedule
// 1 core at a time since we expect a single executor to be launched on each worker
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then we should add a comment here to say that:

// If the number of cores per executor is not specified, then we can just schedule
// 1 core at a time since we expect a single executor to be launched on each worker

val memoryPerExecutor = app.desc.memoryPerExecutorMB
val numUsable = usableWorkers.length
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
val assignedMemory = new Array[Int](numUsable) // Amount of memory to give to each worker
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var freeWorkers = (0 until numUsable).toIndexedSeq

def canLaunchExecutor(pos: Int): Boolean = {
usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor &&
usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor
}

while (coresToAssign >= coresPerExecutor && freeWorkers.nonEmpty) {
freeWorkers = freeWorkers.filter(canLaunchExecutor)
freeWorkers.foreach { pos =>
var keepScheduling = true
while (keepScheduling && canLaunchExecutor(pos) && coresToAssign >= coresPerExecutor) {
coresToAssign -= coresPerExecutor
assignedCores(pos) += coresPerExecutor
assignedMemory(pos) += memoryPerExecutor

// Spreading out an application means spreading out its executors across as
// many workers as possible. If we are not spreading out, then we should keep
// scheduling executors on this worker until we use all of its resources.
// Otherwise, just move on to the next worker.
if (spreadOutApps) {
keepScheduling = false
}
pos = (pos + 1) % numUsable
}
// Now that we've decided how many cores to give on each node, let's actually give them
for (pos <- 0 until numUsable if assigned(pos) > 0) {
allocateWorkerResourceToExecutors(app, assigned(pos), usableWorkers(pos))
}
}
} else {
// Pack each app into as few workers as possible until we've assigned all its cores
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (app <- waitingApps if app.coresLeft > 0) {
allocateWorkerResourceToExecutors(app, app.coresLeft, worker)
}
}
assignedCores
}

/**
* Schedule and launch executors on workers
*/
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
for (app <- waitingApps if app.coresLeft > 0) {
val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
// Filter out workers that don't have enough resources to launch an executor
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scheduleExecutorsOnWorkers actually makes an assumption that usableWorkers has enough resources for at least 1 executor. If this assumption turns out to be false, the method could go into an infinite loop as @srowen mentioned earlier.

I think it would simplify things if we just move the declaration of usableWorkers into scheduleExecutorsOnWorkers. Then we can avoid the implicit logical coupling here between the two methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usableWorkers is being used in startExecutorsOnWorkers as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, you're right. In the latest comment I've suggested an alternative that removes this logical coupling.


// Now that we've decided how many cores to allocate on each worker, let's allocate them
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}

/**
* Allocate a worker's resources to one or more executors.
* @param app the info of the application which the executors belong to
* @param coresToAllocate cores on this worker to be allocated to this application
* @param assignedCores number of cores on this worker for this application
* @param coresPerExecutor number of cores per executor
* @param worker the worker info
*/
private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
coresToAllocate: Int,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
val memoryPerExecutor = app.desc.memoryPerExecutorMB
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate)
var coresLeft = coresToAllocate
while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) {
val exec = app.addExecutor(worker, coresPerExecutor)
coresLeft -= coresPerExecutor
// If the number of cores per executor is specified, we divide the cores assigned
// to this worker evenly among the executors with no remainder.
// Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
for (i <- 1 to numExecutors) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, doesn't this change default behavior? Previously a single executor would acquire all cores on a worker. Now there will be N executors on the worker, each occupying exactly one core.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might have to pass in an option of coresPerExecutor to this method. If it's not defined, we want to launch 1 executor that grabs all the cores allocated on this worker:

// If the number of cores per executor is explicitly specified, then we divide
// the cores assigned to this worker evenly among the executors with no remainder.
// Otherwise, we launch a single executor that grabs all the cores allocated to
// this application on this worker.
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assignment of cores per worker is done by the time this method is invoked. coresPerExecutor is 1 by default. If we have N workers and N executors with coresPerExecutor = 1 and we are spreading out, assignedCores = 1 and we would launch 1 executor per worker with 1 core each, as expected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coresPerExecutor is 1 by default

That's not true (see documentation). If spark.executor.cores is not specified then the executor should grab all the cores allocated on the worker. This number could be and usually is > 1.

Example: We have 3 workers with 8 cores each, spark.cores.max is 12 and spark.executor.cores is not set. Then we'll allocate 4 cores on each worker for this application, and we should end up with 3 executors, one on each worker, each with 4 cores.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. I thought the default behavior had inherited the bug, didn't realize it was intentional. Will fix this in a few mins.

val exec = app.addExecutor(worker, coresToAssign)
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
// into SPARK-6688.
val conf = getLoggingConf(testDirPath, compressionCodec)
.set("spark.hadoop.fs.defaultFS", "unsupported://example.com")
val sc = new SparkContext("local-cluster[2,2,512]", "test", conf)
val sc = new SparkContext("local-cluster[2,2,1024]", "test", conf)
assert(sc.eventLogger.isDefined)
val eventLogger = sc.eventLogger.get
val eventLogPath = eventLogger.logPath
Expand Down