Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.deploy.yarn

import java.lang.{Boolean => JBoolean}
import java.util.{Collections, Set => JSet}
import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap, LinkedBlockingQueue}
import java.util.concurrent.atomic.AtomicInteger

import scala.collection
Expand Down Expand Up @@ -89,6 +89,8 @@ private[yarn] class YarnAllocationHandler(
private val releasedContainerList = new CopyOnWriteArrayList[ContainerId]()
// Containers to be released in next request to RM
private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
// ContainerRequests which have been added to AMRMClient.
private val containerRequestList = new LinkedBlockingQueue[ContainerRequest]()

// Additional memory overhead - in mb.
private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
Expand Down Expand Up @@ -254,6 +256,9 @@ private[yarn] class YarnAllocationHandler(

val executorMemoryOverhead = (executorMemory + memoryOverhead)
assert(container.getResource.getMemory >= executorMemoryOverhead)
if(!containerRequestList.isEmpty()){
amClient.removeContainerRequest(containerRequestList.take())
}

if (numExecutorsRunningNow > maxExecutors) {
logInfo("""Ignoring container %s at host %s, since we already have the required number of
Expand Down Expand Up @@ -474,6 +479,7 @@ private[yarn] class YarnAllocationHandler(
}

for (request <- containerRequests) {
containerRequestList.put(request)
amClient.addContainerRequest(request)
}

Expand Down