Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public final class RouterMetrics {
private MutableGaugeInt numUpdateAppTimeoutsFailedRetrieved;
@Metric("# of signalToContainer failed to be retrieved")
private MutableGaugeInt numSignalToContainerFailedRetrieved;
@Metric("# of getQueueInfo failed to be retrieved")
private MutableGaugeInt numGetQueueInfoFailedRetrieved;
@Metric("# of moveApplicationAcrossQueues failed to be retrieved")
private MutableGaugeInt numMoveApplicationAcrossQueuesFailedRetrieved;

// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
Expand Down Expand Up @@ -130,6 +134,10 @@ public final class RouterMetrics {
private MutableRate totalSucceededUpdateAppTimeoutsRetrieved;
@Metric("Total number of successful Retrieved signalToContainer and latency(ms)")
private MutableRate totalSucceededSignalToContainerRetrieved;
@Metric("Total number of successful Retrieved getQueueInfo and latency(ms)")
private MutableRate totalSucceededGetQueueInfoRetrieved;
@Metric("Total number of successful Retrieved moveApplicationAcrossQueues and latency(ms)")
private MutableRate totalSucceededMoveApplicationAcrossQueuesRetrieved;

/**
* Provide quantile counters for all latencies.
Expand All @@ -155,6 +163,8 @@ public final class RouterMetrics {
private MutableQuantiles updateAppPriorityLatency;
private MutableQuantiles updateAppTimeoutsLatency;
private MutableQuantiles signalToContainerLatency;
private MutableQuantiles getQueueInfoLatency;
private MutableQuantiles moveApplicationAcrossQueuesLatency;

private static volatile RouterMetrics instance = null;
private static MetricsRegistry registry;
Expand Down Expand Up @@ -237,6 +247,14 @@ private RouterMetrics() {
signalToContainerLatency =
registry.newQuantiles("signalToContainerLatency",
"latency of signal to container timeouts", "ops", "latency", 10);

getQueueInfoLatency =
registry.newQuantiles("getQueueInfoLatency",
"latency of get queue info timeouts", "ops", "latency", 10);

moveApplicationAcrossQueuesLatency =
registry.newQuantiles("moveApplicationAcrossQueuesLatency",
"latency of move application across queues timeouts", "ops", "latency", 10);
}

public static RouterMetrics getMetrics() {
Expand Down Expand Up @@ -363,6 +381,16 @@ public long getNumSucceededSignalToContainerRetrieved() {
return totalSucceededSignalToContainerRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetQueueInfoRetrieved() {
return totalSucceededGetQueueInfoRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededMoveApplicationAcrossQueuesRetrieved() {
return totalSucceededMoveApplicationAcrossQueuesRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
Expand Down Expand Up @@ -468,6 +496,16 @@ public double getLatencySucceededSignalToContainerRetrieved() {
return totalSucceededSignalToContainerRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetQueueInfoRetrieved() {
return totalSucceededGetQueueInfoRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededMoveApplicationAcrossQueuesRetrieved() {
return totalSucceededMoveApplicationAcrossQueuesRetrieved.lastStat().mean();
}

@VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
Expand Down Expand Up @@ -573,6 +611,14 @@ public int getSignalToContainerFailedRetrieved() {
return numSignalToContainerFailedRetrieved.value();
}

public int getQueueInfoFailedRetrieved() {
return numGetQueueInfoFailedRetrieved.value();
}

public int getMoveApplicationAcrossQueuesFailedRetrieved() {
return numMoveApplicationAcrossQueuesFailedRetrieved.value();
}

public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
Expand Down Expand Up @@ -678,6 +724,16 @@ public void succeededSignalToContainerRetrieved(long duration) {
signalToContainerLatency.add(duration);
}

public void succeededGetQueueInfoRetrieved(long duration) {
totalSucceededGetQueueInfoRetrieved.add(duration);
getQueueInfoLatency.add(duration);
}

public void succeededMoveApplicationAcrossQueuesRetrieved(long duration) {
totalSucceededMoveApplicationAcrossQueuesRetrieved.add(duration);
moveApplicationAcrossQueuesLatency.add(duration);
}

public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
Expand Down Expand Up @@ -761,4 +817,12 @@ public void incrUpdateApplicationTimeoutsRetrieved() {
public void incrSignalToContainerFailedRetrieved() {
numSignalToContainerFailedRetrieved.incr();
}

public void incrGetQueueInfoFailedRetrieved() {
numGetQueueInfoFailedRetrieved.incr();
}

public void incrMoveApplicationAcrossQueuesFailedRetrieved() {
numMoveApplicationAcrossQueuesFailedRetrieved.incr();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,27 @@ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
@Override
public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null || request.getQueueName() == null) {
routerMetrics.incrGetQueueInfoFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getQueueInfo request or queueName.", null);
}

long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getQueueInfo",
new Class[]{GetQueueInfoRequest.class}, new Object[]{request});
Collection<GetQueueInfoResponse> queues = null;
try {
queues = invokeAppClientProtocolMethod(true, remoteMethod,
GetQueueInfoResponse.class);
} catch (Exception ex) {
routerMetrics.incrGetQueueInfoFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get queue [" +
request.getQueueName() + "] to exception.", ex);
}
long stopTime = clock.getTime();
routerMetrics.succeededGetQueueInfoRetrieved(stopTime - startTime);
// Merge the GetQueueInfoResponse
return RouterYarnClientUtils.mergeQueues(queues);
}

@Override
Expand Down Expand Up @@ -854,7 +874,44 @@ public GetQueueUserAclsInfoResponse getQueueUserAcls(
public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
MoveApplicationAcrossQueuesRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null || request.getApplicationId() == null || request.getTargetQueue() == null) {
routerMetrics.incrMoveApplicationAcrossQueuesFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing moveApplicationAcrossQueues request or " +
"applicationId or target queue.", null);
}

long startTime = clock.getTime();
SubClusterId subClusterId = null;

ApplicationId applicationId = request.getApplicationId();
try {
subClusterId = federationFacade
.getApplicationHomeSubCluster(applicationId);
} catch (YarnException e) {
routerMetrics.incrMoveApplicationAcrossQueuesFailedRetrieved();
RouterServerUtil.logAndThrowException("Application " +
applicationId + " does not exist in FederationStateStore.", e);
}

ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
MoveApplicationAcrossQueuesResponse response = null;
try {
response = clientRMProxy.moveApplicationAcrossQueues(request);
} catch (Exception e) {
routerMetrics.incrAppAttemptsFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to moveApplicationAcrossQueues for " +
applicationId + " to SubCluster " + subClusterId.getId(), e);
}

if (response == null) {
LOG.error("No response when moveApplicationAcrossQueues "
+ "the applicationId {} to Queue {} In SubCluster {}.",
request.getApplicationId(), request.getTargetQueue(), subClusterId.getId());
}

long stopTime = clock.getTime();
routerMetrics.succeededMoveApplicationAcrossQueuesRetrieved(stopTime - startTime);
return response;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
Expand All @@ -44,6 +45,7 @@
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.ReservationAllocationState;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources;
Expand Down Expand Up @@ -365,5 +367,55 @@ public static GetAllResourceTypeInfoResponse mergeResourceTypes(
new ArrayList<>(resourceTypeInfoSet));
return resourceTypeInfoResponse;
}

/**
* Merges a list of GetQueueInfoResponse.
*
* @param responses a list of GetQueueInfoResponse to merge.
* @return the merged GetQueueInfoResponse.
*/
public static GetQueueInfoResponse mergeQueues(
Collection<GetQueueInfoResponse> responses) {
GetQueueInfoResponse queueResponse = Records.newRecord(
GetQueueInfoResponse.class);

QueueInfo queueInfo = null;
for (GetQueueInfoResponse response : responses) {
if (response != null && response.getQueueInfo() != null) {
if (queueInfo == null) {
queueInfo = response.getQueueInfo();
} else {
// set Capacity\MaximumCapacity\CurrentCapacity
queueInfo.setCapacity(queueInfo.getCapacity() + response.getQueueInfo().getCapacity());
queueInfo.setMaximumCapacity(
queueInfo.getMaximumCapacity() + response.getQueueInfo().getMaximumCapacity());
queueInfo.setCurrentCapacity(
queueInfo.getCurrentCapacity() + response.getQueueInfo().getCurrentCapacity());

// set childQueues
List<QueueInfo> childQueues = new ArrayList<>(queueInfo.getChildQueues());
childQueues.addAll(response.getQueueInfo().getChildQueues());
queueInfo.setChildQueues(childQueues);

// set applications
List<ApplicationReport> applicationReports = new ArrayList<>(queueInfo.getApplications());
applicationReports.addAll(response.getQueueInfo().getApplications());
queueInfo.setApplications(applicationReports);

// set accessibleNodeLabels
Set<String> accessibleNodeLabels = new HashSet<>();
if (queueInfo.getAccessibleNodeLabels() != null) {
accessibleNodeLabels.addAll(queueInfo.getAccessibleNodeLabels());
}
if (response.getQueueInfo() != null) {
accessibleNodeLabels.addAll(response.getQueueInfo().getAccessibleNodeLabels());
}
queueInfo.setAccessibleNodeLabels(accessibleNodeLabels);
}
}
}
queueResponse.setQueueInfo(queueInfo);
return queueResponse;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,16 @@ public void getSignalContainer() {
LOG.info("Mocked: failed signalContainer call");
metrics.incrSignalToContainerFailedRetrieved();
}

public void getQueueInfo() {
LOG.info("Mocked: failed getQueueInfo call");
metrics.incrGetQueueInfoFailedRetrieved();
}

public void moveApplicationAcrossQueuesFailed() {
LOG.info("Mocked: failed moveApplicationAcrossQueuesFailed call");
metrics.incrMoveApplicationAcrossQueuesFailedRetrieved();
}
}

// Records successes for all calls
Expand Down Expand Up @@ -533,6 +543,16 @@ public void getSignalToContainerTimeouts(long duration) {
LOG.info("Mocked: successful signalToContainer call with duration {}", duration);
metrics.succeededSignalToContainerRetrieved(duration);
}

public void getQueueInfoRetrieved(long duration) {
LOG.info("Mocked: successful getQueueInfo call with duration {}", duration);
metrics.succeededGetQueueInfoRetrieved(duration);
}

public void moveApplicationAcrossQueuesRetrieved(long duration) {
LOG.info("Mocked: successful moveApplicationAcrossQueues call with duration {}", duration);
metrics.succeededMoveApplicationAcrossQueuesRetrieved(duration);
}
}

@Test
Expand Down Expand Up @@ -839,4 +859,50 @@ public void testSignalToContainerFailed() {
metrics.getSignalToContainerFailedRetrieved());
}

