-
Notifications
You must be signed in to change notification settings - Fork 9.2k
YARN-11228. [Federation] Add getAppAttempts, getAppAttempt REST APIs for Router. #4695
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
01c679e
a4052f6
854758d
4eb1a5e
7005b71
b4ad85e
a856cfe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1365,7 +1365,30 @@ public Response updateApplicationTimeout(AppTimeoutInfo appTimeout, | |
|
|
||
| @Override | ||
| public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) { | ||
| throw new NotImplementedException("Code is not implemented"); | ||
| if (appId == null || appId.isEmpty()) { | ||
| throw new IllegalArgumentException("Parameter error, the appId is empty or null."); | ||
| } | ||
|
|
||
| try { | ||
| ApplicationId applicationId = ApplicationId.fromString(appId); | ||
| SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId); | ||
|
|
||
| if (subClusterInfo == null) { | ||
| RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " | ||
| + applicationId, null); | ||
| } | ||
|
|
||
| DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( | ||
| subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); | ||
| return interceptor.getAppAttempts(hsr, appId); | ||
| } catch (IllegalArgumentException e) { | ||
| String msg = String.format("Unable to get the AppAttempt appId: %s.", appId); | ||
| RouterServerUtil.logAndThrowRunTimeException(msg, e); | ||
| } catch (YarnException e) { | ||
| RouterServerUtil.logAndThrowRunTimeException("getContainer Failed.", e); | ||
| } | ||
|
|
||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -1377,7 +1400,35 @@ public RMQueueAclInfo checkUserAccessToQueue(String queue, String username, | |
| @Override | ||
| public AppAttemptInfo getAppAttempt(HttpServletRequest req, | ||
| HttpServletResponse res, String appId, String appAttemptId) { | ||
| throw new NotImplementedException("Code is not implemented"); | ||
|
|
||
| if (appId == null || appId.isEmpty()) { | ||
| throw new IllegalArgumentException("Parameter error, the appId is empty or null."); | ||
| } | ||
| if (appAttemptId == null || appAttemptId.isEmpty()) { | ||
| throw new IllegalArgumentException("Parameter error, the appAttemptId is empty or null."); | ||
| } | ||
|
|
||
| try { | ||
| ApplicationId applicationId = ApplicationId.fromString(appId); | ||
| SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId); | ||
|
|
||
| if (subClusterInfo == null) { | ||
| RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " | ||
| + applicationId, null); | ||
| } | ||
|
|
||
| DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( | ||
| subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); | ||
| return interceptor.getAppAttempt(req, res, appId, appAttemptId); | ||
| } catch (IllegalArgumentException e) { | ||
| String msg = String.format("Unable to get the AppAttempt appId: %s, appAttemptId: %s.", | ||
| appId, appAttemptId); | ||
| RouterServerUtil.logAndThrowRunTimeException(msg, e); | ||
|
||
| } catch (YarnException e) { | ||
| RouterServerUtil.logAndThrowRunTimeException("getContainer Failed.", e); | ||
| } | ||
|
|
||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -1432,8 +1483,8 @@ public ContainerInfo getContainer(HttpServletRequest req, | |
| SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId); | ||
|
|
||
| if (subClusterInfo == null) { | ||
| RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " + | ||
| applicationId, null); | ||
| RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " | ||
| + applicationId, null); | ||
| } | ||
|
|
||
| DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( | ||
|
|
@@ -1491,8 +1542,8 @@ public Response signalToContainer(String containerId, String command, | |
| SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId); | ||
|
|
||
| if (subClusterInfo == null) { | ||
| RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " + | ||
| applicationId, null); | ||
| RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " | ||
| + applicationId, null); | ||
| } | ||
|
|
||
| DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( | ||
|
|
@@ -1565,7 +1616,8 @@ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> c | |
| * @return HomeSubCluster | ||
| * @throws YarnException on failure | ||
| */ | ||
| private SubClusterInfo getHomeSubClusterInfoByAppId(ApplicationId applicationId) throws YarnException { | ||
| private SubClusterInfo getHomeSubClusterInfoByAppId(ApplicationId applicationId) | ||
| throws YarnException { | ||
| SubClusterInfo subClusterInfo = null; | ||
| SubClusterId subClusterId = null; | ||
| try { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,6 +49,11 @@ | |
| import org.apache.hadoop.yarn.api.records.ContainerReport; | ||
| import org.apache.hadoop.yarn.api.records.NodeLabel; | ||
| import org.apache.hadoop.yarn.api.records.SignalContainerCommand; | ||
| import org.apache.hadoop.yarn.api.records.ApplicationReport; | ||
| import org.apache.hadoop.yarn.api.records.YarnApplicationState; | ||
| import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; | ||
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; | ||
| import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; | ||
| import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; | ||
| import org.apache.hadoop.yarn.exceptions.YarnException; | ||
| import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; | ||
|
|
@@ -67,6 +72,8 @@ | |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo; | ||
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo; | ||
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo; | ||
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo; | ||
| import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; | ||
| import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; | ||
| import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; | ||
| import org.apache.hadoop.yarn.webapp.NotFoundException; | ||
|
|
@@ -412,4 +419,50 @@ public Response signalToContainer(String containerId, String command, | |
|
|
||
| return Response.status(Status.OK).build(); | ||
| } | ||
|
|
||
| @Override | ||
| public org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo getAppAttempt(HttpServletRequest req, HttpServletResponse res, | ||
|
||
| String appId, String appAttemptId) { | ||
| if (!isRunning) { | ||
| throw new RuntimeException("RM is stopped"); | ||
| } | ||
|
|
||
| ApplicationId applicationId = ApplicationId.fromString(appId); | ||
| if (!applicationMap.contains(applicationId)) { | ||
| throw new NotFoundException("app with id: " + appId + " not found"); | ||
| } | ||
|
|
||
| ApplicationReport newApplicationReport = ApplicationReport.newInstance( | ||
| applicationId, ApplicationAttemptId.newInstance(applicationId, Integer.parseInt(appAttemptId)), "user", | ||
| "queue", "appname", "host", 124, null, | ||
| YarnApplicationState.RUNNING, "diagnostics", "url", 1, 2, 3, 4, | ||
| FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null); | ||
|
|
||
| ApplicationAttemptReport attempt = ApplicationAttemptReport.newInstance( | ||
| ApplicationAttemptId.newInstance(applicationId, Integer.parseInt(appAttemptId)), | ||
| "host", 124, "url", "oUrl", "diagnostics", | ||
| YarnApplicationAttemptState.FINISHED, ContainerId.newContainerId( | ||
| newApplicationReport.getCurrentApplicationAttemptId(), 1)); | ||
|
|
||
| return new AppAttemptInfo(attempt); | ||
| } | ||
|
|
||
| @Override | ||
| public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) { | ||
| if (!isRunning) { | ||
| throw new RuntimeException("RM is stopped"); | ||
| } | ||
|
|
||
| ApplicationId applicationId = ApplicationId.fromString(appId); | ||
| if (!applicationMap.contains(applicationId)) { | ||
| throw new NotFoundException("app with id: " + appId + " not found"); | ||
| } | ||
|
|
||
| AppAttemptsInfo infos = new AppAttemptsInfo(); | ||
| org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo | ||
| appAttemptInfo1 = new org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo(); | ||
| infos.add(appAttemptInfo1); | ||
|
|
||
| return infos; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -56,6 +56,7 @@ | |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo; | ||
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo; | ||
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo; | ||
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo; | ||
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo; | ||
| import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; | ||
| import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; | ||
|
|
@@ -717,4 +718,37 @@ public void testGetContainer() | |
| appId.toString(), appAttemptId.toString(), "0"); | ||
| Assert.assertNotNull(containerInfo); | ||
| } | ||
|
|
||
| @Test | ||
| public void testGetAppAttempts() | ||
| throws IOException, InterruptedException, YarnException { | ||
| // Submit application to multiSubCluster | ||
| ApplicationId appId = ApplicationId.newInstance(Time.now(), 1); | ||
| ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo(); | ||
| context.setApplicationId(appId.toString()); | ||
|
|
||
| Assert.assertNotNull(interceptor.submitApplication(context, null)); | ||
|
|
||
| AppAttemptsInfo appAttemptsInfo = interceptor.getAppAttempts(null, appId.toString()); | ||
| Assert.assertNotNull(appAttemptsInfo); | ||
| Assert.assertTrue(!appAttemptsInfo.getAttempts().isEmpty()); | ||
|
||
| } | ||
|
|
||
| @Test | ||
| public void testGetAppAttempt() | ||
| throws IOException, InterruptedException, YarnException { | ||
| // Submit application to multiSubCluster | ||
| ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); | ||
| ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo(); | ||
| context.setApplicationId(appId.toString()); | ||
| Assert.assertNotNull(interceptor.submitApplication(context, null)); | ||
| ApplicationAttemptId appAttemptIdExcept = ApplicationAttemptId.newInstance(appId, 1); | ||
|
||
|
|
||
| org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo | ||
| appAttemptInfo = interceptor.getAppAttempt(null, null, appId.toString(), | ||
| appAttemptIdExcept.toString()); | ||
| Assert.assertNotNull(appAttemptInfo); | ||
|
|
||
| Assert.assertEquals(appAttemptIdExcept, appAttemptInfo.getAppAttemptId()); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should've proposed this before but we keep doing this string to ApplicationId.
We could just have this getHomeSubClusterInfoByAppId() to take a string and do the transformation there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your suggestion, I will modify the code.