Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ public Resource getTotalCapability() {
return node.getTotalCapability();
}

@Override
public Resource getAllocatedContainerResource() {
return node.getAllocatedContainerResource();
}

@Override
public String getRackName() {
return node.getRackName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.util.resource.Resources;

/**
* Node managers information on available resources
Expand Down Expand Up @@ -104,6 +105,14 @@ public interface RMNode {
*/
public Resource getTotalCapability();

/**
* the total allocated resources to containers.
Copy link
Contributor

@bibinchundatt bibinchundatt Nov 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Start with upper case in comments.. This will include the sum of O+G containers queued + running + paused on the node.. Comment cane be more explanatory..

* @return the total allocated resources.
*/
default Resource getAllocatedContainerResource() {
return Resources.none();
}

/**
* If the total available resources has been updated.
* @return If the capability has been updated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
/* Snapshot of total resources before receiving decommissioning command */
private volatile Resource originalTotalCapability;
private volatile Resource totalCapability;
private volatile Resource allocatedContainerResource =
Resource.newInstance(Resources.none());
private volatile boolean updatedCapability = false;
private final Node node;

Expand Down Expand Up @@ -464,6 +466,11 @@ public Resource getTotalCapability() {
return this.totalCapability;
}

@Override
public Resource getAllocatedContainerResource() {
return this.allocatedContainerResource;
}

@Override
public boolean isUpdatedCapability() {
return this.updatedCapability;
Expand Down Expand Up @@ -1554,6 +1561,8 @@ private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
List<Map.Entry<ApplicationId, ContainerStatus>> needUpdateContainers =
new ArrayList<Map.Entry<ApplicationId, ContainerStatus>>();
int numRemoteRunningContainers = 0;
final Resource allocatedResource = Resource.newInstance(Resources.none());

for (ContainerStatus remoteContainer : containerStatuses) {
ContainerId containerId = remoteContainer.getContainerId();

Expand Down Expand Up @@ -1622,8 +1631,16 @@ private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
containerAllocationExpirer
.unregister(new AllocationExpirationInfo(containerId));
}

if ((remoteContainer.getState() == ContainerState.RUNNING ||
remoteContainer.getState() == ContainerState.NEW) &&
remoteContainer.getCapability() != null) {
Resources.addTo(allocatedResource, remoteContainer.getCapability());
}
}

allocatedContainerResource = allocatedResource;

List<ContainerStatus> lostContainers =
findLostContainers(numRemoteRunningContainers, containerStatuses);
for (ContainerStatus remoteContainer : lostContainers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
Expand Down Expand Up @@ -79,6 +80,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -358,6 +360,94 @@ public void testContainerUpdate() throws InterruptedException{
.getContainerId());
}

/**
* Tests that allocated container resources are counted correctly in
* {@link org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode}
* upon a node update. Resources should be counted for both GUARANTEED
* and OPPORTUNISTIC containers.
*/
@Test (timeout = 5000)
public void testAllocatedContainerUpdate() {
NodeStatus mockNodeStatus = createMockNodeStatus();
//Start the node
node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus));

NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);

ApplicationId app0 = BuilderUtils.newApplicationId(0, 0);
ContainerId newContainerId = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(app0, 0), 0);
ContainerId runningContainerId = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(app0, 0), 1);
ContainerId newOppContainerId = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(app0, 0), 2);
ContainerId runningOppContainerId = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(app0, 0), 3);

rmContext.getRMApps().put(app0, Mockito.mock(RMApp.class));

RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent(null);
ContainerStatus newContainerStatusFromNode = mock(ContainerStatus.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create this mock in a static method to make the test easier to read?

ContainerStatus runningContainerStatusFromNode =
mock(ContainerStatus.class);

final Resource newContainerCapability =
Resource.newInstance(100, 1);
final Resource runningContainerCapability =
Resource.newInstance(200, 2);
doReturn(newContainerId).when(newContainerStatusFromNode)
.getContainerId();
doReturn(ContainerState.NEW).when(newContainerStatusFromNode)
.getState();
doReturn(newContainerCapability).when(newContainerStatusFromNode)
.getCapability();
doReturn(runningContainerId).when(runningContainerStatusFromNode)
.getContainerId();
doReturn(ContainerState.RUNNING).when(runningContainerStatusFromNode)
.getState();
doReturn(runningContainerCapability).when(runningContainerStatusFromNode)
.getCapability();
doReturn(Arrays.asList(
newContainerStatusFromNode, runningContainerStatusFromNode))
.when(statusEventFromNode1).getContainers();
node.handle(statusEventFromNode1);
Assert.assertTrue(Resources.equals(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource has equals which actual is called by this, can't we just use assertEquals()?

node.getAllocatedContainerResource(),
Resource.newInstance(300, 3)));

RMNodeStatusEvent statusEventFromNode2 = getMockRMNodeStatusEvent(null);
ContainerStatus newOppContainerStatusFromNode = mock(ContainerStatus.class);
ContainerStatus runningOppContainerStatusFromNode =
mock(ContainerStatus.class);
doReturn(newOppContainerId).when(newOppContainerStatusFromNode)
.getContainerId();
doReturn(ContainerState.NEW).when(newOppContainerStatusFromNode)
.getState();
doReturn(newContainerCapability).when(newOppContainerStatusFromNode)
.getCapability();
doReturn(ExecutionType.OPPORTUNISTIC)
.when(newOppContainerStatusFromNode)
.getExecutionType();
doReturn(runningOppContainerId).when(runningOppContainerStatusFromNode)
.getContainerId();
doReturn(ContainerState.RUNNING).when(runningOppContainerStatusFromNode)
.getState();
doReturn(runningContainerCapability).when(runningOppContainerStatusFromNode)
.getCapability();
doReturn(ExecutionType.OPPORTUNISTIC)
.when(runningOppContainerStatusFromNode)
.getExecutionType();
doReturn(Arrays.asList(
newContainerStatusFromNode, runningContainerStatusFromNode,
newOppContainerStatusFromNode, runningOppContainerStatusFromNode))
.when(statusEventFromNode2).getContainers();

node.handle(statusEventFromNode2);
Assert.assertTrue(Resources.equals(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertEquals()

node.getAllocatedContainerResource(),
Resource.newInstance(600, 6)));
}

@Test (timeout = 5000)
public void testStatusChange(){
NodeStatus mockNodeStatus = createMockNodeStatus();
Expand Down