Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.yarn.api.protocolrecords;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import org.apache.hadoop.classification.InterfaceAudience.Private;
Expand Down Expand Up @@ -52,6 +54,16 @@ public static GetApplicationsResponse newInstance(
return response;
}

@Private
@Unstable
public static GetApplicationsResponse newInstance(
Collection<ApplicationReport> applications) {
GetApplicationsResponse response =
Records.newRecord(GetApplicationsResponse.class);
response.setApplicationList(new ArrayList<>(applications));
return response;
}

/**
* Get <code>ApplicationReport</code> for applications.
* @return <code>ApplicationReport</code> for applications
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3991,6 +3991,15 @@ public static boolean isAclEnabled(Configuration conf) {
ROUTER_PREFIX + "submit.retry";
public static final int DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY = 3;

/**
* The interceptor class used in FederationClientInterceptor should return
* partial ApplicationReports.
*/
public static final String ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED =
ROUTER_PREFIX + "partial-result.enabled";
public static final boolean DEFAULT_ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED =
false;

public static final String ROUTER_WEBAPP_PREFIX = ROUTER_PREFIX + "webapp.";

public static final String ROUTER_USER_CLIENT_THREADS_SIZE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ public void initializeMemberVariables() {

configurationPrefixToSkipCompare
.add(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY);
configurationPrefixToSkipCompare
.add(YarnConfiguration.ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED);
configurationPrefixToSkipCompare
.add(YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED);
configurationPrefixToSkipCompare
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
Expand Down Expand Up @@ -161,6 +162,7 @@ public class FederationClientInterceptor
private RouterMetrics routerMetrics;
private ThreadPoolExecutor executorService;
private final Clock clock = new MonotonicClock();
private boolean returnPartialReport;

@Override
public void init(String userName) {
Expand Down Expand Up @@ -196,6 +198,10 @@ public void init(String userName) {
clientRMProxies =
new ConcurrentHashMap<SubClusterId, ApplicationClientProtocol>();
routerMetrics = RouterMetrics.getMetrics();

returnPartialReport = conf.getBoolean(
YarnConfiguration.ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED,
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED);
}

@Override
Expand Down Expand Up @@ -599,10 +605,44 @@ public GetApplicationReportResponse getApplicationReport(
return response;
}

/**
* The Yarn Router will forward the request to all the Yarn RMs in parallel,
* after that it will group all the ApplicationReports by the ApplicationId.
*
* Possible failure:
*
* Client: identical behavior as {@code ClientRMService}.
*
* Router: the Client will timeout and resubmit the request.
*
* ResourceManager: the Router calls each Yarn RM in parallel. In case a
* Yarn RM fails, a single call will timeout. However the Router will
* merge the ApplicationReports it got, and provides a partial list to
* the client.
*
* State Store: the Router will timeout and it will retry depending on the
* FederationFacade settings - if the failure happened before the select
* operation.
*/
@Override
public GetApplicationsResponse getApplications(GetApplicationsRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null) {
RouterServerUtil.logAndThrowException(
"Missing getApplications request.",
null);
}
Map<SubClusterId, SubClusterInfo> subclusters =
federationFacade.getSubClusters(true);
ClientMethod remoteMethod = new ClientMethod("getApplications",
new Class[] {GetApplicationsRequest.class}, new Object[] {request});
Map<SubClusterId, GetApplicationsResponse> applications =
invokeConcurrent(subclusters.keySet(), remoteMethod,
GetApplicationsResponse.class);

// Merge the Application Reports
return RouterYarnClientUtils.mergeApplications(applications.values(),
returnPartialReport);
}

@Override
Expand Down Expand Up @@ -676,6 +716,12 @@ public Object call() throws Exception {
return results;
}

<R> Map<SubClusterId, R> invokeConcurrent(Collection<SubClusterId> clusterIds,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add break line right before the method definition.

ClientMethod request, Class<R> clazz) throws YarnException, IOException {
ArrayList<SubClusterId> clusterIdList = new ArrayList<>(clusterIds);
return invokeConcurrent(clusterIdList, request, clazz);
}

@Override
public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
throws YarnException, IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,25 @@
package org.apache.hadoop.yarn.server.router.clientrm;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.apache.hadoop.yarn.util.resource.Resources;

/**
* Util class for Router Yarn client API calls.
*/
public final class RouterYarnClientUtils {

private final static String PARTIAL_REPORT = "Partial Report ";

private RouterYarnClientUtils() {

}
Expand All @@ -52,4 +63,130 @@ public static GetClusterMetricsResponse merge(
}
return GetClusterMetricsResponse.newInstance(tmp);
}

/**
* Merges a list of ApplicationReports grouping by ApplicationId.
* Our current policy is to merge the application reports from the reachable
* SubClusters.
* @param responses a list of ApplicationResponse to merge
* @param returnPartialResult if the merge ApplicationReports should contain
* partial result or not
* @return the merged ApplicationsResponse
*/
public static GetApplicationsResponse mergeApplications(
Collection<GetApplicationsResponse> responses,
boolean returnPartialResult){
Map<ApplicationId, ApplicationReport> federationAM = new HashMap<>();
Map<ApplicationId, ApplicationReport> federationUAMSum = new HashMap<>();

for (GetApplicationsResponse appResponse : responses){
for (ApplicationReport appReport : appResponse.getApplicationList()){
ApplicationId appId = appReport.getApplicationId();
// Check if this ApplicationReport is an AM
if (!appReport.isUnmanagedApp()) {
// Insert in the list of AM
federationAM.put(appId, appReport);
// Check if there are any UAM found before
if (federationUAMSum.containsKey(appId)) {
// Merge the current AM with the found UAM
mergeAMWithUAM(appReport, federationUAMSum.get(appId));
// Remove the sum of the UAMs
federationUAMSum.remove(appId);
}
// This ApplicationReport is an UAM
} else if (federationAM.containsKey(appId)) {
// Merge the current UAM with its own AM
mergeAMWithUAM(federationAM.get(appId), appReport);
} else if (federationUAMSum.containsKey(appId)) {
// Merge the current UAM with its own UAM and update the list of UAM
ApplicationReport mergedUAMReport =
mergeUAMWithUAM(federationUAMSum.get(appId), appReport);
federationUAMSum.put(appId, mergedUAMReport);
} else {
// Insert in the list of UAM
federationUAMSum.put(appId, appReport);
}
}
}
// Check the remaining UAMs are depending or not from federation
for (ApplicationReport appReport : federationUAMSum.values()) {
if (mergeUamToReport(appReport.getName(), returnPartialResult)) {
federationAM.put(appReport.getApplicationId(), appReport);
}
}

return GetApplicationsResponse.newInstance(federationAM.values());
}

private static ApplicationReport mergeUAMWithUAM(ApplicationReport uam1,
ApplicationReport uam2){
uam1.setName(PARTIAL_REPORT + uam1.getApplicationId());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we test for this kind of name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test for checking UAM merge.

mergeAMWithUAM(uam1, uam2);
return uam1;
}

private static void mergeAMWithUAM(ApplicationReport am,
ApplicationReport uam){
ApplicationResourceUsageReport amResourceReport =
am.getApplicationResourceUsageReport();

ApplicationResourceUsageReport uamResourceReport =
uam.getApplicationResourceUsageReport();

amResourceReport.setNumUsedContainers(
amResourceReport.getNumUsedContainers() +
uamResourceReport.getNumUsedContainers());

amResourceReport.setNumReservedContainers(
amResourceReport.getNumReservedContainers() +
uamResourceReport.getNumReservedContainers());

amResourceReport.setUsedResources(Resources.add(
amResourceReport.getUsedResources(),
uamResourceReport.getUsedResources()));

amResourceReport.setReservedResources(Resources.add(
amResourceReport.getReservedResources(),
uamResourceReport.getReservedResources()));

amResourceReport.setNeededResources(Resources.add(
amResourceReport.getNeededResources(),
uamResourceReport.getNeededResources()));

amResourceReport.setMemorySeconds(
amResourceReport.getMemorySeconds() +
uamResourceReport.getMemorySeconds());

amResourceReport.setVcoreSeconds(
amResourceReport.getVcoreSeconds() +
uamResourceReport.getVcoreSeconds());

amResourceReport.setQueueUsagePercentage(
amResourceReport.getQueueUsagePercentage() +
uamResourceReport.getQueueUsagePercentage());

amResourceReport.setClusterUsagePercentage(
amResourceReport.getClusterUsagePercentage() +
uamResourceReport.getClusterUsagePercentage());

am.setApplicationResourceUsageReport(amResourceReport);
}

/**
* Returns whether or not to add an unmanaged application to the report.
* @param appName Application Name
* @param returnPartialResult if the merge ApplicationReports should contain
* partial result or not
*/
private static boolean mergeUamToReport(String appName,
boolean returnPartialResult){
if (returnPartialResult) {
return true;
}
if (appName == null) {
return false;
}
return !(appName.startsWith(UnmanagedApplicationManager.APP_NAME) ||
appName.startsWith(PARTIAL_REPORT));
}
}
Loading