Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4294,6 +4294,11 @@ public static boolean isAclEnabled(Configuration conf) {
ROUTER_PREFIX + "webapp.cross-origin.enabled";
public static final boolean DEFAULT_ROUTER_WEBAPP_ENABLE_CORS_FILTER = false;

/** Router Interceptor Allow Partial Result Enable. **/
public static final String ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED =
ROUTER_PREFIX + "interceptor.allow-partial-result.enable";
public static final boolean DEFAULT_ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the other comments, I think this needs to be false


////////////////////////////////
// CSI Volume configs
////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5085,4 +5085,18 @@
</description>
</property>

<property>
<name>yarn.router.interceptor.allow-partial-result.enable</name>
<value>true</value>
<description>
This configuration represents whether to allow the interceptor to
return partial SubCluster results.
If true, we will ignore the exception of some subClusters during the calling process,
and return result.
If false, if an exception occurs in a subCluster during the calling process,
an exception will be thrown directly.
Default is true.
</description>
</property>

</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
private boolean returnPartialReport;
private boolean appInfosCacheEnabled;
private int appInfosCacheCount;
private boolean allowPartialResult;

private Map<SubClusterId, DefaultRequestInterceptorREST> interceptors;
private LRUCacheHashMap<RouterAppInfoCacheKey, AppsInfo> appInfosCaches;
Expand Down Expand Up @@ -194,6 +195,10 @@ public void init(String user) {
YarnConfiguration.DEFAULT_ROUTER_APPSINFO_CACHED_COUNT);
appInfosCaches = new LRUCacheHashMap<>(appInfosCacheCount, true);
}

allowPartialResult = conf.getBoolean(
YarnConfiguration.ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED,
YarnConfiguration.DEFAULT_ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED);
}

