Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,20 @@ public static YarnClusterMetrics newInstance(int numNodeManagers) {
@Unstable
public abstract void setNumNodeManagers(int numNodeManagers);

/**
* Get the number of <code>DecommissioningNodeManager</code>s in the cluster.
*
* @return number of <code>DecommissioningNodeManager</code>s in the cluster
*/
@Public
@Unstable
public abstract int getNumDecommissioningNodeManagers();

@Private
@Unstable
public abstract void setNumDecommissioningNodeManagers(
int numDecommissioningNodeManagers);

/**
* Get the number of <code>DecommissionedNodeManager</code>s in the cluster.
*
Expand Down Expand Up @@ -119,4 +133,16 @@ public abstract void setNumDecommissionedNodeManagers(
@Unstable
public abstract void setNumRebootedNodeManagers(int numRebootedNodeManagers);

/**
* Get the number of <code>ShutdownNodeManager</code>s in the cluster.
*
* @return number of <code>ShutdownNodeManager</code>s in the cluster
*/
@Public
@Unstable
public abstract int getNumShutdownNodeManagers();

@Private
@Unstable
public abstract void setNumShutdownNodeManagers(int numShutdownNodeManagers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,8 @@ message YarnClusterMetricsProto {
optional int32 num_lost_nms = 4;
optional int32 num_unhealthy_nms = 5;
optional int32 num_rebooted_nms = 6;
optional int32 num_decommissioning_nms = 7;
optional int32 num_shutdown_nms = 8;
}

enum QueueStateProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,11 @@ private static class NodesInformation {
int totalNodes;
int runningNodes;
int unhealthyNodes;
int decommissioningNodes;
int decommissionedNodes;
int lostNodes;
int rebootedNodes;
int shutdownNodes;
}

private static class QueueMetrics {
Expand Down Expand Up @@ -696,13 +698,16 @@ protected NodesInformation getNodesInfo() {
return nodeInfo;
}

nodeInfo.decommissioningNodes =
yarnClusterMetrics.getNumDecommissioningNodeManagers();
nodeInfo.decommissionedNodes =
yarnClusterMetrics.getNumDecommissionedNodeManagers();
nodeInfo.totalNodes = yarnClusterMetrics.getNumNodeManagers();
nodeInfo.runningNodes = yarnClusterMetrics.getNumActiveNodeManagers();
nodeInfo.lostNodes = yarnClusterMetrics.getNumLostNodeManagers();
nodeInfo.unhealthyNodes = yarnClusterMetrics.getNumUnhealthyNodeManagers();
nodeInfo.rebootedNodes = yarnClusterMetrics.getNumRebootedNodeManagers();
nodeInfo.shutdownNodes = yarnClusterMetrics.getNumShutdownNodeManagers();
return nodeInfo;
}

Expand Down Expand Up @@ -880,11 +885,11 @@ String getHeader(QueueMetrics queueMetrics, NodesInformation nodes) {
ret.append(CLEAR_LINE)
.append(limitLineLength(String.format(
"NodeManager(s)"
+ ": %d total, %d active, %d unhealthy, %d decommissioned,"
+ " %d lost, %d rebooted%n",
+ ": %d total, %d active, %d unhealthy, %d decommissioning,"
+ " %d decommissioned, %d lost, %d rebooted, %d shutdown%n",
nodes.totalNodes, nodes.runningNodes, nodes.unhealthyNodes,
nodes.decommissionedNodes, nodes.lostNodes,
nodes.rebootedNodes), terminalWidth, true));
nodes.decommissioningNodes, nodes.decommissionedNodes, nodes.lostNodes,
nodes.rebootedNodes, nodes.shutdownNodes), terminalWidth, true));

