Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -3991,6 +3991,15 @@ public static boolean isAclEnabled(Configuration conf) {
ROUTER_PREFIX + "submit.retry";
public static final int DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY = 3;

/**
* The interceptor class used in FederationClientInterceptor should return
* partial ApplicationReports.
*/
public static final String ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED =
ROUTER_PREFIX + "partial-result.enabled";
public static final boolean DEFAULT_ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED =
false;

public static final String ROUTER_WEBAPP_PREFIX = ROUTER_PREFIX + "webapp.";

public static final String ROUTER_USER_CLIENT_THREADS_SIZE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ public void initializeMemberVariables() {

configurationPrefixToSkipCompare
.add(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY);
configurationPrefixToSkipCompare
.add(YarnConfiguration.ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Match the indentation with the previous one.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

configurationPrefixToSkipCompare
.add(YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED);
configurationPrefixToSkipCompare
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public class FederationClientInterceptor
private RouterMetrics routerMetrics;
private ThreadPoolExecutor executorService;
private final Clock clock = new MonotonicClock();
private boolean returnPartialReport;

@Override
public void init(String userName) {
Expand Down Expand Up @@ -196,6 +197,13 @@ public void init(String userName) {
clientRMProxies =
new ConcurrentHashMap<SubClusterId, ApplicationClientProtocol>();
routerMetrics = RouterMetrics.getMetrics();

returnPartialReport =

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lines could be split in a more friendly way:

Suggested change
returnPartialReport =
returnPartialReport = conf.getBoolean(
YarnConfiguration.ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED,
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

conf.getBoolean(
YarnConfiguration
.ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED,
YarnConfiguration
.DEFAULT_ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED);
}

@Override
Expand Down Expand Up @@ -599,10 +607,45 @@ public GetApplicationReportResponse getApplicationReport(
return response;
}

/**
* The Yarn Router will forward the request to all the Yarn RMs in parallel,
* after that it will group all the ApplicationReports by the ApplicationId.
*
* Possible failure:
*
* Client: identical behavior as {@code ClientRMService}.
*
* Router: the Client will timeout and resubmit the request.
*
* ResourceManager: the Router calls each Yarn RM in parallel. In case a
* Yarn RM fails, a single call will timeout. However the Router will
* merge the ApplicationReports it got, and provides a partial list to
* the client.
*
* State Store: the Router will timeout and it will retry depending on the
* FederationFacade settings - if the failure happened before the select
* operation.
*/
@Override
public GetApplicationsResponse getApplications(GetApplicationsRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null) {
RouterServerUtil.logAndThrowException(
"Missing getApplications request.",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the spacing follows the right convention.
The checkstyle should flag all this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There were no flags in checkstyle. I have applied Hadoop formatter and ensure that checkstyle plugin is not giving any errors.

null);
}
Map<SubClusterId, SubClusterInfo> subclusters =
federationFacade.getSubClusters(true);
ClientMethod remoteMethod = new ClientMethod("getApplications",
new Class[] {GetApplicationsRequest.class}, new Object[] {request});
ArrayList<SubClusterId> clusterIds = new ArrayList<>(subclusters.keySet());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure we are doing this conversion to list a bunch of times, having one that takes Collection would remove some noise.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overloaded the invokeConcurrent method to take Collection. Will update the other usage in a follow up PR once this is merged.

Map<SubClusterId, GetApplicationsResponse> applications =
invokeConcurrent(clusterIds, remoteMethod,
GetApplicationsResponse.class);

//Merge the Application Reports

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//Merge the Application Reports
// Merge the Application Reports

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

return RouterYarnClientUtils.mergeApplications(applications.values(),
returnPartialReport);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,28 @@
*/
package org.apache.hadoop.yarn.server.router.clientrm;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.apache.hadoop.yarn.util.resource.Resources;

/**
* Util class for Router Yarn client API calls.
*/
public final class RouterYarnClientUtils {

private final static String PARTIAL_REPORT = "Partial Report ";

private RouterYarnClientUtils() {

}
Expand All @@ -52,4 +65,110 @@ public static GetClusterMetricsResponse merge(
}
return GetClusterMetricsResponse.newInstance(tmp);
}

/**
* Merges a list of ApplicationReports grouping by ApplicationId.
* Our current policy is to merge the application reports from the reachable
* SubClusters.
* @param responses a list of ApplicationResponse to merge
* @param returnPartialResult if the merge ApplicationReports should contain
* partial result or not
* @return the merged ApplicationsResponse
*/
public static GetApplicationsResponse mergeApplications(
Comment thread
goiri marked this conversation as resolved.
Collection<GetApplicationsResponse> responses,
boolean returnPartialResult){
Map<ApplicationId, ApplicationReport> federationAM = new HashMap<>();
Map<ApplicationId, ApplicationReport> federationUAMSum = new HashMap<>();

for(GetApplicationsResponse appResponse : responses){

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for(GetApplicationsResponse appResponse : responses){
for (GetApplicationsResponse appResponse : responses){

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

for(ApplicationReport appReport : appResponse.getApplicationList()){
ApplicationId appId = appReport.getApplicationId();
// Check if this ApplicationReport is an AM
if (appReport.getHost() != null) {
// Insert in the list of AM
federationAM.put(appId, appReport);
// Check if there are any UAM found before
if (federationUAMSum.containsKey(appId)) {
// Merge the current AM with the found UAM
mergeAMWithUAM(appReport, federationUAMSum.get(appId));
// Remove the sum of the UAMs
federationUAMSum.remove(appId);
}
// This ApplicationReport is an UAM
} else {
if (federationAM.containsKey(appId)) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (federationAM.containsKey(appId)) {
} else if (federationAM.containsKey(appId)) {

And then the rest of the elses would go with this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// Merge the current UAM with its own AM
mergeAMWithUAM(federationAM.get(appId), appReport);
} else if (federationUAMSum.containsKey(appId)) {
// Merge the current UAM with its own UAM and update the list of UAM
federationUAMSum.put(appId,
mergeUAMWithUAM(federationUAMSum.get(appId), appReport));
} else {
// Insert in the list of UAM
federationUAMSum.put(appId, appReport);
}
}
}
}
// Check the remaining UAMs are depending or not from federation
for (ApplicationReport appReport : federationUAMSum.values()) {
if (returnPartialResult || appReport.getName() != null

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if is a little convoluted.
Probably making it a static method would help readability.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to a static method with Java doc.

&& !(appReport.getName()
.startsWith(UnmanagedApplicationManager.APP_NAME)
|| appReport.getName()
.startsWith(PARTIAL_REPORT))) {
federationAM.put(appReport.getApplicationId(), appReport);
}
}

List<ApplicationReport> appList = new ArrayList<>(federationAM.values());
return GetApplicationsResponse.newInstance(appList);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably you could extend GetApplicationsResponse.newInstance() to take a Collection and do the new ArrayList there.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

private static ApplicationReport mergeUAMWithUAM(ApplicationReport uam1,
Comment thread
bibinchundatt marked this conversation as resolved.
ApplicationReport uam2){
uam1.setName(PARTIAL_REPORT + uam1.getApplicationId());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we test for this kind of name?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test for checking UAM merge.

mergeAMWithUAM(uam1, uam1);
mergeAMWithUAM(uam1, uam2);
return uam1;
}

private static void mergeAMWithUAM(ApplicationReport am,
ApplicationReport uam){
ApplicationResourceUsageReport resourceUsageReport =
am.getApplicationResourceUsageReport();
resourceUsageReport.setNumUsedContainers(
resourceUsageReport.getNumUsedContainers() +
uam.getApplicationResourceUsageReport()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The readability is not very good here.
Maybe extracting the report for uam would help.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracted uamresourceReport to make it more readable.

.getNumUsedContainers());
resourceUsageReport.setNumReservedContainers(
resourceUsageReport.getNumReservedContainers() +
uam.getApplicationResourceUsageReport()
.getNumReservedContainers());
resourceUsageReport.setUsedResources(Resources.add(
resourceUsageReport.getUsedResources(),
uam.getApplicationResourceUsageReport().getUsedResources()));
resourceUsageReport.setReservedResources(Resources.add(
resourceUsageReport.getReservedResources(),
uam.getApplicationResourceUsageReport().getReservedResources()));
resourceUsageReport.setNeededResources(Resources.add(
resourceUsageReport.getNeededResources(),
uam.getApplicationResourceUsageReport().getNeededResources()));
resourceUsageReport.setMemorySeconds(
resourceUsageReport.getMemorySeconds() +
uam.getApplicationResourceUsageReport().getMemorySeconds());
resourceUsageReport.setVcoreSeconds(
resourceUsageReport.getVcoreSeconds() +
uam.getApplicationResourceUsageReport().getVcoreSeconds());
resourceUsageReport.setQueueUsagePercentage(
resourceUsageReport.getQueueUsagePercentage() +
uam.getApplicationResourceUsageReport()
.getQueueUsagePercentage());
resourceUsageReport.setClusterUsagePercentage(
resourceUsageReport.getClusterUsagePercentage() +
uam.getApplicationResourceUsageReport()
.getClusterUsagePercentage());
am.getApplicationTags().addAll(uam.getApplicationTags());
Comment thread
akshatb1 marked this conversation as resolved.
Outdated
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,20 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;

import java.util.Map;
import java.util.Set;

import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
Expand All @@ -40,11 +44,12 @@
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
Expand Down Expand Up @@ -531,4 +536,107 @@ public void testGetClusterMetricsRequest() throws YarnException, IOException {
GetClusterMetricsResponse.class);
Assert.assertEquals(true, clusterMetrics.isEmpty());
}

/**
* This test validates the correctness of
* GetApplicationsResponse in case the
* application exists in the cluster.
*/
@Test
public void testGetApplicationsResponse()
throws YarnException, IOException, InterruptedException {
LOG.info("Test FederationClientInterceptor: " +

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this fit in 80 chars?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, no errors in checkstyle.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean that putting the whole thing would fit in 80:

LOG.info("Test FederationClientInterceptor: Get Applications Response");

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

"Get Applications Response");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);

SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
SubmitApplicationResponse response = interceptor.submitApplication(request);

Assert.assertNotNull(response);
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));

Set<String> appTypes = Collections.singleton("MockApp");
GetApplicationsRequest requestGet =
GetApplicationsRequest.newInstance(appTypes);

GetApplicationsResponse responseGet =
interceptor.getApplications(requestGet);

Assert.assertNotNull(responseGet);
}

/**
* This test validates
* the correctness of GetApplicationsResponse in case of
* empty request.
*/
@Test
public void testGetApplicationsNullRequest() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Applications request");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LOG.info("Test FederationClientInterceptor : Get Applications request");
LOG.info("Test FederationClientInterceptor: Get Applications request");

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

LambdaTestUtils.intercept(YarnException.class,
"Missing getApplications request.",
() -> interceptor.getApplications(null));
Comment thread
goiri marked this conversation as resolved.
}

/**
* This test validates
* the correctness of GetApplicationsResponse in case applications
* with given type does not exist.
*/
@Test
public void testGetApplicationsApplicationTypeNotExists() throws Exception{
LOG.info("Test FederationClientInterceptor :" +

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LOG.info("Test FederationClientInterceptor :" +
LOG.info("Test FederationClientInterceptor: Application with type does not exist");

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed space before ":". Could not move it to single line since it exceeds 80 characters.

" Application with type does not exist");

ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);

SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
SubmitApplicationResponse response = interceptor.submitApplication(request);

Assert.assertNotNull(response);
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));

Set<String> appTypes = Collections.singleton("SPARK");

GetApplicationsResponse responseGet =
interceptor.getApplications(
GetApplicationsRequest.
newInstance(appTypes));

Assert.assertNotNull(responseGet);
Assert.assertEquals(0, responseGet.getApplicationList().size());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Assert.assertEquals(0, responseGet.getApplicationList().size());
Assert.assertTrue(responseGet.getApplicationList().isEmpty());

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

/**
* This test validates
* the correctness of GetApplicationsResponse in case applications
* with given YarnApplicationState does not exist.
*/
@Test
public void testGetApplicationsApplicationStateNotExists() throws Exception{
LOG.info("Test FederationClientInterceptor :" +
" Application with state does not exist");

ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indentation should be 4 spaces after break line.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed here and at the other places in the same method.


SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
SubmitApplicationResponse response = interceptor.submitApplication(request);

Assert.assertNotNull(response);
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));

EnumSet<YarnApplicationState> applicationStates = EnumSet.noneOf(
YarnApplicationState.class);
applicationStates.add(YarnApplicationState.KILLED);

GetApplicationsResponse responseGet =
interceptor.getApplications(
GetApplicationsRequest.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

newInstance(applicationStates));
Assert.assertNotNull(responseGet);
Assert.assertEquals(0, responseGet.getApplicationList().size());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isEmpty()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}
Loading