Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.webapp.dao;

import java.util.ArrayList;
import java.util.Collection;

import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
Expand Down Expand Up @@ -46,4 +47,7 @@ public ArrayList<ContainerInfo> getContainers() {
return container;
}

public void addAll(Collection<ContainerInfo> containersInfo) {
container.addAll(containersInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.router.webapp;

import java.io.IOException;
import java.lang.reflect.Method;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -92,6 +93,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.router.RouterMetrics;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
Expand Down Expand Up @@ -1053,7 +1055,6 @@ public ClusterMetricsInfo call() {
}

// Collect all the responses in parallel

for (int i = 0; i < subClustersActive.size(); i++) {
try {
Future<ClusterMetricsInfo> future = compSvc.take();
Expand Down Expand Up @@ -1336,7 +1337,33 @@ public AppAttemptInfo getAppAttempt(HttpServletRequest req,
@Override
public ContainersInfo getContainers(HttpServletRequest req,
HttpServletResponse res, String appId, String appAttemptId) {
throw new NotImplementedException("Code is not implemented");

ContainersInfo containersInfo = new ContainersInfo();

Map<SubClusterId, SubClusterInfo> subClustersActive;
try {
subClustersActive = getActiveSubclusters();
} catch (NotFoundException e) {
LOG.error("Get all active sub cluster(s) error.", e);
return containersInfo;
}

try {
Class[] argsClasses = new Class[]{
HttpServletRequest.class, HttpServletResponse.class, String.class, String.class};
Object[] args = new Object[]{req, res, appId, appAttemptId};
ClientMethod remoteMethod = new ClientMethod("getContainers", argsClasses, args);
Map<SubClusterInfo, ContainersInfo> containersInfoMap =
invokeConcurrent(subClustersActive.values(), remoteMethod, ContainersInfo.class);
if (containersInfoMap != null) {
containersInfoMap.values().forEach(containers ->
containersInfo.addAll(containers.getContainers()));
}
} catch (Exception ex) {
LOG.error("Failed to return GetContainers.", ex);
}

return containersInfo;
}

@Override
Expand Down Expand Up @@ -1366,4 +1393,44 @@ public void shutdown() {
threadpool.shutdown();
}
}

private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> clusterIds,
ClientMethod request, Class<R> clazz) {

Map<SubClusterInfo, R> results = new HashMap<>();

// Send the requests in parallel
CompletionService<R> compSvc = new ExecutorCompletionService<>(this.threadpool);

for (final SubClusterInfo info : clusterIds) {
compSvc.submit(() -> {
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
info.getSubClusterId(), info.getRMWebServiceAddress());
try {
Method method = DefaultRequestInterceptorREST.class.
getMethod(request.getMethodName(), request.getTypes());
Object retObj = method.invoke(interceptor, request.getParams());
R ret = clazz.cast(retObj);
return ret;
} catch (Exception e) {
LOG.error("SubCluster {} failed to call {} method.", info.getSubClusterId(),
request.getMethodName(), e);
return null;
}
});
}

clusterIds.stream().forEach(clusterId -> {
try {
Future<R> future = compSvc.take();
R response = future.get();
if (response != null) {
results.put(clusterId, response);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi,@slfan1989,I have a question.

  • Operation compSvc.take() is not guaranteed to be in the same order as operation compSvc.submit(), in addition, clusterIds type might be Set. So, clusterId->response maybe mistake?
  • Can we do this like FederationClientInterceptor#invokeConcurrent ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for your feedback, I agree with your description. For the results of the FederationInterceptorREST interface, we will integrate the results returned by all subClusters. This part does not care about the return order of multi-threads. I've submitted a Follow Up pr to fix this, thanks again!

}
} catch (Throwable e) {
LOG.warn("SubCluster {} failed to {} report.", clusterId, request.getMethodName(), e);
}
});
return results;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,20 @@
import java.util.concurrent.atomic.AtomicInteger;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;

import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
Expand All @@ -45,6 +52,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -231,4 +240,43 @@ public boolean isRunning() {
public void setRunning(boolean runningMode) {
this.isRunning = runningMode;
}

@Override
public ContainersInfo getContainers(HttpServletRequest req, HttpServletResponse res,
String appId, String appAttemptId) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}

// We avoid to check if the Application exists in the system because we need
// to validate that each subCluster returns 1 container.
ContainersInfo containers = new ContainersInfo();

int subClusterId = Integer.valueOf(getSubClusterId().getId());

ContainerId containerId = ContainerId.newContainerId(
ApplicationAttemptId.fromString(appAttemptId), subClusterId);
Resource allocatedResource =
Resource.newInstance(subClusterId, subClusterId);

NodeId assignedNode = NodeId.newInstance("Node", subClusterId);
Priority priority = Priority.newInstance(subClusterId);
long creationTime = subClusterId;
long finishTime = subClusterId;
String diagnosticInfo = "Diagnostic " + subClusterId;
String logUrl = "Log " + subClusterId;
int containerExitStatus = subClusterId;
ContainerState containerState = ContainerState.COMPLETE;
String nodeHttpAddress = "HttpAddress " + subClusterId;

ContainerReport containerReport = ContainerReport.newInstance(
containerId, allocatedResource, assignedNode, priority,
creationTime, finishTime, diagnosticInfo, logUrl,
containerExitStatus, containerState, nodeHttpAddress);

ContainerInfo container = new ContainerInfo(containerReport);
containers.add(container);

return containers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@

import javax.ws.rs.core.Response;

import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
Expand All @@ -47,6 +49,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -160,7 +163,7 @@ public void testSubmitApplication()
throws YarnException, IOException, InterruptedException {

ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationId.newInstance(Time.now(), 1);

ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
Expand All @@ -187,7 +190,7 @@ public void testSubmitApplicationMultipleSubmission()
throws YarnException, IOException, InterruptedException {

ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
Expand Down Expand Up @@ -259,7 +262,7 @@ public void testForceKillApplication()
throws YarnException, IOException, InterruptedException {

ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
Expand All @@ -286,7 +289,7 @@ public void testForceKillApplicationNotExists()
throws YarnException, IOException, InterruptedException {

ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationId.newInstance(Time.now(), 1);
AppState appState = new AppState("KILLED");

Response response =
Expand Down Expand Up @@ -317,7 +320,7 @@ public void testForceKillApplicationWrongFormat()
public void testForceKillApplicationEmptyRequest()
throws YarnException, IOException, InterruptedException {
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationId.newInstance(Time.now(), 1);

ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
Expand All @@ -341,7 +344,7 @@ public void testGetApplicationReport()
throws YarnException, IOException, InterruptedException {

ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
Expand Down Expand Up @@ -478,7 +481,7 @@ public void testGetApplicationState()
throws YarnException, IOException, InterruptedException {

ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
Expand All @@ -505,7 +508,7 @@ public void testGetApplicationStateNotExists()
throws YarnException, IOException, InterruptedException {

ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationId.newInstance(Time.now(), 1);

AppState response = interceptor.getAppState(null, appId.toString());

Expand Down Expand Up @@ -560,4 +563,46 @@ SubClusterState.SC_RUNNING, new MonotonicClock().getTime(),
SubClusterRegisterRequest.newInstance(subClusterInfo));
}

@Test
public void testGetContainers()
throws YarnException, IOException, InterruptedException {

ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());

// Submit the application we want the report later
Response response = interceptor.submitApplication(context, null);

Assert.assertNotNull(response);
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));

ApplicationAttemptId appAttempt = ApplicationAttemptId.newInstance(appId, 1);

ContainersInfo responseGet = interceptor.getContainers(
null, null, appId.toString(), appAttempt.toString());

Assert.assertEquals(4, responseGet.getContainers().size());
}

@Test
public void testGetContainersNotExists() {
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ContainersInfo response = interceptor.getContainers(null, null, appId.toString(), null);
Assert.assertTrue(response.getContainers().isEmpty());
}

@Test
public void testGetContainersWrongFormat() {
ContainersInfo response = interceptor.getContainers(null, null, "Application_wrong_id", null);

Assert.assertNotNull(response);
Assert.assertTrue(response.getContainers().isEmpty());

ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
response = interceptor.getContainers(null, null, appId.toString(), "AppAttempt_wrong_id");

Assert.assertTrue(response.getContainers().isEmpty());
}
}