ret.append(CLEAR_LINE)
.append(limitLineLength(String.format(
Expand Down Expand Up @@ -1039,7 +1044,8 @@ protected void showFieldsScreen() {
}
}

protected void showTopScreen() {
@VisibleForTesting
void showTopScreen() {
List<ApplicationInformation> appsInfo = new ArrayList<>();
List<ApplicationReport> apps;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@

package org.apache.hadoop.yarn.client.cli;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -27,9 +32,13 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

Expand All @@ -47,6 +56,9 @@ public class TestTopCLI {

private static Map<String, String> savedStaticResolution = new HashMap<>();

private PrintStream stdout;
private PrintStream stderr;

@BeforeClass
public static void initializeDummyHostnameResolution() throws Exception {
String previousIpAddress;
Expand All @@ -68,6 +80,18 @@ public static void restoreDummyHostnameResolution() throws Exception {
}
}

@Before
public void before() {
this.stdout = System.out;
this.stderr = System.err;
}

@After
public void after() {
System.setOut(this.stdout);
System.setErr(this.stderr);
}

@Test
public void testHAClusterInfoURL() throws IOException, InterruptedException {
TopCLI topcli = new TopCLI();
Expand Down Expand Up @@ -103,4 +127,44 @@ public void testHAClusterInfoURL() throws IOException, InterruptedException {
Assert.assertEquals("https", clusterUrl.getProtocol());
Assert.assertEquals(rm1Address, clusterUrl.getAuthority());
}
}

@Test
public void testHeaderNodeManagers() throws Exception {
YarnClusterMetrics ymetrics = mock(YarnClusterMetrics.class);
when(ymetrics.getNumNodeManagers()).thenReturn(0);
when(ymetrics.getNumDecommissioningNodeManagers()).thenReturn(1);
when(ymetrics.getNumDecommissionedNodeManagers()).thenReturn(2);
when(ymetrics.getNumActiveNodeManagers()).thenReturn(3);
when(ymetrics.getNumLostNodeManagers()).thenReturn(4);
when(ymetrics.getNumUnhealthyNodeManagers()).thenReturn(5);
when(ymetrics.getNumRebootedNodeManagers()).thenReturn(6);
when(ymetrics.getNumShutdownNodeManagers()).thenReturn(7);

YarnClient client = mock(YarnClient.class);
when(client.getYarnClusterMetrics()).thenReturn(ymetrics);

TopCLI topcli = new TopCLI() {
@Override protected void createAndStartYarnClient() {
}
};
topcli.setClient(client);
topcli.terminalWidth = 200;

String actual;
try (ByteArrayOutputStream outStream = new ByteArrayOutputStream();
PrintStream out = new PrintStream(outStream)) {
System.setOut(out);
System.setErr(out);
topcli.showTopScreen();
out.flush();
actual = outStream.toString("UTF-8");
}

String expected = "NodeManager(s)"
+ ": 0 total, 3 active, 5 unhealthy, 1 decommissioning,"
+ " 2 decommissioned, 4 lost, 6 rebooted, 7 shutdown";
Assert.assertTrue(
String.format("Expected output to contain [%s], actual output was [%s].", expected, actual),
actual.contains(expected));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,22 @@ public void setNumNodeManagers(int numNodeManagers) {
builder.setNumNodeManagers((numNodeManagers));
}

@Override
public int getNumDecommissioningNodeManagers() {
YarnClusterMetricsProtoOrBuilder p = viaProto ? proto : builder;
if (p.hasNumDecommissioningNms()) {
return (p.getNumDecommissioningNms());
}
return 0;
}

@Override
public void
setNumDecommissioningNodeManagers(int numDecommissioningNodeManagers) {
maybeInitBuilder();
builder.setNumDecommissioningNms(numDecommissioningNodeManagers);
}

@Override
public int getNumDecommissionedNodeManagers() {
YarnClusterMetricsProtoOrBuilder p = viaProto ? proto : builder;
Expand Down Expand Up @@ -165,4 +181,19 @@ public void setNumRebootedNodeManagers(int numRebootedNodeManagers) {
maybeInitBuilder();
builder.setNumRebootedNms((numRebootedNodeManagers));
}
}

@Override
public int getNumShutdownNodeManagers() {
YarnClusterMetricsProtoOrBuilder p = viaProto ? proto : builder;
if (p.hasNumShutdownNms()) {
return (p.getNumShutdownNms());
}
return 0;
}

@Override
public void setNumShutdownNodeManagers(int numShutdownNodeManagers) {
maybeInitBuilder();
builder.setNumShutdownNms(numShutdownNodeManagers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -860,12 +860,14 @@ public GetClusterMetricsResponse getClusterMetrics(
.newRecordInstance(YarnClusterMetrics.class);
ymetrics.setNumNodeManagers(this.rmContext.getRMNodes().size());
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
ymetrics.setNumDecommissioningNodeManagers(clusterMetrics.getNumDecommissioningNMs());
ymetrics.setNumDecommissionedNodeManagers(clusterMetrics
.getNumDecommisionedNMs());
ymetrics.setNumActiveNodeManagers(clusterMetrics.getNumActiveNMs());
ymetrics.setNumLostNodeManagers(clusterMetrics.getNumLostNMs());
ymetrics.setNumUnhealthyNodeManagers(clusterMetrics.getUnhealthyNMs());
ymetrics.setNumRebootedNodeManagers(clusterMetrics.getNumRebootedNMs());
ymetrics.setNumShutdownNodeManagers(clusterMetrics.getNumShutdownNMs());
response.setClusterMetrics(ymetrics);
return response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.MockApps;
Expand All @@ -75,6 +76,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
Expand Down Expand Up @@ -139,6 +141,7 @@
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
Expand Down Expand Up @@ -2809,10 +2812,63 @@ protected ClientRMService createClientRMService() {
rm.close();
}

@Test
public void testGetClusterMetrics() throws Exception {
MockRM rm = new MockRM() {
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler,
this.rmAppManager, this.applicationACLsManager, this.queueACLsManager,
this.getRMContext().getRMDelegationTokenSecretManager());
};
};
rm.start();

ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
clusterMetrics.incrDecommissioningNMs();
repeat(2, clusterMetrics::incrDecommisionedNMs);
repeat(3, clusterMetrics::incrNumActiveNodes);
repeat(4, clusterMetrics::incrNumLostNMs);
repeat(5, clusterMetrics::incrNumUnhealthyNMs);
repeat(6, clusterMetrics::incrNumRebootedNMs);
repeat(7, clusterMetrics::incrNumShutdownNMs);

// Create a client.
Configuration conf = new Configuration();
YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress);
ApplicationClientProtocol client =
(ApplicationClientProtocol) rpc
.getProxy(ApplicationClientProtocol.class, rmAddress, conf);

YarnClusterMetrics ymetrics = client.getClusterMetrics(
GetClusterMetricsRequest.newInstance()).getClusterMetrics();

Assert.assertEquals(0, ymetrics.getNumNodeManagers());
Assert.assertEquals(1, ymetrics.getNumDecommissioningNodeManagers());
Assert.assertEquals(2, ymetrics.getNumDecommissionedNodeManagers());
Assert.assertEquals(3, ymetrics.getNumActiveNodeManagers());
Assert.assertEquals(4, ymetrics.getNumLostNodeManagers());
Assert.assertEquals(5, ymetrics.getNumUnhealthyNodeManagers());
Assert.assertEquals(6, ymetrics.getNumRebootedNodeManagers());
Assert.assertEquals(7, ymetrics.getNumShutdownNodeManagers());

rpc.stopProxy(client, conf);
rm.close();
}

@After
public void tearDown(){
if (resourceTypesFile != null && resourceTypesFile.exists()) {
resourceTypesFile.delete();
}
ClusterMetrics.destroy();
DefaultMetricsSystem.shutdown();
}

private static void repeat(int n, Runnable r) {
for (int i = 0; i < n; ++i) {
r.run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ public static GetClusterMetricsResponse merge(
tmp.getNumNodeManagers() + metrics.getNumNodeManagers());
tmp.setNumActiveNodeManagers(
tmp.getNumActiveNodeManagers() + metrics.getNumActiveNodeManagers());
tmp.setNumDecommissioningNodeManagers(
tmp.getNumDecommissioningNodeManagers() + metrics
.getNumDecommissioningNodeManagers());
tmp.setNumDecommissionedNodeManagers(
tmp.getNumDecommissionedNodeManagers() + metrics
.getNumDecommissionedNodeManagers());
Expand All @@ -90,6 +93,9 @@ public static GetClusterMetricsResponse merge(
tmp.setNumUnhealthyNodeManagers(
tmp.getNumUnhealthyNodeManagers() + metrics
.getNumUnhealthyNodeManagers());
tmp.setNumShutdownNodeManagers(
tmp.getNumShutdownNodeManagers() + metrics
.getNumShutdownNodeManagers());
}
return GetClusterMetricsResponse.newInstance(tmp);
}
Expand Down Expand Up @@ -526,4 +532,3 @@ public static GetNodesToAttributesResponse mergeNodesToAttributesResponse(
return GetNodesToAttributesResponse.newInstance(attributesMap);
}
}

Loading