@@ -200,6 +200,11 @@ private[spark] class MesosSchedulerBackend(
200200 }
201201 }
202202
203+ def toWorkerOffer (offer : Offer ) = new WorkerOffer (
204+ offer.getSlaveId.getValue,
205+ offer.getHostname,
206+ getResource(offer.getResourcesList, " cpus" ).toInt)
207+
203208 override def disconnected (d : SchedulerDriver ) {}
204209
205210 override def reregistered (d : SchedulerDriver , masterInfo : MasterInfo ) {}
@@ -212,62 +217,39 @@ private[spark] class MesosSchedulerBackend(
212217 override def resourceOffers (d : SchedulerDriver , offers : JList [Offer ]) {
213218 val oldClassLoader = setClassLoader()
214219 try {
215- synchronized {
216- // Build a big list of the offerable workers, and remember their indices so that we can
217- // figure out which Offer to reply to for each worker
218- val offerableWorkers = new ArrayBuffer [WorkerOffer ]
219- val offerableIndices = new HashMap [String , Int ]
220-
221- def sufficientOffer (o : Offer ) = {
222- val mem = getResource(o.getResourcesList, " mem" )
223- val cpus = getResource(o.getResourcesList, " cpus" )
224- val slaveId = o.getSlaveId.getValue
225- (mem >= MemoryUtils .calculateTotalMemory(sc) &&
226- // need at least 1 for executor, 1 for task
227- cpus >= 2 * scheduler.CPUS_PER_TASK ) ||
228- (slaveIdsWithExecutors.contains(slaveId) &&
229- cpus >= scheduler.CPUS_PER_TASK )
230- }
231-
232- for ((offer, index) <- offers.zipWithIndex if sufficientOffer(offer)) {
233- val slaveId = offer.getSlaveId.getValue
234- offerableIndices.put(slaveId, index)
235- val cpus = if (slaveIdsWithExecutors.contains(slaveId)) {
236- getResource(offer.getResourcesList, " cpus" ).toInt
237- } else {
238- // If the executor doesn't exist yet, subtract CPU for executor
239- getResource(offer.getResourcesList, " cpus" ).toInt -
240- scheduler.CPUS_PER_TASK
241- }
242- offerableWorkers += new WorkerOffer (
243- offer.getSlaveId.getValue,
244- offer.getHostname,
245- cpus)
246- }
247-
248- // Call into the TaskSchedulerImpl
249- val taskLists = scheduler.resourceOffers(offerableWorkers)
250-
251- // Build a list of Mesos tasks for each slave
252- val mesosTasks = offers.map(o => new JArrayList [MesosTaskInfo ]())
253- for ((taskList, index) <- taskLists.zipWithIndex) {
254- if (! taskList.isEmpty) {
255- for (taskDesc <- taskList) {
256- val slaveId = taskDesc.executorId
257- val offerNum = offerableIndices(slaveId)
258- slaveIdsWithExecutors += slaveId
259- taskIdToSlaveId(taskDesc.taskId) = slaveId
260- mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
261- }
262- }
263- }
264-
265- // Reply to the offers
266- val filters = Filters .newBuilder().setRefuseSeconds(1 ).build() // TODO: lower timeout?
267- for (i <- 0 until offers.size) {
268- d.launchTasks(Collections .singleton(offers(i).getId), mesosTasks(i), filters)
220+ val (acceptedOffers, declinedOffers) = offers.partition(o => {
221+ val mem = getResource(o.getResourcesList, " mem" )
222+ val slaveId = o.getSlaveId.getValue
223+ mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId)
224+ })
225+
226+ val offerableWorkers = acceptedOffers.map(toWorkerOffer)
227+
228+ val slaveIdToOffer = acceptedOffers.map(o => o.getSlaveId.getValue -> o).toMap
229+
230+ val mesosTasks = new HashMap [String , JArrayList [MesosTaskInfo ]]
231+
232+ // Call into the TaskSchedulerImpl
233+ scheduler.resourceOffers(offerableWorkers)
234+ .filter(! _.isEmpty)
235+ .foreach(_.foreach(taskDesc => {
236+ val slaveId = taskDesc.executorId
237+ slaveIdsWithExecutors += slaveId
238+ taskIdToSlaveId(taskDesc.taskId) = slaveId
239+ mesosTasks.getOrElseUpdate(slaveId, new JArrayList [MesosTaskInfo ])
240+ .add(createMesosTask(taskDesc, slaveId))
241+ }))
242+
243+ // Reply to the offers
244+ val filters = Filters .newBuilder().setRefuseSeconds(1 ).build() // TODO: lower timeout?
245+
246+ mesosTasks.foreach {
247+ case (slaveId, tasks) => {
248+ d.launchTasks(Collections .singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
269249 }
270250 }
251+
252+ declinedOffers.foreach(o => d.declineOffer(o.getId))
271253 } finally {
272254 restoreClassLoader(oldClassLoader)
273255 }
0 commit comments