From 616e381c9fb9b6f831fd0adaa836d7b77e32d7e2 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Mon, 20 Nov 2023 20:55:25 +0800 Subject: [PATCH] YARN-11577. Improve FederationInterceptorREST Method Result. (#6190) Contributed by Shilun Fan. Reviewed-by: Inigo Goiri Signed-off-by: Shilun Fan --- .../yarn/server/router/RouterServerUtil.java | 35 +- .../webapp/FederationInterceptorREST.java | 17 +- .../router/webapp/RouterWebServices.java | 4 +- .../subcluster/TestFederationSubCluster.java | 135 +++++ ...stYarnFederationWithCapacityScheduler.java | 539 +++++++++++++++++ .../TestYarnFederationWithFairScheduler.java | 564 ++++++++++++++++++ .../src/test/resources/yarn-site.xml | 4 + 7 files changed, 1275 insertions(+), 23 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java index 130503065d6b9..23902bdc61909 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java @@ -105,8 +105,9 @@ public static void logAndThrowException(Throwable t, String errMsgFormat, Object throws YarnException { String msg = String.format(errMsgFormat, args); if (t != null) { - LOG.error(msg, t); - throw new YarnException(msg, t); + String newErrMsg = getErrorMsg(msg, t); + LOG.error(newErrMsg, t); + throw new YarnException(newErrMsg, t); } else { LOG.error(msg); throw new YarnException(msg); @@ -234,8 +235,9 @@ private static List getInterceptorClassNames(Configuration conf, public static void logAndThrowIOException(String errMsg, Throwable t) throws IOException { if (t != null) { - LOG.error(errMsg, t); - throw new IOException(errMsg, t); + String newErrMsg = getErrorMsg(errMsg, t); + LOG.error(newErrMsg, t); + throw new IOException(newErrMsg, t); } else { LOG.error(errMsg); throw new IOException(errMsg); @@ -256,8 +258,9 @@ public static void logAndThrowIOException(Throwable t, String errMsgFormat, Obje throws IOException { String msg = String.format(errMsgFormat, args); if (t != null) { - LOG.error(msg, t); - throw new IOException(msg, t); + String newErrMsg = getErrorMsg(msg, t); + LOG.error(newErrMsg, t); + throw new IOException(newErrMsg, t); } else { LOG.error(msg); throw new IOException(msg); @@ -276,8 +279,9 @@ public static void logAndThrowIOException(Throwable t, String errMsgFormat, Obje public static void logAndThrowRunTimeException(String errMsg, Throwable t) throws RuntimeException { if (t != null) { - LOG.error(errMsg, t); - throw new RuntimeException(errMsg, t); + String newErrMsg = getErrorMsg(errMsg, t); + LOG.error(newErrMsg, t); + throw new RuntimeException(newErrMsg, t); } else { LOG.error(errMsg); throw new RuntimeException(errMsg); @@ -298,8 +302,9 @@ public static void logAndThrowRunTimeException(Throwable t, String errMsgFormat, throws RuntimeException { String msg = String.format(errMsgFormat, args); if (t != null) { - LOG.error(msg, t); - throw new RuntimeException(msg, t); + String newErrMsg = getErrorMsg(msg, t); + LOG.error(newErrMsg, t); + throw new RuntimeException(newErrMsg, t); } else { LOG.error(msg); throw new RuntimeException(msg); @@ -320,8 +325,9 @@ public static RuntimeException logAndReturnRunTimeException( Throwable t, String errMsgFormat, Object... args) { String msg = String.format(errMsgFormat, args); if (t != null) { - LOG.error(msg, t); - return new RuntimeException(msg, t); + String newErrMsg = getErrorMsg(msg, t); + LOG.error(newErrMsg, t); + return new RuntimeException(newErrMsg, t); } else { LOG.error(msg); return new RuntimeException(msg); @@ -356,8 +362,9 @@ public static YarnRuntimeException logAndReturnYarnRunTimeException( Throwable t, String errMsgFormat, Object... args) { String msg = String.format(errMsgFormat, args); if (t != null) { - LOG.error(msg, t); - return new YarnRuntimeException(msg, t); + String newErrMsg = getErrorMsg(msg, t); + LOG.error(newErrMsg, t); + return new YarnRuntimeException(newErrMsg, t); } else { LOG.error(msg); return new YarnRuntimeException(msg); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index 7f9446878b3e6..fcfc7fa300e62 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -341,6 +341,7 @@ protected DefaultRequestInterceptorREST getOrCreateInterceptorByAppId(String app // Get homeSubCluster By appId SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId); + LOG.info("appId = {} : subClusterInfo = {}.", appId, subClusterInfo.getSubClusterId()); return getOrCreateInterceptorForSubCluster(subClusterInfo); } @@ -827,7 +828,7 @@ public AppsInfo getApps(HttpServletRequest hsr, String stateQuery, }); if (apps.getApps().isEmpty()) { - return null; + return new AppsInfo(); } // Merge all the application reports got from all the available YARN RMs @@ -1135,7 +1136,7 @@ public AppState getAppState(HttpServletRequest hsr, String appId) } catch (YarnException | IllegalArgumentException e) { LOG.error("getHomeSubClusterInfoByAppId error, applicationId = {}.", appId, e); } - return null; + return new AppState(); } @Override @@ -3371,17 +3372,19 @@ private Map invokeConcurrent(Collection c } Exception exception = result.getException(); - - // If allowPartialResult=false, it means that if an exception occurs in a subCluster, - // an exception will be thrown directly. - if (!allowPartialResult && exception != null) { + if (exception != null) { throw exception; } } catch (Throwable e) { String subClusterId = subClusterInfo != null ? subClusterInfo.getSubClusterId().getId() : "UNKNOWN"; LOG.error("SubCluster {} failed to {} report.", subClusterId, request.getMethodName(), e); - throw new YarnRuntimeException(e.getCause().getMessage(), e); + // If allowPartialResult=false, it means that if an exception occurs in a subCluster, + // an exception will be thrown directly. + if (!allowPartialResult) { + throw new YarnException("SubCluster " + subClusterId + + " failed to " + request.getMethodName() + " report.", e); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java index c9c56c46c7c33..4e0d97e83e959 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java @@ -106,7 +106,7 @@ * main difference with AMRMProxyService is the protocol they implement. **/ @Singleton -@Path("/ws/v1/cluster") +@Path(RMWSConsts.RM_WEB_SERVICE_PATH) public class RouterWebServices implements RMWebServiceProtocol { private static final Logger LOG = @@ -424,7 +424,7 @@ public BulkActivitiesInfo getBulkActivities( MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 }) @Override public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr, - @QueryParam(RMWSConsts.APP_ID) String appId, + @PathParam(RMWSConsts.APPID) String appId, @QueryParam(RMWSConsts.MAX_TIME) String time, @QueryParam(RMWSConsts.REQUEST_PRIORITIES) Set requestPriorities, @QueryParam(RMWSConsts.ALLOCATION_REQUEST_IDS) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/TestFederationSubCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/TestFederationSubCluster.java index f9cd707821814..71034558687c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/TestFederationSubCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/TestFederationSubCluster.java @@ -32,6 +32,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; @@ -39,6 +40,13 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo; +import org.apache.hadoop.yarn.server.router.webapp.HTTPMethods; import org.apache.hadoop.yarn.server.router.webapp.JavaProcess; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,11 +56,20 @@ import java.security.PrivilegedExceptionAction; import java.util.LinkedList; import java.util.List; +import java.util.ArrayList; import java.util.concurrent.TimeoutException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static javax.servlet.http.HttpServletResponse.SC_OK; +import static javax.ws.rs.core.MediaType.APPLICATION_JSON; import static javax.ws.rs.core.MediaType.APPLICATION_XML; import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RM_WEB_SERVICE_PATH; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_NEW_APPLICATION; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_NEW; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.ADD_NODE_LABELS; import static org.apache.hadoop.yarn.server.router.webapp.TestRouterWebServicesREST.waitWebAppRunning; import static org.junit.Assert.assertEquals; @@ -190,6 +207,8 @@ public static T performGetCalls(final String routerAddress, final String pat final String queryValue) throws IOException, InterruptedException { Client clientToRouter = Client.create(); + clientToRouter.setReadTimeout(5000); + clientToRouter.setConnectTimeout(5000); WebResource toRouter = clientToRouter.resource(routerAddress).path(path); final WebResource.Builder toRouterBuilder; @@ -207,4 +226,120 @@ public static T performGetCalls(final String routerAddress, final String pat return response.getEntity(returnType); }); } + + public static ClientResponse performCall(final String routerAddress, final String webAddress, + final String queryKey, final String queryValue, final Object context, + final HTTPMethods method) throws IOException, InterruptedException { + + return UserGroupInformation.createRemoteUser(userName).doAs( + (PrivilegedExceptionAction) () -> { + Client clientToRouter = Client.create(); + WebResource toRouter = clientToRouter.resource(routerAddress).path(webAddress); + + WebResource toRouterWR = toRouter; + if (queryKey != null && queryValue != null) { + toRouterWR = toRouterWR.queryParam(queryKey, queryValue); + } + + WebResource.Builder builder; + if (context != null) { + builder = toRouterWR.entity(context, APPLICATION_JSON); + builder = builder.accept(APPLICATION_JSON); + } else { + builder = toRouterWR.accept(APPLICATION_JSON); + } + + ClientResponse response = null; + + switch (method) { + case DELETE: + response = builder.delete(ClientResponse.class); + break; + case POST: + response = builder.post(ClientResponse.class); + break; + case PUT: + response = builder.put(ClientResponse.class); + break; + default: + break; + } + + return response; + }); + } + + public String getNodeId(String rmAddress) { + Client clientToRM = Client.create(); + clientToRM.setConnectTimeout(3000); + clientToRM.setReadTimeout(3000); + WebResource toRM = clientToRM.resource(rmAddress).path(RM_WEB_SERVICE_PATH + NODES); + ClientResponse response = + toRM.accept(APPLICATION_XML).get(ClientResponse.class); + NodesInfo ci = response.getEntity(NodesInfo.class); + List nodes = ci.getNodes(); + if (nodes.isEmpty()) { + return null; + } + clientToRM.destroy(); + return nodes.get(0).getNodeId(); + } + + public NewApplication getNewApplicationId(String routerAddress) { + Client clientToRM = Client.create(); + clientToRM.setConnectTimeout(3000); + clientToRM.setReadTimeout(3000); + WebResource toRM = clientToRM.resource(routerAddress).path( + RM_WEB_SERVICE_PATH + APPS_NEW_APPLICATION); + ClientResponse response = toRM.accept(APPLICATION_XML).post(ClientResponse.class); + clientToRM.destroy(); + return response.getEntity(NewApplication.class); + } + + public String submitApplication(String routerAddress) { + ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo(); + String appId = getNewApplicationId(routerAddress).getApplicationId(); + context.setApplicationId(appId); + Client clientToRouter = Client.create(); + clientToRouter.setConnectTimeout(3000); + clientToRouter.setReadTimeout(3000); + WebResource toRM = clientToRouter.resource(routerAddress).path( + RM_WEB_SERVICE_PATH + APPS); + toRM.entity(context, APPLICATION_XML).accept(APPLICATION_XML).post(ClientResponse.class); + clientToRouter.destroy(); + return appId; + } + + public NewReservation getNewReservationId(String routerAddress) { + Client clientToRM = Client.create(); + clientToRM.setConnectTimeout(3000); + clientToRM.setReadTimeout(3000); + WebResource toRM = clientToRM.resource(routerAddress). + path(RM_WEB_SERVICE_PATH + RESERVATION_NEW); + ClientResponse response = toRM.accept(APPLICATION_XML).post(ClientResponse.class); + return response.getEntity(NewReservation.class); + } + + public String addNodeLabel(String routerAddress) { + Client clientToRM = Client.create(); + clientToRM.setConnectTimeout(3000); + clientToRM.setReadTimeout(3000); + WebResource toRM = clientToRM.resource(routerAddress) + .path(RM_WEB_SERVICE_PATH + ADD_NODE_LABELS); + List nodeLabels = new ArrayList<>(); + nodeLabels.add(NodeLabel.newInstance("default")); + NodeLabelsInfo context = new NodeLabelsInfo(nodeLabels); + ClientResponse response = toRM + .entity(context, APPLICATION_XML) + .accept(APPLICATION_XML) + .post(ClientResponse.class); + return response.getEntity(String.class); + } + + public static String format(String format, Object... args) { + Pattern p = Pattern.compile("\\{.*?}"); + Matcher m = p.matcher(format); + String newFormat = m.replaceAll("%s"); + return String.format(newFormat, args); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/capacity/TestYarnFederationWithCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/capacity/TestYarnFederationWithCapacityScheduler.java index f37e1245bdcdd..9fc4b5fd03671 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/capacity/TestYarnFederationWithCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/capacity/TestYarnFederationWithCapacityScheduler.java @@ -17,21 +17,93 @@ */ package org.apache.hadoop.yarn.server.router.subcluster.capacity; +import static javax.servlet.http.HttpServletResponse.SC_ACCEPTED; +import static javax.servlet.http.HttpServletResponse.SC_BAD_REQUEST; +import static javax.servlet.http.HttpServletResponse.SC_SERVICE_UNAVAILABLE; +import com.sun.jersey.api.client.ClientResponse; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Sets; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterUserInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntryList; import org.apache.hadoop.yarn.server.router.subcluster.TestFederationSubCluster; import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterInfo; +import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterUserInfo; +import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo; +import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo; +import org.apache.hadoop.yarn.server.webapp.dao.AppInfo; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.TimeoutException; +import static javax.servlet.http.HttpServletResponse.SC_OK; import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RM_WEB_SERVICE_PATH; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.INFO; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.CLUSTER_USER_INFO; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.METRICS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.SCHEDULER; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.STATES; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES_NODEID; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.SCHEDULER_ACTIVITIES; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_NEW_APPLICATION; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APP_STATISTICS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_APPATTEMPTS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_STATE; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_PRIORITY; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_QUEUE; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_TIMEOUTS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_TIMEOUT; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_NEW; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_SUBMIT; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_UPDATE; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_DELETE; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.GET_NODE_LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES_NODEID_GETLABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.LABEL_MAPPINGS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.ADD_NODE_LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.GET_NODE_TO_LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.REMOVE_NODE_LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.REPLACE_NODE_TO_LABELS; +import static org.apache.hadoop.yarn.server.router.subcluster.TestFederationSubCluster.format; +import static org.apache.hadoop.yarn.server.router.webapp.HTTPMethods.POST; +import static org.apache.hadoop.yarn.server.router.webapp.HTTPMethods.PUT; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -41,6 +113,8 @@ public class TestYarnFederationWithCapacityScheduler { private static TestFederationSubCluster testFederationSubCluster; private static Set subClusters; private static final String ROUTER_WEB_ADDRESS = "http://localhost:18089"; + private static final String SC1_RM_WEB_ADDRESS = "http://localhost:18088"; + private static final String SC2_RM_WEB_ADDRESS = "http://localhost:28088"; @BeforeClass public static void setUp() @@ -73,4 +147,469 @@ public void testGetClusterInfo() throws InterruptedException, IOException { assertTrue(subClusters.contains(clusterInfo.getSubClusterId())); } } + + @Test + public void testInfo() throws InterruptedException, IOException { + FederationClusterInfo federationClusterInfo = + TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, RM_WEB_SERVICE_PATH + INFO, + FederationClusterInfo.class, null, null); + List clusterInfos = federationClusterInfo.getList(); + assertNotNull(clusterInfos); + assertEquals(2, clusterInfos.size()); + for (ClusterInfo clusterInfo : clusterInfos) { + assertNotNull(clusterInfo); + assertTrue(subClusters.contains(clusterInfo.getSubClusterId())); + } + } + + @Test + public void testClusterUserInfo() throws Exception { + FederationClusterUserInfo federationClusterUserInfo = + TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + CLUSTER_USER_INFO, + FederationClusterUserInfo.class, null, null); + List clusterUserInfos = federationClusterUserInfo.getList(); + assertNotNull(clusterUserInfos); + assertEquals(2, clusterUserInfos.size()); + for (ClusterUserInfo clusterUserInfo : clusterUserInfos) { + assertNotNull(clusterUserInfo); + assertTrue(subClusters.contains(clusterUserInfo.getSubClusterId())); + } + } + + @Test + public void testMetricsInfo() throws Exception { + // It takes time to start the sub-cluster. + // We need to wait for the sub-cluster to be completely started, + // so we need to set the waiting time. + // The resources of the two sub-clusters we registered are 24C and 12G, + // so the resources that the Router should collect are 48C and 24G. + GenericTestUtils.waitFor(() -> { + try { + ClusterMetricsInfo clusterMetricsInfo = + TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + METRICS, ClusterMetricsInfo.class, null, null); + assertNotNull(clusterMetricsInfo); + return (48 == clusterMetricsInfo.getTotalVirtualCores() && + 24576 == clusterMetricsInfo.getTotalMB()); + } catch (Exception e) { + return false; + } + }, 5000, 50 * 5000); + } + + @Test + public void testSchedulerInfo() throws Exception { + FederationSchedulerTypeInfo schedulerTypeInfo = + TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + SCHEDULER, FederationSchedulerTypeInfo.class, null, null); + assertNotNull(schedulerTypeInfo); + List schedulerTypeInfos = schedulerTypeInfo.getList(); + assertNotNull(schedulerTypeInfos); + assertEquals(2, schedulerTypeInfos.size()); + for (SchedulerTypeInfo schedulerTypeInfoItem : schedulerTypeInfos) { + assertNotNull(schedulerTypeInfoItem); + assertTrue(subClusters.contains(schedulerTypeInfoItem.getSubClusterId())); + CapacitySchedulerInfo schedulerInfo = + (CapacitySchedulerInfo) schedulerTypeInfoItem.getSchedulerInfo(); + assertNotNull(schedulerInfo); + assertEquals(3, schedulerInfo.getQueues().getQueueInfoList().size()); + } + } + + @Test + public void testNodesEmpty() throws Exception { + // We are in 2 sub-clusters, each with 3 nodes, so our Router should correspond to 6 nodes. + GenericTestUtils.waitFor(() -> { + try { + NodesInfo nodesInfo = + TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + NODES, NodesInfo.class, null, null); + assertNotNull(nodesInfo); + ArrayList nodes = nodesInfo.getNodes(); + assertNotNull(nodes); + return (6 == nodes.size()); + } catch (Exception e) { + return false; + } + }, 5000, 50 * 5000); + } + + @Test + public void testNodesLost() throws Exception { + GenericTestUtils.waitFor(() -> { + try { + NodesInfo nodesInfo = + TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + NODES, NodesInfo.class, STATES, "LOST"); + assertNotNull(nodesInfo); + ArrayList nodes = nodesInfo.getNodes(); + assertNotNull(nodes); + return nodes.isEmpty(); + } catch (Exception e) { + return false; + } + }, 5000, 50 * 5000); + } + + @Test + public void testNode() throws Exception { + String rm1NodeId = testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS); + NodeInfo nodeInfo = + TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(NODES_NODEID, rm1NodeId), + NodeInfo.class, null, null); + assertNotNull(nodeInfo); + assertEquals(rm1NodeId, nodeInfo.getNodeId()); + + String rm2NodeId = testFederationSubCluster.getNodeId(SC2_RM_WEB_ADDRESS); + NodeInfo nodeInfo2 = + TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(NODES_NODEID, rm2NodeId), + NodeInfo.class, null, null); + assertNotNull(nodeInfo2); + assertEquals(rm2NodeId, nodeInfo2.getNodeId()); + } + + @Test + public void testUpdateNodeResource() throws Exception { + // wait until a node shows up and check the resources + GenericTestUtils.waitFor(() -> testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS) != null, + 100, 5 * 1000); + String rm1NodeId = testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS); + + // assert memory and default vcores + NodeInfo nodeInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(NODES_NODEID, rm1NodeId), + NodeInfo.class, null, null); + assertEquals(4096, nodeInfo.getTotalResource().getMemorySize()); + assertEquals(8, nodeInfo.getTotalResource().getvCores()); + } + + @Test + public void testActivies() throws Exception { + // wait until a node shows up and check the resources + GenericTestUtils.waitFor(() -> testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS) != null, + 100, 5 * 1000); + String rm1NodeId = testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS); + + ActivitiesInfo activitiesInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + SCHEDULER_ACTIVITIES, ActivitiesInfo.class, "nodeId", rm1NodeId); + + assertNotNull(activitiesInfo); + assertEquals(rm1NodeId, activitiesInfo.getNodeId()); + } + + @Test + public void testAppActivitiesXML() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + AppActivitiesInfo appActivitiesInfo = + TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + "/scheduler/app-activities/" + appId, + AppActivitiesInfo.class, null, null); + assertNotNull(appActivitiesInfo); + assertEquals(appId, appActivitiesInfo.getApplicationId()); + } + + @Test + public void testAppStatistics() throws Exception { + ApplicationStatisticsInfo applicationStatisticsInfo = + TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + APP_STATISTICS, ApplicationStatisticsInfo.class, STATES, "RUNNING"); + assertNotNull(applicationStatisticsInfo); + ArrayList statItems = applicationStatisticsInfo.getStatItems(); + assertNotNull(statItems); + assertEquals(1, statItems.size()); + } + + @Test + public void testNewApplication() throws Exception { + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + APPS_NEW_APPLICATION, null, + null, null, POST); + assertEquals(SC_OK, response.getStatus()); + NewApplication ci = response.getEntity(NewApplication.class); + assertNotNull(ci); + } + + @Test + public void testSubmitApplicationXML() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + assertNotNull(appId); + } + + @Test + public void testApps() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + assertNotNull(appId); + AppsInfo appsInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + APPS, AppsInfo.class, null, null); + assertNotNull(appsInfo); + assertEquals(1, appsInfo.getApps().size()); + } + + @Test + public void testApp() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + assertNotNull(appId); + AppInfo appInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(APPS_APPID, appId), + AppInfo.class, null, null); + assertNotNull(appInfo); + } + + @Test + public void testAppAttempt() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + assertNotNull(appId); + AppAttemptsInfo appAttemptsInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(APPS_APPID_APPATTEMPTS, appId), + AppAttemptsInfo.class, null, null); + assertNotNull(appAttemptsInfo); + ArrayList attempts = appAttemptsInfo.getAttempts(); + assertNotNull(attempts); + assertEquals(1, attempts.size()); + } + + @Test + public void testAppState() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + assertNotNull(appId); + AppState appState = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(APPS_APPID_STATE, appId), + AppState.class, null, null); + assertNotNull(appState); + String state = appState.getState(); + assertNotNull(state); + assertEquals("ACCEPTED", state); + } + + @Test + public void testUpdateAppState() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + assertNotNull(appId); + AppState appState = new AppState("KILLED"); + String pathApp = RM_WEB_SERVICE_PATH + format(APPS_APPID_STATE, appId); + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + pathApp, null, null, appState, PUT); + assertNotNull(response); + assertEquals(SC_ACCEPTED, response.getStatus()); + AppState appState1 = response.getEntity(AppState.class); + assertNotNull(appState1); + assertNotNull(appState1.getState()); + assertEquals("KILLING", appState1.getState()); + } + + @Test + public void testAppPriority() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + assertNotNull(appId); + AppPriority appPriority = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(APPS_APPID_PRIORITY, appId), + AppPriority.class, null, null); + assertNotNull(appPriority); + assertEquals(-1, appPriority.getPriority()); + } + + @Test + public void testUpdateAppPriority() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + AppPriority appPriority = new AppPriority(1); + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(APPS_APPID_PRIORITY, appId), + null, null, appPriority, PUT); + + assertEquals(SC_OK, response.getStatus()); + AppPriority ci = response.getEntity(AppPriority.class); + assertNotNull(ci); + assertNotNull(ci.getPriority()); + assertEquals(1, ci.getPriority()); + } + + @Test + public void testAppQueue() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + AppQueue appQueue = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(APPS_APPID_QUEUE, appId), + AppQueue.class, null, null); + assertNotNull(appQueue); + String queue = appQueue.getQueue(); + assertEquals("root.default", queue); + } + + @Test + public void testUpdateAppQueue() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + AppQueue appQueue = new AppQueue("root.default"); + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(APPS_APPID_QUEUE, appId), + null, null, appQueue, PUT); + assertEquals(SC_OK, response.getStatus()); + AppQueue appQueue1 = response.getEntity(AppQueue.class); + assertNotNull(appQueue1); + String queue1 = appQueue1.getQueue(); + assertEquals("root.default", queue1); + } + + @Test + public void testAppTimeouts() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + AppTimeoutsInfo appTimeoutsInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(APPS_TIMEOUTS, appId), + AppTimeoutsInfo.class, null, null); + assertNotNull(appTimeoutsInfo); + ArrayList appTimeouts = appTimeoutsInfo.getAppTimeouts(); + assertNotNull(appTimeouts); + assertEquals(1, appTimeouts.size()); + AppTimeoutInfo appTimeoutInfo = appTimeouts.get(0); + assertNotNull(appTimeoutInfo); + assertEquals(ApplicationTimeoutType.LIFETIME, appTimeoutInfo.getTimeoutType()); + assertEquals("UNLIMITED", appTimeoutInfo.getExpireTime()); + } + + @Test + public void testAppTimeout() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + String pathApp = RM_WEB_SERVICE_PATH + format(APPS_TIMEOUTS, appId); + AppTimeoutInfo appTimeoutInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + pathApp + "/" + "LIFETIME", AppTimeoutInfo.class, null, null); + assertNotNull(appTimeoutInfo); + } + + @Test + public void testUpdateAppTimeouts() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + AppTimeoutInfo appTimeoutInfo = new AppTimeoutInfo(); + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(APPS_TIMEOUT, appId), + null, null, appTimeoutInfo, PUT); + assertEquals(SC_BAD_REQUEST, response.getStatus()); + String entity = response.getEntity(String.class); + assertNotNull(entity); + } + + @Test + public void testNewReservation() throws Exception { + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + RESERVATION_NEW, + null, null, null, POST); + assertEquals(SC_OK, response.getStatus()); + NewReservation ci = response.getEntity(NewReservation.class); + assertNotNull(ci); + } + + @Test + public void testSubmitReservation() throws Exception { + ReservationSubmissionRequestInfo context = new ReservationSubmissionRequestInfo(); + NewReservation newReservationId = + testFederationSubCluster.getNewReservationId(ROUTER_WEB_ADDRESS); + context.setReservationId(newReservationId.getReservationId()); + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + RESERVATION_SUBMIT, null, null, context, POST); + assertEquals(SC_BAD_REQUEST, response.getStatus()); + String entity = response.getEntity(String.class); + assertNotNull(entity); + } + + @Test + public void testUpdateReservation() throws Exception { + NewReservation newReservationId = + testFederationSubCluster.getNewReservationId(ROUTER_WEB_ADDRESS); + String reservationId = newReservationId.getReservationId(); + ReservationUpdateRequestInfo context = new ReservationUpdateRequestInfo(); + context.setReservationId(reservationId); + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + RESERVATION_UPDATE, null, null, context, POST); + assertEquals(SC_BAD_REQUEST, response.getStatus()); + String entity = response.getEntity(String.class); + assertNotNull(entity); + } + + @Test + public void testDeleteReservation() throws Exception { + NewReservation newReservationId = + testFederationSubCluster.getNewReservationId(ROUTER_WEB_ADDRESS); + String reservationId = newReservationId.getReservationId(); + ReservationDeleteRequestInfo context = new ReservationDeleteRequestInfo(); + context.setReservationId(reservationId); + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + RESERVATION_DELETE, null, null, context, POST); + assertEquals(SC_SERVICE_UNAVAILABLE, response.getStatus()); + String entity = response.getEntity(String.class); + assertNotNull(entity); + } + + @Test + public void testGetClusterNodeLabels() throws Exception { + NodeLabelsInfo nodeLabelsInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + GET_NODE_LABELS, NodeLabelsInfo.class, null, null); + assertNotNull(nodeLabelsInfo); + } + + @Test + public void testGetLabelsOnNode() throws Exception { + String rm1NodeId = testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS); + NodeLabelsInfo nodeLabelsInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(NODES_NODEID_GETLABELS, rm1NodeId), + NodeLabelsInfo.class, null, null); + assertNotNull(nodeLabelsInfo); + } + + @Test + public void testGetLabelsMappingEmpty() throws Exception { + LabelsToNodesInfo labelsToNodesInfo = TestFederationSubCluster.performGetCalls( + ROUTER_WEB_ADDRESS, RM_WEB_SERVICE_PATH + LABEL_MAPPINGS, + LabelsToNodesInfo.class, null, null); + assertNotNull(labelsToNodesInfo); + } + + @Test + public void testGetLabelsMapping() throws Exception { + LabelsToNodesInfo labelsToNodesInfo = TestFederationSubCluster.performGetCalls( + ROUTER_WEB_ADDRESS, RM_WEB_SERVICE_PATH + LABEL_MAPPINGS, + LabelsToNodesInfo.class, LABELS, "label1"); + assertNotNull(labelsToNodesInfo); + } + + @Test + public void testAddToClusterNodeLabels() throws Exception { + List nodeLabels = new ArrayList<>(); + nodeLabels.add(NodeLabel.newInstance("default")); + NodeLabelsInfo context = new NodeLabelsInfo(nodeLabels); + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + ADD_NODE_LABELS, null, null, context, POST); + assertEquals(SC_OK, response.getStatus()); + String entity = response.getEntity(String.class); + assertNotNull(entity); + } + + @Test + public void testGetNodeToLabels() throws Exception { + NodeToLabelsInfo nodeToLabelsInfo = TestFederationSubCluster.performGetCalls( + ROUTER_WEB_ADDRESS, RM_WEB_SERVICE_PATH + GET_NODE_TO_LABELS, + NodeToLabelsInfo.class, null, null); + assertNotNull(nodeToLabelsInfo); + } + + @Test + public void testRemoveFromClusterNodeLabels() throws Exception { + testFederationSubCluster.addNodeLabel(ROUTER_WEB_ADDRESS); + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + REMOVE_NODE_LABELS, + LABELS, "default", null, POST); + assertEquals(SC_OK, response.getStatus()); + String entity = response.getEntity(String.class); + assertNotNull(entity); + } + + @Test + public void testReplaceLabelsOnNodes() throws Exception { + testFederationSubCluster.addNodeLabel(ROUTER_WEB_ADDRESS); + NodeToLabelsEntryList context = new NodeToLabelsEntryList(); + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + REPLACE_NODE_TO_LABELS, + null, null, context, POST); + String entity = response.getEntity(String.class); + assertNotNull(entity); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/fair/TestYarnFederationWithFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/fair/TestYarnFederationWithFairScheduler.java index ce27d5a3fc7e6..8af40193149c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/fair/TestYarnFederationWithFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/fair/TestYarnFederationWithFairScheduler.java @@ -17,21 +17,99 @@ */ package org.apache.hadoop.yarn.server.router.subcluster.fair; +import static javax.servlet.http.HttpServletResponse.SC_ACCEPTED; +import static javax.servlet.http.HttpServletResponse.SC_SERVICE_UNAVAILABLE; +import static javax.servlet.http.HttpServletResponse.SC_BAD_REQUEST; +import com.sun.jersey.api.client.ClientResponse; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Sets; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterUserInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerQueueInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntryList; import org.apache.hadoop.yarn.server.router.subcluster.TestFederationSubCluster; import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterInfo; +import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterUserInfo; +import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo; +import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo; +import org.codehaus.jettison.json.JSONObject; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.TimeoutException; import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RM_WEB_SERVICE_PATH; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.INFO; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.CLUSTER_USER_INFO; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.METRICS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.SCHEDULER; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.STATES; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES_NODEID; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES_NODEID_REPLACE_LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.SCHEDULER_ACTIVITIES; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_NEW_APPLICATION; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APP_STATISTICS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APP_ID; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_APPATTEMPTS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_STATE; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_PRIORITY; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_QUEUE; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_TIMEOUTS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_TIMEOUT; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_NEW; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_SUBMIT; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_UPDATE; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_DELETE; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.GET_NODE_LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES_NODEID_GETLABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.LABEL_MAPPINGS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.ADD_NODE_LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.GET_NODE_TO_LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.REMOVE_NODE_LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.REPLACE_NODE_TO_LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODE_RESOURCE; +import static org.apache.hadoop.yarn.server.router.subcluster.TestFederationSubCluster.format; +import static org.apache.hadoop.yarn.server.router.webapp.HTTPMethods.POST; +import static org.apache.hadoop.yarn.server.router.webapp.HTTPMethods.PUT; +import static org.apache.http.HttpStatus.SC_OK; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -40,6 +118,8 @@ public class TestYarnFederationWithFairScheduler { private static TestFederationSubCluster testFederationSubCluster; private static Set subClusters; private static final String ROUTER_WEB_ADDRESS = "http://localhost:28089"; + private static final String SC1_RM_WEB_ADDRESS = "http://localhost:38088"; + private static final String SC2_RM_WEB_ADDRESS = "http://localhost:48088"; @BeforeClass public static void setUp() @@ -72,4 +152,488 @@ public void testGetClusterInfo() throws InterruptedException, IOException { assertTrue(subClusters.contains(clusterInfo.getSubClusterId())); } } + + @Test + public void testInfo() throws InterruptedException, IOException { + FederationClusterInfo federationClusterInfo = + TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, RM_WEB_SERVICE_PATH + INFO, + FederationClusterInfo.class, null, null); + List clusterInfos = federationClusterInfo.getList(); + assertNotNull(clusterInfos); + assertEquals(2, clusterInfos.size()); + for (ClusterInfo clusterInfo : clusterInfos) { + assertNotNull(clusterInfo); + assertTrue(subClusters.contains(clusterInfo.getSubClusterId())); + } + } + + @Test + public void testClusterUserInfo() throws Exception { + FederationClusterUserInfo federationClusterUserInfo = + TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + CLUSTER_USER_INFO, + FederationClusterUserInfo.class, null, null); + List clusterUserInfos = federationClusterUserInfo.getList(); + assertNotNull(clusterUserInfos); + assertEquals(2, clusterUserInfos.size()); + for (ClusterUserInfo clusterUserInfo : clusterUserInfos) { + assertNotNull(clusterUserInfo); + assertTrue(subClusters.contains(clusterUserInfo.getSubClusterId())); + } + } + + @Test + public void testMetricsInfo() throws Exception { + // It takes time to start the sub-cluster. + // We need to wait for the sub-cluster to be completely started, + // so we need to set the waiting time. + // The resources of the two sub-clusters we registered are 24C and 12G, + // so the resources that the Router should collect are 48C and 24G. + GenericTestUtils.waitFor(() -> { + try { + ClusterMetricsInfo clusterMetricsInfo = + TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + METRICS, ClusterMetricsInfo.class, null, null); + assertNotNull(clusterMetricsInfo); + return (48 == clusterMetricsInfo.getTotalVirtualCores() && + 24576 == clusterMetricsInfo.getTotalMB()); + } catch (Exception e) { + return false; + } + }, 5000, 50 * 5000); + } + + @Test + public void testSchedulerInfo() throws Exception { + FederationSchedulerTypeInfo schedulerTypeInfo = + TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + SCHEDULER, FederationSchedulerTypeInfo.class, null, null); + assertNotNull(schedulerTypeInfo); + List schedulerTypeInfos = schedulerTypeInfo.getList(); + assertNotNull(schedulerTypeInfos); + assertEquals(2, schedulerTypeInfos.size()); + for (SchedulerTypeInfo schedulerTypeInfoItem : schedulerTypeInfos) { + assertNotNull(schedulerTypeInfoItem); + assertTrue(subClusters.contains(schedulerTypeInfoItem.getSubClusterId())); + FairSchedulerQueueInfo rootQueueInfo = + ((FairSchedulerInfo) schedulerTypeInfoItem.getSchedulerInfo()).getRootQueueInfo(); + assertNotNull(rootQueueInfo); + assertEquals("fair", rootQueueInfo.getSchedulingPolicy()); + } + } + + @Test + public void testNodesEmpty() throws Exception { + // We are in 2 sub-clusters, each with 3 nodes, so our Router should correspond to 6 nodes. + GenericTestUtils.waitFor(() -> { + try { + NodesInfo nodesInfo = + TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + NODES, NodesInfo.class, null, null); + assertNotNull(nodesInfo); + ArrayList nodes = nodesInfo.getNodes(); + assertNotNull(nodes); + return (6 == nodes.size()); + } catch (Exception e) { + return false; + } + }, 5000, 50 * 5000); + } + + @Test + public void testNodesLost() throws Exception { + GenericTestUtils.waitFor(() -> { + try { + NodesInfo nodesInfo = + TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + NODES, NodesInfo.class, STATES, "LOST"); + assertNotNull(nodesInfo); + ArrayList nodes = nodesInfo.getNodes(); + assertNotNull(nodes); + return nodes.isEmpty(); + } catch (Exception e) { + return false; + } + }, 5000, 50 * 5000); + } + + @Test + public void testNode() throws Exception { + String rm1NodeId = testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS); + NodeInfo nodeInfo = + TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(NODES_NODEID, rm1NodeId), + NodeInfo.class, null, null); + assertNotNull(nodeInfo); + assertEquals(rm1NodeId, nodeInfo.getNodeId()); + + String rm2NodeId = testFederationSubCluster.getNodeId(SC2_RM_WEB_ADDRESS); + NodeInfo nodeInfo2 = + TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(NODES_NODEID, rm2NodeId), + NodeInfo.class, null, null); + assertNotNull(nodeInfo2); + assertEquals(rm2NodeId, nodeInfo2.getNodeId()); + } + + @Test + public void testUpdateNodeResource() throws Exception { + // wait until a node shows up and check the resources + GenericTestUtils.waitFor(() -> testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS) != null, + 100, 5 * 1000); + String rm1NodeId = testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS); + + // assert memory and default vcores + NodeInfo nodeInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(NODES_NODEID, rm1NodeId), + NodeInfo.class, null, null); + assertEquals(4096, nodeInfo.getTotalResource().getMemorySize()); + assertEquals(8, nodeInfo.getTotalResource().getvCores()); + + Resource resource = Resource.newInstance(4096, 5); + ResourceOptionInfo resourceOption = new ResourceOptionInfo( + ResourceOption.newInstance(resource, 1000)); + ClientResponse routerResponse = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(NODE_RESOURCE, rm1NodeId), + null, null, resourceOption, POST); + JSONObject json = routerResponse.getEntity(JSONObject.class); + JSONObject totalResource = json.getJSONObject("resourceInfo"); + assertEquals(resource.getMemorySize(), totalResource.getLong("memory")); + assertEquals(resource.getVirtualCores(), totalResource.getLong("vCores")); + + // assert updated memory and cores + NodeInfo nodeInfo1 = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(NODES_NODEID, rm1NodeId), + NodeInfo.class, null, null); + assertEquals(4096, nodeInfo1.getTotalResource().getMemorySize()); + assertEquals(5, nodeInfo1.getTotalResource().getvCores()); + } + + @Test + public void testActivies() throws Exception { + // wait until a node shows up and check the resources + GenericTestUtils.waitFor(() -> testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS) != null, + 100, 5 * 1000); + String rm1NodeId = testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS); + + ActivitiesInfo activitiesInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + SCHEDULER_ACTIVITIES, ActivitiesInfo.class, "nodeId", rm1NodeId); + + assertNotNull(activitiesInfo); + assertEquals(rm1NodeId, activitiesInfo.getNodeId()); + assertEquals("Not Capacity Scheduler", activitiesInfo.getDiagnostic()); + } + + @Test + public void testAppActivities() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + AppActivitiesInfo appActivitiesInfo = + TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + "/scheduler/app-activities/" + appId, + AppActivitiesInfo.class, APP_ID, appId); + assertNotNull(appActivitiesInfo); + assertEquals(appId, appActivitiesInfo.getApplicationId()); + assertEquals("Not Capacity Scheduler", appActivitiesInfo.getDiagnostic()); + } + + @Test + public void testAppStatistics() throws Exception { + ApplicationStatisticsInfo applicationStatisticsInfo = + TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + APP_STATISTICS, ApplicationStatisticsInfo.class, STATES, "RUNNING"); + assertNotNull(applicationStatisticsInfo); + ArrayList statItems = applicationStatisticsInfo.getStatItems(); + assertNotNull(statItems); + assertEquals(1, statItems.size()); + } + + @Test + public void testNewApplication() throws Exception { + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + APPS_NEW_APPLICATION, null, + null, null, POST); + assertEquals(SC_OK, response.getStatus()); + NewApplication ci = response.getEntity(NewApplication.class); + assertNotNull(ci); + } + + @Test + public void testSubmitApplication() { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + assertNotNull(appId); + } + + @Test + public void testApps() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + assertNotNull(appId); + AppsInfo appsInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + APPS, AppsInfo.class, null, null); + assertNotNull(appsInfo); + assertEquals(1, appsInfo.getApps().size()); + } + + @Test + public void testAppAttempt() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + assertNotNull(appId); + AppAttemptsInfo appAttemptsInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(APPS_APPID_APPATTEMPTS, appId), + AppAttemptsInfo.class, null, null); + assertNotNull(appAttemptsInfo); + ArrayList attempts = appAttemptsInfo.getAttempts(); + assertNotNull(attempts); + assertEquals(1, attempts.size()); + } + + @Test + public void testAppState() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + assertNotNull(appId); + AppState appState = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(APPS_APPID_STATE, appId), + AppState.class, null, null); + assertNotNull(appState); + String state = appState.getState(); + assertNotNull(state); + assertEquals("ACCEPTED", state); + } + + @Test + public void testUpdateAppState() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + assertNotNull(appId); + AppState appState = new AppState("KILLED"); + String pathApp = RM_WEB_SERVICE_PATH + format(APPS_APPID_STATE, appId); + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + pathApp, null, null, appState, PUT); + assertNotNull(response); + assertEquals(SC_ACCEPTED, response.getStatus()); + AppState appState1 = response.getEntity(AppState.class); + assertNotNull(appState1); + assertNotNull(appState1.getState()); + assertEquals("KILLING", appState1.getState()); + } + + @Test + public void testAppPriority() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + assertNotNull(appId); + AppPriority appPriority = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(APPS_APPID_PRIORITY, appId), + AppPriority.class, null, null); + assertNotNull(appPriority); + assertEquals(0, appPriority.getPriority()); + } + + @Test + public void testUpdateAppPriority() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + AppPriority appPriority = new AppPriority(1); + // FairScheduler does not support Update Application Priority. + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(APPS_APPID_PRIORITY, appId), + null, null, appPriority, PUT); + assertEquals(SC_SERVICE_UNAVAILABLE, response.getStatus()); + } + + @Test + public void testAppQueue() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + AppQueue appQueue = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(APPS_APPID_QUEUE, appId), + AppQueue.class, null, null); + assertNotNull(appQueue); + String queue = appQueue.getQueue(); + assertEquals("root.dr_dot_who", queue); + } + + @Test + public void testUpdateAppQueue() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + AppQueue appQueue = new AppQueue("root.a"); + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(APPS_APPID_QUEUE, appId), + null, null, appQueue, PUT); + assertEquals(SC_OK, response.getStatus()); + AppQueue appQueue1 = response.getEntity(AppQueue.class); + assertNotNull(appQueue1); + String queue1 = appQueue1.getQueue(); + assertEquals("root.a", queue1); + } + + @Test + public void testAppTimeouts() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + AppTimeoutsInfo appTimeoutsInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(APPS_TIMEOUTS, appId), + AppTimeoutsInfo.class, null, null); + assertNotNull(appTimeoutsInfo); + ArrayList appTimeouts = appTimeoutsInfo.getAppTimeouts(); + assertNotNull(appTimeouts); + assertEquals(1, appTimeouts.size()); + AppTimeoutInfo appTimeoutInfo = appTimeouts.get(0); + assertNotNull(appTimeoutInfo); + assertEquals(ApplicationTimeoutType.LIFETIME, appTimeoutInfo.getTimeoutType()); + assertEquals("UNLIMITED", appTimeoutInfo.getExpireTime()); + } + + @Test + public void testAppTimeout() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + String pathApp = RM_WEB_SERVICE_PATH + format(APPS_TIMEOUTS, appId); + AppTimeoutInfo appTimeoutInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + pathApp + "/" + "LIFETIME", AppTimeoutInfo.class, null, null); + assertNotNull(appTimeoutInfo); + } + + @Test + public void testUpdateAppTimeouts() throws Exception { + String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS); + AppTimeoutInfo appTimeoutInfo = new AppTimeoutInfo(); + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(APPS_TIMEOUT, appId), + null, null, appTimeoutInfo, PUT); + assertEquals(SC_BAD_REQUEST, response.getStatus()); + String entity = response.getEntity(String.class); + assertNotNull(entity); + } + + @Test + public void testNewReservation() throws Exception { + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + RESERVATION_NEW, + null, null, null, POST); + assertEquals(SC_OK, response.getStatus()); + NewReservation ci = response.getEntity(NewReservation.class); + assertNotNull(ci); + } + + @Test + public void testSubmitReservation() throws Exception { + ReservationSubmissionRequestInfo context = new ReservationSubmissionRequestInfo(); + NewReservation newReservationId = + testFederationSubCluster.getNewReservationId(ROUTER_WEB_ADDRESS); + context.setReservationId(newReservationId.getReservationId()); + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + RESERVATION_SUBMIT, null, null, context, POST); + assertEquals(SC_BAD_REQUEST, response.getStatus()); + String entity = response.getEntity(String.class); + assertNotNull(entity); + } + + @Test + public void testUpdateReservation() throws Exception { + NewReservation newReservationId = + testFederationSubCluster.getNewReservationId(ROUTER_WEB_ADDRESS); + String reservationId = newReservationId.getReservationId(); + ReservationUpdateRequestInfo context = new ReservationUpdateRequestInfo(); + context.setReservationId(reservationId); + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + RESERVATION_UPDATE, null, null, context, POST); + assertEquals(SC_BAD_REQUEST, response.getStatus()); + String entity = response.getEntity(String.class); + assertNotNull(entity); + } + + @Test + public void testDeleteReservation() throws Exception { + NewReservation newReservationId = + testFederationSubCluster.getNewReservationId(ROUTER_WEB_ADDRESS); + String reservationId = newReservationId.getReservationId(); + ReservationDeleteRequestInfo context = new ReservationDeleteRequestInfo(); + context.setReservationId(reservationId); + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + RESERVATION_DELETE, null, null, context, POST); + assertEquals(SC_SERVICE_UNAVAILABLE, response.getStatus()); + String entity = response.getEntity(String.class); + assertNotNull(entity); + } + + @Test + public void testGetClusterNodeLabels() throws Exception { + NodeLabelsInfo nodeLabelsInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + GET_NODE_LABELS, NodeLabelsInfo.class, null, null); + assertNotNull(nodeLabelsInfo); + } + + @Test + public void testGetLabelsOnNode() throws Exception { + String rm1NodeId = testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS); + NodeLabelsInfo nodeLabelsInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + format(NODES_NODEID_GETLABELS, rm1NodeId), + NodeLabelsInfo.class, null, null); + assertNotNull(nodeLabelsInfo); + } + + @Test + public void testGetLabelsMappingEmpty() throws Exception { + LabelsToNodesInfo labelsToNodesInfo = + TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + LABEL_MAPPINGS, LabelsToNodesInfo.class, null, null); + assertNotNull(labelsToNodesInfo); + } + + @Test + public void testGetLabelsMapping() throws Exception { + LabelsToNodesInfo labelsToNodesInfo = TestFederationSubCluster.performGetCalls( + ROUTER_WEB_ADDRESS, RM_WEB_SERVICE_PATH + LABEL_MAPPINGS, + LabelsToNodesInfo.class, LABELS, "label1"); + assertNotNull(labelsToNodesInfo); + } + + @Test + public void testAddToClusterNodeLabels() throws Exception { + List nodeLabels = new ArrayList<>(); + nodeLabels.add(NodeLabel.newInstance("default")); + NodeLabelsInfo context = new NodeLabelsInfo(nodeLabels); + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + ADD_NODE_LABELS, null, null, context, POST); + assertEquals(SC_OK, response.getStatus()); + String entity = response.getEntity(String.class); + assertNotNull(entity); + } + + @Test + public void testGetNodeToLabels() throws Exception { + NodeToLabelsInfo nodeToLabelsInfo = TestFederationSubCluster.performGetCalls( + ROUTER_WEB_ADDRESS, RM_WEB_SERVICE_PATH + GET_NODE_TO_LABELS, + NodeToLabelsInfo.class, null, null); + assertNotNull(nodeToLabelsInfo); + } + + @Test + public void testRemoveFromClusterNodeLabels() throws Exception { + testFederationSubCluster.addNodeLabel(ROUTER_WEB_ADDRESS); + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + REMOVE_NODE_LABELS, + LABELS, "default", null, POST); + assertEquals(SC_OK, response.getStatus()); + String entity = response.getEntity(String.class); + assertNotNull(entity); + } + + @Test + public void testReplaceLabelsOnNodes() throws Exception { + testFederationSubCluster.addNodeLabel(ROUTER_WEB_ADDRESS); + NodeToLabelsEntryList context = new NodeToLabelsEntryList(); + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + RM_WEB_SERVICE_PATH + REPLACE_NODE_TO_LABELS, + null, null, context, POST); + String entity = response.getEntity(String.class); + assertNotNull(entity); + } + + @Test + public void testReplaceLabelsOnNode() throws Exception { + String rm1NodeId = testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS); + String pathNode = RM_WEB_SERVICE_PATH + + format(NODES_NODEID_REPLACE_LABELS, rm1NodeId); + testFederationSubCluster.addNodeLabel(ROUTER_WEB_ADDRESS); + ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS, + pathNode, LABELS, "default", null, POST); + assertEquals(SC_OK, response.getStatus()); + String entity = response.getEntity(String.class); + assertNotNull(entity); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/yarn-site.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/yarn-site.xml index 4a28627a9a194..94b7972dae764 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/yarn-site.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/yarn-site.xml @@ -47,4 +47,8 @@ yarn.resourcemanager.cluster-id local-cluster + + yarn.router.interceptor.allow-partial-result.enable + true +