Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ public final class RouterMetrics {
private MutableGaugeInt numGetClusterMetricsFailedRetrieved;
@Metric("# of getClusterNodes failed to be retrieved")
private MutableGaugeInt numGetClusterNodesFailedRetrieved;
@Metric("# of getNodeToLabels failed to be retrieved")
private MutableGaugeInt numGetNodeToLabelsFailedRetrieved;
@Metric("# of getNodeToLabels failed to be retrieved")
private MutableGaugeInt numGetLabelsToNodesFailedRetrieved;
@Metric("# of getClusterNodeLabels failed to be retrieved")
private MutableGaugeInt numGetClusterNodeLabelsFailedRetrieved;

// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
Expand All @@ -78,6 +84,12 @@ public final class RouterMetrics {
private MutableRate totalSucceededGetClusterMetricsRetrieved;
@Metric("Total number of successful Retrieved getClusterNodes and latency(ms)")
private MutableRate totalSucceededGetClusterNodesRetrieved;
@Metric("Total number of successful Retrieved getNodeToLabels and latency(ms)")
private MutableRate totalSucceededGetNodeToLabelsRetrieved;
@Metric("Total number of successful Retrieved getNodeToLabels and latency(ms)")
private MutableRate totalSucceededGetLabelsToNodesRetrieved;
@Metric("Total number of successful Retrieved getClusterNodeLabels and latency(ms)")
private MutableRate totalSucceededGetClusterNodeLabelsRetrieved;

/**
* Provide quantile counters for all latencies.
Expand All @@ -90,6 +102,9 @@ public final class RouterMetrics {
private MutableQuantiles getApplicationAttemptReportLatency;
private MutableQuantiles getClusterMetricsLatency;
private MutableQuantiles getClusterNodesLatency;
private MutableQuantiles getNodeToLabelsLatency;
private MutableQuantiles getLabelToNodesLatency;
private MutableQuantiles getClusterNodeLabelsLatency;

private static volatile RouterMetrics INSTANCE = null;
private static MetricsRegistry registry;
Expand Down Expand Up @@ -120,6 +135,18 @@ private RouterMetrics() {
getClusterNodesLatency =
registry.newQuantiles("getClusterNodesLatency",
"latency of get cluster nodes", "ops", "latency", 10);

getNodeToLabelsLatency =
registry.newQuantiles("getNodeToLabelsLatency",
"latency of get node labels", "ops", "latency", 10);

getLabelToNodesLatency =
registry.newQuantiles("getLabelToNodesLatency",
"latency of get label nodes", "ops", "latency", 10);

getClusterNodeLabelsLatency =
registry.newQuantiles("getClusterNodeLabelsLatency",
"latency of get cluster node labels", "ops", "latency", 10);
}

public static RouterMetrics getMetrics() {
Expand Down Expand Up @@ -181,6 +208,21 @@ public long getNumSucceededGetClusterNodesRetrieved(){
return totalSucceededGetClusterNodesRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetNodeToLabelsRetrieved(){
return totalSucceededGetNodeToLabelsRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetLabelsToNodesRetrieved(){
return totalSucceededGetLabelsToNodesRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetClusterNodeLabelsRetrieved(){
return totalSucceededGetClusterNodeLabelsRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
Expand Down Expand Up @@ -221,6 +263,21 @@ public double getLatencySucceededGetClusterNodesRetrieved() {
return totalSucceededGetClusterNodesRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetNodeToLabelsRetrieved() {
return totalSucceededGetNodeToLabelsRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetLabelsToNodesRetrieved() {
return totalSucceededGetLabelsToNodesRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetClusterNodeLabelsRetrieved() {
return totalSucceededGetClusterNodeLabelsRetrieved.lastStat().mean();
}

@VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
Expand Down Expand Up @@ -261,6 +318,21 @@ public int getClusterNodesFailedRetrieved() {
return numGetClusterNodesFailedRetrieved.value();
}

@VisibleForTesting
public int getNodeToLabelsFailedRetrieved() {
return numGetNodeToLabelsFailedRetrieved.value();
}

@VisibleForTesting
public int getLabelsToNodesFailedRetrieved() {
return numGetLabelsToNodesFailedRetrieved.value();
}

@VisibleForTesting
public int getGetClusterNodeLabelsFailedRetrieved() {
return numGetClusterNodeLabelsFailedRetrieved.value();
}

public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
Expand Down Expand Up @@ -301,6 +373,21 @@ public void succeededGetClusterNodesRetrieved(long duration) {
getClusterNodesLatency.add(duration);
}

public void succeededGetNodeToLabelsRetrieved(long duration) {
totalSucceededGetNodeToLabelsRetrieved.add(duration);
getNodeToLabelsLatency.add(duration);
}

public void succeededGetLabelsToNodesRetrieved(long duration) {
totalSucceededGetLabelsToNodesRetrieved.add(duration);
getLabelToNodesLatency.add(duration);
}

public void succeededGetClusterNodeLabelsRetrieved(long duration) {
totalSucceededGetClusterNodeLabelsRetrieved.add(duration);
getClusterNodeLabelsLatency.add(duration);
}

public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
Expand Down Expand Up @@ -332,4 +419,16 @@ public void incrGetClusterMetricsFailedRetrieved() {
public void incrClusterNodesFailedRetrieved() {
numGetClusterNodesFailedRetrieved.incr();
}

public void incrNodeToLabelsFailedRetrieved() {
numGetNodeToLabelsFailedRetrieved.incr();
}

public void incrLabelsToNodesFailedRetrieved() {
numGetLabelsToNodesFailedRetrieved.incr();
}

public void incrClusterNodeLabelsFailedRetrieved() {
numGetClusterNodeLabelsFailedRetrieved.incr();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -870,19 +870,88 @@ public ReservationDeleteResponse deleteReservation(
@Override
public GetNodesToLabelsResponse getNodeToLabels(
GetNodesToLabelsRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null) {
routerMetrics.incrNodeToLabelsFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getNodesToLabels request.", null);
}
long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subClusters =
federationFacade.getSubClusters(true);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we do this pattern a bunch of time.
Maybe we can generalizer this with some lambda passed as a parameter?

@slfan1989 slfan1989 May 18, 2022

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to design a generic function like this?

  private <R> Collection<R> invokeAppClientProtocolMethod(
          Boolean filterInactiveSubClusters, ClientMethod request, Class<R> clazz)
                throws YarnException, RuntimeException {
    Map<SubClusterId, SubClusterInfo> subClusters =
             federationFacade.getSubClusters(filterInactiveSubClusters);
    return subClusters.keySet().stream().map(subClusterId -> {
      try {
        ApplicationClientProtocol protocol =
                getClientRMProxyForSubCluster(subClusterId);
        Method method = ApplicationClientProtocol.class
                .getMethod(request.getMethodName(), request.getTypes());
        return clazz.cast(method.invoke(protocol, request.getParams()));
      } catch (YarnException | NoSuchMethodException | IllegalAccessException |
               InvocationTargetException ex) {
        throw new RuntimeException(ex);
      }
    }).collect(Collectors.toList());
  }

Map<SubClusterId, GetNodesToLabelsResponse> clusterNodes = Maps.newHashMap();
for (SubClusterId subClusterId : subClusters.keySet()) {
ApplicationClientProtocol client;
try {
client = getClientRMProxyForSubCluster(subClusterId);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ApplicationClientProtocol client inside the try

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix it.

GetNodesToLabelsResponse response = client.getNodeToLabels(request);
clusterNodes.put(subClusterId, response);
} catch (Exception ex) {
routerMetrics.incrNodeToLabelsFailedRetrieved();
LOG.error("Unable to get node labels due to exception.", ex);
throw ex;
}
}
long stopTime = clock.getTime();
routerMetrics.succeededGetNodeToLabelsRetrieved(stopTime - startTime);
// Merge the NodesToLabelsResponse
return RouterYarnClientUtils.mergeNodesToLabelsResponse(clusterNodes.values());
}

@Override
public GetLabelsToNodesResponse getLabelsToNodes(
GetLabelsToNodesRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null) {
routerMetrics.incrLabelsToNodesFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getLabelsToNodes request.", null);
}
long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subClusters =
federationFacade.getSubClusters(true);
Map<SubClusterId, GetLabelsToNodesResponse> clusterNodes = Maps.newHashMap();
for (SubClusterId subClusterId : subClusters.keySet()) {
ApplicationClientProtocol client;
try {
client = getClientRMProxyForSubCluster(subClusterId);
GetLabelsToNodesResponse response = client.getLabelsToNodes(request);
clusterNodes.put(subClusterId, response);
} catch (Exception ex) {
routerMetrics.incrLabelsToNodesFailedRetrieved();
LOG.error("Unable to get label node due to exception.", ex);
throw ex;
}
}
long stopTime = clock.getTime();
routerMetrics.succeededGetLabelsToNodesRetrieved(stopTime - startTime);
// Merge the LabelsToNodesResponse
return RouterYarnClientUtils.mergeLabelsToNodes(clusterNodes.values());
}

@Override
public GetClusterNodeLabelsResponse getClusterNodeLabels(
GetClusterNodeLabelsRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null) {
routerMetrics.incrClusterNodeLabelsFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getClusterNodeLabels request.", null);
}
long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subClusters =
federationFacade.getSubClusters(true);
Map<SubClusterId, GetClusterNodeLabelsResponse> clusterNodes = Maps.newHashMap();
for (SubClusterId subClusterId : subClusters.keySet()) {
ApplicationClientProtocol client;
try {
client = getClientRMProxyForSubCluster(subClusterId);
GetClusterNodeLabelsResponse response = client.getClusterNodeLabels(request);
clusterNodes.put(subClusterId, response);
} catch (Exception ex) {
routerMetrics.incrClusterNodeLabelsFailedRetrieved();
LOG.error("Unable to get cluster nodeLabels due to exception.", ex);
throw ex;
}
}
long stopTime = clock.getTime();
routerMetrics.succeededGetClusterNodeLabelsRetrieved(stopTime - startTime);
// Merge the ClusterNodeLabelsResponse
return RouterYarnClientUtils.mergeClusterNodeLabelsResponse(clusterNodes.values());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,22 @@
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
import java.util.Set;
import java.util.HashSet;

import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources;
Expand Down Expand Up @@ -218,4 +225,75 @@ public static GetClusterNodesResponse mergeClusterNodesResponse(
clusterNodesResponse.setNodeReports(nodeReports);
return clusterNodesResponse;
}

/**
* Merges a list of GetNodesToLabelsResponse.
*
* @param responses a list of GetNodesToLabelsResponse to merge.
* @return the merged GetNodesToLabelsResponse.
*/
public static GetNodesToLabelsResponse mergeNodesToLabelsResponse(
Collection<GetNodesToLabelsResponse> responses) {
GetNodesToLabelsResponse nodesToLabelsResponse = Records.newRecord(
GetNodesToLabelsResponse.class);
Map<NodeId, Set<String>> nodesToLabelMap = new HashMap<>();
for (GetNodesToLabelsResponse response : responses) {
if (response != null && response.getNodeToLabels() != null) {
nodesToLabelMap.putAll(response.getNodeToLabels());
}
}
nodesToLabelsResponse.setNodeToLabels(nodesToLabelMap);
return nodesToLabelsResponse;
}

/**
* Merges a list of GetLabelsToNodesResponse.
*
* @param responses a list of GetLabelsToNodesResponse to merge.
* @return the merged GetLabelsToNodesResponse.
*/
public static GetLabelsToNodesResponse mergeLabelsToNodes(
Collection<GetLabelsToNodesResponse> responses){
GetLabelsToNodesResponse labelsToNodesResponse = Records.newRecord(
GetLabelsToNodesResponse.class);
Map<String, Set<NodeId>> labelsToNodesMap = new HashMap<>();
for (GetLabelsToNodesResponse response : responses) {
if (response != null && response.getLabelsToNodes() != null) {
Map<String, Set<NodeId>> clusterLabelsToNodesMap = response.getLabelsToNodes();
for (Map.Entry<String, Set<NodeId>> entry : clusterLabelsToNodesMap.entrySet()) {
String label = entry.getKey();
Set<NodeId> clusterNodes = entry.getValue();
if (labelsToNodesMap.containsKey(label)) {
Set<NodeId> allNodes = labelsToNodesMap.get(label);
allNodes.addAll(clusterNodes);
} else {
labelsToNodesMap.put(label, clusterNodes);
}
}
}
}
labelsToNodesResponse.setLabelsToNodes(labelsToNodesMap);
return labelsToNodesResponse;
}

/**
* Merges a list of GetClusterNodeLabelsResponse.
*
* @param responses a list of GetClusterNodeLabelsResponse to merge.
* @return the merged GetClusterNodeLabelsResponse.
*/
public static GetClusterNodeLabelsResponse mergeClusterNodeLabelsResponse(
Comment thread
goiri marked this conversation as resolved.
Collection<GetClusterNodeLabelsResponse> responses) {
GetClusterNodeLabelsResponse nodeLabelsResponse = Records.newRecord(
GetClusterNodeLabelsResponse.class);
Set<NodeLabel> nodeLabelsList = new HashSet<>();
for (GetClusterNodeLabelsResponse response : responses) {
if (response != null && response.getNodeLabelList() != null) {
nodeLabelsList.addAll(response.getNodeLabelList());
}
}
nodeLabelsResponse.setNodeLabelList(new ArrayList<>(nodeLabelsList));
return nodeLabelsResponse;
}
}

Loading