Skip to content

Commit 25311c2

Browse files
committed
[SPARK-3456] YarnAllocator on alpha can lose container requests to RM
Author: Thomas Graves <[email protected]> Closes #2373 from tgravescs/SPARK-3456 and squashes the following commits: 77e9532 [Thomas Graves] [SPARK-3456] YarnAllocator on alpha can lose container requests to RM
1 parent af25838 commit 25311c2

File tree

3 files changed

+14
-8
lines changed

3 files changed

+14
-8
lines changed

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,17 @@ private[yarn] class YarnAllocationHandler(
4848
private val lastResponseId = new AtomicInteger()
4949
private val releaseList: CopyOnWriteArrayList[ContainerId] = new CopyOnWriteArrayList()
5050

51-
override protected def allocateContainers(count: Int): YarnAllocateResponse = {
51+
override protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse = {
5252
var resourceRequests: List[ResourceRequest] = null
5353

54-
logDebug("numExecutors: " + count)
54+
logDebug("asking for additional executors: " + count + " with already pending: " + pending)
55+
val totalNumAsk = count + pending
5556
if (count <= 0) {
5657
resourceRequests = List()
5758
} else if (preferredHostToCount.isEmpty) {
5859
logDebug("host preferences is empty")
5960
resourceRequests = List(createResourceRequest(
60-
AllocationType.ANY, null, count, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
61+
AllocationType.ANY, null, totalNumAsk, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
6162
} else {
6263
// request for all hosts in preferred nodes and for numExecutors -
6364
// candidates.size, request by default allocation policy.
@@ -80,7 +81,7 @@ private[yarn] class YarnAllocationHandler(
8081
val anyContainerRequests: ResourceRequest = createResourceRequest(
8182
AllocationType.ANY,
8283
resource = null,
83-
count,
84+
totalNumAsk,
8485
YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
8586

8687
val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
@@ -103,7 +104,7 @@ private[yarn] class YarnAllocationHandler(
103104
req.addAllReleases(releasedContainerList)
104105

105106
if (count > 0) {
106-
logInfo("Allocating %d executor containers with %d of memory each.".format(count,
107+
logInfo("Allocating %d executor containers with %d of memory each.".format(totalNumAsk,
107108
executorMemory + memoryOverhead))
108109
} else {
109110
logDebug("Empty allocation req .. release : " + releasedContainerList)

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ private[yarn] abstract class YarnAllocator(
112112
def allocateResources() = {
113113
val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
114114

115+
// this is needed by alpha, do it here since we add numPending right after this
116+
val executorsPending = numPendingAllocate.get()
117+
115118
if (missing > 0) {
116119
numPendingAllocate.addAndGet(missing)
117120
logInfo("Will Allocate %d executor containers, each with %d memory".format(
@@ -121,7 +124,7 @@ private[yarn] abstract class YarnAllocator(
121124
logDebug("Empty allocation request ...")
122125
}
123126

124-
val allocateResponse = allocateContainers(missing)
127+
val allocateResponse = allocateContainers(missing, executorsPending)
125128
val allocatedContainers = allocateResponse.getAllocatedContainers()
126129

127130
if (allocatedContainers.size > 0) {
@@ -435,9 +438,10 @@ private[yarn] abstract class YarnAllocator(
435438
*
436439
* @param count Number of containers to allocate.
437440
* If zero, should still contact RM (as a heartbeat).
441+
* @param pending Number of containers pending allocate. Only used on alpha.
438442
* @return Response to the allocation request.
439443
*/
440-
protected def allocateContainers(count: Int): YarnAllocateResponse
444+
protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse
441445

442446
/** Called to release a previously allocated container. */
443447
protected def releaseContainer(container: Container): Unit

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ private[yarn] class YarnAllocationHandler(
4747
amClient.releaseAssignedContainer(container.getId())
4848
}
4949

50-
override protected def allocateContainers(count: Int): YarnAllocateResponse = {
50+
// pending isn't used on stable as the AMRMClient handles incremental asks
51+
override protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse = {
5152
addResourceRequests(count)
5253

5354
// We have already set the container request. Poll the ResourceManager for a response.

0 commit comments

Comments
 (0)