Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public final class RouterMetrics {
private MutableGaugeInt numMultipleAppsFailedRetrieved;
@Metric("# of applicationAttempt reports failed to be retrieved")
private MutableGaugeInt numAppAttemptsFailedRetrieved;
@Metric("# of getClusterMetrics failed to be retrieved")
private MutableGaugeInt numGetClusterMetricsFailedRetrieved;

// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
Expand All @@ -69,6 +71,9 @@ public final class RouterMetrics {
@Metric("Total number of successful Retrieved " +
"appAttempt reports and latency(ms)")
private MutableRate totalSucceededAppAttemptsRetrieved;
@Metric("Total number of successful Retrieved getClusterMetrics and "
+ "latency(ms)")
private MutableRate totalSucceededGetClusterMetricsRetrieved;


/**
Expand All @@ -80,6 +85,7 @@ public final class RouterMetrics {
private MutableQuantiles getApplicationReportLatency;
private MutableQuantiles getApplicationsReportLatency;
private MutableQuantiles getApplicationAttemptReportLatency;
private MutableQuantiles getClusterMetricsLatency;

private static volatile RouterMetrics INSTANCE = null;
private static MetricsRegistry registry;
Expand All @@ -103,6 +109,9 @@ private RouterMetrics() {
registry.newQuantiles("getApplicationAttemptReportLatency",
"latency of get applicationattempt " +
"report", "ops", "latency", 10);
getClusterMetricsLatency =
registry.newQuantiles("getClusterMetricsLatency",
"latency of get cluster metrics", "ops", "latency", 10);
}

public static RouterMetrics getMetrics() {
Expand Down Expand Up @@ -154,6 +163,11 @@ public long getNumSucceededMultipleAppsRetrieved() {
return totalSucceededMultipleAppsRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetClusterMetricsRetrieved(){
return totalSucceededGetClusterMetricsRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
Expand Down Expand Up @@ -184,6 +198,11 @@ public double getLatencySucceededMultipleGetAppReport() {
return totalSucceededMultipleAppsRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetClusterMetricsRetrieved() {
return totalSucceededGetClusterMetricsRetrieved.lastStat().mean();
}

@VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
Expand Down Expand Up @@ -214,6 +233,11 @@ public int getMultipleAppsFailedRetrieved() {
return numMultipleAppsFailedRetrieved.value();
}

@VisibleForTesting
public int getClusterMetricsFailedRetrieved() {
return numGetClusterMetricsFailedRetrieved.value();
}

public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
Expand Down Expand Up @@ -244,6 +268,11 @@ public void succeededAppAttemptsRetrieved(long duration) {
getApplicationAttemptReportLatency.add(duration);
}

public void succeededGetClusterMetricsRetrieved(long duration) {
totalSucceededGetClusterMetricsRetrieved.add(duration);
getClusterMetricsLatency.add(duration);
}

public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
Expand All @@ -268,4 +297,8 @@ public void incrAppAttemptsFailedRetrieved() {
numAppAttemptsFailedRetrieved.incr();
}

public void incrGetClusterMetricsFailedRetrieved() {
numGetClusterMetricsFailedRetrieved.incr();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -628,18 +628,29 @@ public GetApplicationReportResponse getApplicationReport(
public GetApplicationsResponse getApplications(GetApplicationsRequest request)
throws YarnException, IOException {
if (request == null) {
routerMetrics.incrMultipleAppsFailedRetrieved();
RouterServerUtil.logAndThrowException(
"Missing getApplications request.",
null);
}
long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subclusters =
federationFacade.getSubClusters(true);
ClientMethod remoteMethod = new ClientMethod("getApplications",
new Class[] {GetApplicationsRequest.class}, new Object[] {request});
Map<SubClusterId, GetApplicationsResponse> applications =
invokeConcurrent(subclusters.keySet(), remoteMethod,
GetApplicationsResponse.class);
Map<SubClusterId, GetApplicationsResponse> applications;

try {
applications = invokeConcurrent(subclusters.keySet(), remoteMethod,
GetApplicationsResponse.class);

} catch (Exception ex) {
routerMetrics.incrMultipleAppsFailedRetrieved();
LOG.error("Unable to get applications due to exception.", ex);
throw ex;
}
long stopTime = clock.getTime();
routerMetrics.succeededMultipleAppsRetrieved(stopTime - startTime);
// Merge the Application Reports
return RouterYarnClientUtils.mergeApplications(applications.values(),
returnPartialReport);
Expand All @@ -648,14 +659,26 @@ public GetApplicationsResponse getApplications(GetApplicationsRequest request)
@Override
public GetClusterMetricsResponse getClusterMetrics(
GetClusterMetricsRequest request) throws YarnException, IOException {
long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subclusters =
federationFacade.getSubClusters(true);
ClientMethod remoteMethod = new ClientMethod("getClusterMetrics",
new Class[] {GetClusterMetricsRequest.class}, new Object[] {request});
ArrayList<SubClusterId> clusterList = new ArrayList<>(subclusters.keySet());
Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics =
invokeConcurrent(clusterList, remoteMethod,
GetClusterMetricsResponse.class);
Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics;

try {
clusterMetrics = invokeConcurrent(clusterList, remoteMethod,
GetClusterMetricsResponse.class);

} catch (Exception ex) {
routerMetrics.incrGetClusterMetricsFailedRetrieved();
LOG.error("Unable to get cluster metrics due to exception.", ex);
throw ex;
}

long stopTime = clock.getTime();
routerMetrics.succeededGetClusterMetricsRetrieved(stopTime - startTime);
return RouterYarnClientUtils.merge(clusterMetrics.values());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,37 @@ public void testMulipleAppsReportFailed() {
metrics.getMultipleAppsFailedRetrieved());
}

/**
* This test validates the correctness of the metric: Retrieved getClusterMetrics
* multiple times successfully.
*/
@Test
public void testSucceededGetClusterMetrics() {
long totalGoodBefore = metrics.getNumSucceededGetClusterMetricsRetrieved();
goodSubCluster.getClusterMetrics(100);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededGetClusterMetricsRetrieved());
Assert.assertEquals(100, metrics.getLatencySucceededGetClusterMetricsRetrieved(),
0);
goodSubCluster.getClusterMetrics(200);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededGetClusterMetricsRetrieved());
Assert.assertEquals(150, metrics.getLatencySucceededGetClusterMetricsRetrieved(),
0);
}

/**
* This test validates the correctness of the metric: Failed to
* retrieve getClusterMetrics.
*/
@Test
public void testGetClusterMetricsFailed() {
long totalBadbefore = metrics.getClusterMetricsFailedRetrieved();
badSubCluster.getClusterMetrics();
Assert.assertEquals(totalBadbefore + 1,
metrics.getClusterMetricsFailedRetrieved());
}

// Records failures for all calls
private class MockBadSubCluster {
public void getNewApplication() {
Expand Down Expand Up @@ -310,6 +341,11 @@ public void getApplicationsReport() {
LOG.info("Mocked: failed getApplicationsReport call");
metrics.incrMultipleAppsFailedRetrieved();
}

public void getClusterMetrics() {
LOG.info("Mocked: failed getClusterMetrics call");
metrics.incrGetClusterMetricsFailedRetrieved();
}
}

// Records successes for all calls
Expand Down Expand Up @@ -350,5 +386,11 @@ public void getApplicationsReport(long duration) {
duration);
metrics.succeededMultipleAppsRetrieved(duration);
}

public void getClusterMetrics(long duration){
LOG.info("Mocked: successful getClusterMetrics call with duration {}",
duration);
metrics.succeededGetClusterMetricsRetrieved(duration);
}
}
}