@Test
public void testSucceededGetQueueInfoRetrieved() {
long totalGoodBefore = metrics.getNumSucceededGetQueueInfoRetrieved();
goodSubCluster.getQueueInfoRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededGetQueueInfoRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededGetQueueInfoRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getQueueInfoRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededGetQueueInfoRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededGetQueueInfoRetrieved(), ASSERT_DOUBLE_DELTA);
}

@Test
public void testGetQueueInfoFailed() {
long totalBadBefore = metrics.getQueueInfoFailedRetrieved();
badSubCluster.getQueueInfo();
Assert.assertEquals(totalBadBefore + 1,
metrics.getQueueInfoFailedRetrieved());
}

@Test
public void testSucceededMoveApplicationAcrossQueuesRetrieved() {
long totalGoodBefore = metrics.getNumSucceededMoveApplicationAcrossQueuesRetrieved();
goodSubCluster.moveApplicationAcrossQueuesRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededMoveApplicationAcrossQueuesRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededMoveApplicationAcrossQueuesRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.moveApplicationAcrossQueuesRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededMoveApplicationAcrossQueuesRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededMoveApplicationAcrossQueuesRetrieved(), ASSERT_DOUBLE_DELTA);
}

@Test
public void testMoveApplicationAcrossQueuesRetrievedFailed() {
long totalBadBefore = metrics.getMoveApplicationAcrossQueuesFailedRetrieved();
badSubCluster.moveApplicationAcrossQueuesFailed();
Assert.assertEquals(totalBadBefore + 1,
metrics.getMoveApplicationAcrossQueuesFailedRetrieved());
}

}
Loading