From a403687d5fbe0cc4fcc67f24bb0a3a714fa77bee Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Fri, 21 Oct 2022 10:09:35 +0800 Subject: [PATCH 1/7] YARN-11357. Fix FederationClientInterceptor#submitApplication Can't Update SubClusterId. --- .../server/router/clientrm/FederationClientInterceptor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index d2596343a5fbc..26e4ea2871852 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -543,7 +543,7 @@ private SubmitApplicationResponse invokeSubmitApplication( ApplicationHomeSubCluster appHomeSubCluster = ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); - if (exists || retryCount == 0) { + if (!exists || retryCount == 0) { addApplicationHomeSubCluster(applicationId, appHomeSubCluster); } else { updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster); @@ -563,7 +563,7 @@ private SubmitApplicationResponse invokeSubmitApplication( } catch (Exception e) { RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN, TARGET_CLIENT_RM_SERVICE, e.getMessage(), applicationId, subClusterId); - LOG.warn("Unable to submitApplication appId {} try #{} on SubCluster {} error = {}.", + LOG.warn("Unable to submitApplication appId {} try #{} on SubCluster {}.", applicationId, subClusterId, e); if (subClusterId != null) { blackList.add(subClusterId); From 8321970ca838be57b4e687f9b9c2507e4d6644e1 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Fri, 21 Oct 2022 22:27:00 +0800 Subject: [PATCH 2/7] YARN-11357. Fix CheckStyle. --- .../clientrm/FederationClientInterceptor.java | 13 ++-- .../TestFederationClientInterceptorRetry.java | 50 ++++++++++++++- .../TestSequentialBroadcastPolicyManager.java | 31 +++++++++ .../clientrm/TestSequentialRouterPolicy.java | 63 +++++++++++++++++++ 4 files changed, 152 insertions(+), 5 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialBroadcastPolicyManager.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialRouterPolicy.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index 26e4ea2871852..faffbf602be81 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -314,7 +314,7 @@ public GetNewApplicationResponse getNewApplication( // Try calling the getNewApplication method List blacklist = new ArrayList<>(); int activeSubClustersCount = getActiveSubClustersCount(); - int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries) + 1; + int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries); try { GetNewApplicationResponse response = @@ -470,7 +470,7 @@ public SubmitApplicationResponse submitApplication( // but if the number of Active SubClusters is less than this number at this time, // we should provide a high number of retry according to the number of Active SubClusters. int activeSubClustersCount = getActiveSubClustersCount(); - int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries) + 1; + int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries); // Try calling the SubmitApplication method SubmitApplicationResponse response = @@ -484,7 +484,7 @@ public SubmitApplicationResponse submitApplication( return response; } - } catch (Exception e){ + } catch (Exception e) { routerMetrics.incrAppsFailedSubmitted(); RouterServerUtil.logAndThrowException(e.getMessage(), e); } @@ -564,7 +564,7 @@ private SubmitApplicationResponse invokeSubmitApplication( RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN, TARGET_CLIENT_RM_SERVICE, e.getMessage(), applicationId, subClusterId); LOG.warn("Unable to submitApplication appId {} try #{} on SubCluster {}.", - applicationId, subClusterId, e); + applicationId, retryCount, subClusterId, e); if (subClusterId != null) { blackList.add(subClusterId); } @@ -1948,4 +1948,9 @@ private void updateReservationHomeSubCluster(SubClusterId subClusterId, } } } + + @VisibleForTesting + public void setNumSubmitRetries(int numSubmitRetries) { + this.numSubmitRetries = numSubmitRetries; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java index f52c9acbd490c..ee4efaf6f002d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.router.clientrm; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_POLICY_MANAGER; import static org.mockito.Mockito.mock; import java.io.IOException; @@ -87,6 +88,8 @@ public class TestFederationClientInterceptorRetry @Override public void setUp() throws IOException { super.setUpConfig(); + this.getConf().setStrings(FEDERATION_POLICY_MANAGER, + "org.apache.hadoop.yarn.server.router.clientrm.TestSequentialBroadcastPolicyManager"); interceptor = new TestableFederationClientInterceptor(); stateStore = new MemoryFederationStateStore(); @@ -150,7 +153,7 @@ protected YarnConfiguration createConfiguration() { mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + "," + TestableFederationClientInterceptor.class.getName()); - conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER, + conf.set(FEDERATION_POLICY_MANAGER, UniformBroadcastPolicyManager.class.getName()); // Disable StateStoreFacade cache @@ -283,4 +286,49 @@ public void testSubmitApplicationOneBadOneGood() SubClusterId respSubClusterId = responseHomeSubCluster.getHomeSubCluster(); Assert.assertEquals(good, respSubClusterId); } + + @Test + public void testSubmitApplicationTwoBadOneGood() throws Exception { + LOG.info("Test submitApplication with two bad, one good SC."); + setupCluster(Arrays.asList(bad1, bad2, good)); + final ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + + // Use the TestSequentialRouterPolicy strategy, + // which will sort the SubClusterId because good=0, bad1=1, bad2=2 + // We will get 2, 1, 0 [bad2, bad1, good] + // Set the retryNum to 1 + // 1st time will use bad2, 2nd time will use bad1 + // bad1 is updated to stateStore + interceptor.setNumSubmitRetries(1); + final SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + LambdaTestUtils.intercept(YarnException.class, "RM is stopped", + () -> interceptor.submitApplication(request)); + + // We will get bad1 + checkSubmitSubCluster(appId, bad1); + + // Set the retryNum to 1 + // 1st time will use bad2, 2nd time will use bad1, 3rd good + interceptor.setNumSubmitRetries(2); + SubmitApplicationResponse submitAppResponse = interceptor.submitApplication(request); + Assert.assertNotNull(submitAppResponse); + + // We will get good + checkSubmitSubCluster(appId, good); + } + + private void checkSubmitSubCluster(ApplicationId appId, SubClusterId expectSubCluster) + throws YarnException { + GetApplicationHomeSubClusterRequest getAppRequest = + GetApplicationHomeSubClusterRequest.newInstance(appId); + GetApplicationHomeSubClusterResponse getAppResponse = + stateStore.getApplicationHomeSubCluster(getAppRequest); + Assert.assertNotNull(getAppResponse); + Assert.assertNotNull(getAppResponse); + ApplicationHomeSubCluster responseHomeSubCluster = getAppResponse.getApplicationHomeSubCluster(); + Assert.assertNotNull(responseHomeSubCluster); + SubClusterId respSubClusterId = responseHomeSubCluster.getHomeSubCluster(); + Assert.assertEquals(expectSubCluster, respSubClusterId); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialBroadcastPolicyManager.java new file mode 100644 index 0000000000000..ace925a108118 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialBroadcastPolicyManager.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.manager.AbstractPolicyManager; + +public class TestSequentialBroadcastPolicyManager extends AbstractPolicyManager { + public TestSequentialBroadcastPolicyManager() { + // this structurally hard-codes two compatible policies for Router and + // AMRMProxy. + routerFederationPolicy = TestSequentialRouterPolicy.class; + amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialRouterPolicy.java new file mode 100644 index 0000000000000..f39892677fd91 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialRouterPolicy.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.router.AbstractRouterPolicy; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class TestSequentialRouterPolicy extends AbstractRouterPolicy { + + @Override + public void reinitialize(FederationPolicyInitializationContext policyContext) + throws FederationPolicyInitializationException { + FederationPolicyInitializationContextValidator.validate(policyContext, + this.getClass().getCanonicalName()); + setPolicyContext(policyContext); + } + + @Override + protected SubClusterId chooseSubCluster(String queue, + Map preSelectSubClusters) throws YarnException { + /** + * This strategy is only suitable for testing. We need to obtain subClusters sequentially. + * We have 3 subClusters, 1 goodSubCluster and 2 badSubClusters. + * The sc-id of goodSubCluster is 0, and the sc-id of badSubCluster is 1 and 2. + * We hope Return in reverse order, that is, return 2, 1, 0 + * Return to badCluster first. + */ + List subClusterIds = new ArrayList<>(preSelectSubClusters.keySet()); + if(subClusterIds.size() > 1){ + subClusterIds.sort((o1, o2) -> Integer.parseInt(o2.getId()) - Integer.parseInt(o1.getId())); + } + if(CollectionUtils.isNotEmpty(subClusterIds)){ + return subClusterIds.get(0); + } + return null; + } +} From 75a99045c8692554ca47476dc1e488db002c3553 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Fri, 21 Oct 2022 23:20:17 +0800 Subject: [PATCH 3/7] YARN-11357. Fix CheckStyle. --- .../router/clientrm/TestFederationClientInterceptorRetry.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java index ee4efaf6f002d..2b85693a3b50d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java @@ -88,8 +88,6 @@ public class TestFederationClientInterceptorRetry @Override public void setUp() throws IOException { super.setUpConfig(); - this.getConf().setStrings(FEDERATION_POLICY_MANAGER, - "org.apache.hadoop.yarn.server.router.clientrm.TestSequentialBroadcastPolicyManager"); interceptor = new TestableFederationClientInterceptor(); stateStore = new MemoryFederationStateStore(); @@ -154,7 +152,7 @@ protected YarnConfiguration createConfiguration() { + "," + TestableFederationClientInterceptor.class.getName()); conf.set(FEDERATION_POLICY_MANAGER, - UniformBroadcastPolicyManager.class.getName()); + TestSequentialBroadcastPolicyManager.class.getName()); // Disable StateStoreFacade cache conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0); From e14c380bc5a32a50b6cec21e1252623a45c132e9 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Sat, 22 Oct 2022 05:15:03 +0800 Subject: [PATCH 4/7] YARN-11357. Fix CheckStyle. --- .../clientrm/TestFederationClientInterceptorRetry.java | 6 +++--- .../server/router/clientrm/TestSequentialRouterPolicy.java | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java index 2b85693a3b50d..6ebedf1285326 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java @@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; @@ -290,7 +289,7 @@ public void testSubmitApplicationTwoBadOneGood() throws Exception { LOG.info("Test submitApplication with two bad, one good SC."); setupCluster(Arrays.asList(bad1, bad2, good)); final ApplicationId appId = - ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationId.newInstance(System.currentTimeMillis(), 1); // Use the TestSequentialRouterPolicy strategy, // which will sort the SubClusterId because good=0, bad1=1, bad2=2 @@ -324,7 +323,8 @@ private void checkSubmitSubCluster(ApplicationId appId, SubClusterId expectSubCl stateStore.getApplicationHomeSubCluster(getAppRequest); Assert.assertNotNull(getAppResponse); Assert.assertNotNull(getAppResponse); - ApplicationHomeSubCluster responseHomeSubCluster = getAppResponse.getApplicationHomeSubCluster(); + ApplicationHomeSubCluster responseHomeSubCluster = + getAppResponse.getApplicationHomeSubCluster(); Assert.assertNotNull(responseHomeSubCluster); SubClusterId respSubClusterId = responseHomeSubCluster.getHomeSubCluster(); Assert.assertEquals(expectSubCluster, respSubClusterId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialRouterPolicy.java index f39892677fd91..f3926f8cfa2b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialRouterPolicy.java @@ -35,7 +35,7 @@ public class TestSequentialRouterPolicy extends AbstractRouterPolicy { @Override public void reinitialize(FederationPolicyInitializationContext policyContext) - throws FederationPolicyInitializationException { + throws FederationPolicyInitializationException { FederationPolicyInitializationContextValidator.validate(policyContext, this.getClass().getCanonicalName()); setPolicyContext(policyContext); @@ -43,7 +43,7 @@ public void reinitialize(FederationPolicyInitializationContext policyContext) @Override protected SubClusterId chooseSubCluster(String queue, - Map preSelectSubClusters) throws YarnException { + Map preSelectSubClusters) throws YarnException { /** * This strategy is only suitable for testing. We need to obtain subClusters sequentially. * We have 3 subClusters, 1 goodSubCluster and 2 badSubClusters. From fa8d7ca5e8759f70d8196e820db25b3b4dab4f87 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Tue, 25 Oct 2022 03:57:38 +0800 Subject: [PATCH 5/7] YARN-11357. Fix CheckStyle. --- .../TestFederationClientInterceptorRetry.java | 29 +++++++++++++++++-- .../clientrm/TestSequentialRouterPolicy.java | 16 +++++++++- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java index 6ebedf1285326..eca726b10e1c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java @@ -24,8 +24,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; @@ -38,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; @@ -49,6 +52,9 @@ import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,14 +70,22 @@ * It tests the case with SubClusters down and the Router logic of retries. We * have 1 good SubCluster and 2 bad ones for all the tests. */ +@RunWith(Parameterized.class) public class TestFederationClientInterceptorRetry extends BaseRouterClientRMTest { private static final Logger LOG = LoggerFactory.getLogger(TestFederationClientInterceptorRetry.class); + @Parameters + public static Collection getParameters() { + return Arrays.asList(new String[][] {{UniformBroadcastPolicyManager.class.getName()}, + {TestSequentialBroadcastPolicyManager.class.getName()}}); + } + private TestableFederationClientInterceptor interceptor; private MemoryFederationStateStore stateStore; private FederationStateStoreTestUtil stateStoreUtil; + private String routerPolicyManagerName; private String user = "test-user"; @@ -84,6 +98,10 @@ public class TestFederationClientInterceptorRetry private static List scs = new ArrayList<>(); + public TestFederationClientInterceptorRetry(String policyManagerName) { + this.routerPolicyManagerName = policyManagerName; + } + @Override public void setUp() throws IOException { super.setUpConfig(); @@ -150,8 +168,7 @@ protected YarnConfiguration createConfiguration() { mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + "," + TestableFederationClientInterceptor.class.getName()); - conf.set(FEDERATION_POLICY_MANAGER, - TestSequentialBroadcastPolicyManager.class.getName()); + conf.set(FEDERATION_POLICY_MANAGER, this.routerPolicyManagerName); // Disable StateStoreFacade cache conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0); @@ -286,7 +303,15 @@ public void testSubmitApplicationOneBadOneGood() @Test public void testSubmitApplicationTwoBadOneGood() throws Exception { + LOG.info("Test submitApplication with two bad, one good SC."); + + // This test must require the TestSequentialRouterPolicy policy + if (StringUtils.equals(routerPolicyManagerName, + UniformBroadcastPolicyManager.class.getName())) { + return; + } + setupCluster(Arrays.asList(bad1, bad2, good)); final ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialRouterPolicy.java index f3926f8cfa2b0..51f0e73fed902 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialRouterPolicy.java @@ -31,6 +31,20 @@ import java.util.List; import java.util.Map; +/** + * This is a test strategy, + * the purpose of this strategy is to return subClusters in descending order of subClusterId. + * + * This strategy is to verify the situation of Retry during the use of FederationClientInterceptor. + * The conditions of use are as follows: + * 1.We require subClusterId to be an integer. + * 2.The larger the subCluster, the sooner the representative is selected. + * + * We have 4 subClusters, 2 normal subClusters, 2 bad subClusters. + * We expect to select badSubClusters first and then goodSubClusters during testing. + * We can set the subCluster like this, good1 = [0], good2 = [1], bad1 = [2], bad2 = [3]. + * This strategy will return [3, 2, 1, 0], The selection order of subCluster is bad2, bad1, good2, good1. + */ public class TestSequentialRouterPolicy extends AbstractRouterPolicy { @Override @@ -52,7 +66,7 @@ protected SubClusterId chooseSubCluster(String queue, * Return to badCluster first. */ List subClusterIds = new ArrayList<>(preSelectSubClusters.keySet()); - if(subClusterIds.size() > 1){ + if (subClusterIds.size() > 1) { subClusterIds.sort((o1, o2) -> Integer.parseInt(o2.getId()) - Integer.parseInt(o1.getId())); } if(CollectionUtils.isNotEmpty(subClusterIds)){ From 5281573ee0e1bb0c1db972a20918512ae02a187e Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Tue, 25 Oct 2022 04:23:44 +0800 Subject: [PATCH 6/7] YARN-11357. Fix CheckStyle. --- .../clientrm/TestFederationClientInterceptorRetry.java | 2 +- .../clientrm/TestSequentialBroadcastPolicyManager.java | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java index eca726b10e1c8..5ff2b538b5d11 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java @@ -330,7 +330,7 @@ public void testSubmitApplicationTwoBadOneGood() throws Exception { // We will get bad1 checkSubmitSubCluster(appId, bad1); - // Set the retryNum to 1 + // Set the retryNum to 2 // 1st time will use bad2, 2nd time will use bad1, 3rd good interceptor.setNumSubmitRetries(2); SubmitApplicationResponse submitAppResponse = interceptor.submitApplication(request); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialBroadcastPolicyManager.java index ace925a108118..dfa8c7136d76c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialBroadcastPolicyManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialBroadcastPolicyManager.java @@ -21,6 +21,14 @@ import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.manager.AbstractPolicyManager; +/** + * This PolicyManager is used for testing and will contain the + * {@link TestSequentialRouterPolicy} policy. + * + * When we test FederationClientInterceptor Retry, + * we hope that SubCluster can return in a certain order, not randomly. + * We can view the policy description by linking to TestSequentialRouterPolicy. + */ public class TestSequentialBroadcastPolicyManager extends AbstractPolicyManager { public TestSequentialBroadcastPolicyManager() { // this structurally hard-codes two compatible policies for Router and From 90c2834af509e6e648204c41512aac2af8723232 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Tue, 25 Oct 2022 09:04:03 +0800 Subject: [PATCH 7/7] YARN-11357. Fix CheckStyle. --- .../clientrm/TestFederationClientInterceptorRetry.java | 9 ++++----- .../router/clientrm/TestSequentialRouterPolicy.java | 3 ++- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java index 5ff2b538b5d11..2d0bc6b3507db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.router.clientrm; import static org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_POLICY_MANAGER; +import static org.hamcrest.CoreMatchers.is; import static org.mockito.Mockito.mock; import java.io.IOException; @@ -27,7 +28,6 @@ import java.util.Collection; import java.util.List; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -307,10 +308,8 @@ public void testSubmitApplicationTwoBadOneGood() throws Exception { LOG.info("Test submitApplication with two bad, one good SC."); // This test must require the TestSequentialRouterPolicy policy - if (StringUtils.equals(routerPolicyManagerName, - UniformBroadcastPolicyManager.class.getName())) { - return; - } + Assume.assumeThat(routerPolicyManagerName, + is(TestSequentialBroadcastPolicyManager.class.getName())); setupCluster(Arrays.asList(bad1, bad2, good)); final ApplicationId appId = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialRouterPolicy.java index 51f0e73fed902..e702b764fede7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialRouterPolicy.java @@ -43,7 +43,8 @@ * We have 4 subClusters, 2 normal subClusters, 2 bad subClusters. * We expect to select badSubClusters first and then goodSubClusters during testing. * We can set the subCluster like this, good1 = [0], good2 = [1], bad1 = [2], bad2 = [3]. - * This strategy will return [3, 2, 1, 0], The selection order of subCluster is bad2, bad1, good2, good1. + * This strategy will return [3, 2, 1, 0], + * The selection order of subCluster is bad2, bad1, good2, good1. */ public class TestSequentialRouterPolicy extends AbstractRouterPolicy {