diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index 7a068471662f3..1399b2f4b228b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -51,7 +51,7 @@ public final class RouterMetrics { private MutableGaugeInt numAppsFailedRetrieved; @Metric("# of multiple applications reports failed to be retrieved") private MutableGaugeInt numMultipleAppsFailedRetrieved; - @Metric("# of applicationAttempt reports failed to be retrieved") + @Metric("# of getApplicationAttempts failed to be retrieved") private MutableGaugeInt numAppAttemptsFailedRetrieved; @Metric("# of getClusterMetrics failed to be retrieved") private MutableGaugeInt numGetClusterMetricsFailedRetrieved; @@ -63,6 +63,18 @@ public final class RouterMetrics { private MutableGaugeInt numGetLabelsToNodesFailedRetrieved; @Metric("# of getClusterNodeLabels failed to be retrieved") private MutableGaugeInt numGetClusterNodeLabelsFailedRetrieved; + @Metric("# of getApplicationAttemptReports failed to be retrieved") + private MutableGaugeInt numAppAttemptReportFailedRetrieved; + @Metric("# of getQueueUserAcls failed to be retrieved") + private MutableGaugeInt numGetQueueUserAclsFailedRetrieved; + @Metric("# of getContainerReport failed to be retrieved") + private MutableGaugeInt numGetContainerReportFailedRetrieved; + @Metric("# of getContainers failed to be retrieved") + private MutableGaugeInt numGetContainersFailedRetrieved; + @Metric("# of getContainers failed to be retrieved") + private MutableGaugeInt numListReservationsFailedRetrieved; + @Metric("# of getResourceTypeInfo failed to be retrieved") + private MutableGaugeInt numGetResourceTypeInfo; // Aggregate metrics are shared, and don't have to be looked up per call @Metric("Total number of successful Submitted apps and latency(ms)") @@ -90,6 +102,18 @@ public final class RouterMetrics { private MutableRate totalSucceededGetLabelsToNodesRetrieved; @Metric("Total number of successful Retrieved getClusterNodeLabels and latency(ms)") private MutableRate totalSucceededGetClusterNodeLabelsRetrieved; + @Metric("Total number of successful Retrieved getApplicationAttemptReport and latency(ms)") + private MutableRate totalSucceededAppAttemptReportRetrieved; + @Metric("Total number of successful Retrieved getQueueUserAcls and latency(ms)") + private MutableRate totalSucceededGetQueueUserAclsRetrieved; + @Metric("Total number of successful Retrieved getContainerReport and latency(ms)") + private MutableRate totalSucceededGetContainerReportRetrieved; + @Metric("Total number of successful Retrieved getContainers and latency(ms)") + private MutableRate totalSucceededGetContainersRetrieved; + @Metric("Total number of successful Retrieved listReservations and latency(ms)") + private MutableRate totalSucceededListReservationsRetrieved; + @Metric("Total number of successful Retrieved getResourceTypeInfo and latency(ms)") + private MutableRate totalSucceededGetResourceTypeInfoRetrieved; /** * Provide quantile counters for all latencies. @@ -105,6 +129,12 @@ public final class RouterMetrics { private MutableQuantiles getNodeToLabelsLatency; private MutableQuantiles getLabelToNodesLatency; private MutableQuantiles getClusterNodeLabelsLatency; + private MutableQuantiles getApplicationAttemptsLatency; + private MutableQuantiles getQueueUserAclsLatency; + private MutableQuantiles getContainerReportLatency; + private MutableQuantiles getContainerLatency; + private MutableQuantiles listReservationsLatency; + private MutableQuantiles listResourceTypeInfoLatency; private static volatile RouterMetrics INSTANCE = null; private static MetricsRegistry registry; @@ -147,6 +177,30 @@ private RouterMetrics() { getClusterNodeLabelsLatency = registry.newQuantiles("getClusterNodeLabelsLatency", "latency of get cluster node labels", "ops", "latency", 10); + + getApplicationAttemptsLatency = + registry.newQuantiles("getApplicationAttemptsLatency", + "latency of get application attempts", "ops", "latency", 10); + + getQueueUserAclsLatency = + registry.newQuantiles("getQueueUserAclsLatency", + "latency of get queue user acls", "ops", "latency", 10); + + getContainerReportLatency = + registry.newQuantiles("getContainerReportLatency", + "latency of get container report", "ops", "latency", 10); + + getContainerLatency = + registry.newQuantiles("getContainerLatency", + "latency of get container", "ops", "latency", 10); + + listReservationsLatency = + registry.newQuantiles("listReservationsLatency", + "latency of list reservations", "ops", "latency", 10); + + listResourceTypeInfoLatency = + registry.newQuantiles("getResourceTypeInfoLatency", + "latency of get resource type info", "ops", "latency", 10); } public static RouterMetrics getMetrics() { @@ -223,6 +277,36 @@ public long getNumSucceededGetClusterNodeLabelsRetrieved(){ return totalSucceededGetClusterNodeLabelsRetrieved.lastStat().numSamples(); } + @VisibleForTesting + public long getNumSucceededAppAttemptReportRetrieved(){ + return totalSucceededAppAttemptReportRetrieved.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumSucceededGetQueueUserAclsRetrieved(){ + return totalSucceededGetQueueUserAclsRetrieved.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumSucceededGetContainerReportRetrieved() { + return totalSucceededGetContainerReportRetrieved.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumSucceededGetContainersRetrieved() { + return totalSucceededGetContainersRetrieved.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumSucceededListReservationsRetrieved() { + return totalSucceededListReservationsRetrieved.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumSucceededGetResourceTypeInfoRetrieved() { + return totalSucceededGetResourceTypeInfoRetrieved.lastStat().numSamples(); + } + @VisibleForTesting public double getLatencySucceededAppsCreated() { return totalSucceededAppsCreated.lastStat().mean(); @@ -240,7 +324,7 @@ public double getLatencySucceededAppsKilled() { @VisibleForTesting public double getLatencySucceededGetAppAttemptReport() { - return totalSucceededAppAttemptsRetrieved.lastStat().mean(); + return totalSucceededAppAttemptReportRetrieved.lastStat().mean(); } @VisibleForTesting @@ -278,6 +362,36 @@ public double getLatencySucceededGetClusterNodeLabelsRetrieved() { return totalSucceededGetClusterNodeLabelsRetrieved.lastStat().mean(); } + @VisibleForTesting + public double getLatencySucceededAppAttemptRetrieved() { + return totalSucceededAppAttemptsRetrieved.lastStat().mean(); + } + + @VisibleForTesting + public double getLatencySucceededGetQueueUserAclsRetrieved() { + return totalSucceededGetQueueUserAclsRetrieved.lastStat().mean(); + } + + @VisibleForTesting + public double getLatencySucceededGetContainerReportRetrieved() { + return totalSucceededGetContainerReportRetrieved.lastStat().mean(); + } + + @VisibleForTesting + public double getLatencySucceededGetContainersRetrieved() { + return totalSucceededGetContainersRetrieved.lastStat().mean(); + } + + @VisibleForTesting + public double getLatencySucceededListReservationsRetrieved() { + return totalSucceededListReservationsRetrieved.lastStat().mean(); + } + + @VisibleForTesting + public double getLatencySucceededGetResourceTypeInfoRetrieved() { + return totalSucceededGetResourceTypeInfoRetrieved.lastStat().mean(); + } + @VisibleForTesting public int getAppsFailedCreated() { return numAppsFailedCreated.value(); @@ -300,7 +414,7 @@ public int getAppsFailedRetrieved() { @VisibleForTesting public int getAppAttemptsFailedRetrieved() { - return numAppsFailedRetrieved.value(); + return numAppAttemptsFailedRetrieved.value(); } @VisibleForTesting @@ -333,6 +447,36 @@ public int getGetClusterNodeLabelsFailedRetrieved() { return numGetClusterNodeLabelsFailedRetrieved.value(); } + @VisibleForTesting + public int getAppAttemptReportFailedRetrieved() { + return numAppAttemptReportFailedRetrieved.value(); + } + + @VisibleForTesting + public int getQueueUserAclsFailedRetrieved() { + return numGetQueueUserAclsFailedRetrieved.value(); + } + + @VisibleForTesting + public int getContainerReportFailedRetrieved() { + return numGetContainerReportFailedRetrieved.value(); + } + + @VisibleForTesting + public int getContainersFailedRetrieved() { + return numGetContainersFailedRetrieved.value(); + } + + @VisibleForTesting + public int getListReservationsFailedRetrieved() { + return numListReservationsFailedRetrieved.value(); + } + + @VisibleForTesting + public int getGetResourceTypeInfoRetrieved() { + return numGetResourceTypeInfo.value(); + } + public void succeededAppsCreated(long duration) { totalSucceededAppsCreated.add(duration); getNewApplicationLatency.add(duration); @@ -360,7 +504,7 @@ public void succeededMultipleAppsRetrieved(long duration) { public void succeededAppAttemptsRetrieved(long duration) { totalSucceededAppAttemptsRetrieved.add(duration); - getApplicationAttemptReportLatency.add(duration); + getApplicationAttemptsLatency.add(duration); } public void succeededGetClusterMetricsRetrieved(long duration) { @@ -388,6 +532,36 @@ public void succeededGetClusterNodeLabelsRetrieved(long duration) { getClusterNodeLabelsLatency.add(duration); } + public void succeededAppAttemptReportRetrieved(long duration) { + totalSucceededAppAttemptReportRetrieved.add(duration); + getApplicationAttemptReportLatency.add(duration); + } + + public void succeededGetQueueUserAclsRetrieved(long duration) { + totalSucceededGetQueueUserAclsRetrieved.add(duration); + getQueueUserAclsLatency.add(duration); + } + + public void succeededGetContainerReportRetrieved(long duration) { + totalSucceededGetContainerReportRetrieved.add(duration); + getContainerReportLatency.add(duration); + } + + public void succeededGetContainersRetrieved(long duration) { + totalSucceededGetContainersRetrieved.add(duration); + getContainerLatency.add(duration); + } + + public void succeededListReservationsRetrieved(long duration) { + totalSucceededListReservationsRetrieved.add(duration); + listReservationsLatency.add(duration); + } + + public void succeededGetResourceTypeInfoRetrieved(long duration) { + totalSucceededGetResourceTypeInfoRetrieved.add(duration); + listResourceTypeInfoLatency.add(duration); + } + public void incrAppsFailedCreated() { numAppsFailedCreated.incr(); } @@ -431,4 +605,28 @@ public void incrLabelsToNodesFailedRetrieved() { public void incrClusterNodeLabelsFailedRetrieved() { numGetClusterNodeLabelsFailedRetrieved.incr(); } + + public void incrAppAttemptReportFailedRetrieved() { + numAppAttemptReportFailedRetrieved.incr(); + } + + public void incrQueueUserAclsFailedRetrieved() { + numGetQueueUserAclsFailedRetrieved.incr(); + } + + public void incrContainerReportFailedRetrieved() { + numGetContainerReportFailedRetrieved.incr(); + } + + public void incrContainerFailedRetrieved() { + numGetContainersFailedRetrieved.incr(); + } + + public void incrListReservationsFailedRetrieved() { + numListReservationsFailedRetrieved.incr(); + } + + public void incrResourceTypeInfoFailedRetrieved() { + numGetResourceTypeInfo.incr(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index 71e3d2f1629c7..f92e3566ca2be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -828,7 +828,26 @@ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) @Override public GetQueueUserAclsInfoResponse getQueueUserAcls( GetQueueUserAclsInfoRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + if(request == null){ + routerMetrics.incrQueueUserAclsFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing getQueueUserAcls request.", null); + } + long startTime = clock.getTime(); + ClientMethod remoteMethod = new ClientMethod("getQueueUserAcls", + new Class[] {GetQueueUserAclsInfoRequest.class}, new Object[] {request}); + Collection queueUserAcls; + try { + queueUserAcls = invokeAppClientProtocolMethod(true, remoteMethod, + GetQueueUserAclsInfoResponse.class); + } catch (Exception ex) { + routerMetrics.incrQueueUserAclsFailedRetrieved(); + LOG.error("Unable to get queue user Acls due to exception.", ex); + throw ex; + } + long stopTime = clock.getTime(); + routerMetrics.succeededGetQueueUserAclsRetrieved(stopTime - startTime); + // Merge the QueueUserAclsInfoResponse + return RouterYarnClientUtils.mergeQueueUserAcls(queueUserAcls); } @Override @@ -853,7 +872,26 @@ public ReservationSubmissionResponse submitReservation( @Override public ReservationListResponse listReservations( ReservationListRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + if (request == null || request.getReservationId() == null) { + routerMetrics.incrListReservationsFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing listReservations request.", null); + } + long startTime = clock.getTime(); + ClientMethod remoteMethod = new ClientMethod("listReservations", + new Class[] {ReservationListRequest.class}, new Object[] {request}); + Collection listResponses; + try { + listResponses = invokeAppClientProtocolMethod(true, remoteMethod, + ReservationListResponse.class); + } catch (Exception ex) { + routerMetrics.incrListReservationsFailedRetrieved(); + LOG.error("Unable to list reservations node due to exception.", ex); + throw ex; + } + long stopTime = clock.getTime(); + routerMetrics.succeededListReservationsRetrieved(stopTime - startTime); + // Merge the ReservationListResponse + return RouterYarnClientUtils.mergeReservationsList(listResponses); } @Override @@ -982,38 +1020,31 @@ public GetApplicationAttemptReportResponse getApplicationAttemptReport( GetApplicationAttemptReportRequest request) throws YarnException, IOException { - long startTime = clock.getTime(); - if (request == null || request.getApplicationAttemptId() == null || request.getApplicationAttemptId().getApplicationId() == null) { - routerMetrics.incrAppAttemptsFailedRetrieved(); + routerMetrics.incrAppAttemptReportFailedRetrieved(); RouterServerUtil.logAndThrowException( - "Missing getApplicationAttemptReport " + - "request or applicationId " + - "or applicationAttemptId information.", - null); + "Missing getApplicationAttemptReport request or applicationId " + + "or applicationAttemptId information.", null); } + long startTime = clock.getTime(); SubClusterId subClusterId = null; - + ApplicationId applicationId = request.getApplicationAttemptId().getApplicationId(); try { - subClusterId = federationFacade - .getApplicationHomeSubCluster( - request.getApplicationAttemptId().getApplicationId()); + subClusterId = getApplicationHomeSubCluster(applicationId); } catch (YarnException e) { - routerMetrics.incrAppAttemptsFailedRetrieved(); - RouterServerUtil - .logAndThrowException("ApplicationAttempt " + - request.getApplicationAttemptId() + - "belongs to Application " + - request.getApplicationAttemptId().getApplicationId() + - " does not exist in FederationStateStore", e); + routerMetrics.incrAppAttemptReportFailedRetrieved(); + RouterServerUtil.logAndThrowException("ApplicationAttempt " + + request.getApplicationAttemptId() + " belongs to Application " + + request.getApplicationAttemptId().getApplicationId() + + " does not exist in FederationStateStore.", e); } ApplicationClientProtocol clientRMProxy = - getClientRMProxyForSubCluster(subClusterId); + getClientRMProxyForSubCluster(subClusterId); - GetApplicationAttemptReportResponse response = null; + GetApplicationAttemptReportResponse response; try { response = clientRMProxy.getApplicationAttemptReport(request); } catch (Exception e) { @@ -1031,26 +1062,134 @@ public GetApplicationAttemptReportResponse getApplicationAttemptReport( } long stopTime = clock.getTime(); - routerMetrics.succeededAppAttemptsRetrieved(stopTime - startTime); + routerMetrics.succeededAppAttemptReportRetrieved(stopTime - startTime); return response; } @Override public GetApplicationAttemptsResponse getApplicationAttempts( GetApplicationAttemptsRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + if (request == null || request.getApplicationId() == null) { + routerMetrics.incrAppAttemptsFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing getApplicationAttempts " + + "request or application id.", null); + } + + long startTime = clock.getTime(); + ApplicationId applicationId = request.getApplicationId(); + SubClusterId subClusterId = null; + try { + subClusterId = getApplicationHomeSubCluster(applicationId); + } catch (YarnException ex) { + routerMetrics.incrAppAttemptsFailedRetrieved(); + RouterServerUtil.logAndThrowException("Application " + applicationId + + " does not exist in FederationStateStore.", ex); + } + + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + GetApplicationAttemptsResponse response = null; + try { + response = clientRMProxy.getApplicationAttempts(request); + } catch (Exception ex) { + routerMetrics.incrAppAttemptsFailedRetrieved(); + RouterServerUtil.logAndThrowException("Unable to get the application attempts for " + + applicationId + " from SubCluster " + subClusterId.getId(), ex); + } + + if (response == null) { + LOG.error("No response when attempting to retrieve the attempts list of " + + "the application = {} to SubCluster = {}.", applicationId, + subClusterId.getId()); + } + + long stopTime = clock.getTime(); + routerMetrics.succeededAppAttemptsRetrieved(stopTime - startTime); + return response; } @Override public GetContainerReportResponse getContainerReport( GetContainerReportRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + if(request == null || request.getContainerId() == null){ + routerMetrics.incrContainerReportFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing getContainerReport request " + + "or containerId", null); + } + + long startTime = clock.getTime(); + ApplicationId applicationId = request.getContainerId(). + getApplicationAttemptId().getApplicationId(); + SubClusterId subClusterId = null; + try { + subClusterId = getApplicationHomeSubCluster(applicationId); + } catch (YarnException ex) { + routerMetrics.incrContainerReportFailedRetrieved(); + RouterServerUtil.logAndThrowException("Application " + applicationId + + " does not exist in FederationStateStore.", ex); + } + + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + GetContainerReportResponse response = null; + + try { + response = clientRMProxy.getContainerReport(request); + } catch (Exception ex) { + routerMetrics.incrContainerReportFailedRetrieved(); + LOG.error("Unable to get the container report for {} from SubCluster {}.", + applicationId, subClusterId.getId(), ex); + } + + if (response == null) { + LOG.error("No response when attempting to retrieve the container report of " + + "the ContainerId = {} From SubCluster = {}.", request.getContainerId(), + subClusterId.getId()); + } + + long stopTime = clock.getTime(); + routerMetrics.succeededGetContainerReportRetrieved(stopTime - startTime); + return response; } @Override public GetContainersResponse getContainers(GetContainersRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + if (request == null || request.getApplicationAttemptId() == null) { + routerMetrics.incrContainerFailedRetrieved(); + RouterServerUtil.logAndThrowException( + "Missing getContainers request or ApplicationAttemptId.", null); + } + + long startTime = clock.getTime(); + ApplicationId applicationId = request.getApplicationAttemptId().getApplicationId(); + SubClusterId subClusterId = null; + try { + subClusterId = getApplicationHomeSubCluster(applicationId); + } catch (YarnException ex) { + routerMetrics.incrContainerFailedRetrieved(); + RouterServerUtil.logAndThrowException("Application " + applicationId + + " does not exist in FederationStateStore.", ex); + } + + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + GetContainersResponse response = null; + + try { + response = clientRMProxy.getContainers(request); + } catch (Exception ex) { + routerMetrics.incrContainerFailedRetrieved(); + RouterServerUtil.logAndThrowException("Unable to get the containers for " + + applicationId + " from SubCluster " + subClusterId.getId(), ex); + } + + if (response == null) { + LOG.error("No response when attempting to retrieve the container report of " + + "the ApplicationId = {} From SubCluster = {}.", applicationId, + subClusterId.getId()); + } + + long stopTime = clock.getTime(); + routerMetrics.succeededGetContainersRetrieved(stopTime - startTime); + return response; } @Override @@ -1112,7 +1251,26 @@ public GetResourceProfileResponse getResourceProfile( @Override public GetAllResourceTypeInfoResponse getResourceTypeInfo( GetAllResourceTypeInfoRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + if (request == null) { + routerMetrics.incrResourceTypeInfoFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing getResourceTypeInfo request.", null); + } + long startTime = clock.getTime(); + ClientMethod remoteMethod = new ClientMethod("getResourceTypeInfo", + new Class[] {GetAllResourceTypeInfoRequest.class}, new Object[] {request}); + Collection listResourceTypeInfo; + try { + listResourceTypeInfo = invokeAppClientProtocolMethod(true, remoteMethod, + GetAllResourceTypeInfoResponse.class); + } catch (Exception ex) { + routerMetrics.incrResourceTypeInfoFailedRetrieved(); + LOG.error("Unable to get all resource type info node due to exception.", ex); + throw ex; + } + long stopTime = clock.getTime(); + routerMetrics.succeededGetResourceTypeInfoRetrieved(stopTime - startTime); + // Merge the GetAllResourceTypeInfoResponse + return RouterYarnClientUtils.mergeResourceTypes(listResourceTypeInfo); } @Override @@ -1139,4 +1297,61 @@ public GetNodesToAttributesResponse getNodesToAttributes( GetNodesToAttributesRequest request) throws YarnException, IOException { throw new NotImplementedException("Code is not implemented"); } + + protected SubClusterId getApplicationHomeSubCluster( + ApplicationId applicationId) throws YarnException { + if (applicationId == null) { + LOG.error("ApplicationId is Null, Can't find in SubCluster."); + return null; + } + + SubClusterId resultSubClusterId = null; + + // try looking for applicationId in Home SubCluster + try { + resultSubClusterId = federationFacade. + getApplicationHomeSubCluster(applicationId); + } catch (YarnException ex) { + if(LOG.isDebugEnabled()){ + LOG.debug("can't find applicationId = {} in home sub cluster, " + + " try foreach sub clusters.", applicationId); + } + } + if (resultSubClusterId != null) { + return resultSubClusterId; + } + + // if applicationId not found in Home SubCluster, + // foreach Clusters + Map subClusters = + federationFacade.getSubClusters(true); + for (SubClusterId subClusterId : subClusters.keySet()) { + try { + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + if(clientRMProxy == null) { + continue; + } + GetApplicationReportRequest appReportRequest = + GetApplicationReportRequest.newInstance(applicationId); + GetApplicationReportResponse appReportResponse = + clientRMProxy.getApplicationReport(appReportRequest); + + if(appReportResponse!=null && applicationId.equals( + appReportResponse.getApplicationReport().getApplicationId())){ + resultSubClusterId = federationFacade.addApplicationHomeSubCluster( + ApplicationHomeSubCluster.newInstance(applicationId, subClusterId)); + return resultSubClusterId; + } + + } catch (Exception ex) { + if(LOG.isDebugEnabled()){ + LOG.debug("Can't Find ApplicationId = {} in Sub Cluster!", applicationId); + } + } + } + + String errorMsg = + String.format("Can't Found applicationId = %s in any sub clusters", applicationId); + throw new YarnException(errorMsg); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java index 65f973626e594..46915738c965a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java @@ -31,6 +31,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; @@ -38,6 +41,9 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.ReservationAllocationState; +import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.resource.Resources; @@ -295,5 +301,69 @@ public static GetClusterNodeLabelsResponse mergeClusterNodeLabelsResponse( nodeLabelsResponse.setNodeLabelList(new ArrayList<>(nodeLabelsList)); return nodeLabelsResponse; } + + /** + * Merges a list of GetQueueUserAclsInfoResponse. + * + * @param responses a list of GetQueueUserAclsInfoResponse to merge. + * @return the merged GetQueueUserAclsInfoResponse. + */ + public static GetQueueUserAclsInfoResponse mergeQueueUserAcls( + Collection responses) { + GetQueueUserAclsInfoResponse aclsInfoResponse = Records.newRecord( + GetQueueUserAclsInfoResponse.class); + Set queueUserACLInfos = new HashSet<>(); + for (GetQueueUserAclsInfoResponse response : responses) { + if (response != null && response.getUserAclsInfoList() != null) { + queueUserACLInfos.addAll(response.getUserAclsInfoList()); + } + } + aclsInfoResponse.setUserAclsInfoList(new ArrayList<>(queueUserACLInfos)); + return aclsInfoResponse; + } + + /** + * Merges a list of ReservationListResponse. + * + * @param responses a list of ReservationListResponse to merge. + * @return the merged ReservationListResponse. + */ + public static ReservationListResponse mergeReservationsList( + Collection responses) { + ReservationListResponse reservationListResponse = + Records.newRecord(ReservationListResponse.class); + List reservationAllocationStates = + new ArrayList<>(); + for (ReservationListResponse response : responses) { + if (response != null && response.getReservationAllocationState() != null) { + reservationAllocationStates.addAll( + response.getReservationAllocationState()); + } + } + reservationListResponse.setReservationAllocationState( + reservationAllocationStates); + return reservationListResponse; + } + + /** + * Merges a list of GetAllResourceTypeInfoResponse. + * + * @param responses a list of GetAllResourceTypeInfoResponse to merge. + * @return the merged GetAllResourceTypeInfoResponse. + */ + public static GetAllResourceTypeInfoResponse mergeResourceTypes( + Collection responses) { + GetAllResourceTypeInfoResponse resourceTypeInfoResponse = + Records.newRecord(GetAllResourceTypeInfoResponse.class); + Set resourceTypeInfoSet = new HashSet<>(); + for (GetAllResourceTypeInfoResponse response : responses) { + if (response != null && response.getResourceTypeInfo() != null) { + resourceTypeInfoSet.addAll(response.getResourceTypeInfo()); + } + } + resourceTypeInfoResponse.setResourceTypeInfo( + new ArrayList<>(resourceTypeInfoSet)); + return resourceTypeInfoResponse; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java index c139515bfa158..a4df82f9dcbfc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java @@ -210,21 +210,21 @@ public void testAppsReportFailed() { @Test public void testSucceededAppAttemptReport() { - long totalGoodBefore = metrics.getNumSucceededAppAttemptsRetrieved(); + long totalGoodBefore = metrics.getNumSucceededAppAttemptReportRetrieved(); goodSubCluster.getApplicationAttemptReport(100); Assert.assertEquals(totalGoodBefore + 1, - metrics.getNumSucceededAppAttemptsRetrieved()); + metrics.getNumSucceededAppAttemptReportRetrieved()); Assert.assertEquals(100, - metrics.getLatencySucceededGetAppAttemptReport(), 0); + metrics.getLatencySucceededGetAppAttemptReport(), ASSERT_DOUBLE_DELTA); goodSubCluster.getApplicationAttemptReport(200); Assert.assertEquals(totalGoodBefore + 2, - metrics.getNumSucceededAppAttemptsRetrieved()); + metrics.getNumSucceededAppAttemptReportRetrieved()); Assert.assertEquals(150, - metrics.getLatencySucceededGetAppAttemptReport(), 0); + metrics.getLatencySucceededGetAppAttemptReport(), ASSERT_DOUBLE_DELTA); } /** @@ -234,12 +234,12 @@ public void testSucceededAppAttemptReport() { @Test public void testAppAttemptReportFailed() { - long totalBadbefore = metrics.getAppAttemptsFailedRetrieved(); + long totalBadBefore = metrics.getAppAttemptReportFailedRetrieved(); badSubCluster.getApplicationAttemptReport(); - Assert.assertEquals(totalBadbefore + 1, - metrics.getAppAttemptsFailedRetrieved()); + Assert.assertEquals(totalBadBefore + 1, + metrics.getAppAttemptReportFailedRetrieved()); } /** @@ -336,7 +336,7 @@ public void getApplicationReport() { public void getApplicationAttemptReport() { LOG.info("Mocked: failed getApplicationAttemptReport call"); - metrics.incrAppsFailedRetrieved(); + metrics.incrAppAttemptReportFailedRetrieved(); } public void getApplicationsReport() { @@ -368,6 +368,36 @@ public void getClusterNodeLabels() { LOG.info("Mocked: failed getClusterNodeLabels call"); metrics.incrClusterNodeLabelsFailedRetrieved(); } + + public void getQueueUserAcls() { + LOG.info("Mocked: failed getQueueUserAcls call"); + metrics.incrQueueUserAclsFailedRetrieved(); + } + + public void getListReservations() { + LOG.info("Mocked: failed listReservations call"); + metrics.incrListReservationsFailedRetrieved(); + } + + public void getApplicationAttempts() { + LOG.info("Mocked: failed getApplicationAttempts call"); + metrics.incrAppAttemptsFailedRetrieved(); + } + + public void getContainerReport() { + LOG.info("Mocked: failed getContainerReport call"); + metrics.incrContainerReportFailedRetrieved(); + } + + public void getContainer() { + LOG.info("Mocked: failed getContainer call"); + metrics.incrContainerFailedRetrieved(); + } + + public void getResourceTypeInfo() { + LOG.info("Mocked: failed getResourceTypeInfo call"); + metrics.incrResourceTypeInfoFailedRetrieved(); + } } // Records successes for all calls @@ -397,10 +427,9 @@ public void getApplicationReport(long duration) { } public void getApplicationAttemptReport(long duration) { - LOG.info("Mocked: successful " + - "getApplicationAttemptReport call with duration {}", - duration); - metrics.succeededAppAttemptsRetrieved(duration); + LOG.info("Mocked: successful getApplicationAttemptReport call " + + "with duration {}", duration); + metrics.succeededAppAttemptReportRetrieved(duration); } public void getApplicationsReport(long duration) { @@ -434,6 +463,36 @@ public void getClusterNodeLabels(long duration) { LOG.info("Mocked: successful getClusterNodeLabels call with duration {}", duration); metrics.succeededGetClusterNodeLabelsRetrieved(duration); } + + public void getQueueUserAcls(long duration) { + LOG.info("Mocked: successful getQueueUserAcls call with duration {}", duration); + metrics.succeededGetQueueUserAclsRetrieved(duration); + } + + public void getListReservations(long duration) { + LOG.info("Mocked: successful listReservations call with duration {}", duration); + metrics.succeededListReservationsRetrieved(duration); + } + + public void getApplicationAttempts(long duration) { + LOG.info("Mocked: successful getApplicationAttempts call with duration {}", duration); + metrics.succeededAppAttemptsRetrieved(duration); + } + + public void getContainerReport(long duration) { + LOG.info("Mocked: successful getContainerReport call with duration {}", duration); + metrics.succeededGetContainerReportRetrieved(duration); + } + + public void getContainer(long duration) { + LOG.info("Mocked: successful getContainer call with duration {}", duration); + metrics.succeededGetContainersRetrieved(duration); + } + + public void getResourceTypeInfo(long duration) { + LOG.info("Mocked: successful getResourceTypeInfo call with duration {}", duration); + metrics.succeededGetResourceTypeInfoRetrieved(duration); + } } @Test @@ -517,4 +576,136 @@ public void testClusterNodeLabelsFailed() { badSubCluster.getClusterNodeLabels(); Assert.assertEquals(totalBadBefore + 1, metrics.getGetClusterNodeLabelsFailedRetrieved()); } + + @Test + public void testSucceededQueueUserAcls() { + long totalGoodBefore = metrics.getNumSucceededGetQueueUserAclsRetrieved(); + goodSubCluster.getQueueUserAcls(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededGetQueueUserAclsRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededGetQueueUserAclsRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getQueueUserAcls(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededGetQueueUserAclsRetrieved()); + Assert.assertEquals(225, metrics.getLatencySucceededGetQueueUserAclsRetrieved(), + ASSERT_DOUBLE_DELTA); + } + + @Test + public void testQueueUserAclsFailed() { + long totalBadBefore = metrics.getQueueUserAclsFailedRetrieved(); + badSubCluster.getQueueUserAcls(); + Assert.assertEquals(totalBadBefore + 1, metrics.getQueueUserAclsFailedRetrieved()); + } + @Test + public void testSucceededListReservations() { + long totalGoodBefore = metrics.getNumSucceededListReservationsRetrieved(); + goodSubCluster.getListReservations(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededListReservationsRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededListReservationsRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getListReservations(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededListReservationsRetrieved()); + Assert.assertEquals(225, metrics.getLatencySucceededListReservationsRetrieved(), + ASSERT_DOUBLE_DELTA); + } + + @Test + public void testListReservationsFailed() { + long totalBadBefore = metrics.getListReservationsFailedRetrieved(); + badSubCluster.getListReservations(); + Assert.assertEquals(totalBadBefore + 1, metrics.getListReservationsFailedRetrieved()); + } + + @Test + public void testSucceededGetApplicationAttempts() { + long totalGoodBefore = metrics.getNumSucceededAppAttemptsRetrieved(); + goodSubCluster.getApplicationAttempts(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededAppAttemptsRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededAppAttemptRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getApplicationAttempts(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededAppAttemptsRetrieved()); + Assert.assertEquals(225, metrics.getLatencySucceededAppAttemptRetrieved(), + ASSERT_DOUBLE_DELTA); + } + + @Test + public void testGetApplicationAttemptsFailed() { + long totalBadBefore = metrics.getAppAttemptsFailedRetrieved(); + badSubCluster.getApplicationAttempts(); + Assert.assertEquals(totalBadBefore + 1, metrics.getAppAttemptsFailedRetrieved()); + } + + @Test + public void testSucceededGetContainerReport() { + long totalGoodBefore = metrics.getNumSucceededGetContainerReportRetrieved(); + goodSubCluster.getContainerReport(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededGetContainerReportRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededGetContainerReportRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getContainerReport(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededGetContainerReportRetrieved()); + Assert.assertEquals(225, metrics.getLatencySucceededGetContainerReportRetrieved(), + ASSERT_DOUBLE_DELTA); + } + + @Test + public void testGetContainerReportFailed() { + long totalBadBefore = metrics.getContainerReportFailedRetrieved(); + badSubCluster.getContainerReport(); + Assert.assertEquals(totalBadBefore + 1, metrics.getContainerReportFailedRetrieved()); + } + + @Test + public void testSucceededGetContainers() { + long totalGoodBefore = metrics.getNumSucceededGetContainersRetrieved(); + goodSubCluster.getContainer(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededGetContainersRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededGetContainersRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getContainer(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededGetContainersRetrieved()); + Assert.assertEquals(225, metrics.getLatencySucceededGetContainersRetrieved(), + ASSERT_DOUBLE_DELTA); + } + + @Test + public void testGetContainerFailed() { + long totalBadBefore = metrics.getContainersFailedRetrieved(); + badSubCluster.getContainer(); + Assert.assertEquals(totalBadBefore + 1, metrics.getContainersFailedRetrieved()); + } + + @Test + public void testSucceededGetResourceTypeInfo() { + long totalGoodBefore = metrics.getNumSucceededGetResourceTypeInfoRetrieved(); + goodSubCluster.getResourceTypeInfo(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededGetResourceTypeInfoRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededGetResourceTypeInfoRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getResourceTypeInfo(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededGetResourceTypeInfoRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededGetResourceTypeInfoRetrieved(), ASSERT_DOUBLE_DELTA); + } + + @Test + public void testGetResourceTypeInfoFailed() { + long totalBadBefore = metrics.getGetResourceTypeInfoRetrieved(); + badSubCluster.getResourceTypeInfo(); + Assert.assertEquals(totalBadBefore + 1, metrics.getGetResourceTypeInfoRetrieved()); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index 02ebdbdd6c07f..8fa52e8f92bb3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.yarn.MockApps; @@ -52,12 +53,28 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; @@ -435,13 +452,10 @@ public void testGetApplicationEmptyRequest() @Test public void testGetApplicationAttemptReport() throws YarnException, IOException, InterruptedException { - LOG.info("Test FederationClientInterceptor: " + - "Get ApplicationAttempt Report"); + LOG.info("Test FederationClientInterceptor: Get ApplicationAttempt Report."); ApplicationId appId = - ApplicationId.newInstance(System.currentTimeMillis(), 1); - ApplicationAttemptId appAttemptId = - ApplicationAttemptId.newInstance(appId, 1); + ApplicationId.newInstance(System.currentTimeMillis(), 1); SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); @@ -451,11 +465,26 @@ public void testGetApplicationAttemptReport() Assert.assertNotNull(response); Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + // Call GetApplicationAttempts Get ApplicationAttemptId + GetApplicationAttemptsRequest attemptsRequest = + GetApplicationAttemptsRequest.newInstance(appId); + GetApplicationAttemptsResponse attemptsResponse = + interceptor.getApplicationAttempts(attemptsRequest); + + // Wait for app to start + while(attemptsResponse.getApplicationAttemptList().size() == 0) { + attemptsResponse = + interceptor.getApplicationAttempts(attemptsRequest); + } + + Assert.assertNotNull(attemptsResponse); + GetApplicationAttemptReportRequest requestGet = - GetApplicationAttemptReportRequest.newInstance(appAttemptId); + GetApplicationAttemptReportRequest.newInstance( + attemptsResponse.getApplicationAttemptList().get(0).getApplicationAttemptId()); GetApplicationAttemptReportResponse responseGet = - interceptor.getApplicationAttemptReport(requestGet); + interceptor.getApplicationAttemptReport(requestGet); Assert.assertNotNull(responseGet); } @@ -479,8 +508,8 @@ public void testGetApplicationAttemptNotExists() GetApplicationAttemptReportRequest.newInstance(appAttemptID); LambdaTestUtils.intercept(YarnException.class, "ApplicationAttempt " + - appAttemptID + "belongs to Application " + - appId + " does not exist in FederationStateStore", + appAttemptID + " belongs to Application " + + appId + " does not exist in FederationStateStore.", () -> interceptor.getApplicationAttemptReport(requestGet)); } @@ -697,4 +726,176 @@ public void testClusterNodeLabelsRequest() throws Exception { interceptor.getClusterNodeLabels(GetClusterNodeLabelsRequest.newInstance()); Assert.assertEquals(0, response.getNodeLabelList().size()); } + + @Test + public void testGetQueueUserAcls() throws Exception { + LOG.info("Test FederationClientInterceptor : Get QueueUserAcls request"); + + // null request + LambdaTestUtils.intercept(YarnException.class, "Missing getQueueUserAcls request.", + () -> interceptor.getQueueUserAcls(null)); + + // noraml request + GetQueueUserAclsInfoResponse response = interceptor.getQueueUserAcls( + GetQueueUserAclsInfoRequest.newInstance()); + + Assert.assertNotNull(response); + + List submitAndAdministerAcl = new ArrayList<>(); + submitAndAdministerAcl.add(QueueACL.SUBMIT_APPLICATIONS); + submitAndAdministerAcl.add(QueueACL.ADMINISTER_QUEUE); + + QueueUserACLInfo exceptRootQueueACLInfo = QueueUserACLInfo.newInstance("root", + submitAndAdministerAcl); + + QueueUserACLInfo queueRootQueueACLInfo = response.getUserAclsInfoList().stream(). + filter(acl->acl.getQueueName().equals("root")). + collect(Collectors.toList()).get(0); + + Assert.assertEquals(exceptRootQueueACLInfo, queueRootQueueACLInfo); + } + + @Test + public void testListReservations() throws Exception { + LOG.info("Test FederationClientInterceptor : Get ListReservations request"); + + // null request + LambdaTestUtils.intercept(YarnException.class, "Missing listReservations request.", + () -> interceptor.listReservations(null)); + + // normal request + ReservationId reservationId = ReservationId.newInstance(1653487680L, 1L); + ReservationListResponse response = interceptor.listReservations( + ReservationListRequest.newInstance("root.decided", reservationId.toString())); + Assert.assertNotNull(response); + Assert.assertEquals(0, response.getReservationAllocationState().size()); + } + + @Test + public void testGetContainersRequest() throws Exception { + LOG.info("Test FederationClientInterceptor : Get Containers request"); + + // null request + LambdaTestUtils.intercept(YarnException.class, "Missing getContainers request " + + "or ApplicationAttemptId.", () -> interceptor.getContainers(null)); + + // normal request + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + + // Submit the application + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + + // Call GetApplicationAttempts + GetApplicationAttemptsRequest attemptsRequest = + GetApplicationAttemptsRequest.newInstance(appId); + GetApplicationAttemptsResponse attemptsResponse = + interceptor.getApplicationAttempts(attemptsRequest); + + // Wait for app to start + while(attemptsResponse.getApplicationAttemptList().size() == 0) { + attemptsResponse = + interceptor.getApplicationAttempts(attemptsRequest); + } + + Assert.assertNotNull(attemptsResponse); + + // Call GetContainers + GetContainersRequest containersRequest = + GetContainersRequest.newInstance( + attemptsResponse.getApplicationAttemptList().get(0).getApplicationAttemptId()); + GetContainersResponse containersResponse = + interceptor.getContainers(containersRequest); + + Assert.assertNotNull(containersResponse); + } + + @Test + public void testGetContainerReportRequest() throws Exception { + LOG.info("Test FederationClientInterceptor : Get Container Report request."); + + // null request + LambdaTestUtils.intercept(YarnException.class, "Missing getContainerReport request " + + "or containerId", () -> interceptor.getContainerReport(null)); + + // normal request + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + + // Submit the application + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + + // Call GetApplicationAttempts + GetApplicationAttemptsRequest attemptsRequest = + GetApplicationAttemptsRequest.newInstance(appId); + GetApplicationAttemptsResponse attemptsResponse = + interceptor.getApplicationAttempts(attemptsRequest); + + // Wait for app to start + while(attemptsResponse.getApplicationAttemptList().size() == 0) { + attemptsResponse = + interceptor.getApplicationAttempts(attemptsRequest); + } + Assert.assertNotNull(attemptsResponse); + + ApplicationAttemptId attemptId = attemptsResponse.getApplicationAttemptList(). + get(0).getApplicationAttemptId(); + ContainerId containerId = ContainerId.newContainerId(attemptId, 1); + + // Call ContainerReport, RM does not allocate Container, Here is null + GetContainerReportRequest containerReportRequest = + GetContainerReportRequest.newInstance(containerId); + GetContainerReportResponse containerReportResponse = + interceptor.getContainerReport(containerReportRequest); + + Assert.assertEquals(containerReportResponse, null); + } + + @Test + public void getApplicationAttempts() throws Exception { + LOG.info("Test FederationClientInterceptor : Get Application Attempts request."); + + // null request + LambdaTestUtils.intercept(YarnException.class, "Missing getApplicationAttempts " + + "request or application id.", () -> interceptor.getApplicationAttempts(null)); + + // normal request + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + + // Submit the application + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + + // Call GetApplicationAttempts + GetApplicationAttemptsRequest attemptsRequest = + GetApplicationAttemptsRequest.newInstance(appId); + GetApplicationAttemptsResponse attemptsResponse = + interceptor.getApplicationAttempts(attemptsRequest); + + Assert.assertNotNull(attemptsResponse); + } + + @Test + public void testGetResourceTypeInfoRequest() throws Exception { + LOG.info("Test FederationClientInterceptor : Get Resource TypeInfo request"); + // null request + LambdaTestUtils.intercept(YarnException.class, "Missing getResourceTypeInfo request.", + () -> interceptor.getResourceTypeInfo(null)); + // normal request. + GetAllResourceTypeInfoResponse response = + interceptor.getResourceTypeInfo(GetAllResourceTypeInfoRequest.newInstance()); + Assert.assertEquals(2, response.getResourceTypeInfo().size()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java index 691a801e4469b..435a8bd85176b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java @@ -32,6 +32,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -42,6 +45,12 @@ import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.ReservationAllocationState; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; import org.junit.Assert; @@ -367,4 +376,167 @@ public void testMergeLabelsToNodes(){ Assert.assertEquals(expectedResponse, response.getLabelsToNodes()); } + + @Test + public void testMergeQueueUserAclsResponse() { + + List submitOnlyAcl = new ArrayList<>(); + submitOnlyAcl.add(QueueACL.SUBMIT_APPLICATIONS); + + List administerOnlyAcl = new ArrayList<>(); + administerOnlyAcl.add(QueueACL.ADMINISTER_QUEUE); + + List submitAndAdministerAcl = new ArrayList<>(); + submitAndAdministerAcl.add(QueueACL.ADMINISTER_QUEUE); + submitAndAdministerAcl.add(QueueACL.SUBMIT_APPLICATIONS); + + QueueUserACLInfo queueUserACLInfo1 = QueueUserACLInfo.newInstance( + "root", submitAndAdministerAcl); + + QueueUserACLInfo queueUserACLInfo2 = QueueUserACLInfo.newInstance( + "default", submitOnlyAcl); + + QueueUserACLInfo queueUserACLInfo3 = QueueUserACLInfo.newInstance( + "root", submitAndAdministerAcl); + + QueueUserACLInfo queueUserACLInfo4 = QueueUserACLInfo.newInstance( + "yarn", administerOnlyAcl); + + List queueUserACLInfoList1 = new ArrayList<>(); + List queueUserACLInfoList2 = new ArrayList<>(); + + queueUserACLInfoList1.add(queueUserACLInfo1); + queueUserACLInfoList1.add(queueUserACLInfo2); + queueUserACLInfoList2.add(queueUserACLInfo3); + queueUserACLInfoList2.add(queueUserACLInfo4); + + // normal response + GetQueueUserAclsInfoResponse response1 = Records.newRecord( + GetQueueUserAclsInfoResponse.class); + response1.setUserAclsInfoList(queueUserACLInfoList1); + GetQueueUserAclsInfoResponse response2 = Records.newRecord( + GetQueueUserAclsInfoResponse.class); + response2.setUserAclsInfoList(queueUserACLInfoList2); + + // empty response + GetQueueUserAclsInfoResponse response3 = Records.newRecord( + GetQueueUserAclsInfoResponse.class); + + // null responce + GetQueueUserAclsInfoResponse response4 = null; + + List responses = new ArrayList<>(); + responses.add(response1); + responses.add(response2); + responses.add(response3); + responses.add(response4); + + // expected user acls + List expectedOutput = new ArrayList<>(); + expectedOutput.add(queueUserACLInfo1); + expectedOutput.add(queueUserACLInfo2); + expectedOutput.add(queueUserACLInfo4); + + GetQueueUserAclsInfoResponse response = + RouterYarnClientUtils.mergeQueueUserAcls(responses); + Assert.assertTrue(CollectionUtils.isEqualCollection(expectedOutput, + response.getUserAclsInfoList())); + } + + @Test + public void testMergeReservationsList() { + + // normal response + ReservationListResponse response1 = createReservationListResponse( + 165348678000L, 165348690000L, 165348678000L, 1L); + + ReservationListResponse response2 = createReservationListResponse( + 165348750000L, 165348768000L, 165348750000L, 1L); + + // empty response + ReservationListResponse response3 = ReservationListResponse.newInstance(new ArrayList<>()); + + // null response + ReservationListResponse response4 = null; + + List responses = new ArrayList<>(); + responses.add(response1); + responses.add(response2); + responses.add(response3); + responses.add(response4); + + // expected response + List expectedResponse = new ArrayList<>(); + expectedResponse.addAll(response1.getReservationAllocationState()); + expectedResponse.addAll(response2.getReservationAllocationState()); + + ReservationListResponse response = + RouterYarnClientUtils.mergeReservationsList(responses); + Assert.assertEquals(expectedResponse, response.getReservationAllocationState()); + } + + private ReservationListResponse createReservationListResponse(long startTime, + long endTime, long reservationTime, long reservationNumber) { + List reservationsList = new ArrayList<>(); + ReservationDefinition reservationDefinition = + Records.newRecord(ReservationDefinition.class); + reservationDefinition.setArrival(startTime); + reservationDefinition.setDeadline(endTime); + ReservationAllocationState reservationAllocationState = + Records.newRecord(ReservationAllocationState.class); + ReservationId reservationId = ReservationId.newInstance(reservationTime, + reservationNumber); + reservationAllocationState.setReservationDefinition(reservationDefinition); + reservationAllocationState.setReservationId(reservationId); + reservationsList.add(reservationAllocationState); + return ReservationListResponse.newInstance(reservationsList); + } + + @Test + public void testMergeResourceTypes() { + + ResourceTypeInfo resourceTypeInfo1 = ResourceTypeInfo.newInstance("vcores"); + ResourceTypeInfo resourceTypeInfo2 = ResourceTypeInfo.newInstance("gpu"); + ResourceTypeInfo resourceTypeInfo3 = ResourceTypeInfo.newInstance("memory-mb"); + + List resourceTypeInfoList1 = new ArrayList<>(); + resourceTypeInfoList1.add(resourceTypeInfo1); + resourceTypeInfoList1.add(resourceTypeInfo3); + + List resourceTypeInfoList2 = new ArrayList<>(); + resourceTypeInfoList2.add(resourceTypeInfo3); + resourceTypeInfoList2.add(resourceTypeInfo2); + + // normal response + GetAllResourceTypeInfoResponse response1 = + Records.newRecord(GetAllResourceTypeInfoResponse.class); + response1.setResourceTypeInfo(resourceTypeInfoList1); + + GetAllResourceTypeInfoResponse response2 = + Records.newRecord(GetAllResourceTypeInfoResponse.class); + response2.setResourceTypeInfo(resourceTypeInfoList2); + + // empty response + GetAllResourceTypeInfoResponse response3 = + Records.newRecord(GetAllResourceTypeInfoResponse.class); + + // null response + GetAllResourceTypeInfoResponse response4 = null; + + List responses = new ArrayList<>(); + responses.add(response1); + responses.add(response2); + responses.add(response3); + responses.add(response4); + + // expected response + List expectedResponse = new ArrayList<>(); + expectedResponse.add(resourceTypeInfo1); + expectedResponse.add(resourceTypeInfo2); + expectedResponse.add(resourceTypeInfo3); + GetAllResourceTypeInfoResponse response = + RouterYarnClientUtils.mergeResourceTypes(responses); + Assert.assertTrue(CollectionUtils.isEqualCollection(expectedResponse, + response.getResourceTypeInfo())); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/capacity-scheduler.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/capacity-scheduler.xml index 90c5eeb097e8b..ffe1a5d4ca48c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/capacity-scheduler.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/capacity-scheduler.xml @@ -45,7 +45,7 @@ yarn.scheduler.capacity.root.queues - default + default,decided The queues at the this level (root is the root queue). @@ -97,6 +97,15 @@ + + yarn.scheduler.capacity.root.decided.reservable + true + + indicates to the ReservationSystem that the queue’s resources + is available for users to reserve. + + + yarn.scheduler.capacity.node-locality-delay -1