Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -810,9 +810,9 @@ public GetClusterMetricsResponse getClusterMetrics(
clusterMetrics = invokeConcurrent(remoteMethod, GetClusterMetricsResponse.class);
} catch (Exception ex) {
routerMetrics.incrGetClusterMetricsFailedRetrieved();
String msg = "Unable to get cluster metrics due to exception.";
String msg = "Unable to get cluster metrics due to exception. ";
RouterAuditLogger.logFailure(user.getShortUserName(), GET_CLUSTERMETRICS, UNKNOWN,
TARGET_CLIENT_RM_SERVICE, msg);
TARGET_CLIENT_RM_SERVICE, msg + ex.getMessage());
RouterServerUtil.logAndThrowException(msg, ex);
}
long stopTime = clock.getTime();
Expand All @@ -831,18 +831,28 @@ <R> Collection<R> invokeConcurrent(ClientMethod request, Class<R> clazz)

List<Callable<Pair<SubClusterId, Object>>> callables = new ArrayList<>();
List<Future<Pair<SubClusterId, Object>>> futures = new ArrayList<>();
Map<SubClusterId, Exception> exceptions = new TreeMap<>();
List<String> exceptions = new ArrayList<>();

// Generate parallel Callable tasks
for (SubClusterId subClusterId : subClusterIds) {
callables.add(() -> {
ApplicationClientProtocol protocol = getClientRMProxyForSubCluster(subClusterId);
String methodName = request.getMethodName();
Class<?>[] types = request.getTypes();
Object[] params = request.getParams();
Method method = ApplicationClientProtocol.class.getMethod(methodName, types);
Object result = method.invoke(protocol, params);
return Pair.of(subClusterId, result);
try {
ApplicationClientProtocol protocol = getClientRMProxyForSubCluster(subClusterId);
String methodName = request.getMethodName();
Class<?>[] types = request.getTypes();
Object[] params = request.getParams();
Method method = ApplicationClientProtocol.class.getMethod(methodName, types);
Object result = method.invoke(protocol, params);
return Pair.of(subClusterId, result);
} catch (Exception e) {
Throwable cause = e.getCause();
if (cause != null && cause instanceof InvocationTargetException) {
cause = cause.getCause();
}
String errMsg = (cause.getMessage() != null) ? cause.getMessage() : "UNKNOWN";
throw new YarnException(String.format("subClusterId %s exec %s error %s.",
subClusterId, request.getMethodName(), errMsg), e);
}
});
}

Expand All @@ -859,9 +869,8 @@ <R> Collection<R> invokeConcurrent(ClientMethod request, Class<R> clazz)
results.put(subClusterId, clazz.cast(result));
} catch (InterruptedException | ExecutionException e) {
Throwable cause = e.getCause();
LOG.error("Cannot execute {} on {} : {}", request.getMethodName(),
subClusterId.getId(), cause.getMessage());
exceptions.put(subClusterId, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think is good to keep the map even though we just output the values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for your help in reviewing the code! I will improve this part of the code.

LOG.error(cause.getMessage(), e);
exceptions.add(cause.getMessage());
}
});
} catch (InterruptedException e) {
Expand All @@ -871,9 +880,7 @@ <R> Collection<R> invokeConcurrent(ClientMethod request, Class<R> clazz)
// All sub-clusters return results to be considered successful,
// otherwise an exception will be thrown.
if (exceptions != null && !exceptions.isEmpty()) {
Set<SubClusterId> subClusterIdSets = exceptions.keySet();
throw new YarnException("invokeConcurrent Failed, An exception occurred in subClusterIds = " +
StringUtils.join(subClusterIdSets, ","));
throw new YarnException("invokeConcurrent Failed = " + StringUtils.join(exceptions, ","));
}

// return result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
Expand Down Expand Up @@ -353,4 +354,48 @@ private void checkSubmitSubCluster(ApplicationId appId, SubClusterId expectSubCl
SubClusterId respSubClusterId = responseHomeSubCluster.getHomeSubCluster();
Assert.assertEquals(expectSubCluster, respSubClusterId);
}

@Test
public void testSubmitApplicationTwoBadNodeWithRealError() throws Exception {
LOG.info("Test submitApplication with two bad SubClusters.");
setupCluster(Arrays.asList(bad1, bad2));
interceptor.setNumSubmitRetries(1);

final ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 5);

final SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);

LambdaTestUtils.intercept(YarnException.class, "RM is stopped",
() -> interceptor.submitApplication(request));
}

@Test
public void testSubmitApplicationOneBadNodeWithRealError() throws Exception {
LOG.info("Test submitApplication with two bad SubClusters.");
setupCluster(Arrays.asList(bad1));
interceptor.setNumSubmitRetries(0);

final ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 6);

final SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);

LambdaTestUtils.intercept(YarnException.class, "RM is stopped",
() -> interceptor.submitApplication(request));
}

@Test
public void testGetClusterMetricsTwoBadNode() throws Exception {
setupCluster(Arrays.asList(bad1, bad2));
GetClusterMetricsRequest request = GetClusterMetricsRequest.newInstance();

LambdaTestUtils.intercept(YarnException.class,
"subClusterId 1 exec getClusterMetrics error RM is stopped.",
() -> interceptor.getClusterMetrics(request));

LambdaTestUtils.intercept(YarnException.class,
"subClusterId 2 exec getClusterMetrics error RM is stopped.",
() -> interceptor.getClusterMetrics(request));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
Expand Down Expand Up @@ -126,6 +128,11 @@ public SubmitApplicationResponse submitApplication(
throw new ConnectException("RM is stopped");
}

@Override
public GetClusterMetricsResponse getClusterMetrics(GetClusterMetricsRequest request)
throws YarnException {
throw new YarnException("RM is stopped");
}
}

/**
Expand Down