diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 29ccec2adcac3..0957e94bb1bf0 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.yarn import java.lang.{Boolean => JBoolean} import java.util.{Collections, Set => JSet} -import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap} +import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap, LinkedBlockingQueue} import java.util.concurrent.atomic.AtomicInteger import scala.collection @@ -89,6 +89,8 @@ private[yarn] class YarnAllocationHandler( private val releasedContainerList = new CopyOnWriteArrayList[ContainerId]() // Containers to be released in next request to RM private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean] + // ContainerRequests which have been added to AMRMClient. + private val containerRequestList = new LinkedBlockingQueue[ContainerRequest]() // Additional memory overhead - in mb. private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", @@ -254,6 +256,9 @@ private[yarn] class YarnAllocationHandler( val executorMemoryOverhead = (executorMemory + memoryOverhead) assert(container.getResource.getMemory >= executorMemoryOverhead) + if(!containerRequestList.isEmpty()){ + amClient.removeContainerRequest(containerRequestList.take()) + } if (numExecutorsRunningNow > maxExecutors) { logInfo("""Ignoring container %s at host %s, since we already have the required number of @@ -474,6 +479,7 @@ private[yarn] class YarnAllocationHandler( } for (request <- containerRequests) { + containerRequestList.put(request) amClient.addContainerRequest(request) }