private SubClusterId getRandomActiveSubCluster(
Expand Down Expand Up @@ -2104,9 +2109,10 @@ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> c

Map<SubClusterInfo, R> results = new HashMap<>();

// Send the requests in parallel
CompletionService<Pair<R, Exception>> compSvc =
new ExecutorCompletionService<>(this.threadpool);
// If there is a sub-cluster access error,
// we should choose whether to throw exception information according to user configuration.
// Send the requests in parallel.
CompletionService<Pair<R, Exception>> compSvc = new ExecutorCompletionService<>(threadpool);

// This part of the code should be able to expose the accessed Exception information.
// We use Pair to store related information. The left value of the Pair is the response,
Expand Down Expand Up @@ -2139,9 +2145,10 @@ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> c
if (response != null) {
results.put(clusterId, response);
}

Exception exception = pair.getRight();
if (exception != null) {
Exception exception = pair.getValue();
// If allowPartialResult=false, it means that if an exception occurs in a subCluster,
// an exception will be thrown directly.
if (!allowPartialResult && exception != null) {
throw exception;
}
} catch (Throwable e) {
Expand Down Expand Up @@ -2212,4 +2219,9 @@ private SubClusterInfo getHomeSubClusterInfoByReservationId(String resId)
public LRUCacheHashMap<RouterAppInfoCacheKey, AppsInfo> getAppInfosCaches() {
return appInfosCaches;
}

@VisibleForTesting
public void setAllowPartialResult(boolean allowPartialResult) {
this.allowPartialResult = allowPartialResult;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1183,6 +1183,8 @@ public void testWebAddressWithScheme() {
@Test
public void testCheckUserAccessToQueue() throws Exception {

interceptor.setAllowPartialResult(false);

// Case 1: Only queue admin user can access other user's information
HttpServletRequest mockHsr = mockHttpServletRequestByUserName("non-admin");
String errorMsg1 = "User=non-admin doesn't haven access to queue=queue " +
Expand Down Expand Up @@ -1212,6 +1214,8 @@ public void testCheckUserAccessToQueue() throws Exception {
// Case 5: get OK only for SUBMIT_APP acl for "yarn" user
checkUserAccessToQueueFailed("queue", "yarn", QueueACL.ADMINISTER_QUEUE, "admin");
checkUserAccessToQueueSuccess("queue", "yarn", QueueACL.SUBMIT_APPLICATIONS, "admin");

interceptor.setAllowPartialResult(true);
}

private void checkUserAccessToQueueSuccess(String queue, String userName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@

import javax.ws.rs.core.Response;

import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
Expand All @@ -42,6 +41,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.router.clientrm.PassThroughClientRequestInterceptor;
import org.apache.hadoop.yarn.server.router.clientrm.TestableFederationClientInterceptor;
import org.apache.hadoop.yarn.webapp.NotFoundException;
Expand Down Expand Up @@ -81,10 +81,15 @@ public class TestFederationInterceptorRESTRetry
@Override
public void setUp() {
super.setUpConfig();

Configuration conf = this.getConf();
// Allow partial results to be returned.
conf.setBoolean(YarnConfiguration.ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED, true);

interceptor = new TestableFederationInterceptorREST();

stateStore = new MemoryFederationStateStore();
stateStore.init(this.getConf());
stateStore.init(conf);
FederationStateStoreFacade.getInstance().reinitialize(stateStore,
getConf());
stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
Expand Down Expand Up @@ -380,10 +385,20 @@ public void testGetNodeOneBadOneGood()
@Test
public void testGetNodesOneBadSC() throws Exception {

// allowPartialResult = true,
// We tolerate exceptions and return normal results
interceptor.setAllowPartialResult(true);
setupCluster(Arrays.asList(bad2));

LambdaTestUtils.intercept(YarnRuntimeException.class, "RM is stopped",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep the old test and add a new one with the partial enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for reviewing the code, I will add new unit tests.

() -> interceptor.getNodes(null));
NodesInfo response = interceptor.getNodes(null);
Assert.assertNotNull(response);
Assert.assertEquals(0, response.getNodes().size());

// allowPartialResult = false,
// We do not tolerate exceptions and will throw exceptions directly
interceptor.setAllowPartialResult(false);

setupCluster(Arrays.asList(bad2));
}

/**
Expand All @@ -393,10 +408,20 @@ public void testGetNodesOneBadSC() throws Exception {
@Test
public void testGetNodesTwoBadSCs() throws Exception {

// allowPartialResult = true,
// We tolerate exceptions and return normal results
interceptor.setAllowPartialResult(true);
setupCluster(Arrays.asList(bad1, bad2));

LambdaTestUtils.intercept(YarnRuntimeException.class, "RM is stopped",
() -> interceptor.getNodes(null));
NodesInfo response = interceptor.getNodes(null);
Assert.assertNotNull(response);
Assert.assertEquals(0, response.getNodes().size());

// allowPartialResult = false,
// We do not tolerate exceptions and will throw exceptions directly
interceptor.setAllowPartialResult(false);

setupCluster(Arrays.asList(bad1, bad2));
}

/**
Expand All @@ -405,10 +430,24 @@ public void testGetNodesTwoBadSCs() throws Exception {
*/
@Test
public void testGetNodesOneBadOneGood() throws Exception {

// allowPartialResult = true,
// We tolerate exceptions and return normal results
interceptor.setAllowPartialResult(true);
setupCluster(Arrays.asList(good, bad2));

LambdaTestUtils.intercept(YarnRuntimeException.class, "RM is stopped",
() -> interceptor.getNodes(null));
NodesInfo response = interceptor.getNodes(null);
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getNodes().size());
// Check if the only node came from Good SubCluster
Assert.assertEquals(good.getId(),
Long.toString(response.getNodes().get(0).getLastHealthUpdate()));

// allowPartialResult = false,
// We do not tolerate exceptions and will throw exceptions directly
interceptor.setAllowPartialResult(false);

setupCluster(Arrays.asList(good, bad2));
}

/**
Expand Down