diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 25513fec90ce9..316a6421889bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -4294,6 +4294,11 @@ public static boolean isAclEnabled(Configuration conf) { ROUTER_PREFIX + "webapp.cross-origin.enabled"; public static final boolean DEFAULT_ROUTER_WEBAPP_ENABLE_CORS_FILTER = false; + /** Router Interceptor Allow Partial Result Enable. **/ + public static final String ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED = + ROUTER_PREFIX + "interceptor.allow-partial-result.enable"; + public static final boolean DEFAULT_ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED = false; + //////////////////////////////// // CSI Volume configs //////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index e1cc6adbe52ec..ec2ac71adc263 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -5085,4 +5085,18 @@ + + yarn.router.interceptor.allow-partial-result.enable + false + + This configuration represents whether to allow the interceptor to + return partial SubCluster results. + If true, we will ignore the exception to some subClusters during the calling process, + and return result. + If false, if an exception occurs in a subCluster during the calling process, + an exception will be thrown directly. + Default is false. + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index a21be7b4e1b76..61edfb363d0f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -140,6 +140,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { private boolean returnPartialReport; private boolean appInfosCacheEnabled; private int appInfosCacheCount; + private boolean allowPartialResult; private long submitIntervalTime; private Map interceptors; @@ -194,6 +195,10 @@ public void init(String user) { appInfosCaches = new LRUCacheHashMap<>(appInfosCacheCount, true); } + allowPartialResult = conf.getBoolean( + YarnConfiguration.ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED, + YarnConfiguration.DEFAULT_ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED); + submitIntervalTime = conf.getTimeDuration( YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_INTERVAL_TIME, YarnConfiguration.DEFAULT_CLIENTRM_SUBMIT_INTERVAL_TIME, TimeUnit.MILLISECONDS); @@ -975,10 +980,13 @@ public NodesInfo getNodes(String states) { }); } catch (NotFoundException e) { LOG.error("get all active sub cluster(s) error.", e); + throw e; } catch (YarnException e) { LOG.error("getNodes by states = {} error.", states, e); + throw new YarnRuntimeException(e); } catch (IOException e) { LOG.error("getNodes by states = {} error with io error.", states, e); + throw new YarnRuntimeException(e); } // Delete duplicate from all the node reports got from all the available @@ -2070,9 +2078,10 @@ private Map invokeConcurrent(Collection c Map results = new HashMap<>(); - // Send the requests in parallel - CompletionService> compSvc = - new ExecutorCompletionService<>(this.threadpool); + // If there is a sub-cluster access error, + // we should choose whether to throw exception information according to user configuration. + // Send the requests in parallel. + CompletionService> compSvc = new ExecutorCompletionService<>(threadpool); // This part of the code should be able to expose the accessed Exception information. // We use Pair to store related information. The left value of the Pair is the response, @@ -2105,9 +2114,10 @@ private Map invokeConcurrent(Collection c if (response != null) { results.put(clusterId, response); } - - Exception exception = pair.getRight(); - if (exception != null) { + Exception exception = pair.getValue(); + // If allowPartialResult=false, it means that if an exception occurs in a subCluster, + // an exception will be thrown directly. + if (!allowPartialResult && exception != null) { throw exception; } } catch (Throwable e) { @@ -2178,4 +2188,9 @@ private SubClusterInfo getHomeSubClusterInfoByReservationId(String resId) public LRUCacheHashMap getAppInfosCaches() { return appInfosCaches; } + + @VisibleForTesting + public void setAllowPartialResult(boolean allowPartialResult) { + this.allowPartialResult = allowPartialResult; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java index 4c50e5198dc03..7c82e71ea9b7a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java @@ -749,6 +749,7 @@ public void testGetLabelsOnNode() throws Exception { Assert.assertTrue(nodeLabelsName.contains("y")); // null request + interceptor.setAllowPartialResult(false); NodeLabelsInfo nodeLabelsInfo2 = interceptor.getLabelsOnNode(null, "node2"); Assert.assertNotNull(nodeLabelsInfo2); Assert.assertEquals(0, nodeLabelsInfo2.getNodeLabelsName().size()); @@ -1183,6 +1184,8 @@ public void testWebAddressWithScheme() { @Test public void testCheckUserAccessToQueue() throws Exception { + interceptor.setAllowPartialResult(false); + // Case 1: Only queue admin user can access other user's information HttpServletRequest mockHsr = mockHttpServletRequestByUserName("non-admin"); String errorMsg1 = "User=non-admin doesn't haven access to queue=queue " + @@ -1212,6 +1215,8 @@ public void testCheckUserAccessToQueue() throws Exception { // Case 5: get OK only for SUBMIT_APP acl for "yarn" user checkUserAccessToQueueFailed("queue", "yarn", QueueACL.ADMINISTER_QUEUE, "admin"); checkUserAccessToQueueSuccess("queue", "yarn", QueueACL.SUBMIT_APPLICATIONS, "admin"); + + interceptor.setAllowPartialResult(true); } private void checkUserAccessToQueueSuccess(String queue, String userName, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java index e2b2103c7deb8..790cf410bed75 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java @@ -25,6 +25,7 @@ import javax.ws.rs.core.Response; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo; import org.apache.hadoop.yarn.server.router.clientrm.PassThroughClientRequestInterceptor; import org.apache.hadoop.yarn.server.router.clientrm.TestableFederationClientInterceptor; import org.apache.hadoop.yarn.webapp.NotFoundException; @@ -81,10 +83,16 @@ public class TestFederationInterceptorRESTRetry @Override public void setUp() { super.setUpConfig(); + + Configuration conf = this.getConf(); + + // Compatible with historical test cases, we set router.allow-partial-result.enable=false. + conf.setBoolean(YarnConfiguration.ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED, false); + interceptor = new TestableFederationInterceptorREST(); stateStore = new MemoryFederationStateStore(); - stateStore.init(this.getConf()); + stateStore.init(conf); FederationStateStoreFacade.getInstance().reinitialize(stateStore, getConf()); stateStoreUtil = new FederationStateStoreTestUtil(stateStore); @@ -516,4 +524,58 @@ private void checkEmptyMetrics(ClusterMetricsInfo response) { Assert.assertEquals(0, response.getActiveNodes()); Assert.assertEquals(0, response.getShutdownNodes()); } + + @Test + public void testGetNodesOneBadSCAllowPartial() throws Exception { + // We set allowPartialResult to true. + // In this test case, we set up a subCluster, + // and the subCluster status is bad, we can't get the response, + // an exception should be thrown at this time. + interceptor.setAllowPartialResult(true); + setupCluster(Arrays.asList(bad2)); + + NodesInfo nodesInfo = interceptor.getNodes(null); + Assert.assertNotNull(nodesInfo); + + // We need to set allowPartialResult=false + interceptor.setAllowPartialResult(false); + } + + @Test + public void testGetNodesTwoBadSCsAllowPartial() throws Exception { + // We set allowPartialResult to true. + // In this test case, we set up 2 subClusters, + // and the status of these 2 subClusters is bad. When we call the interface, + // an exception should be returned. + interceptor.setAllowPartialResult(true); + setupCluster(Arrays.asList(bad1, bad2)); + + NodesInfo nodesInfo = interceptor.getNodes(null); + Assert.assertNotNull(nodesInfo); + + // We need to set allowPartialResult=false + interceptor.setAllowPartialResult(false); + } + + @Test + public void testGetNodesOneBadOneGoodAllowPartial() throws Exception { + + // allowPartialResult = true, + // We tolerate exceptions and return normal results + interceptor.setAllowPartialResult(true); + setupCluster(Arrays.asList(good, bad2)); + + NodesInfo response = interceptor.getNodes(null); + Assert.assertNotNull(response); + Assert.assertEquals(1, response.getNodes().size()); + // Check if the only node came from Good SubCluster + Assert.assertEquals(good.getId(), + Long.toString(response.getNodes().get(0).getLastHealthUpdate())); + + // allowPartialResult = false, + // We do not tolerate exceptions and will throw exceptions directly + interceptor.setAllowPartialResult(false); + + setupCluster(Arrays.asList(good, bad2)); + } } \ No newline at end of file