Skip to content

Commit

Permalink
YARN-11620. [Federation] Improve FederationClientInterceptor To Retur…
Browse files Browse the repository at this point in the history
…n Partial Results of subClusters. (apache#6289) Contributed by Shilun Fan.

Reviewed-by: Inigo Goiri <[email protected]>
Signed-off-by: Shilun Fan <[email protected]>
  • Loading branch information
slfan1989 authored Nov 28, 2023
1 parent d72cdf7 commit 478c4ce
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ public class FederationClientInterceptor
private final Clock clock = new MonotonicClock();
private boolean returnPartialReport;
private long submitIntervalTime;
private boolean allowPartialResult;

@Override
public void init(String userName) {
Expand Down Expand Up @@ -263,6 +264,10 @@ public void init(String userName) {
returnPartialReport = conf.getBoolean(
YarnConfiguration.ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED,
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED);

allowPartialResult = conf.getBoolean(
YarnConfiguration.ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED,
YarnConfiguration.DEFAULT_ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED);
}

@Override
Expand Down Expand Up @@ -895,8 +900,10 @@ <R> Collection<R> invokeConcurrent(ClientMethod request, Class<R> clazz)
// All sub-clusters return results to be considered successful,
// otherwise an exception will be thrown.
if (exceptions != null && !exceptions.isEmpty()) {
throw new YarnException("invokeConcurrent Failed = " +
StringUtils.join(exceptions.values(), ","));
if (!allowPartialResult || exceptions.keySet().size() == subClusterIds.size()) {
throw new YarnException("invokeConcurrent Failed = " +
StringUtils.join(exceptions.values(), ","));
}
}

// return result
Expand Down Expand Up @@ -2350,4 +2357,9 @@ protected int getNumMaxThreads(Configuration conf) {
public void setNumSubmitRetries(int numSubmitRetries) {
this.numSubmitRetries = numSubmitRetries;
}

@VisibleForTesting
public void setAllowPartialResult(boolean allowPartialResult) {
this.allowPartialResult = allowPartialResult;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
Expand Down Expand Up @@ -410,4 +411,21 @@ public void testGetClusterMetricsOneBadNodeWithRealError() throws Exception {
"subClusterId 1 exec getClusterMetrics error RM is stopped.",
() -> interceptor.getClusterMetrics(request));
}

@Test
public void testGetClusterMetricsOneBadOneGoodNodeWithRealError() throws Exception {
LOG.info("Test getClusterMetrics with one bad and one good SubCluster.");
setupCluster(Arrays.asList(bad1, good));
GetClusterMetricsRequest request = GetClusterMetricsRequest.newInstance();

GetClusterMetricsResponse clusterMetrics = interceptor.getClusterMetrics(request);
Assert.assertNotNull(clusterMetrics);

// If partial results are not allowed to be returned, an exception will be thrown.
interceptor.setAllowPartialResult(false);
LambdaTestUtils.intercept(YarnException.class,
"subClusterId 1 exec getClusterMetrics error RM is stopped.",
() -> interceptor.getClusterMetrics(request));
interceptor.setAllowPartialResult(true);
}
}

0 comments on commit 478c4ce

Please sign in to comment.