Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ public Resource getTotalCapability() {
return node.getTotalCapability();
}

@Override
public Resource getAllocatedContainerResource() {
return node.getAllocatedContainerResource();
}

@Override
public String getRackName() {
return node.getRackName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.util.resource.Resources;

/**
* Node managers information on available resources
Expand Down Expand Up @@ -104,6 +105,14 @@ public interface RMNode {
*/
public Resource getTotalCapability();

/**
* the total allocated resources to containers.
Copy link
Contributor

@bibinchundatt bibinchundatt Nov 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Start with upper case in comments.. This will include the sum of O+G containers queued + running + paused on the node.. Comment cane be more explanatory..

* @return the total allocated resources.
*/
default Resource getAllocatedContainerResource() {
return Resources.none();
}

/**
* If the total available resources has been updated.
* @return If the capability has been updated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
/* Snapshot of total resources before receiving decommissioning command */
private volatile Resource originalTotalCapability;
private volatile Resource totalCapability;
private volatile Resource allocatedContainerResource =
Resource.newInstance(Resources.none());
private volatile boolean updatedCapability = false;
private final Node node;

Expand Down Expand Up @@ -464,6 +466,11 @@ public Resource getTotalCapability() {
return this.totalCapability;
}

@Override
public Resource getAllocatedContainerResource() {
return this.allocatedContainerResource;
}

@Override
public boolean isUpdatedCapability() {
return this.updatedCapability;
Expand Down Expand Up @@ -1554,6 +1561,8 @@ private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
List<Map.Entry<ApplicationId, ContainerStatus>> needUpdateContainers =
new ArrayList<Map.Entry<ApplicationId, ContainerStatus>>();
int numRemoteRunningContainers = 0;
final Resource allocatedResource = Resource.newInstance(Resources.none());

for (ContainerStatus remoteContainer : containerStatuses) {
ContainerId containerId = remoteContainer.getContainerId();

Expand Down Expand Up @@ -1622,8 +1631,16 @@ private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
containerAllocationExpirer
.unregister(new AllocationExpirationInfo(containerId));
}

if ((remoteContainer.getState() == ContainerState.RUNNING ||
remoteContainer.getState() == ContainerState.NEW) &&
remoteContainer.getCapability() != null) {
Resources.addTo(allocatedResource, remoteContainer.getCapability());
}
}

allocatedContainerResource = allocatedResource;

List<ContainerStatus> lostContainers =
findLostContainers(numRemoteRunningContainers, containerStatuses);
for (ContainerStatus remoteContainer : lostContainers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
Expand Down Expand Up @@ -79,6 +80,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -231,6 +233,24 @@ private RMNodeStatusEvent getMockRMNodeStatusEventWithoutRunningApps() {
return event;
}

private static ContainerStatus getMockContainerStatus(
final ContainerId containerId, final Resource capability,
final ContainerState containerState) {
return getMockContainerStatus(containerId, capability, containerState,
ExecutionType.GUARANTEED);
}

private static ContainerStatus getMockContainerStatus(
final ContainerId containerId, final Resource capability,
final ContainerState containerState, final ExecutionType executionType) {
final ContainerStatus containerStatus = mock(ContainerStatus.class);
doReturn(containerId).when(containerStatus).getContainerId();
doReturn(containerState).when(containerStatus).getState();
doReturn(capability).when(containerStatus).getCapability();
doReturn(executionType).when(containerStatus).getExecutionType();
return containerStatus;
}

@Test (timeout = 5000)
public void testExpiredContainer() {
NodeStatus mockNodeStatus = createMockNodeStatus();
Expand All @@ -248,8 +268,8 @@ public void testExpiredContainer() {
// Now verify that scheduler isn't notified of an expired container
// by checking number of 'completedContainers' it got in the previous event
RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(null);
ContainerStatus containerStatus = mock(ContainerStatus.class);
doReturn(completedContainerId).when(containerStatus).getContainerId();
ContainerStatus containerStatus = getMockContainerStatus(
completedContainerId, null, ContainerState.COMPLETE);
doReturn(Collections.singletonList(containerStatus)).
when(statusEvent).getContainers();
node.handle(statusEvent);
Expand Down Expand Up @@ -321,12 +341,13 @@ public void testContainerUpdate() throws InterruptedException{
RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent(null);
RMNodeStatusEvent statusEventFromNode2_2 = getMockRMNodeStatusEvent(null);

ContainerStatus containerStatusFromNode1 = mock(ContainerStatus.class);
ContainerStatus containerStatusFromNode2_1 = mock(ContainerStatus.class);
ContainerStatus containerStatusFromNode2_2 = mock(ContainerStatus.class);
ContainerStatus containerStatusFromNode1 = getMockContainerStatus(
completedContainerIdFromNode1, null, ContainerState.COMPLETE);
ContainerStatus containerStatusFromNode2_1 = getMockContainerStatus(
completedContainerIdFromNode2_1, null, ContainerState.COMPLETE);
ContainerStatus containerStatusFromNode2_2 = getMockContainerStatus(
completedContainerIdFromNode2_2, null, ContainerState.COMPLETE);

doReturn(completedContainerIdFromNode1).when(containerStatusFromNode1)
.getContainerId();
doReturn(Collections.singletonList(containerStatusFromNode1))
.when(statusEventFromNode1).getContainers();
node.handle(statusEventFromNode1);
Expand All @@ -336,13 +357,9 @@ public void testContainerUpdate() throws InterruptedException{

completedContainers.clear();

doReturn(completedContainerIdFromNode2_1).when(containerStatusFromNode2_1)
.getContainerId();
doReturn(Collections.singletonList(containerStatusFromNode2_1))
.when(statusEventFromNode2_1).getContainers();

doReturn(completedContainerIdFromNode2_2).when(containerStatusFromNode2_2)
.getContainerId();
doReturn(Collections.singletonList(containerStatusFromNode2_2))
.when(statusEventFromNode2_2).getContainers();

Expand All @@ -358,6 +375,119 @@ public void testContainerUpdate() throws InterruptedException{
.getContainerId());
}

/**
* Tests that allocated container resources are counted correctly in
* {@link org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode}
* upon a node update. Resources should be counted for both GUARANTEED
* and OPPORTUNISTIC containers.
*/
@Test (timeout = 5000)
public void testAllocatedContainerUpdate() {
NodeStatus mockNodeStatus = createMockNodeStatus();
//Start the node
node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus));

// Make sure that the node starts with no allocated resources
Assert.assertEquals(Resources.none(), node.getAllocatedContainerResource());

ApplicationId app0 = BuilderUtils.newApplicationId(0, 0);
final ContainerId newContainerId = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(app0, 0), 0);
final ContainerId runningContainerId = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(app0, 0), 1);

rmContext.getRMApps().put(app0, Mockito.mock(RMApp.class));

RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent(null);

final List<ContainerStatus> containerStatuses = new ArrayList<>();

// Use different memory and VCores for new and running state containers
// to test that they add up correctly
final Resource newContainerCapability =
Resource.newInstance(100, 1);
final Resource runningContainerCapability =
Resource.newInstance(200, 2);
final Resource completedContainerCapability =
Resource.newInstance(50, 3);
final ContainerStatus newContainerStatusFromNode = getMockContainerStatus(
newContainerId, newContainerCapability, ContainerState.NEW);
final ContainerStatus runningContainerStatusFromNode =
getMockContainerStatus(runningContainerId, runningContainerCapability,
ContainerState.RUNNING);

containerStatuses.addAll(Arrays.asList(
newContainerStatusFromNode, runningContainerStatusFromNode));
doReturn(containerStatuses).when(statusEventFromNode1).getContainers();
node.handle(statusEventFromNode1);
Assert.assertEquals(Resource.newInstance(300, 3),
node.getAllocatedContainerResource());

final ContainerId newOppContainerId = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(app0, 0), 2);
final ContainerId runningOppContainerId = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(app0, 0), 3);

