From d7c5819b9c7679ada3e31d8b816b163f0af83ebf Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Wed, 12 Oct 2022 19:24:33 +0800 Subject: [PATCH 1/8] YARN-11342. [Federation] Refactor FederationClientInterceptor#submitApplication Use FederationActionRetry. --- .../retry/FederationActionRetry.java | 4 +- .../FederationStateStoreService.java | 5 +- .../clientrm/FederationClientInterceptor.java | 234 +++++++++++++----- 3 files changed, 173 insertions(+), 70 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java index 634e76896456c..3068526c1eab1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java @@ -24,13 +24,13 @@ public interface FederationActionRetry { Logger LOG = LoggerFactory.getLogger(FederationActionRetry.class); - T run() throws Exception; + T run(int retry) throws Exception; default T runWithRetries(int retryCount, long retrySleepTime) throws Exception { int retry = 0; while (true) { try { - return run(); + return run(retry); } catch (Exception e) { LOG.info("Exception while executing an Federation operation.", e); if (++retry > retryCount) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index 1d67af926d43e..1290c24991997 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -481,8 +481,9 @@ public boolean cleanUpFinishApplicationsWithRetries(ApplicationId appId, boolean DeleteApplicationHomeSubClusterRequest.newInstance(appId); // CleanUp Finish App. - return ((FederationActionRetry) () -> invokeCleanUpFinishApp(appId, isQuery, request)) - .runWithRetries(cleanUpRetryCountNum, cleanUpRetrySleepTime); + return ((FederationActionRetry) (retry) -> + invokeCleanUpFinishApp(appId, isQuery, request)) + .runWithRetries(cleanUpRetryCountNum, cleanUpRetrySleepTime); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index a027977e1345e..db1dc4a5cca9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -116,6 +116,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -124,6 +125,7 @@ import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.retry.FederationActionRetry; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; @@ -260,6 +262,17 @@ private SubClusterId getRandomActiveSubCluster( return list.get(rand.nextInt(list.size())); } + @VisibleForTesting + private int getActiveSubClustersCount() throws YarnException { + Map activeSubClusters = + federationFacade.getSubClusters(true); + if (activeSubClusters == null || activeSubClusters.isEmpty()) { + return 0; + } else { + return activeSubClusters.size(); + } + } + /** * YARN Router forwards every getNewApplication requests to any RM. During * this operation there will be no communication with the State Store. The @@ -400,98 +413,187 @@ public SubmitApplicationResponse submitApplication( RouterServerUtil.logAndThrowException(errMsg, null); } - SubmitApplicationResponse response = null; - long startTime = clock.getTime(); - ApplicationId applicationId = request.getApplicationSubmissionContext().getApplicationId(); - List blacklist = new ArrayList<>(); - for (int i = 0; i < numSubmitRetries; ++i) { + try { + + // We need to handle this situation, + // the user will provide us with an expected submitRetries, + // but if the number of Active SubClusters is less than this number at this time, + // we should provide a high number of retry according to the number of Active SubClusters. + int activeSubClustersCount = getActiveSubClustersCount(); + int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries); + + // Try calling the SubmitApplication method + SubmitApplicationResponse response = + ((FederationActionRetry) (retryCount) -> + invokeSubmitApplication(blacklist, request, retryCount)). + runWithRetries(actualRetryNums, 100); + + if (response != null) { + long stopTime = clock.getTime(); + routerMetrics.succeededAppsSubmitted(stopTime - startTime); + return response; + } + + } catch (Exception e){ + routerMetrics.incrAppsFailedSubmitted(); + RouterServerUtil.logAndThrowException(e.getMessage(), e); + } + + routerMetrics.incrAppsFailedSubmitted(); + String msg = String.format("Application %s with appId %s failed to be submitted.", + request.getApplicationSubmissionContext().getApplicationName(), applicationId); + RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN, + TARGET_CLIENT_RM_SERVICE, msg, applicationId); + throw new YarnException(msg); + } - SubClusterId subClusterId = policyFacade.getHomeSubcluster( - request.getApplicationSubmissionContext(), blacklist); + /** + * Invoke SubmitApplication to different subClusters. + * + * Step1. Select homeSubCluster for Application according to Policy. + * + * Step2. Query homeSubCluster according to ApplicationId, + * if homeSubCluster does not exist or first attempt(consider repeated submissions), write; + * if homeSubCluster exists, update. + * + * Step3. Find the clientRMProxy of the corresponding cluster according to homeSubCluster, + * and then call the SubmitApplication method. + * + * Step4. If SubmitApplicationResponse is empty, the request fails, + * if SubmitApplicationResponse is not empty, the request is successful. + * + * @param blackList Blacklist avoid repeated calls to unavailable subCluster. + * @param request submitApplicationRequest. + * @param retryCount number of retries. + * @return submitApplication response, If the response is empty, the request fails, + * if the response is not empty, the request is successful. + * @throws YarnException yarn exception. + */ + private SubmitApplicationResponse invokeSubmitApplication( + List blackList, SubmitApplicationRequest request, int retryCount) + throws YarnException { + // The request is not checked here, + // because the request has been checked before the method is called. + // We get applicationId and subClusterId from context. + ApplicationSubmissionContext appSubmissionContext = request.getApplicationSubmissionContext(); + ApplicationId applicationId = appSubmissionContext.getApplicationId(); + SubClusterId subClusterId = null; + + try { + + // Step1. Select homeSubCluster for Application according to Policy. + subClusterId = policyFacade.getHomeSubcluster(appSubmissionContext, blackList); LOG.info("submitApplication appId {} try #{} on SubCluster {}.", - applicationId, i, subClusterId); + applicationId, retryCount, subClusterId); + + // Step2. Query homeSubCluster according to ApplicationId. + Boolean exists = existsApplicationHomeSubCluster(applicationId); ApplicationHomeSubCluster appHomeSubCluster = ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); - if (i == 0) { - try { - // persist the mapping of applicationId and the subClusterId which has - // been selected as its home - subClusterId = - federationFacade.addApplicationHomeSubCluster(appHomeSubCluster); - } catch (YarnException e) { - routerMetrics.incrAppsFailedSubmitted(); - String message = - String.format("Unable to insert the ApplicationId %s into the FederationStateStore.", - applicationId); - RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN, - TARGET_CLIENT_RM_SERVICE, message, applicationId, subClusterId); - RouterServerUtil.logAndThrowException(message, e); - } + if (exists || retryCount == 0) { + addApplicationHomeSubCluster(applicationId, appHomeSubCluster); } else { - try { - // update the mapping of applicationId and the home subClusterId to - // the new subClusterId we have selected - federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster); - } catch (YarnException e) { - String message = - String.format("Unable to update the ApplicationId %s into the FederationStateStore.", - applicationId); - SubClusterId subClusterIdInStateStore = - federationFacade.getApplicationHomeSubCluster(applicationId); - if (subClusterId == subClusterIdInStateStore) { - LOG.info("Application {} already submitted on SubCluster {}.", - applicationId, subClusterId); - } else { - routerMetrics.incrAppsFailedSubmitted(); - RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN, - TARGET_CLIENT_RM_SERVICE, message, applicationId, subClusterId); - RouterServerUtil.logAndThrowException(message, e); - } - } + updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster); } - ApplicationClientProtocol clientRMProxy = - getClientRMProxyForSubCluster(subClusterId); - - try { - response = clientRMProxy.submitApplication(request); - } catch (Exception e) { - LOG.warn("Unable to submit the application {} to SubCluster {} error = {}.", - applicationId, subClusterId.getId(), e); - } + // Step3. SubmitApplication to the subCluster + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + SubmitApplicationResponse response = clientRMProxy.submitApplication(request); + // Step4. if SubmitApplicationResponse is not empty, the request is successful. if (response != null) { - LOG.info("Application {} with appId {} submitted on {}.", - request.getApplicationSubmissionContext().getApplicationName(), - applicationId, subClusterId); - long stopTime = clock.getTime(); - routerMetrics.succeededAppsSubmitted(stopTime - startTime); + LOG.info("Application {} submitted on subCluster {}.", applicationId, subClusterId); RouterAuditLogger.logSuccess(user.getShortUserName(), SUBMIT_NEW_APP, TARGET_CLIENT_RM_SERVICE, applicationId, subClusterId); return response; - } else { - // Empty response from the ResourceManager. - // Blacklist this subcluster for this request. - blacklist.add(subClusterId); } + } catch (Exception e) { + RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN, + TARGET_CLIENT_RM_SERVICE, e.getMessage(), applicationId, subClusterId); + LOG.warn("Unable to submitApplication appId {} try #{} on SubCluster {} error = {}.", + applicationId, subClusterId, e); + blackList.add(subClusterId); } - routerMetrics.incrAppsFailedSubmitted(); - String msg = String.format("Application %s with appId %s failed to be submitted.", - request.getApplicationSubmissionContext().getApplicationName(), applicationId); - RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN, - TARGET_CLIENT_RM_SERVICE, msg, applicationId); + // If SubmitApplicationResponse is empty, the request fails. + String msg = String.format("Application %s failed to be submitted SubCluster %s.", + applicationId, subClusterId); throw new YarnException(msg); } + /** + * Add ApplicationHomeSubCluster to FederationStateStore. + * + * @param applicationId applicationId. + * @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy. + * @throws YarnException yarn exception. + */ + private void addApplicationHomeSubCluster(ApplicationId applicationId, + ApplicationHomeSubCluster homeSubCluster) throws YarnException { + try { + federationFacade.addApplicationHomeSubCluster(homeSubCluster); + } catch (YarnException e) { + RouterServerUtil.logAndThrowException(e, + "Unable to insert the ApplicationId %s into the FederationStateStore.", applicationId); + } + } + + /** + * Update ApplicationHomeSubCluster to FederationStateStore. + * + * @param subClusterId homeSubClusterId + * @param applicationId applicationId. + * @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy. + * @throws YarnException yarn exception. + */ + private void updateApplicationHomeSubCluster(SubClusterId subClusterId, + ApplicationId applicationId, ApplicationHomeSubCluster homeSubCluster) throws YarnException { + try { + federationFacade.updateApplicationHomeSubCluster(homeSubCluster); + } catch (YarnException e) { + SubClusterId subClusterIdInStateStore = + federationFacade.getApplicationHomeSubCluster(applicationId); + if (subClusterId == subClusterIdInStateStore) { + LOG.info("Application {} already submitted on SubCluster {}.", + applicationId, subClusterId); + } else { + RouterServerUtil.logAndThrowException(e, + "Unable to update the ApplicationId %s into the FederationStateStore.", + applicationId); + } + } + } + + + /** + * Query SubClusterId By applicationId. + * + * If SubClusterId is not empty, it means it exists and returns true; + * if SubClusterId is empty, it means it does not exist and returns false. + * + * @param applicationId applicationId + * @return true, SubClusterId exists; false, SubClusterId not exists. + */ + private boolean existsApplicationHomeSubCluster(ApplicationId applicationId) { + try { + SubClusterId subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId); + if (subClusterId != null) { + return true; + } + } catch (YarnException e) { + LOG.warn("get homeSubCluster by applicationId = {} error.", applicationId, e); + } + return false; + } + /** * The YARN Router will forward to the respective YARN RM in which the AM is * running. From d22b8adefd914e682fe568ea46e5d23c85b383c4 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Thu, 13 Oct 2022 18:40:44 +0800 Subject: [PATCH 2/8] YARN-11342. Fix CheckStyle. --- .../clientrm/FederationClientInterceptor.java | 12 +- .../TestFederationClientInterceptorRetry.java | 109 +++++++----------- 2 files changed, 50 insertions(+), 71 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index db1dc4a5cca9f..042332074177b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -425,7 +425,7 @@ public SubmitApplicationResponse submitApplication( // but if the number of Active SubClusters is less than this number at this time, // we should provide a high number of retry according to the number of Active SubClusters. int activeSubClustersCount = getActiveSubClustersCount(); - int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries); + int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries) + 1; // Try calling the SubmitApplication method SubmitApplicationResponse response = @@ -476,7 +476,7 @@ public SubmitApplicationResponse submitApplication( */ private SubmitApplicationResponse invokeSubmitApplication( List blackList, SubmitApplicationRequest request, int retryCount) - throws YarnException { + throws YarnException, IOException { // The request is not checked here, // because the request has been checked before the method is called. @@ -520,12 +520,14 @@ private SubmitApplicationResponse invokeSubmitApplication( TARGET_CLIENT_RM_SERVICE, e.getMessage(), applicationId, subClusterId); LOG.warn("Unable to submitApplication appId {} try #{} on SubCluster {} error = {}.", applicationId, subClusterId, e); - blackList.add(subClusterId); + if (subClusterId != null) { + blackList.add(subClusterId); + } + throw e; } // If SubmitApplicationResponse is empty, the request fails. - String msg = String.format("Application %s failed to be submitted SubCluster %s.", - applicationId, subClusterId); + String msg = String.format("Application %s failed to be submitted.", applicationId); throw new YarnException(msg); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java index 096fa0639079a..864c26efa09d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java @@ -25,10 +25,12 @@ import java.util.Arrays; import java.util.List; +import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; @@ -38,7 +40,9 @@ import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil; @@ -160,19 +164,13 @@ protected YarnConfiguration createConfiguration() { */ @Test public void testGetNewApplicationOneBadSC() - throws YarnException, IOException, InterruptedException { - - System.out.println("Test getNewApplication with one bad SubCluster"); + throws Exception { + LOG.info("Test getNewApplication with one bad SubCluster"); setupCluster(Arrays.asList(bad2)); - try { - interceptor.getNewApplication(GetNewApplicationRequest.newInstance()); - Assert.fail(); - } catch (Exception e) { - System.out.println(e.toString()); - Assert.assertTrue(e.getMessage() - .equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE)); - } + GetNewApplicationRequest request = GetNewApplicationRequest.newInstance(); + LambdaTestUtils.intercept(YarnException.class, FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, + () -> interceptor.getNewApplication(request)); } /** @@ -181,18 +179,13 @@ public void testGetNewApplicationOneBadSC() */ @Test public void testGetNewApplicationTwoBadSCs() - throws YarnException, IOException, InterruptedException { - System.out.println("Test getNewApplication with two bad SubClusters"); + throws Exception { + LOG.info("Test getNewApplication with two bad SubClusters"); setupCluster(Arrays.asList(bad1, bad2)); - try { - interceptor.getNewApplication(GetNewApplicationRequest.newInstance()); - Assert.fail(); - } catch (Exception e) { - System.out.println(e.toString()); - Assert.assertTrue(e.getMessage() - .equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE)); - } + GetNewApplicationRequest request = GetNewApplicationRequest.newInstance(); + LambdaTestUtils.intercept(YarnException.class, FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, + () -> interceptor.getNewApplication(request)); } /** @@ -201,16 +194,14 @@ public void testGetNewApplicationTwoBadSCs() */ @Test public void testGetNewApplicationOneBadOneGood() - throws YarnException, IOException, InterruptedException { - System.out.println("Test getNewApplication with one bad, one good SC"); + throws YarnException, IOException { + + LOG.info("Test getNewApplication with one bad, one good SC"); setupCluster(Arrays.asList(good, bad2)); - GetNewApplicationResponse response = null; - try { - response = - interceptor.getNewApplication(GetNewApplicationRequest.newInstance()); - } catch (Exception e) { - Assert.fail(); - } + GetNewApplicationRequest request = GetNewApplicationRequest.newInstance(); + GetNewApplicationResponse response = interceptor.getNewApplication(request); + + Assert.assertNotNull(response); Assert.assertEquals(ResourceManager.getClusterTimeStamp(), response.getApplicationId().getClusterTimestamp()); } @@ -221,24 +212,17 @@ public void testGetNewApplicationOneBadOneGood() */ @Test public void testSubmitApplicationOneBadSC() - throws YarnException, IOException, InterruptedException { + throws Exception { - System.out.println("Test submitApplication with one bad SubCluster"); + LOG.info("Test submitApplication with one bad SubCluster"); setupCluster(Arrays.asList(bad2)); final ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - final SubmitApplicationRequest request = mockSubmitApplicationRequest( - appId); - try { - interceptor.submitApplication(request); - Assert.fail(); - } catch (Exception e) { - System.out.println(e); - Assert.assertTrue(e.getMessage() - .equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE)); - } + final SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + LambdaTestUtils.intercept(YarnException.class, FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, + () -> interceptor.submitApplication(request)); } private SubmitApplicationRequest mockSubmitApplicationRequest( @@ -261,23 +245,16 @@ private SubmitApplicationRequest mockSubmitApplicationRequest( */ @Test public void testSubmitApplicationTwoBadSCs() - throws YarnException, IOException, InterruptedException { + throws Exception { System.out.println("Test submitApplication with two bad SubClusters"); setupCluster(Arrays.asList(bad1, bad2)); final ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - final SubmitApplicationRequest request = mockSubmitApplicationRequest( - appId); - try { - interceptor.submitApplication(request); - Assert.fail(); - } catch (Exception e) { - System.out.println(e.toString()); - Assert.assertTrue(e.getMessage() - .equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE)); - } + final SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + LambdaTestUtils.intercept(YarnException.class, FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, + () -> interceptor.submitApplication(request)); } /** @@ -293,18 +270,18 @@ public void testSubmitApplicationOneBadOneGood() final ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - final SubmitApplicationRequest request = mockSubmitApplicationRequest( - appId); - try { - interceptor.submitApplication(request); - } catch (Exception e) { - Assert.fail(); - } - Assert.assertEquals(good, - stateStore - .getApplicationHomeSubCluster( - GetApplicationHomeSubClusterRequest.newInstance(appId)) - .getApplicationHomeSubCluster().getHomeSubCluster()); - } + final SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + SubmitApplicationResponse response = interceptor.submitApplication(request); + Assert.assertNotNull(response); + + GetApplicationHomeSubClusterRequest getAppRequest = + GetApplicationHomeSubClusterRequest.newInstance(appId); + GetApplicationHomeSubClusterResponse getAppResponse = + stateStore.getApplicationHomeSubCluster(getAppRequest); + Assert.assertNotNull(getAppResponse); + ApplicationHomeSubCluster responseHomeSubCluster = + getAppResponse.getApplicationHomeSubCluster(); + Assert.assertEquals(good,responseHomeSubCluster); + } } From f5c2d31562fa06f4b273036b2251d9b3284a90c6 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Thu, 13 Oct 2022 23:11:07 +0800 Subject: [PATCH 3/8] YARN-11342. Fix CheckStyle. --- .../router/clientrm/TestFederationClientInterceptorRetry.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java index 864c26efa09d7..96c9a5fd092dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java @@ -282,6 +282,8 @@ public void testSubmitApplicationOneBadOneGood() ApplicationHomeSubCluster responseHomeSubCluster = getAppResponse.getApplicationHomeSubCluster(); - Assert.assertEquals(good,responseHomeSubCluster); + Assert.assertNotNull(responseHomeSubCluster); + SubClusterId respSubClusterId = responseHomeSubCluster.getHomeSubCluster(); + Assert.assertEquals(good, respSubClusterId); } } From 59a41ac9832826a38b4632e4388e9d43a583c91f Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Mon, 17 Oct 2022 21:26:25 +0800 Subject: [PATCH 4/8] YARN-11342. Fix CheckStyle. --- .../yarn/server/router/RouterServerUtil.java | 35 +++++++++- .../clientrm/FederationClientInterceptor.java | 64 +++++++++++++++---- 2 files changed, 83 insertions(+), 16 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java index 2355ef7f73dcd..546b1b7c3a4e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.router; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.math.NumberUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -27,14 +29,15 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; +import java.util.*; import java.io.IOException; /** @@ -53,6 +56,8 @@ public final class RouterServerUtil { private static final String EPOCH_PREFIX = "e"; + private static Random rand = new Random(System.currentTimeMillis()); + /** Disable constructor. */ private RouterServerUtil() { } @@ -446,4 +451,28 @@ public static void validateContainerId(String containerId) throw new IllegalArgumentException("Invalid ContainerId: " + containerId); } } + + /** + * Randomly pick ActiveSubCluster. + * During the selection process, we will exclude SubClusters from the blacklist. + * + * @param activeSubClusters List of active subClusters. + * @param blackList blacklist. + * @return Active SubClusterId + */ + public static SubClusterId getRandomActiveSubCluster( + Map activeSubClusters, List blackList) + throws YarnException { + if (MapUtils.isEmpty(activeSubClusters)) { + logAndThrowException(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null); + } + List list = new ArrayList<>(activeSubClusters.keySet()); + if (CollectionUtils.isNotEmpty(blackList)) { + list.removeAll(blackList); + } + if (CollectionUtils.isEmpty(list)) { + logAndThrowException(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null); + } + return list.get(rand.nextInt(list.size())); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index a2e278a072d14..dc9a4eaac572e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -306,25 +306,25 @@ public GetNewApplicationResponse getNewApplication( Map subClustersActive = federationFacade.getSubClusters(true); - for (int i = 0; i < numSubmitRetries; ++i) { - SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive); - LOG.info("getNewApplication try #{} on SubCluster {}.", i, subClusterId); - ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); - GetNewApplicationResponse response = null; - try { - response = clientRMProxy.getNewApplication(request); - } catch (Exception e) { - LOG.warn("Unable to create a new ApplicationId in SubCluster {}.", subClusterId.getId(), e); - subClustersActive.remove(subClusterId); - } + // Try calling the getNewApplication method + List blacklist = new ArrayList<>(); + int activeSubClustersCount = getActiveSubClustersCount(); + int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries) + 1; + + try { + GetNewApplicationResponse response = + ((FederationActionRetry) (retryCount) -> + invokeGetNewApplication(subClustersActive, blacklist, request, retryCount)). + runWithRetries(actualRetryNums, 100); if (response != null) { long stopTime = clock.getTime(); routerMetrics.succeededAppsCreated(stopTime - startTime); - RouterAuditLogger.logSuccess(user.getShortUserName(), GET_NEW_APP, - TARGET_CLIENT_RM_SERVICE, response.getApplicationId()); return response; } + } catch (Exception e) { + routerMetrics.incrAppsFailedCreated(); + RouterServerUtil.logAndThrowException(e.getMessage(), e); } routerMetrics.incrAppsFailedCreated(); @@ -334,6 +334,44 @@ public GetNewApplicationResponse getNewApplication( throw new YarnException(errMsg); } + /** + * Invoke GetNewApplication to different subClusters. + * + * @param subClustersActive Active SubClusters + * @param blackList Blacklist avoid repeated calls to unavailable subCluster. + * @param request getNewApplicationRequest. + * @param retryCount number of retries. + * @return Get NewApplicationResponse response, If the response is empty, the request fails, + * if the response is not empty, the request is successful. + * @throws YarnException + * @throws IOException + */ + private GetNewApplicationResponse invokeGetNewApplication( + Map subClustersActive, + List blackList, GetNewApplicationRequest request, int retryCount) + throws YarnException, IOException { + SubClusterId subClusterId = + RouterServerUtil.getRandomActiveSubCluster(subClustersActive, blackList); + LOG.info("getNewApplication try #{} on SubCluster {}.", retryCount, subClusterId); + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + try { + GetNewApplicationResponse response = clientRMProxy.getNewApplication(request); + if (response != null) { + RouterAuditLogger.logSuccess(user.getShortUserName(), GET_NEW_APP, + TARGET_CLIENT_RM_SERVICE, response.getApplicationId()); + return response; + } + } catch (Exception e) { + LOG.warn("Unable to create a new ApplicationId in SubCluster {}.", subClusterId.getId(), e); + blackList.add(subClusterId); + throw e; + } + // If SubmitApplicationResponse is empty, the request fails. + String msg = String.format("Unable to create a new ApplicationId in SubCluster %s.", + subClusterId.getId()); + throw new YarnException(msg); + } + /** * Today, in YARN there are no checks of any applicationId submitted. * From 4fcace96260623c35d52b7b543e8142d5a5a05ef Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Tue, 18 Oct 2022 05:52:10 +0800 Subject: [PATCH 5/8] YARN-11342. Fix CheckStyle. --- .../hadoop/yarn/conf/YarnConfiguration.java | 8 ++++++ .../src/main/resources/yarn-default.xml | 9 ++++++ .../yarn/server/router/RouterAuditLogger.java | 18 ++++++++++++ .../yarn/server/router/RouterServerUtil.java | 28 +++++++++++++++---- .../clientrm/FederationClientInterceptor.java | 17 +++++++---- .../TestFederationClientInterceptorRetry.java | 11 ++++---- 6 files changed, 75 insertions(+), 16 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 2b7c01b0bd932..e53aeecbaeb83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -4117,6 +4117,14 @@ public static boolean isAclEnabled(Configuration conf) { ROUTER_PREFIX + "submit.retry"; public static final int DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY = 3; + /** + * GetNewApplication and SubmitApplication request retry interval time. + */ + public static final String ROUTER_CLIENTRM_SUBMIT_INTERVAL_TIME = + ROUTER_PREFIX + "submit.interval.time"; + public static final long DEFAULT_CLIENTRM_SUBMIT_INTERVAL_TIME = + TimeUnit.MILLISECONDS.toMillis(10); + /** * The interceptor class used in FederationClientInterceptor should return * partial ApplicationReports. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 638818ce33d9e..7d7a8c9f62139 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -5047,4 +5047,13 @@ + + yarn.router.submit.interval.time + 10 + + The interval Time between calling different subCluster requests. + Default is 10ms. + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterAuditLogger.java index a89d0e4462aa2..f3b428dab4a6d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterAuditLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterAuditLogger.java @@ -196,6 +196,24 @@ public static void logFailure(String user, String operation, String perm, } } + /** + * Create a readable and parsable audit log string for a failed event. + * + * @param user User who made the service request. + * @param operation Operation requested by the user. + * @param perm Target permissions. + * @param target The target on which the operation is being performed. + * @param description Some additional information as to why the operation failed. + * @param subClusterId SubCluster Id in which operation was performed. + */ + public static void logFailure(String user, String operation, String perm, + String target, String description, SubClusterId subClusterId) { + if (LOG.isInfoEnabled()) { + LOG.info(createFailureLog(user, operation, perm, target, description, null, + subClusterId)); + } + } + /** * A helper api for creating an audit log for a failure event. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java index 546b1b7c3a4e4..f55a45d4ba485 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java @@ -37,7 +37,11 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Random; import java.io.IOException; /** @@ -458,21 +462,33 @@ public static void validateContainerId(String containerId) * * @param activeSubClusters List of active subClusters. * @param blackList blacklist. - * @return Active SubClusterId + * @return Active SubClusterId. + * @throws YarnException When there is no Active SubCluster, + * an exception will be thrown (No active SubCluster available to submit the request.) */ public static SubClusterId getRandomActiveSubCluster( Map activeSubClusters, List blackList) throws YarnException { + + // Check if activeSubClusters is empty, if it is empty, we need to throw an exception if (MapUtils.isEmpty(activeSubClusters)) { logAndThrowException(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null); } - List list = new ArrayList<>(activeSubClusters.keySet()); + + // Change activeSubClusters to List + List subClusterIds = new ArrayList<>(activeSubClusters.keySet()); + + // If the blacklist is not empty, we need to remove all the subClusters in the blacklist if (CollectionUtils.isNotEmpty(blackList)) { - list.removeAll(blackList); + subClusterIds.removeAll(blackList); } - if (CollectionUtils.isEmpty(list)) { + + // Check there are still active subcluster after removing the blacklist + if (CollectionUtils.isEmpty(subClusterIds)) { logAndThrowException(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null); } - return list.get(rand.nextInt(list.size())); + + // Randomly choose a SubCluster + return subClusterIds.get(rand.nextInt(subClusterIds.size())); } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index dc9a4eaac572e..98b64f749b4aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -178,6 +178,7 @@ public class FederationClientInterceptor private ThreadPoolExecutor executorService; private final Clock clock = new MonotonicClock(); private boolean returnPartialReport; + private long submitIntervalTime; @Override public void init(String userName) { @@ -209,6 +210,10 @@ public void init(String userName) { YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY, YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY); + submitIntervalTime = conf.getTimeDuration( + YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_INTERVAL_TIME, + YarnConfiguration.DEFAULT_CLIENTRM_SUBMIT_INTERVAL_TIME, TimeUnit.MILLISECONDS); + clientRMProxies = new ConcurrentHashMap<>(); routerMetrics = RouterMetrics.getMetrics(); @@ -315,7 +320,7 @@ public GetNewApplicationResponse getNewApplication( GetNewApplicationResponse response = ((FederationActionRetry) (retryCount) -> invokeGetNewApplication(subClustersActive, blacklist, request, retryCount)). - runWithRetries(actualRetryNums, 100); + runWithRetries(actualRetryNums, submitIntervalTime); if (response != null) { long stopTime = clock.getTime(); @@ -343,8 +348,8 @@ public GetNewApplicationResponse getNewApplication( * @param retryCount number of retries. * @return Get NewApplicationResponse response, If the response is empty, the request fails, * if the response is not empty, the request is successful. - * @throws YarnException - * @throws IOException + * @throws YarnException yarn exception. + * @throws IOException io error. */ private GetNewApplicationResponse invokeGetNewApplication( Map subClustersActive, @@ -358,10 +363,12 @@ private GetNewApplicationResponse invokeGetNewApplication( GetNewApplicationResponse response = clientRMProxy.getNewApplication(request); if (response != null) { RouterAuditLogger.logSuccess(user.getShortUserName(), GET_NEW_APP, - TARGET_CLIENT_RM_SERVICE, response.getApplicationId()); + TARGET_CLIENT_RM_SERVICE, response.getApplicationId(), subClusterId); return response; } } catch (Exception e) { + RouterAuditLogger.logFailure(user.getShortUserName(), GET_NEW_APP, UNKNOWN, + TARGET_CLIENT_RM_SERVICE, e.getMessage(), subClusterId); LOG.warn("Unable to create a new ApplicationId in SubCluster {}.", subClusterId.getId(), e); blackList.add(subClusterId); throw e; @@ -469,7 +476,7 @@ public SubmitApplicationResponse submitApplication( SubmitApplicationResponse response = ((FederationActionRetry) (retryCount) -> invokeSubmitApplication(blacklist, request, retryCount)). - runWithRetries(actualRetryNums, 100); + runWithRetries(actualRetryNums, submitIntervalTime); if (response != null) { long stopTime = clock.getTime(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java index 96c9a5fd092dc..144669fadce57 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java @@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; @@ -53,6 +52,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE; + /** * Extends the {@code BaseRouterClientRMTest} and overrides methods in order to * use the {@code RouterClientRMService} pipeline test cases for testing the @@ -169,7 +170,7 @@ public void testGetNewApplicationOneBadSC() setupCluster(Arrays.asList(bad2)); GetNewApplicationRequest request = GetNewApplicationRequest.newInstance(); - LambdaTestUtils.intercept(YarnException.class, FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, + LambdaTestUtils.intercept(YarnException.class, NO_ACTIVE_SUBCLUSTER_AVAILABLE, () -> interceptor.getNewApplication(request)); } @@ -184,7 +185,7 @@ public void testGetNewApplicationTwoBadSCs() setupCluster(Arrays.asList(bad1, bad2)); GetNewApplicationRequest request = GetNewApplicationRequest.newInstance(); - LambdaTestUtils.intercept(YarnException.class, FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, + LambdaTestUtils.intercept(YarnException.class, NO_ACTIVE_SUBCLUSTER_AVAILABLE, () -> interceptor.getNewApplication(request)); } @@ -221,7 +222,7 @@ public void testSubmitApplicationOneBadSC() ApplicationId.newInstance(System.currentTimeMillis(), 1); final SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); - LambdaTestUtils.intercept(YarnException.class, FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, + LambdaTestUtils.intercept(YarnException.class, NO_ACTIVE_SUBCLUSTER_AVAILABLE, () -> interceptor.submitApplication(request)); } @@ -253,7 +254,7 @@ public void testSubmitApplicationTwoBadSCs() ApplicationId.newInstance(System.currentTimeMillis(), 1); final SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); - LambdaTestUtils.intercept(YarnException.class, FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, + LambdaTestUtils.intercept(YarnException.class, NO_ACTIVE_SUBCLUSTER_AVAILABLE, () -> interceptor.submitApplication(request)); } From 7756ec87e85a62267226dc28c8ce644959498946 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Tue, 18 Oct 2022 06:06:16 +0800 Subject: [PATCH 6/8] YARN-11342. Fix CheckStyle. --- .../yarn/server/router/clientrm/FederationClientInterceptor.java | 1 - .../hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index 98b64f749b4aa..d2596343a5fbc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -619,7 +619,6 @@ private void updateApplicationHomeSubCluster(SubClusterId subClusterId, } } - /** * Query SubClusterId By applicationId. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md index b1791551b2bc7..087f38b7c7617 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md @@ -270,6 +270,7 @@ Optional: |`yarn.router.admin.address` | `0.0.0.0:8052` | Admin address at the router. | |`yarn.router.webapp.https.address` | `0.0.0.0:8091` | Secure webapp address at the router. | |`yarn.router.submit.retry` | `3` | The number of retries in the router before we give up. | +|`yarn.router.submit.interval.time` | `10` | The interval between two retry, the default value is 10ms. | |`yarn.federation.statestore.max-connections` | `10` | This is the maximum number of parallel connections each Router makes to the state-store. | |`yarn.federation.cache-ttl.secs` | `60` | The Router caches informations, and this is the time to leave before the cache is invalidated. | |`yarn.router.webapp.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.router.webapp.FederationInterceptorREST` | A comma-separated list of interceptor classes to be run at the router when interfacing with the client via REST interface. The last step of this pipeline must be the Federation Interceptor REST. | From f9cb71c02c810cc985397080a385662288c1a2be Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Wed, 19 Oct 2022 09:25:19 +0800 Subject: [PATCH 7/8] YARN-11342. Fix CheckStyle. --- .../hadoop-yarn-common/src/main/resources/yarn-default.xml | 2 +- .../federation/FederationStateStoreService.java | 7 +++---- .../hadoop-yarn-site/src/site/markdown/Federation.md | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 7d7a8c9f62139..e46473db0fc40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -5049,7 +5049,7 @@ yarn.router.submit.interval.time - 10 + 10ms The interval Time between calling different subCluster requests. Default is 10ms. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index 61aee1a32cdfb..3cc4dab238618 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -502,13 +502,12 @@ public boolean cleanUpFinishApplicationsWithRetries(ApplicationId appId, boolean throws Exception { // Generate a request to delete data - DeleteApplicationHomeSubClusterRequest request = + DeleteApplicationHomeSubClusterRequest req = DeleteApplicationHomeSubClusterRequest.newInstance(appId); // CleanUp Finish App. - return ((FederationActionRetry) (retry) -> - invokeCleanUpFinishApp(appId, isQuery, request)) - .runWithRetries(cleanUpRetryCountNum, cleanUpRetrySleepTime); + return ((FederationActionRetry) (retry) -> invokeCleanUpFinishApp(appId, isQuery, req)) + .runWithRetries(cleanUpRetryCountNum, cleanUpRetrySleepTime); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md index 087f38b7c7617..f547e8d6b7744 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md @@ -270,7 +270,7 @@ Optional: |`yarn.router.admin.address` | `0.0.0.0:8052` | Admin address at the router. | |`yarn.router.webapp.https.address` | `0.0.0.0:8091` | Secure webapp address at the router. | |`yarn.router.submit.retry` | `3` | The number of retries in the router before we give up. | -|`yarn.router.submit.interval.time` | `10` | The interval between two retry, the default value is 10ms. | +|`yarn.router.submit.interval.time` | `10ms` | The interval between two retry, the default value is 10ms. | |`yarn.federation.statestore.max-connections` | `10` | This is the maximum number of parallel connections each Router makes to the state-store. | |`yarn.federation.cache-ttl.secs` | `60` | The Router caches informations, and this is the time to leave before the cache is invalidated. | |`yarn.router.webapp.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.router.webapp.FederationInterceptorREST` | A comma-separated list of interceptor classes to be run at the router when interfacing with the client via REST interface. The last step of this pipeline must be the Federation Interceptor REST. | From 7a7d3828597fb395ce863752fae819de8c686376 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Thu, 20 Oct 2022 09:44:55 +0800 Subject: [PATCH 8/8] YARN-11342. Fix CheckStyle. --- .../TestFederationClientInterceptorRetry.java | 42 +++++++++---------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java index 144669fadce57..f52c9acbd490c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java @@ -82,7 +82,7 @@ public class TestFederationClientInterceptorRetry private static SubClusterId bad1; private static SubClusterId bad2; - private static List scs = new ArrayList(); + private static List scs = new ArrayList<>(); @Override public void setUp() throws IOException { @@ -119,8 +119,7 @@ public void tearDown() { super.tearDown(); } - private void setupCluster(List scsToRegister) - throws YarnException { + private void setupCluster(List scsToRegister) throws YarnException { try { // Clean up the StateStore before every test @@ -137,6 +136,7 @@ private void setupCluster(List scsToRegister) @Override protected YarnConfiguration createConfiguration() { + YarnConfiguration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); String mockPassThroughInterceptorClass = @@ -164,8 +164,8 @@ protected YarnConfiguration createConfiguration() { * cluster is composed of only 1 bad SubCluster. */ @Test - public void testGetNewApplicationOneBadSC() - throws Exception { + public void testGetNewApplicationOneBadSC() throws Exception { + LOG.info("Test getNewApplication with one bad SubCluster"); setupCluster(Arrays.asList(bad2)); @@ -179,8 +179,8 @@ public void testGetNewApplicationOneBadSC() * cluster is composed of only 2 bad SubClusters. */ @Test - public void testGetNewApplicationTwoBadSCs() - throws Exception { + public void testGetNewApplicationTwoBadSCs() throws Exception { + LOG.info("Test getNewApplication with two bad SubClusters"); setupCluster(Arrays.asList(bad1, bad2)); @@ -194,8 +194,7 @@ public void testGetNewApplicationTwoBadSCs() * cluster is composed of only 1 bad SubCluster and 1 good one. */ @Test - public void testGetNewApplicationOneBadOneGood() - throws YarnException, IOException { + public void testGetNewApplicationOneBadOneGood() throws YarnException, IOException { LOG.info("Test getNewApplication with one bad, one good SC"); setupCluster(Arrays.asList(good, bad2)); @@ -212,8 +211,7 @@ public void testGetNewApplicationOneBadOneGood() * cluster is composed of only 1 bad SubCluster. */ @Test - public void testSubmitApplicationOneBadSC() - throws Exception { + public void testSubmitApplicationOneBadSC() throws Exception { LOG.info("Test submitApplication with one bad SubCluster"); setupCluster(Arrays.asList(bad2)); @@ -226,17 +224,14 @@ public void testSubmitApplicationOneBadSC() () -> interceptor.submitApplication(request)); } - private SubmitApplicationRequest mockSubmitApplicationRequest( - ApplicationId appId) { + private SubmitApplicationRequest mockSubmitApplicationRequest(ApplicationId appId) { ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class); ApplicationSubmissionContext context = ApplicationSubmissionContext .newInstance(appId, MockApps.newAppName(), "q1", - Priority.newInstance(0), amContainerSpec, false, false, -1, - Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB), - "MockApp"); - SubmitApplicationRequest request = SubmitApplicationRequest - .newInstance(context); + Priority.newInstance(0), amContainerSpec, false, false, -1, + Resources.createResource(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB), + "MockApp"); + SubmitApplicationRequest request = SubmitApplicationRequest.newInstance(context); return request; } @@ -245,9 +240,9 @@ private SubmitApplicationRequest mockSubmitApplicationRequest( * cluster is composed of only 2 bad SubClusters. */ @Test - public void testSubmitApplicationTwoBadSCs() - throws Exception { - System.out.println("Test submitApplication with two bad SubClusters"); + public void testSubmitApplicationTwoBadSCs() throws Exception { + + LOG.info("Test submitApplication with two bad SubClusters."); setupCluster(Arrays.asList(bad1, bad2)); final ApplicationId appId = @@ -265,7 +260,8 @@ public void testSubmitApplicationTwoBadSCs() @Test public void testSubmitApplicationOneBadOneGood() throws YarnException, IOException, InterruptedException { - System.out.println("Test submitApplication with one bad, one good SC"); + + LOG.info("Test submitApplication with one bad, one good SC."); setupCluster(Arrays.asList(good, bad2)); final ApplicationId appId =