Skip to content

Commit bb2b9ff

Browse files
committed
Added an option to spread out jobs in the standalone mode.
1 parent b3b52c9 commit bb2b9ff

File tree

3 files changed

+56
-18
lines changed

3 files changed

+56
-18
lines changed

core/src/main/scala/spark/deploy/master/Master.scala

Lines changed: 51 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
3131
val waitingJobs = new ArrayBuffer[JobInfo]
3232
val completedJobs = new ArrayBuffer[JobInfo]
3333

34+
// As a temporary workaround before better ways of configuring memory, we allow users to set
35+
// a flag that will perform round-robin scheduling across the nodes (spreading out each job
36+
// among all the nodes) instead of trying to consolidate each job onto a small # of nodes.
37+
val spreadOutJobs = System.getProperty("spark.deploy.spreadOut", "false").toBoolean
38+
3439
override def preStart() {
3540
logInfo("Starting Spark master at spark://" + ip + ":" + port)
3641
// Listen for remote client disconnection events, since they don't go through Akka's watch()
@@ -127,24 +132,58 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
127132
}
128133
}
129134

135+
/**
136+
* Can a job use the given worker? True if the worker has enough memory and we haven't already
137+
* launched an executor for the job on it (right now the standalone backend doesn't like having
138+
* two executors on the same worker).
139+
*/
140+
def canUse(job: JobInfo, worker: WorkerInfo): Boolean = {
141+
worker.memoryFree >= job.desc.memoryPerSlave && !worker.hasExecutor(job)
142+
}
143+
130144
/**
131145
* Schedule the currently available resources among waiting jobs. This method will be called
132146
* every time a new job joins or resource availability changes.
133147
*/
134148
def schedule() {
135-
// Right now this is a very simple FIFO scheduler. We keep looking through the jobs
136-
// in order of submission time and launching the first one that fits on each node.
137-
for (worker <- workers if worker.coresFree > 0) {
138-
for (job <- waitingJobs.clone()) {
139-
val jobMemory = job.desc.memoryPerSlave
140-
if (worker.memoryFree >= jobMemory) {
141-
val coresToUse = math.min(worker.coresFree, job.coresLeft)
142-
val exec = job.addExecutor(worker, coresToUse)
143-
launchExecutor(worker, exec)
149+
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first job
150+
// in the queue, then the second job, etc.
151+
if (spreadOutJobs) {
152+
// Try to spread out each job among all the nodes, until it has all its cores
153+
for (job <- waitingJobs if job.coresLeft > 0) {
154+
val usableWorkers = workers.toArray.filter(canUse(job, _)).sortBy(_.coresFree).reverse
155+
val numUsable = usableWorkers.length
156+
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
157+
var toAssign = math.min(job.coresLeft, usableWorkers.map(_.coresFree).sum)
158+
var pos = 0
159+
while (toAssign > 0) {
160+
if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
161+
toAssign -= 1
162+
assigned(pos) += 1
163+
}
164+
pos = (pos + 1) % numUsable
144165
}
145-
if (job.coresLeft == 0) {
146-
waitingJobs -= job
147-
job.state = JobState.RUNNING
166+
// Now that we've decided how many cores to give on each node, let's actually give them
167+
for (pos <- 0 until numUsable) {
168+
if (assigned(pos) > 0) {
169+
val exec = job.addExecutor(usableWorkers(pos), assigned(pos))
170+
launchExecutor(usableWorkers(pos), exec)
171+
job.state = JobState.RUNNING
172+
}
173+
}
174+
}
175+
} else {
176+
// Pack each job into as few nodes as possible until we've assigned all its cores
177+
for (worker <- workers if worker.coresFree > 0) {
178+
for (job <- waitingJobs if job.coresLeft > 0) {
179+
if (canUse(job, worker)) {
180+
val coresToUse = math.min(worker.coresFree, job.coresLeft)
181+
if (coresToUse > 0) {
182+
val exec = job.addExecutor(worker, coresToUse)
183+
launchExecutor(worker, exec)
184+
job.state = JobState.RUNNING
185+
}
186+
}
148187
}
149188
}
150189
}

core/src/main/scala/spark/deploy/master/WorkerInfo.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ private[spark] class WorkerInfo(
3333
memoryUsed -= exec.memory
3434
}
3535
}
36+
37+
def hasExecutor(job: JobInfo): Boolean = {
38+
executors.values.exists(_.job == job)
39+
}
3640

3741
def webUiAddress : String = {
3842
"http://" + this.host + ":" + this.webUiPort

core/src/main/twirl/spark/deploy/master/job_row.scala.html

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,7 @@
1010
</td>
1111
<td>@job.desc.name</td>
1212
<td>
13-
@job.coresGranted Granted
14-
@if(job.desc.cores == Integer.MAX_VALUE) {
15-
16-
} else {
17-
, @job.coresLeft
18-
}
13+
@job.coresGranted
1914
</td>
2015
<td>@Utils.memoryMegabytesToString(job.desc.memoryPerSlave)</td>
2116
<td>@formatDate(job.submitDate)</td>

0 commit comments

Comments
 (0)