Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,21 @@ public void allocate(ApplicationAttemptId appAttemptId,
((AbstractYarnScheduler)getScheduler())
.getApplicationAttempt(appAttemptId).pullUpdateContainerErrors());

response.setNumClusterNodes(getScheduler().getNumClusterNodes());
String label="";
try {
label = rmContext.getScheduler()
.getQueueInfo(app.getQueue(), false, false)
.getDefaultNodeLabelExpression();
} catch (Exception e){
//Queue may not exist since it could be auto-created in case of
// dynamic queues
}

if (label == null || label.equals("")) {
response.setNumClusterNodes(getScheduler().getNumClusterNodes());
} else {
response.setNumClusterNodes(rmContext.getNodeLabelManager().getActiveNMCountPerLabel(label));
}

// add collector address for this application
if (timelineServiceV2Enabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,46 @@

package org.apache.hadoop.yarn.server.resourcemanager;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;

import com.google.common.collect.ImmutableMap;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @zhangxiping1 and @brumi1024.
We need to use thirdparty library for com.google.common.*.

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tasanuma Thank you for your check. This PR has been closed. How should we deal with this situation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhangxiping1 Could you create another PR to fix it with the title of "YARN-11107. Addendum. When NodeLabel is enabled for a YARN cluster, AM blacklist program does not work properly."?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK , created [(https://github.com//pull/4175)]

import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.toSet;
import static org.junit.Assert.fail;

/**
Expand Down Expand Up @@ -208,4 +218,157 @@ public void testPriorityInAllocatedResponse() throws Exception {
Assert.assertEquals(appPriority2, response2.getApplicationPriority());
rm.stop();
}

@Test(timeout = 300000)
public void testGetNMNumInAllocatedResponseWithOutNodeLabel() throws Exception {
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();

// Register node1 node2 node3 node4
MockNM nm1 = rm.registerNode("host1:1234", 6 * GB);
MockNM nm2 = rm.registerNode("host2:1234", 6 * GB);
MockNM nm3 = rm.registerNode("host3:1234", 6 * GB);

// Submit an application
MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder
.createWithMemory(2048, rm)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data);

nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);

RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();

AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
List<ContainerId> release = new ArrayList<ContainerId>();
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
allocateRequest.setReleaseList(release);
allocateRequest.setAskList(ask);

AllocateResponse response1 = am1.allocate(allocateRequest);
Assert.assertEquals(3, response1.getNumClusterNodes());

rm.stop();
}

private Configuration getConfigurationWithQueueLabels(Configuration config) {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(config);

// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);

final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacity(A, 50);
conf.setMaximumCapacity(A, 100);
conf.setAccessibleNodeLabels(A, toSet("x"));
conf.setDefaultNodeLabelExpression(A, "x");
conf.setCapacityByLabel(A, "x", 100);

final String B = CapacitySchedulerConfiguration.ROOT + ".b";
conf.setCapacity(B, 50);
conf.setMaximumCapacity(B, 100);
conf.setAccessibleNodeLabels(B, toSet("y"));
conf.setDefaultNodeLabelExpression(B, "y");
conf.setCapacityByLabel(B, "y", 100);

return conf;
}

@Test(timeout = 300000)
public void testGetNMNumInAllocatedResponseWithNodeLabel() throws Exception {
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
@Override
protected RMNodeLabelsManager createNodeLabelManager() {
RMNodeLabelsManager mgr = new RMNodeLabelsManager();
mgr.init(getConfig());
return mgr;
}
};

// add node label "x","y" and set node to label mapping
Set<String> clusterNodeLabels = new HashSet<String>();
clusterNodeLabels.add("x");
clusterNodeLabels.add("y");

RMNodeLabelsManager nodeLabelManager = rm.getRMContext().getNodeLabelManager();
nodeLabelManager.
addToCluserNodeLabelsWithDefaultExclusivity(clusterNodeLabels);

//has 3 nodes with node label "x",1 node with node label "y"
nodeLabelManager
.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("host1", 1234), toSet("x")));
nodeLabelManager
.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("host2", 1234), toSet("x")));
nodeLabelManager
.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("host3", 1234), toSet("x")));
nodeLabelManager
.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("host4", 1234), toSet("y")));
rm.start();

// Register node1 node2 node3 node4
MockNM nm1 = rm.registerNode("host1:1234", 6 * GB);
MockNM nm2 = rm.registerNode("host2:1234", 6 * GB);
MockNM nm3 = rm.registerNode("host3:1234", 6 * GB);
MockNM nm4 = rm.registerNode("host4:1234", 6 * GB);

// submit an application to queue root.a expression as "x"
MockRMAppSubmissionData data1 = MockRMAppSubmissionData.Builder
.createWithMemory(2048, rm)
.withAppName("someApp1")
.withUser("someUser")
.withQueue("root.a")
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data1);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);

// submit an application to queue root.b expression as "y"
MockRMAppSubmissionData data2 = MockRMAppSubmissionData.Builder
.createWithMemory(2048, rm)
.withAppName("someApp2")
.withUser("someUser")
.withQueue("root.b")
.build();
RMApp app2 = MockRMAppSubmitter.submit(rm, data2);
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm4);

AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
List<ContainerId> release = new ArrayList<ContainerId>();
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
allocateRequest.setReleaseList(release);
allocateRequest.setAskList(ask);

AllocateResponse response1 = am1.allocate(allocateRequest);
AllocateResponse response2 = am2.allocate(allocateRequest);

CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
RMNode rmNode3 = rm.getRMContext().getRMNodes().get(nm3.getNodeId());
RMNode rmNode4 = rm.getRMContext().getRMNodes().get(nm4.getNodeId());

// Do node heartbeats many times
for (int i = 0; i < 3; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
cs.handle(new NodeUpdateSchedulerEvent(rmNode3));
cs.handle(new NodeUpdateSchedulerEvent(rmNode4));
}

//has 3 nodes with node label "x"
Assert.assertEquals(3, response1.getNumClusterNodes());

//has 1 node with node label "y"
Assert.assertEquals(1, response2.getNumClusterNodes());

rm.stop();
}
}