diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetApplicationsResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetApplicationsResponse.java index 45e8fdf097115..dd411daec1fe5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetApplicationsResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetApplicationsResponse.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.api.protocolrecords; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -52,6 +54,16 @@ public static GetApplicationsResponse newInstance( return response; } + @Private + @Unstable + public static GetApplicationsResponse newInstance( + Collection applications) { + GetApplicationsResponse response = + Records.newRecord(GetApplicationsResponse.class); + response.setApplicationList(new ArrayList<>(applications)); + return response; + } + /** * Get ApplicationReport for applications. * @return ApplicationReport for applications diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 2ebf79cfae3f3..d3cec1402641b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3991,6 +3991,15 @@ public static boolean isAclEnabled(Configuration conf) { ROUTER_PREFIX + "submit.retry"; public static final int DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY = 3; + /** + * The interceptor class used in FederationClientInterceptor should return + * partial ApplicationReports. + */ + public static final String ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED = + ROUTER_PREFIX + "partial-result.enabled"; + public static final boolean DEFAULT_ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED = + false; + public static final String ROUTER_WEBAPP_PREFIX = ROUTER_PREFIX + "webapp."; public static final String ROUTER_USER_CLIENT_THREADS_SIZE = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index 3dcd5cc3bed60..84e4b561e5d16 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -184,6 +184,8 @@ public void initializeMemberVariables() { configurationPrefixToSkipCompare .add(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY); + configurationPrefixToSkipCompare + .add(YarnConfiguration.ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED); configurationPrefixToSkipCompare .add(YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED); configurationPrefixToSkipCompare diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index 7e8e7af3c7ae3..08636bbc10b06 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Random; @@ -161,6 +162,7 @@ public class FederationClientInterceptor private RouterMetrics routerMetrics; private ThreadPoolExecutor executorService; private final Clock clock = new MonotonicClock(); + private boolean returnPartialReport; @Override public void init(String userName) { @@ -196,6 +198,10 @@ public void init(String userName) { clientRMProxies = new ConcurrentHashMap(); routerMetrics = RouterMetrics.getMetrics(); + + returnPartialReport = conf.getBoolean( + YarnConfiguration.ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED, + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED); } @Override @@ -599,10 +605,44 @@ public GetApplicationReportResponse getApplicationReport( return response; } + /** + * The Yarn Router will forward the request to all the Yarn RMs in parallel, + * after that it will group all the ApplicationReports by the ApplicationId. + * + * Possible failure: + * + * Client: identical behavior as {@code ClientRMService}. + * + * Router: the Client will timeout and resubmit the request. + * + * ResourceManager: the Router calls each Yarn RM in parallel. In case a + * Yarn RM fails, a single call will timeout. However the Router will + * merge the ApplicationReports it got, and provides a partial list to + * the client. + * + * State Store: the Router will timeout and it will retry depending on the + * FederationFacade settings - if the failure happened before the select + * operation. + */ @Override public GetApplicationsResponse getApplications(GetApplicationsRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + if (request == null) { + RouterServerUtil.logAndThrowException( + "Missing getApplications request.", + null); + } + Map subclusters = + federationFacade.getSubClusters(true); + ClientMethod remoteMethod = new ClientMethod("getApplications", + new Class[] {GetApplicationsRequest.class}, new Object[] {request}); + Map applications = + invokeConcurrent(subclusters.keySet(), remoteMethod, + GetApplicationsResponse.class); + + // Merge the Application Reports + return RouterYarnClientUtils.mergeApplications(applications.values(), + returnPartialReport); } @Override @@ -676,6 +716,12 @@ public Object call() throws Exception { return results; } + Map invokeConcurrent(Collection clusterIds, + ClientMethod request, Class clazz) throws YarnException, IOException { + ArrayList clusterIdList = new ArrayList<>(clusterIds); + return invokeConcurrent(clusterIdList, request, clazz); + } + @Override public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) throws YarnException, IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java index 50abcf40a8088..9c36f30952a9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java @@ -18,14 +18,25 @@ package org.apache.hadoop.yarn.server.router.clientrm; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; +import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; +import org.apache.hadoop.yarn.util.resource.Resources; /** * Util class for Router Yarn client API calls. */ public final class RouterYarnClientUtils { + private final static String PARTIAL_REPORT = "Partial Report "; + private RouterYarnClientUtils() { } @@ -52,4 +63,130 @@ public static GetClusterMetricsResponse merge( } return GetClusterMetricsResponse.newInstance(tmp); } + + /** + * Merges a list of ApplicationReports grouping by ApplicationId. + * Our current policy is to merge the application reports from the reachable + * SubClusters. + * @param responses a list of ApplicationResponse to merge + * @param returnPartialResult if the merge ApplicationReports should contain + * partial result or not + * @return the merged ApplicationsResponse + */ + public static GetApplicationsResponse mergeApplications( + Collection responses, + boolean returnPartialResult){ + Map federationAM = new HashMap<>(); + Map federationUAMSum = new HashMap<>(); + + for (GetApplicationsResponse appResponse : responses){ + for (ApplicationReport appReport : appResponse.getApplicationList()){ + ApplicationId appId = appReport.getApplicationId(); + // Check if this ApplicationReport is an AM + if (!appReport.isUnmanagedApp()) { + // Insert in the list of AM + federationAM.put(appId, appReport); + // Check if there are any UAM found before + if (federationUAMSum.containsKey(appId)) { + // Merge the current AM with the found UAM + mergeAMWithUAM(appReport, federationUAMSum.get(appId)); + // Remove the sum of the UAMs + federationUAMSum.remove(appId); + } + // This ApplicationReport is an UAM + } else if (federationAM.containsKey(appId)) { + // Merge the current UAM with its own AM + mergeAMWithUAM(federationAM.get(appId), appReport); + } else if (federationUAMSum.containsKey(appId)) { + // Merge the current UAM with its own UAM and update the list of UAM + ApplicationReport mergedUAMReport = + mergeUAMWithUAM(federationUAMSum.get(appId), appReport); + federationUAMSum.put(appId, mergedUAMReport); + } else { + // Insert in the list of UAM + federationUAMSum.put(appId, appReport); + } + } + } + // Check the remaining UAMs are depending or not from federation + for (ApplicationReport appReport : federationUAMSum.values()) { + if (mergeUamToReport(appReport.getName(), returnPartialResult)) { + federationAM.put(appReport.getApplicationId(), appReport); + } + } + + return GetApplicationsResponse.newInstance(federationAM.values()); + } + + private static ApplicationReport mergeUAMWithUAM(ApplicationReport uam1, + ApplicationReport uam2){ + uam1.setName(PARTIAL_REPORT + uam1.getApplicationId()); + mergeAMWithUAM(uam1, uam2); + return uam1; + } + + private static void mergeAMWithUAM(ApplicationReport am, + ApplicationReport uam){ + ApplicationResourceUsageReport amResourceReport = + am.getApplicationResourceUsageReport(); + + ApplicationResourceUsageReport uamResourceReport = + uam.getApplicationResourceUsageReport(); + + amResourceReport.setNumUsedContainers( + amResourceReport.getNumUsedContainers() + + uamResourceReport.getNumUsedContainers()); + + amResourceReport.setNumReservedContainers( + amResourceReport.getNumReservedContainers() + + uamResourceReport.getNumReservedContainers()); + + amResourceReport.setUsedResources(Resources.add( + amResourceReport.getUsedResources(), + uamResourceReport.getUsedResources())); + + amResourceReport.setReservedResources(Resources.add( + amResourceReport.getReservedResources(), + uamResourceReport.getReservedResources())); + + amResourceReport.setNeededResources(Resources.add( + amResourceReport.getNeededResources(), + uamResourceReport.getNeededResources())); + + amResourceReport.setMemorySeconds( + amResourceReport.getMemorySeconds() + + uamResourceReport.getMemorySeconds()); + + amResourceReport.setVcoreSeconds( + amResourceReport.getVcoreSeconds() + + uamResourceReport.getVcoreSeconds()); + + amResourceReport.setQueueUsagePercentage( + amResourceReport.getQueueUsagePercentage() + + uamResourceReport.getQueueUsagePercentage()); + + amResourceReport.setClusterUsagePercentage( + amResourceReport.getClusterUsagePercentage() + + uamResourceReport.getClusterUsagePercentage()); + + am.setApplicationResourceUsageReport(amResourceReport); + } + + /** + * Returns whether or not to add an unmanaged application to the report. + * @param appName Application Name + * @param returnPartialResult if the merge ApplicationReports should contain + * partial result or not + */ + private static boolean mergeUamToReport(String appName, + boolean returnPartialResult){ + if (returnPartialResult) { + return true; + } + if (appName == null) { + return false; + } + return !(appName.startsWith(UnmanagedApplicationManager.APP_NAME) || + appName.startsWith(PARTIAL_REPORT)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index 125dfcfbeeed9..4ffd3cb2e1df3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -22,16 +22,20 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; import java.util.List; - import java.util.Map; +import java.util.Set; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.yarn.MockApps; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; @@ -40,11 +44,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; @@ -531,4 +536,109 @@ public void testGetClusterMetricsRequest() throws YarnException, IOException { GetClusterMetricsResponse.class); Assert.assertEquals(true, clusterMetrics.isEmpty()); } + + /** + * This test validates the correctness of + * GetApplicationsResponse in case the + * application exists in the cluster. + */ + @Test + public void testGetApplicationsResponse() + throws YarnException, IOException, InterruptedException { + LOG.info("Test FederationClientInterceptor: Get Applications Response"); + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + + Set appTypes = Collections.singleton("MockApp"); + GetApplicationsRequest requestGet = + GetApplicationsRequest.newInstance(appTypes); + + GetApplicationsResponse responseGet = + interceptor.getApplications(requestGet); + + Assert.assertNotNull(responseGet); + } + + /** + * This test validates + * the correctness of GetApplicationsResponse in case of + * empty request. + */ + @Test + public void testGetApplicationsNullRequest() throws Exception { + LOG.info("Test FederationClientInterceptor: Get Applications request"); + LambdaTestUtils.intercept(YarnException.class, + "Missing getApplications request.", + () -> interceptor.getApplications(null)); + } + + /** + * This test validates + * the correctness of GetApplicationsResponse in case applications + * with given type does not exist. + */ + @Test + public void testGetApplicationsApplicationTypeNotExists() throws Exception{ + LOG.info("Test FederationClientInterceptor: Application with type does " + + "not exist"); + + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + + Set appTypes = Collections.singleton("SPARK"); + + GetApplicationsRequest requestGet = + GetApplicationsRequest.newInstance(appTypes); + + GetApplicationsResponse responseGet = + interceptor.getApplications(requestGet); + + Assert.assertNotNull(responseGet); + Assert.assertTrue(responseGet.getApplicationList().isEmpty()); + } + + /** + * This test validates + * the correctness of GetApplicationsResponse in case applications + * with given YarnApplicationState does not exist. + */ + @Test + public void testGetApplicationsApplicationStateNotExists() throws Exception{ + LOG.info("Test FederationClientInterceptor:" + + " Application with state does not exist"); + + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + + EnumSet applicationStates = EnumSet.noneOf( + YarnApplicationState.class); + applicationStates.add(YarnApplicationState.KILLED); + + GetApplicationsRequest requestGet = + GetApplicationsRequest.newInstance(applicationStates); + + GetApplicationsResponse responseGet = + interceptor.getApplications(requestGet); + + Assert.assertNotNull(responseGet); + Assert.assertTrue(responseGet.getApplicationList().isEmpty()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java index d062f9d0b590f..3b64c2310768c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java @@ -19,8 +19,19 @@ package org.apache.hadoop.yarn.server.router.clientrm; import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; +import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; import org.junit.Assert; import org.junit.Test; @@ -29,6 +40,8 @@ */ public class TestRouterYarnClientUtils { + private final static String PARTIAL_REPORT = "Partial Report "; + @Test public void testClusterMetricsMerge() { ArrayList responses = new ArrayList<>(); @@ -54,4 +67,108 @@ public GetClusterMetricsResponse getClusterMetricsResponse(int value) { metrics.setNumNodeManagers(value); return GetClusterMetricsResponse.newInstance(metrics); } + + /** + * This test validates the correctness of + * RouterYarnClientUtils#mergeApplications. + */ + @Test + public void testMergeApplications() { + ArrayList responses = new ArrayList<>(); + responses.add(getApplicationsResponse(1, false)); + responses.add(getApplicationsResponse(2, false)); + GetApplicationsResponse result = RouterYarnClientUtils. + mergeApplications(responses, false); + Assert.assertNotNull(result); + Assert.assertEquals(2, result.getApplicationList().size()); + + String appName1 = result.getApplicationList().get(0).getName(); + String appName2 = result.getApplicationList().get(1).getName(); + + // Check that no Unmanaged applications are added to the result + Assert.assertEquals(false, + appName1.contains(UnmanagedApplicationManager.APP_NAME)); + Assert.assertEquals(false, + appName2.contains(UnmanagedApplicationManager.APP_NAME)); + } + + /** + * This test validates the correctness of + * RouterYarnClientUtils#mergeApplications. + */ + @Test + public void testMergeUnmanagedApplications() { + ArrayList responses = new ArrayList<>(); + responses.add(getApplicationsResponse(1, true)); + + // Check response if partial results are enabled + GetApplicationsResponse result = RouterYarnClientUtils. + mergeApplications(responses, true); + Assert.assertNotNull(result); + Assert.assertEquals(1, result.getApplicationList().size()); + ApplicationReport appReport = result.getApplicationList().iterator().next(); + String appName = appReport.getName(); + Assert.assertTrue(appName.startsWith(PARTIAL_REPORT)); + + // Check ApplicationResourceUsageReport merge + ApplicationResourceUsageReport resourceUsageReport = + appReport.getApplicationResourceUsageReport(); + + Assert.assertEquals(2, resourceUsageReport.getNumUsedContainers()); + Assert.assertEquals(4, resourceUsageReport.getNumReservedContainers()); + + // Check response if partial results are disabled + result = RouterYarnClientUtils. + mergeApplications(responses, false); + Assert.assertNotNull(result); + Assert.assertTrue(result.getApplicationList().isEmpty()); + } + + /** + * This generates a GetApplicationsResponse with 2 applications with + * same ApplicationId. + * @param value Used as Id in ApplicationId + * @param uamOnly If set to true, only unmanaged applications are added in + * response, else one managed and one unmanaged applications + * are added with same ApplicationId. + * @return GetApplicationsResponse + */ + private GetApplicationsResponse getApplicationsResponse(int value, + boolean uamOnly) { + String appName = uamOnly? UnmanagedApplicationManager.APP_NAME: "appname"; + List applications = new ArrayList<>(); + + // Create first application report. This is a managed app by default. + // If uamOnly is true, this becomes unmanaged application. + ApplicationId appId = ApplicationId.newInstance(1234, value); + Resource resource = Resource.newInstance(1024, 1); + ApplicationResourceUsageReport appResourceUsageReport = + ApplicationResourceUsageReport.newInstance( + 1, 2, resource, resource, + resource, null, 0.1f, + 0.1f, null); + + ApplicationReport appReport = ApplicationReport.newInstance( + appId, ApplicationAttemptId.newInstance(appId, 1), + "user", "queue", appName, "host", + 124, null, YarnApplicationState.RUNNING, + "diagnostics", "url", 0, 0, + 0, FinalApplicationStatus.SUCCEEDED, appResourceUsageReport, "N/A", + 0.53789f, "YARN", null, null, uamOnly, null, null, null); + + // Create second application report. This is always unmanaged application. + ApplicationId appId2 = ApplicationId.newInstance(1234, value); + ApplicationReport appReport2 = ApplicationReport.newInstance( + appId2, ApplicationAttemptId.newInstance(appId, 1), + "user", "queue", UnmanagedApplicationManager.APP_NAME, "host", + 124, null, YarnApplicationState.RUNNING, + "diagnostics", "url", 0, 0, + 0, FinalApplicationStatus.SUCCEEDED, appResourceUsageReport, "N/A", + 0.53789f, "YARN", null, null, true, null, null, null); + + applications.add(appReport); + applications.add(appReport2); + + return GetApplicationsResponse.newInstance(applications); + } }