Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1241,7 +1241,23 @@ public static boolean isAclEnabled(Configuration conf) {
/** Prefix for all node manager configs.*/
public static final String NM_PREFIX = "yarn.nodemanager.";

/** Max Queue length of <code>OPPORTUNISTIC</code> containers on the NM. */
/**
* At the NM, the policy to determine whether to queue an
* <code>OPPORTUNISTIC</code> container or not.
* If set to <code>BY_QUEUE_LEN</code>, uses the queue capacity, as set by
* {@link YarnConfiguration#NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH},
* to limit how many containers to accept/queue.
* If set to <code>BY_RESOURCES</code>, limits the number of containers
* accepted based on the resource capacity of the node.
*/
public static final String NM_OPPORTUNISTIC_CONTAINERS_QUEUE_POLICY =
NM_PREFIX + "opportunistic-containers-queue-policy";
public static final String DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_QUEUE_POLICY =
"BY_QUEUE_LEN";

/** Max Queue length of <code>OPPORTUNISTIC</code> containers on the NM.
* If set to 0, NM does not accept any <code>OPPORTUNISTIC</code> containers.
* If set to {@literal > 0}, enforces the queue capacity. */
public static final String NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH =
NM_PREFIX + "opportunistic-containers-max-queue-length";
public static final int DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1265,8 +1265,27 @@
</property>

<property>
<description>Max number of OPPORTUNISTIC containers to queue at the
nodemanager.</description>
<description>
At the NM, the policy to determine whether to queue an
OPPORTUNISTIC container or not.
If set to BY_QUEUE_LEN, uses the queue capacity, as set by
yarn.nodemanager.opportunistic-containers-max-queue-length
to limit how many containers to accept/queue.
If set to BY_RESOURCES, limits the number of containers
accepted based on the resource capacity of the node.
</description>
<name>yarn.nodemanager.opportunistic-containers-queue-policy</name>
<value>BY_QUEUE_LEN</value>
</property>

<property>
<description>
Max number of OPPORTUNISTIC containers to queue at the
nodemanager (NM). If the value is 0, NMs do not allow any
OPPORTUNISTIC containers.
If the value is positive, the NM caps the number of OPPORTUNISTIC
containers that can be queued at the NM.
</description>
<name>yarn.nodemanager.opportunistic-containers-max-queue-length</name>
<value>0</value>
</property>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;

import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
Expand All @@ -34,6 +35,9 @@ public class AllocationBasedResourceUtilizationTracker implements
private static final Logger LOG =
LoggerFactory.getLogger(AllocationBasedResourceUtilizationTracker.class);

private static final long LEFT_SHIFT_MB_IN_BYTES = 20;
private static final int RIGHT_SHIFT_BYTES_IN_MB = 20;

private ResourceUtilization containersAllocation;
private ContainerScheduler scheduler;

Expand Down Expand Up @@ -80,10 +84,24 @@ public void subtractContainerResource(Container container) {
*/
@Override
public boolean hasResourcesAvailable(Container container) {
long pMemBytes = container.getResource().getMemorySize() * 1024 * 1024L;
return hasResourcesAvailable(pMemBytes,
(long) (getContainersMonitor().getVmemRatio()* pMemBytes),
container.getResource().getVirtualCores());
return hasResourcesAvailable(container.getResource());
}

private static long convertMBToBytes(final long memMB) {
return memMB << LEFT_SHIFT_MB_IN_BYTES;
}

private static long convertBytesToMB(final long bytes) {
return bytes >> RIGHT_SHIFT_BYTES_IN_MB;
}

@Override
public boolean hasResourcesAvailable(Resource resource) {
long pMemBytes = convertMBToBytes(resource.getMemorySize());
final long vmemBytes = (long)
(getContainersMonitor().getVmemRatio() * pMemBytes);
return hasResourcesAvailable(
pMemBytes, vmemBytes, resource.getVirtualCores());
}

private boolean hasResourcesAvailable(long pMemBytes, long vMemBytes,
Expand All @@ -92,29 +110,32 @@ private boolean hasResourcesAvailable(long pMemBytes, long vMemBytes,
if (LOG.isDebugEnabled()) {
LOG.debug("pMemCheck [current={} + asked={} > allowed={}]",
this.containersAllocation.getPhysicalMemory(),
(pMemBytes >> 20),
(getContainersMonitor().getPmemAllocatedForContainers() >> 20));
convertBytesToMB(pMemBytes),
convertBytesToMB(
getContainersMonitor().getPmemAllocatedForContainers()));
}
if (this.containersAllocation.getPhysicalMemory() +
(int) (pMemBytes >> 20) >
(int) (getContainersMonitor()
.getPmemAllocatedForContainers() >> 20)) {
(int) convertBytesToMB(pMemBytes) >
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to cast?

(int) convertBytesToMB(getContainersMonitor()
.getPmemAllocatedForContainers())) {
return false;
}

if (LOG.isDebugEnabled()) {
LOG.debug("before vMemCheck" +
"[isEnabled={}, current={} + asked={} > allowed={}]",
getContainersMonitor().isVmemCheckEnabled(),
this.containersAllocation.getVirtualMemory(), (vMemBytes >> 20),
(getContainersMonitor().getVmemAllocatedForContainers() >> 20));
this.containersAllocation.getVirtualMemory(),
convertBytesToMB(vMemBytes),
convertBytesToMB(
getContainersMonitor().getVmemAllocatedForContainers()));
}
// Check virtual memory.
if (getContainersMonitor().isVmemCheckEnabled() &&
this.containersAllocation.getVirtualMemory() +
(int) (vMemBytes >> 20) >
(int) (getContainersMonitor()
.getVmemAllocatedForContainers() >> 20)) {
(int) convertBytesToMB(vMemBytes) >
(int) convertBytesToMB(getContainersMonitor()
.getVmemAllocatedForContainers())) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
Expand All @@ -46,6 +47,7 @@
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService
.RecoveredContainerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -74,6 +76,7 @@ public class ContainerScheduler extends AbstractService implements
private final Context context;
// Capacity of the queue for opportunistic Containers.
private final int maxOppQueueLength;
private final boolean forceStartGuaranteedContainers;

// Queue of Guaranteed Containers waiting for resources to run
private final LinkedHashMap<ContainerId, Container>
Expand Down Expand Up @@ -106,9 +109,39 @@ public class ContainerScheduler extends AbstractService implements

private final AsyncDispatcher dispatcher;
private final NodeManagerMetrics metrics;
private final OpportunisticContainersQueuePolicy oppContainersQueuePolicy;

private Boolean usePauseEventForPreemption = false;

private static int getMaxOppQueueLengthFromConf(final Context context) {
if (context == null || context.getConf() == null) {
return YarnConfiguration
.DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH;
}

return context.getConf().getInt(
YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
YarnConfiguration.DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH
);
}

private static OpportunisticContainersQueuePolicy
getOppContainersQueuePolicyFromConf(final Context context) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getOppContainersQueuePolicyFromConf(final Context context) {: 'getOppContainersQueuePolicyFromConf' has incorrect indentation level 2, expected level should be 6. [Indentation]

final String queuePolicy;
if (context == null || context.getConf() == null) {
queuePolicy = YarnConfiguration
.DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_QUEUE_POLICY;
} else {
queuePolicy = context.getConf().get(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Configuration#getEnum() could be an option

YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_QUEUE_POLICY,
YarnConfiguration
.DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_QUEUE_POLICY
);
}

return OpportunisticContainersQueuePolicy.valueOf(queuePolicy);
}

@VisibleForTesting
ResourceHandlerChain resourceHandlerChain = null;

Expand All @@ -120,10 +153,9 @@ public class ContainerScheduler extends AbstractService implements
*/
public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
NodeManagerMetrics metrics) {
this(context, dispatcher, metrics, context.getConf().getInt(
YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
YarnConfiguration.
DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH));
this(context, dispatcher, metrics,
getOppContainersQueuePolicyFromConf(context),
getMaxOppQueueLengthFromConf(context));
}


Expand All @@ -149,13 +181,35 @@ public void serviceInit(Configuration conf) throws Exception {
@VisibleForTesting
public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
NodeManagerMetrics metrics, int qLength) {
this(context, dispatcher, metrics,
getOppContainersQueuePolicyFromConf(context), qLength);
}

@VisibleForTesting
public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
NodeManagerMetrics metrics,
OpportunisticContainersQueuePolicy oppContainersQueuePolicy,
int qLength) {
super(ContainerScheduler.class.getName());
this.context = context;
this.dispatcher = dispatcher;
this.metrics = metrics;
this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength;
this.utilizationTracker =
new AllocationBasedResourceUtilizationTracker(this);
this.oppContainersQueuePolicy = oppContainersQueuePolicy;
switch (oppContainersQueuePolicy) {
case BY_RESOURCES:
this.maxOppQueueLength = 0;
this.forceStartGuaranteedContainers = false;
LOG.info("Setting max opportunistic queue length to 0,"
+ " as {} is incompatible with queue length",
oppContainersQueuePolicy);
break;
case BY_QUEUE_LEN:
default:
this.maxOppQueueLength = qLength;
this.forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
}
this.opportunisticContainersStatus =
OpportunisticContainersStatus.newInstance();
}
Expand Down Expand Up @@ -187,7 +241,7 @@ public void handle(ContainerSchedulerEvent event) {
shedQueuedOpportunisticContainers();
break;
case RECOVERY_COMPLETED:
startPendingContainers(maxOppQueueLength <= 0);
startPendingContainers(forceStartGuaranteedContainers);
metrics.setQueuedContainers(queuedOpportunisticContainers.size(),
queuedGuaranteedContainers.size());
break;
Expand Down Expand Up @@ -243,7 +297,7 @@ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) {
LOG.warn(String.format("Could not update resources on " +
"continer update of %s", containerId), ex);
}
startPendingContainers(maxOppQueueLength <= 0);
startPendingContainers(forceStartGuaranteedContainers);
metrics.setQueuedContainers(queuedOpportunisticContainers.size(),
queuedGuaranteedContainers.size());
}
Expand Down Expand Up @@ -371,7 +425,6 @@ private void onResourcesReclaimed(Container container) {
ExecutionType.OPPORTUNISTIC) {
this.metrics.completeOpportunisticContainer(container.getResource());
}
boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
startPendingContainers(forceStartGuaranteedContainers);
}
this.metrics.setQueuedContainers(queuedOpportunisticContainers.size(),
Expand All @@ -380,13 +433,13 @@ private void onResourcesReclaimed(Container container) {

/**
* Start pending containers in the queue.
* @param forceStartGuaranteedContaieners When this is true, start guaranteed
* @param forceStartGContainers When this is true, start guaranteed
* container without looking at available resource
*/
private void startPendingContainers(boolean forceStartGuaranteedContaieners) {
private void startPendingContainers(boolean forceStartGContainers) {
// Start guaranteed containers that are paused, if resources available.
boolean resourcesAvailable = startContainers(
queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
queuedGuaranteedContainers.values(), forceStartGContainers);
// Start opportunistic containers, if resources available.
if (resourcesAvailable) {
startContainers(queuedOpportunisticContainers.values(), false);
Expand Down Expand Up @@ -429,6 +482,21 @@ private boolean resourceAvailableToStartContainer(Container container) {
return this.utilizationTracker.hasResourcesAvailable(container);
}

private boolean resourceAvailableToQueueOppContainer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, we are doing this check after localization is done. Shouldn't this check be done at the time of container start to avoid wasting NM resources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC I didn't do that since it's much more complicated and requires the ContainerScheduler to expose internal state.
Could you point me to where this can be done cleanly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked code and looks like it is same case with existing code also. I think there is scope of improvement there and that can be handle as separate jira. @goiri what do you think ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this JIRA we can leave it as is for consistency with the rest.
We can open a JIRA to move it everywhere (it might have side effects).

Container newOppContainer) {
final Resource cumulativeResource = Resource.newInstance(Resources.none());
for (final Container container : queuedGuaranteedContainers.values()) {
Resources.addTo(cumulativeResource, container.getResource());
}

for (final Container container : queuedOpportunisticContainers.values()) {
Resources.addTo(cumulativeResource, container.getResource());
}

Resources.addTo(cumulativeResource, newOppContainer.getResource());
return this.utilizationTracker.hasResourcesAvailable(cumulativeResource);
}

private boolean enqueueContainer(Container container) {
boolean isGuaranteedContainer = container.getContainerTokenIdentifier().
getExecutionType() == ExecutionType.GUARANTEED;
Expand All @@ -438,7 +506,21 @@ private boolean enqueueContainer(Container container) {
queuedGuaranteedContainers.put(container.getContainerId(), container);
isQueued = true;
} else {
if (queuedOpportunisticContainers.size() < maxOppQueueLength) {
switch (oppContainersQueuePolicy) {
case BY_RESOURCES:
isQueued = resourceAvailableToQueueOppContainer(container);
break;
case BY_QUEUE_LEN:
default:
if (maxOppQueueLength <= 0) {
isQueued = false;
} else {
isQueued =
queuedOpportunisticContainers.size() < maxOppQueueLength;
}
}

if (isQueued) {
LOG.info("Opportunistic container {} will be queued at the NM.",
container.getContainerId());
queuedOpportunisticContainers.put(
Expand All @@ -451,7 +533,6 @@ private boolean enqueueContainer(Container container) {
container.sendKillEvent(
ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
"Opportunistic container queue is full.");
isQueued = false;
}
}

Expand Down Expand Up @@ -484,7 +565,6 @@ protected void scheduleContainer(Container container) {
// When opportunistic container not allowed (which is determined by
// max-queue length of pending opportunistic containers <= 0), start
// guaranteed containers without looking at available resources.
boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
startPendingContainers(forceStartGuaranteedContainers);

// if the guaranteed container is queued, we need to preempt opportunistic
Expand Down
Loading