// Use the same resource capability as in previous for opportunistic case
RMNodeStatusEvent statusEventFromNode2 = getMockRMNodeStatusEvent(null);
final ContainerStatus newOppContainerStatusFromNode =
getMockContainerStatus(newOppContainerId, newContainerCapability,
ContainerState.NEW, ExecutionType.OPPORTUNISTIC);
final ContainerStatus runningOppContainerStatusFromNode =
getMockContainerStatus(runningOppContainerId,
runningContainerCapability, ContainerState.RUNNING,
ExecutionType.OPPORTUNISTIC);

containerStatuses.addAll(Arrays.asList(
newOppContainerStatusFromNode, runningOppContainerStatusFromNode));

// Pass in both guaranteed and opportunistic container statuses
doReturn(containerStatuses).when(statusEventFromNode2).getContainers();

node.handle(statusEventFromNode2);

// The result here should be double the first check,
// since allocated resources are doubled, just
// with different execution types
Assert.assertEquals(Resource.newInstance(600, 6),
node.getAllocatedContainerResource());

RMNodeStatusEvent statusEventFromNode3 = getMockRMNodeStatusEvent(null);
final ContainerId completedContainerId = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(app0, 0), 4);
final ContainerId completedOppContainerId = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(app0, 0), 5);
final ContainerStatus completedContainerStatusFromNode =
getMockContainerStatus(completedContainerId, completedContainerCapability,
ContainerState.COMPLETE, ExecutionType.OPPORTUNISTIC);
final ContainerStatus completedOppContainerStatusFromNode =
getMockContainerStatus(completedOppContainerId,
completedContainerCapability, ContainerState.COMPLETE,
ExecutionType.OPPORTUNISTIC);

