Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ private[yarn] class YarnAllocationHandler(
releaseList.add(container.getId())
}

override protected def removeContainerRequest() = {
}

private def createRackResourceRequests(hostContainers: List[ResourceRequest]):
List[ResourceRequest] = {
// First generate modified racks and new set of hosts under it : then issue requests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ private[yarn] abstract class YarnAllocator(

val executorMemoryOverhead = (executorMemory + memoryOverhead)
assert(container.getResource.getMemory >= executorMemoryOverhead)
removeContainerRequest()

if (numExecutorsRunningNow > maxExecutors) {
logInfo("""Ignoring container %s at host %s, since we already have the required number of
Expand Down Expand Up @@ -506,6 +507,9 @@ private[yarn] abstract class YarnAllocator(
/** Called to release a previously allocated container. */
protected def releaseContainer(container: Container): Unit

/** Called to remove container request. */
protected def removeContainerRequest(): Unit

/**
* Defines the interface for an allocate response from the RM. This is needed since the alpha
* and stable interfaces differ here in ways that cannot be fixed using other routes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.util.Records
import java.util.concurrent.LinkedBlockingQueue

/**
* Acquires resources for executors from a ResourceManager and launches executors in new containers.
Expand All @@ -43,10 +44,20 @@ private[yarn] class YarnAllocationHandler(
securityMgr: SecurityManager)
extends YarnAllocator(conf, sparkConf, appAttemptId, args, preferredNodes, securityMgr) {


// ContainerRequests which have been added to AMRMClient.
private val containerRequestList = new LinkedBlockingQueue[ContainerRequest]()

override protected def releaseContainer(container: Container) = {
amClient.releaseAssignedContainer(container.getId())
}

override protected def removeContainerRequest() = {
if(!containerRequestList.isEmpty()){
amClient.removeContainerRequest(containerRequestList.take())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so even though the preferred node locality stuff isn't working right now I think I would rather see us attempt to do the right thing here. If there weren't any preferred nodes specified then taking any of the requests works just fine, but if there were host or rack specific requests we should remove the ones that were actually allocated to us.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i donot think so. because it is FIFO, we can remove head of request queue because head of queue is allocated by Yarn when we receive allocated Containers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you request containers from yarn and specify hosts or racks then it is not guaranteed that you get them back in the order you requests them. Certain hosts may be busy, not available, etc. You might get a rack request fullfilled instead of a host request, etc. Since the preferred locality stuff is broken right now anyway (SPARK-2089), we can fix this up separately if you would prefer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,i know what you mean. preferred locality donot work in our application. i will take a look at it.thanks.

}
}

// pending isn't used on stable as the AMRMClient handles incremental asks
override protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse = {
addResourceRequests(count)
Expand Down Expand Up @@ -133,6 +144,7 @@ private[yarn] class YarnAllocationHandler(
}

for (request <- containerRequests) {
containerRequestList.put(request)
amClient.addContainerRequest(request)
}

Expand Down