Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ public final class RouterMetrics {
private MutableGaugeInt numGetClusterMetricsFailedRetrieved;
@Metric("# of getClusterNodes failed to be retrieved")
private MutableGaugeInt numGetClusterNodesFailedRetrieved;
@Metric("# of getNodeToLabels failed to be retrieved")
private MutableGaugeInt numGetNodeToLabelsFailedRetrieved;
@Metric("# of getNodeToLabels failed to be retrieved")
private MutableGaugeInt numGetLabelsToNodesFailedRetrieved;
@Metric("# of getClusterNodeLabels failed to be retrieved")
private MutableGaugeInt numGetClusterNodeLabelsFailedRetrieved;

// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
Expand All @@ -78,6 +84,12 @@ public final class RouterMetrics {
private MutableRate totalSucceededGetClusterMetricsRetrieved;
@Metric("Total number of successful Retrieved getClusterNodes and latency(ms)")
private MutableRate totalSucceededGetClusterNodesRetrieved;
@Metric("Total number of successful Retrieved getNodeToLabels and latency(ms)")
private MutableRate totalSucceededGetNodeToLabelsRetrieved;
@Metric("Total number of successful Retrieved getNodeToLabels and latency(ms)")
private MutableRate totalSucceededGetLabelsToNodesRetrieved;
@Metric("Total number of successful Retrieved getClusterNodeLabels and latency(ms)")
private MutableRate totalSucceededGetClusterNodeLabelsRetrieved;

/**
* Provide quantile counters for all latencies.
Expand All @@ -90,6 +102,9 @@ public final class RouterMetrics {
private MutableQuantiles getApplicationAttemptReportLatency;
private MutableQuantiles getClusterMetricsLatency;
private MutableQuantiles getClusterNodesLatency;
private MutableQuantiles getNodeToLabelsLatency;
private MutableQuantiles getLabelToNodesLatency;
private MutableQuantiles getClusterNodeLabelsLatency;

private static volatile RouterMetrics INSTANCE = null;
private static MetricsRegistry registry;
Expand Down Expand Up @@ -120,6 +135,18 @@ private RouterMetrics() {
getClusterNodesLatency =
registry.newQuantiles("getClusterNodesLatency",
"latency of get cluster nodes", "ops", "latency", 10);

getNodeToLabelsLatency =
registry.newQuantiles("getNodeToLabelsLatency",
"latency of get node labels", "ops", "latency", 10);

getLabelToNodesLatency =
registry.newQuantiles("getLabelToNodesLatency",
"latency of get label nodes", "ops", "latency", 10);

getClusterNodeLabelsLatency =
registry.newQuantiles("getClusterNodeLabelsLatency",
"latency of get cluster node labels", "ops", "latency", 10);
}

public static RouterMetrics getMetrics() {
Expand Down Expand Up @@ -181,6 +208,21 @@ public long getNumSucceededGetClusterNodesRetrieved(){
return totalSucceededGetClusterNodesRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetNodeToLabelsRetrieved(){
return totalSucceededGetNodeToLabelsRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetLabelsToNodesRetrieved(){
return totalSucceededGetLabelsToNodesRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetClusterNodeLabelsRetrieved(){
return totalSucceededGetClusterNodeLabelsRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
Expand Down Expand Up @@ -221,6 +263,21 @@ public double getLatencySucceededGetClusterNodesRetrieved() {
return totalSucceededGetClusterNodesRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetNodeToLabelsRetrieved() {
return totalSucceededGetNodeToLabelsRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetLabelsToNodesRetrieved() {
return totalSucceededGetLabelsToNodesRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetClusterNodeLabelsRetrieved() {
return totalSucceededGetClusterNodeLabelsRetrieved.lastStat().mean();
}

@VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
Expand Down Expand Up @@ -261,6 +318,21 @@ public int getClusterNodesFailedRetrieved() {
return numGetClusterNodesFailedRetrieved.value();
}

@VisibleForTesting
public int getNodeToLabelsFailedRetrieved() {
return numGetNodeToLabelsFailedRetrieved.value();
}

@VisibleForTesting
public int getLabelsToNodesFailedRetrieved() {
return numGetLabelsToNodesFailedRetrieved.value();
}

@VisibleForTesting
public int getGetClusterNodeLabelsFailedRetrieved() {
return numGetClusterNodeLabelsFailedRetrieved.value();
}

public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
Expand Down Expand Up @@ -301,6 +373,21 @@ public void succeededGetClusterNodesRetrieved(long duration) {
getClusterNodesLatency.add(duration);
}

public void succeededGetNodeToLabelsRetrieved(long duration) {
totalSucceededGetNodeToLabelsRetrieved.add(duration);
getNodeToLabelsLatency.add(duration);
}

public void succeededGetLabelsToNodesRetrieved(long duration) {
totalSucceededGetLabelsToNodesRetrieved.add(duration);
getLabelToNodesLatency.add(duration);
}

public void succeededGetClusterNodeLabelsRetrieved(long duration) {
totalSucceededGetClusterNodeLabelsRetrieved.add(duration);
getClusterNodeLabelsLatency.add(duration);
}

public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
Expand Down Expand Up @@ -332,4 +419,16 @@ public void incrGetClusterMetricsFailedRetrieved() {
public void incrClusterNodesFailedRetrieved() {
numGetClusterNodesFailedRetrieved.incr();
}

public void incrNodeToLabelsFailedRetrieved() {
numGetNodeToLabelsFailedRetrieved.incr();
}

public void incrLabelsToNodesFailedRetrieved() {
numGetLabelsToNodesFailedRetrieved.incr();
}

public void incrClusterNodeLabelsFailedRetrieved() {
numGetClusterNodeLabelsFailedRetrieved.incr();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -38,6 +39,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
Expand Down Expand Up @@ -867,22 +869,97 @@ public ReservationDeleteResponse deleteReservation(
throw new NotImplementedException("Code is not implemented");
}

private <R> Collection<R> invokeAppClientProtocolMethod(
Boolean filterInactiveSubClusters, ClientMethod request, Class<R> clazz)
throws YarnException, RuntimeException {
Map<SubClusterId, SubClusterInfo> subClusters =
federationFacade.getSubClusters(filterInactiveSubClusters);
return subClusters.keySet().stream().map(subClusterId -> {
try {
ApplicationClientProtocol protocol = getClientRMProxyForSubCluster(subClusterId);
Method method = ApplicationClientProtocol.class.
getMethod(request.getMethodName(), request.getTypes());
return clazz.cast(method.invoke(protocol, request.getParams()));
} catch (YarnException | NoSuchMethodException |
IllegalAccessException | InvocationTargetException ex) {
throw new RuntimeException(ex);
}
}).collect(Collectors.toList());
}

@Override
public GetNodesToLabelsResponse getNodeToLabels(
GetNodesToLabelsRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null) {
routerMetrics.incrNodeToLabelsFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getNodesToLabels request.", null);
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getNodeToLabels",
new Class[] {GetNodesToLabelsRequest.class}, new Object[] {request});
Collection<GetNodesToLabelsResponse> clusterNodes;
try {
clusterNodes = invokeAppClientProtocolMethod(true, remoteMethod,
GetNodesToLabelsResponse.class);
} catch (Exception ex) {
routerMetrics.incrNodeToLabelsFailedRetrieved();
LOG.error("Unable to get label node due to exception.", ex);
throw ex;
}
long stopTime = clock.getTime();
routerMetrics.succeededGetNodeToLabelsRetrieved(stopTime - startTime);
// Merge the NodesToLabelsResponse
return RouterYarnClientUtils.mergeNodesToLabelsResponse(clusterNodes);
}

@Override
public GetLabelsToNodesResponse getLabelsToNodes(
GetLabelsToNodesRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null) {
routerMetrics.incrLabelsToNodesFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getLabelsToNodes request.", null);
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getLabelsToNodes",
new Class[] {GetLabelsToNodesRequest.class}, new Object[] {request});
Collection<GetLabelsToNodesResponse> labelNodes;
try {
labelNodes = invokeAppClientProtocolMethod(true, remoteMethod,
GetLabelsToNodesResponse.class);
} catch (Exception ex) {
routerMetrics.incrLabelsToNodesFailedRetrieved();
LOG.error("Unable to get label node due to exception.", ex);
throw ex;
}
long stopTime = clock.getTime();
routerMetrics.succeededGetLabelsToNodesRetrieved(stopTime - startTime);
// Merge the LabelsToNodesResponse
return RouterYarnClientUtils.mergeLabelsToNodes(labelNodes);
}

@Override
public GetClusterNodeLabelsResponse getClusterNodeLabels(
GetClusterNodeLabelsRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null) {
routerMetrics.incrClusterNodeLabelsFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getClusterNodeLabels request.", null);
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getClusterNodeLabels",
new Class[] {GetClusterNodeLabelsRequest.class}, new Object[] {request});
Collection<GetClusterNodeLabelsResponse> nodeLabels;
try {
nodeLabels = invokeAppClientProtocolMethod(true, remoteMethod,
GetClusterNodeLabelsResponse.class);
} catch (Exception ex) {
routerMetrics.incrClusterNodeLabelsFailedRetrieved();
LOG.error("Unable to get cluster nodeLabels due to exception.", ex);
throw ex;
}
long stopTime = clock.getTime();
routerMetrics.succeededGetClusterNodeLabelsRetrieved(stopTime - startTime);
// Merge the ClusterNodeLabelsResponse
return RouterYarnClientUtils.mergeClusterNodeLabelsResponse(nodeLabels);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,22 @@
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
import java.util.Set;
import java.util.HashSet;

import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources;
Expand Down Expand Up @@ -218,4 +225,75 @@ public static GetClusterNodesResponse mergeClusterNodesResponse(
clusterNodesResponse.setNodeReports(nodeReports);
return clusterNodesResponse;
}

/**
* Merges a list of GetNodesToLabelsResponse.
*
* @param responses a list of GetNodesToLabelsResponse to merge.
* @return the merged GetNodesToLabelsResponse.
*/
public static GetNodesToLabelsResponse mergeNodesToLabelsResponse(
Collection<GetNodesToLabelsResponse> responses) {
GetNodesToLabelsResponse nodesToLabelsResponse = Records.newRecord(
GetNodesToLabelsResponse.class);
Map<NodeId, Set<String>> nodesToLabelMap = new HashMap<>();
for (GetNodesToLabelsResponse response : responses) {
if (response != null && response.getNodeToLabels() != null) {
nodesToLabelMap.putAll(response.getNodeToLabels());
}
}
nodesToLabelsResponse.setNodeToLabels(nodesToLabelMap);
return nodesToLabelsResponse;
}

/**
* Merges a list of GetLabelsToNodesResponse.
*
* @param responses a list of GetLabelsToNodesResponse to merge.
* @return the merged GetLabelsToNodesResponse.
*/
public static GetLabelsToNodesResponse mergeLabelsToNodes(
Collection<GetLabelsToNodesResponse> responses){
GetLabelsToNodesResponse labelsToNodesResponse = Records.newRecord(
GetLabelsToNodesResponse.class);
Map<String, Set<NodeId>> labelsToNodesMap = new HashMap<>();
for (GetLabelsToNodesResponse response : responses) {
if (response != null && response.getLabelsToNodes() != null) {
Map<String, Set<NodeId>> clusterLabelsToNodesMap = response.getLabelsToNodes();
for (Map.Entry<String, Set<NodeId>> entry : clusterLabelsToNodesMap.entrySet()) {
String label = entry.getKey();
Set<NodeId> clusterNodes = entry.getValue();
if (labelsToNodesMap.containsKey(label)) {
Set<NodeId> allNodes = labelsToNodesMap.get(label);
allNodes.addAll(clusterNodes);
} else {
labelsToNodesMap.put(label, clusterNodes);
}
}
}
}
labelsToNodesResponse.setLabelsToNodes(labelsToNodesMap);
return labelsToNodesResponse;
}

/**
* Merges a list of GetClusterNodeLabelsResponse.
*
* @param responses a list of GetClusterNodeLabelsResponse to merge.
* @return the merged GetClusterNodeLabelsResponse.
*/
public static GetClusterNodeLabelsResponse mergeClusterNodeLabelsResponse(
Collection<GetClusterNodeLabelsResponse> responses) {
GetClusterNodeLabelsResponse nodeLabelsResponse = Records.newRecord(
GetClusterNodeLabelsResponse.class);
Set<NodeLabel> nodeLabelsList = new HashSet<>();
for (GetClusterNodeLabelsResponse response : responses) {
if (response != null && response.getNodeLabelList() != null) {
nodeLabelsList.addAll(response.getNodeLabelList());
}
}
nodeLabelsResponse.setNodeLabelList(new ArrayList<>(nodeLabelsList));
return nodeLabelsResponse;
}
}

Loading