containerStatuses.addAll(Arrays.asList(
completedContainerStatusFromNode, completedOppContainerStatusFromNode));

doReturn(containerStatuses).when(statusEventFromNode3).getContainers();
node.handle(statusEventFromNode3);

// Adding completed containers should not have changed
// the resources allocated
Assert.assertEquals(Resource.newInstance(600, 6),
node.getAllocatedContainerResource());

RMNodeStatusEvent emptyStatusEventFromNode =
getMockRMNodeStatusEvent(null);

doReturn(Collections.emptyList())
.when(emptyStatusEventFromNode).getContainers();
node.handle(emptyStatusEventFromNode);

// Passing an empty containers list should yield no resources allocated
Assert.assertEquals(Resources.none(),
node.getAllocatedContainerResource());
}

@Test (timeout = 5000)
public void testStatusChange(){
NodeStatus mockNodeStatus = createMockNodeStatus();
Expand All @@ -376,14 +506,14 @@ public void testStatusChange(){
RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent(null);
RMNodeStatusEvent statusEvent2 = getMockRMNodeStatusEvent(null);

ContainerStatus containerStatus1 = mock(ContainerStatus.class);
ContainerStatus containerStatus2 = mock(ContainerStatus.class);
ContainerStatus containerStatus1 = getMockContainerStatus(
completedContainerId1, null, null);
ContainerStatus containerStatus2 = getMockContainerStatus(
completedContainerId2, null, null);

doReturn(completedContainerId1).when(containerStatus1).getContainerId();
doReturn(Collections.singletonList(containerStatus1))
.when(statusEvent1).getContainers();

doReturn(completedContainerId2).when(containerStatus2).getContainerId();
doReturn(Collections.singletonList(containerStatus2))
.when(statusEvent2).getContainers();

Expand Down Expand Up @@ -1153,9 +1283,9 @@ public void testForHandlingDuplicatedCompltedContainers() {

RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent(null);

ContainerStatus containerStatus1 = mock(ContainerStatus.class);
ContainerStatus containerStatus1 = getMockContainerStatus(
completedContainerId1, null, ContainerState.COMPLETE);

doReturn(completedContainerId1).when(containerStatus1).getContainerId();
doReturn(Collections.singletonList(containerStatus1)).when(statusEvent1)
.getContainers();

Expand Down