diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java index 9b795b0507bd2..fc8cc5bd464b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java @@ -20,7 +20,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.List; +import java.util.Collection; import java.util.Random; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -188,8 +188,8 @@ public static FederationAMRMProxyPolicy loadAMRMPolicy(String queue, * @throws FederationPolicyException if there are no usable subclusters. */ public static void validateSubClusterAvailability( - List activeSubClusters, - List blackListSubClusters) + Collection activeSubClusters, + Collection blackListSubClusters) throws FederationPolicyException { if (activeSubClusters != null && !activeSubClusters.isEmpty()) { if (blackListSubClusters == null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java index 0cbda314f287e..c4fc7322a1eca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; @@ -136,7 +137,7 @@ public SubClusterId getHomeSubcluster( if (appSubmissionContext == null) { throw new FederationPolicyException( - "The ApplicationSubmissionContext " + "cannot be null."); + "The ApplicationSubmissionContext cannot be null."); } String queue = appSubmissionContext.getQueue(); @@ -148,51 +149,7 @@ public SubClusterId getHomeSubcluster( queue = YarnConfiguration.DEFAULT_QUEUE_NAME; } - // the facade might cache this request, based on its parameterization - SubClusterPolicyConfiguration configuration = null; - - try { - configuration = federationFacade.getPolicyConfiguration(queue); - } catch (YarnException e) { - String errMsg = "There is no policy configured for the queue: " + queue - + ", falling back to defaults."; - LOG.warn(errMsg, e); - } - - // If there is no policy configured for this queue, fallback to the baseline - // policy that is configured either in the store or via XML config (and - // cached) - if (configuration == null) { - LOG.warn("There is no policies configured for queue: " + queue + " we" - + " fallback to default policy for: " - + YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY); - - queue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY; - try { - configuration = federationFacade.getPolicyConfiguration(queue); - } catch (YarnException e) { - String errMsg = "Cannot retrieve policy configured for the queue: " - + queue + ", falling back to defaults."; - LOG.warn(errMsg, e); - - } - } - - // the fallback is not configure via store, but via XML, using - // previously loaded configuration. - if (configuration == null) { - configuration = - cachedConfs.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY); - } - - // if the configuration has changed since last loaded, reinit the policy - // based on current configuration - if (!cachedConfs.containsKey(queue) - || !cachedConfs.get(queue).equals(configuration)) { - singlePolicyReinit(policyMap, cachedConfs, queue, configuration); - } - - FederationRouterPolicy policy = policyMap.get(queue); + FederationRouterPolicy policy = getFederationRouterPolicy(cachedConfs, policyMap, queue); if (policy == null) { // this should never happen, as the to maps are updated together throw new FederationPolicyException("No FederationRouterPolicy found " @@ -262,4 +219,92 @@ public synchronized void reset() { } + /** + * This method provides a wrapper of all policy functionalities for routing a + * reservation. Internally it manages configuration changes, and policy + * init/reinit. + * + * @param request the reservation to route. + * + * @return the id of the subcluster that will be the "home" for this + * reservation. + * + * @throws YarnException if there are issues initializing policies, or no + * valid sub-cluster id could be found for this reservation. + */ + public SubClusterId getReservationHomeSubCluster( + ReservationSubmissionRequest request) throws YarnException { + + // the maps are concurrent, but we need to protect from reset() + // reinitialization mid-execution by creating a new reference local to this + // method. + Map cachedConfs = globalConfMap; + Map policyMap = globalPolicyMap; + + if (request == null) { + throw new FederationPolicyException( + "The ReservationSubmissionRequest cannot be null."); + } + + String queue = request.getQueue(); + FederationRouterPolicy policy = getFederationRouterPolicy(cachedConfs, policyMap, queue); + + if (policy == null) { + // this should never happen, as the to maps are updated together + throw new FederationPolicyException("No FederationRouterPolicy found " + + "for queue: " + request.getQueue() + " (while routing " + + "reservation: " + request.getReservationId() + ") " + + "and no default specified."); + } + + return policy.getReservationHomeSubcluster(request); + } + + private FederationRouterPolicy getFederationRouterPolicy( + Map cachedConfiguration, + Map policyMap, String queue) + throws FederationPolicyInitializationException { + + // the facade might cache this request, based on its parameterization + SubClusterPolicyConfiguration configuration = null; + String copyQueue = queue; + + try { + configuration = federationFacade.getPolicyConfiguration(copyQueue); + } catch (YarnException e) { + LOG.warn("There is no policy configured for the queue: {}, falling back to defaults.", + copyQueue, e); + } + + // If there is no policy configured for this queue, fallback to the baseline + // policy that is configured either in the store or via XML config (and cached) + if (configuration == null) { + final String policyKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY; + LOG.warn("There is no policies configured for queue: {} " + + "we fallback to default policy for: {}. ", copyQueue, policyKey); + copyQueue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY; + try { + configuration = federationFacade.getPolicyConfiguration(copyQueue); + } catch (YarnException e) { + LOG.warn("Cannot retrieve policy configured for the queue: {}, falling back to defaults.", + copyQueue, e); + } + } + + // the fallback is not configure via store, but via XML, using + // previously loaded configuration. + if (configuration == null) { + configuration = cachedConfiguration.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY); + } + + // if the configuration has changed since last loaded, reinit the policy + // based on current configuration + SubClusterPolicyConfiguration policyConfiguration = + cachedConfiguration.getOrDefault(copyQueue, null); + if (policyConfiguration == null || !policyConfiguration.equals(configuration)) { + singlePolicyReinit(policyMap, cachedConfiguration, copyQueue, configuration); + } + + return policyMap.get(copyQueue); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java index 730fb417f883d..dddc5384fc49d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java @@ -18,15 +18,22 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.util.List; import java.util.Map; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.AbstractConfigurableFederationPolicy; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; /** * Base abstract class for {@link FederationRouterPolicy} implementations, that @@ -63,4 +70,107 @@ public void validate(ApplicationSubmissionContext appSubmissionContext) } } + /** + * This method is implemented by the specific policy, and it is used to route + * both reservations, and applications among a given set of + * sub-clusters. + * + * @param queue the queue for this application/reservation + * @param preSelectSubClusters a pre-filter set of sub-clusters + * @return the chosen sub-cluster + * + * @throws YarnException if the policy fails to choose a sub-cluster + */ + protected abstract SubClusterId chooseSubCluster(String queue, + Map preSelectSubClusters) throws YarnException; + + /** + * Filter chosen SubCluster based on reservationId. + * + * @param reservationId the globally unique identifier for a reservation. + * @param activeSubClusters the map of ids to info for all active subclusters. + * @return the chosen sub-cluster + * @throws YarnException if the policy fails to choose a sub-cluster + */ + protected Map prefilterSubClusters( + ReservationId reservationId, Map activeSubClusters) + throws YarnException { + + // if a reservation exists limit scope to the sub-cluster this + // reservation is mapped to + // TODO: Implemented in YARN-11236 + return activeSubClusters; + } + + /** + * Simply picks from alphabetically-sorted active subclusters based on the + * hash of quey name. Jobs of the same queue will all be routed to the same + * sub-cluster, as far as the number of active sub-cluster and their names + * remain the same. + * + * @param appContext the {@link ApplicationSubmissionContext} that + * has to be routed to an appropriate subCluster for execution. + * + * @param blackLists the list of subClusters as identified by + * {@link SubClusterId} to blackList from the selection of the home + * subCluster. + * + * @return a hash-based chosen {@link SubClusterId} that will be the "home" + * for this application. + * + * @throws YarnException if there are no active subclusters. + */ + @Override + public SubClusterId getHomeSubcluster(ApplicationSubmissionContext appContext, + List blackLists) throws YarnException { + + // null checks and default-queue behavior + validate(appContext); + + // apply filtering based on reservation location and active sub-clusters + Map filteredSubClusters = prefilterSubClusters( + appContext.getReservationID(), getActiveSubclusters()); + + FederationPolicyUtils.validateSubClusterAvailability(filteredSubClusters.keySet(), blackLists); + + // remove black SubCluster + if (blackLists != null) { + blackLists.forEach(filteredSubClusters::remove); + } + + // pick the chosen subCluster from the active ones + return chooseSubCluster(appContext.getQueue(), filteredSubClusters); + } + + /** + * This method provides a wrapper of all policy functionalities for routing a + * reservation. Internally it manages configuration changes, and policy + * init/reinit. + * + * @param request the reservation to route. + * + * @return the id of the subcluster that will be the "home" for this + * reservation. + * + * @throws YarnException if there are issues initializing policies, or no + * valid sub-cluster id could be found for this reservation. + */ + @Override + public SubClusterId getReservationHomeSubcluster(ReservationSubmissionRequest request) + throws YarnException { + if (request == null) { + throw new FederationPolicyException("The ReservationSubmissionRequest cannot be null."); + } + + if (request.getQueue() == null) { + request.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME); + } + + // apply filtering based on reservation location and active sub-clusters + Map filteredSubClusters = prefilterSubClusters( + request.getReservationId(), getActiveSubclusters()); + + // pick the chosen subCluster from the active ones + return chooseSubCluster(request.getQueue(), filteredSubClusters); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java index 9325bd8ca2a15..af5810665913c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java @@ -19,6 +19,7 @@ import java.util.List; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy; @@ -49,4 +50,16 @@ public interface FederationRouterPolicy extends ConfigurableFederationPolicy { SubClusterId getHomeSubcluster( ApplicationSubmissionContext appSubmissionContext, List blackListSubClusters) throws YarnException; + + /** + * Determines the sub-cluster where a ReservationSubmissionRequest should be + * sent to. + * + * @param request the original request + * @return a mapping of sub-clusters and the requests + * + * @throws YarnException if the policy fails to choose a sub-cluster + */ + SubClusterId getReservationHomeSubcluster( + ReservationSubmissionRequest request) throws YarnException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java index cc11880665335..5ac2d1cce0720 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java @@ -22,11 +22,9 @@ import java.util.List; import java.util.Map; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; -import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -50,53 +48,12 @@ public void reinitialize( setPolicyContext(federationPolicyContext); } - /** - * Simply picks from alphabetically-sorted active subclusters based on the - * hash of quey name. Jobs of the same queue will all be routed to the same - * sub-cluster, as far as the number of active sub-cluster and their names - * remain the same. - * - * @param appSubmissionContext the {@link ApplicationSubmissionContext} that - * has to be routed to an appropriate subCluster for execution. - * - * @param blackListSubClusters the list of subClusters as identified by - * {@link SubClusterId} to blackList from the selection of the home - * subCluster. - * - * @return a hash-based chosen {@link SubClusterId} that will be the "home" - * for this application. - * - * @throws YarnException if there are no active subclusters. - */ @Override - public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext, - List blackListSubClusters) throws YarnException { - - // throws if no active subclusters available - Map activeSubclusters = - getActiveSubclusters(); - - FederationPolicyUtils.validateSubClusterAvailability( - new ArrayList(activeSubclusters.keySet()), - blackListSubClusters); - - if (blackListSubClusters != null) { - - // Remove from the active SubClusters from StateStore the blacklisted ones - for (SubClusterId scId : blackListSubClusters) { - activeSubclusters.remove(scId); - } - } - - validate(appSubmissionContext); - - int chosenPosition = Math.abs( - appSubmissionContext.getQueue().hashCode() % activeSubclusters.size()); - - List list = new ArrayList<>(activeSubclusters.keySet()); + protected SubClusterId chooseSubCluster(String queue, + Map preSelectSubclusters) throws YarnException { + int chosenPosition = Math.abs(queue.hashCode() % preSelectSubclusters.size()); + List list = new ArrayList<>(preSelectSubclusters.keySet()); Collections.sort(list); return list.get(chosenPosition); } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java index fa5eb4be2cfd5..a86a43a213de0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java @@ -17,14 +17,10 @@ package org.apache.hadoop.yarn.server.federation.policies.router; -import java.util.ArrayList; -import java.util.List; import java.util.Map; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; -import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; @@ -65,28 +61,12 @@ public void reinitialize(FederationPolicyInitializationContext policyContext) } @Override - public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext, - List blacklist) throws YarnException { - - // null checks and default-queue behavior - validate(appSubmissionContext); - - Map activeSubclusters = - getActiveSubclusters(); - - FederationPolicyUtils.validateSubClusterAvailability( - new ArrayList(activeSubclusters.keySet()), blacklist); - - Map weights = - getPolicyInfo().getRouterPolicyWeights(); + protected SubClusterId chooseSubCluster( + String queue, Map preSelectSubclusters) throws YarnException { + Map weights = getPolicyInfo().getRouterPolicyWeights(); SubClusterIdInfo chosen = null; long currBestMem = -1; - for (Map.Entry entry : activeSubclusters - .entrySet()) { - if (blacklist != null && blacklist.contains(entry.getKey())) { - continue; - } + for (Map.Entry entry : preSelectSubclusters.entrySet()) { SubClusterIdInfo id = new SubClusterIdInfo(entry.getKey()); if (weights.containsKey(id) && weights.get(id) > 0) { long availableMemory = getAvailableMemory(entry.getValue()); @@ -110,7 +90,7 @@ private long getAvailableMemory(SubClusterInfo value) throws YarnException { mem = obj.getJSONObject("clusterMetrics").getLong("availableMB"); return mem; } catch (JSONException j) { - throw new YarnException("FederationSubCluserInfo cannot be parsed", j); + throw new YarnException("FederationSubClusterInfo cannot be parsed", j); } } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java index 469240af518d9..3abcf6fa378e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.Collections; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -78,7 +79,7 @@ public void reinitialize(FederationPolicyInitializationContext policyContext) resolver = policyContext.getFederationSubclusterResolver(); Map weights = getPolicyInfo().getRouterPolicyWeights(); - enabledSCs = new ArrayList(); + enabledSCs = new ArrayList<>(); for (Map.Entry entry : weights.entrySet()) { if (entry != null && entry.getValue() > 0) { enabledSCs.add(entry.getKey().toId()); @@ -100,8 +101,7 @@ public SubClusterId getHomeSubcluster( // Fast path for FailForward to WeightedRandomRouterPolicy if (rrList == null || rrList.isEmpty() || (rrList.size() == 1 && ResourceRequest.isAnyLocation(rrList.get(0).getResourceName()))) { - return super - .getHomeSubcluster(appSubmissionContext, blackListSubClusters); + return super.getHomeSubcluster(appSubmissionContext, blackListSubClusters); } if (rrList.size() != 3) { @@ -109,12 +109,11 @@ public SubClusterId getHomeSubcluster( "Invalid number of resource requests: " + rrList.size()); } - Map activeSubClusters = - getActiveSubclusters(); - List validSubClusters = - new ArrayList<>(activeSubClusters.keySet()); - FederationPolicyUtils - .validateSubClusterAvailability(validSubClusters, blackListSubClusters); + Map activeSubClusters = getActiveSubclusters(); + Set validSubClusters = activeSubClusters.keySet(); + FederationPolicyUtils.validateSubClusterAvailability(activeSubClusters.keySet(), + blackListSubClusters); + if (blackListSubClusters != null) { // Remove from the active SubClusters from StateStore the blacklisted ones validSubClusters.removeAll(blackListSubClusters); @@ -128,20 +127,21 @@ public SubClusterId getHomeSubcluster( ResourceRequest nodeRequest = null; ResourceRequest rackRequest = null; ResourceRequest anyRequest = null; + for (ResourceRequest rr : rrList) { // Handle "node" requests try { targetId = resolver.getSubClusterForNode(rr.getResourceName()); nodeRequest = rr; } catch (YarnException e) { - LOG.error("Cannot resolve node : {}", e.getLocalizedMessage()); + LOG.error("Cannot resolve node : {}.", e.getMessage()); } // Handle "rack" requests try { resolver.getSubClustersForRack(rr.getResourceName()); rackRequest = rr; } catch (YarnException e) { - LOG.error("Cannot resolve rack : {}", e.getLocalizedMessage()); + LOG.error("Cannot resolve rack : {}.", e.getMessage()); } // Handle "ANY" requests if (ResourceRequest.isAnyLocation(rr.getResourceName())) { @@ -149,32 +149,33 @@ public SubClusterId getHomeSubcluster( continue; } } + if (nodeRequest == null) { - throw new YarnException("Missing node request"); + throw new YarnException("Missing node request."); } if (rackRequest == null) { - throw new YarnException("Missing rack request"); + throw new YarnException("Missing rack request."); } if (anyRequest == null) { - throw new YarnException("Missing any request"); + throw new YarnException("Missing any request."); } - LOG.info( - "Node request: " + nodeRequest.getResourceName() + ", Rack request: " - + rackRequest.getResourceName() + ", Any request: " + anyRequest - .getResourceName()); + + LOG.info("Node request: {} , Rack request: {} , Any request: {}.", + nodeRequest.getResourceName(), rackRequest.getResourceName(), + anyRequest.getResourceName()); + // Handle "node" requests if (validSubClusters.contains(targetId) && enabledSCs .contains(targetId)) { - LOG.info("Node {} is in SubCluster: {}", nodeRequest.getResourceName(), - targetId); + LOG.info("Node {} is in SubCluster: {}.", nodeRequest.getResourceName(), targetId); return targetId; } else { throw new YarnException("The node " + nodeRequest.getResourceName() + " is in a blacklist SubCluster or not active. "); } } catch (YarnException e) { - LOG.error("Validating resource requests failed, Falling back to " - + "WeightedRandomRouterPolicy placement: " + e.getMessage()); + LOG.error("Validating resource requests failed, " + + "Falling back to WeightedRandomRouterPolicy placement : {}.", e.getMessage()); // FailForward to WeightedRandomRouterPolicy // Overwrite request to use a default ANY ResourceRequest amReq = Records.newRecord(ResourceRequest.class); @@ -183,14 +184,10 @@ public SubClusterId getHomeSubcluster( amReq.setCapability(appSubmissionContext.getResource()); amReq.setNumContainers(1); amReq.setRelaxLocality(true); - amReq.setNodeLabelExpression( - appSubmissionContext.getNodeLabelExpression()); - amReq.setExecutionTypeRequest( - ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)); - appSubmissionContext - .setAMContainerResourceRequests(Collections.singletonList(amReq)); - return super - .getHomeSubcluster(appSubmissionContext, blackListSubClusters); + amReq.setNodeLabelExpression(appSubmissionContext.getNodeLabelExpression()); + amReq.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)); + appSubmissionContext.setAMContainerResourceRequests(Collections.singletonList(amReq)); + return super.getHomeSubcluster(appSubmissionContext, blackListSubClusters); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java index b81ca07b42ad8..7d50d3814a0dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java @@ -17,13 +17,9 @@ package org.apache.hadoop.yarn.server.federation.policies.router; -import java.util.ArrayList; -import java.util.List; import java.util.Map; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; @@ -37,30 +33,15 @@ public class PriorityRouterPolicy extends AbstractRouterPolicy { @Override - public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext, - List blacklist) throws YarnException { - - // null checks and default-queue behavior - validate(appSubmissionContext); - - Map activeSubclusters = - getActiveSubclusters(); - - FederationPolicyUtils.validateSubClusterAvailability( - new ArrayList(activeSubclusters.keySet()), blacklist); - + protected SubClusterId chooseSubCluster( + String queue, Map preSelectSubclusters) throws YarnException { // This finds the sub-cluster with the highest weight among the // currently active ones. - Map weights = - getPolicyInfo().getRouterPolicyWeights(); + Map weights = getPolicyInfo().getRouterPolicyWeights(); SubClusterId chosen = null; Float currentBest = Float.MIN_VALUE; - for (SubClusterId id : activeSubclusters.keySet()) { + for (SubClusterId id : preSelectSubclusters.keySet()) { SubClusterIdInfo idInfo = new SubClusterIdInfo(id); - if (blacklist != null && blacklist.contains(id)) { - continue; - } if (weights.containsKey(idInfo) && weights.get(idInfo) > currentBest) { currentBest = weights.get(idInfo); chosen = id; @@ -68,10 +49,8 @@ public SubClusterId getHomeSubcluster( } if (chosen == null) { throw new FederationPolicyException( - "No Active Subcluster with weight vector greater than zero"); + "No Active Subcluster with weight vector greater than zero."); } - return chosen; } - } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java index b4c019270249c..32e31ebfec712 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java @@ -17,15 +17,15 @@ package org.apache.hadoop.yarn.server.federation.policies.router; -import java.util.List; +import java.util.Map; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; /** * This {@link FederationRouterPolicy} simply rejects all incoming requests. @@ -43,34 +43,12 @@ public void reinitialize( setPolicyContext(federationPolicyContext); } - /** - * The policy always reject requests. - * - * @param appSubmissionContext the {@link ApplicationSubmissionContext} that - * has to be routed to an appropriate subCluster for execution. - * - * @param blackListSubClusters the list of subClusters as identified by - * {@link SubClusterId} to blackList from the selection of the home - * subCluster. - * - * @return (never). - * - * @throws YarnException (always) to prevent applications in this queue to be - * run anywhere in the federated cluster. - */ @Override - public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext, - List blackListSubClusters) throws YarnException { - - // run standard validation, as error might differ - validate(appSubmissionContext); - - throw new FederationPolicyException("The policy configured for this queue" - + " (" + appSubmissionContext.getQueue() + ") reject all routing " - + "requests by construction. Application " - + appSubmissionContext.getApplicationId() - + " cannot be routed to any RM."); + protected SubClusterId chooseSubCluster( + String queue, Map preSelectSubclusters) throws YarnException { + throw new FederationPolicyException( + "The policy configured for this queue (" + queue + ") " + + "reject all routing requests by construction. Application in " + + queue + " cannot be routed to any RM."); } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java index 7a8be91fcd0f6..353329613ab97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java @@ -22,11 +22,10 @@ import java.util.Map; import java.util.Random; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; -import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -55,50 +54,16 @@ public void reinitialize(FederationPolicyInitializationContext policyContext) this.getClass().getCanonicalName()); // note: this overrides AbstractRouterPolicy and ignores the weights - setPolicyContext(policyContext); } - /** - * Simply picks a random active subCluster to start the AM (this does NOT - * depend on the weights in the policy). - * - * @param appSubmissionContext the {@link ApplicationSubmissionContext} that - * has to be routed to an appropriate subCluster for execution. - * - * @param blackListSubClusters the list of subClusters as identified by - * {@link SubClusterId} to blackList from the selection of the home - * subCluster. - * - * @return a randomly chosen subcluster. - * - * @throws YarnException if there are no active subclusters. - */ @Override - public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext, - List blackListSubClusters) throws YarnException { - - // null checks and default-queue behavior - validate(appSubmissionContext); - - Map activeSubclusters = - getActiveSubclusters(); - - List list = new ArrayList<>(activeSubclusters.keySet()); - - FederationPolicyUtils.validateSubClusterAvailability(list, - blackListSubClusters); - - if (blackListSubClusters != null) { - - // Remove from the active SubClusters from StateStore the blacklisted ones - for (SubClusterId scId : blackListSubClusters) { - list.remove(scId); - } + protected SubClusterId chooseSubCluster( + String queue, Map preSelectSubclusters) throws YarnException { + if (preSelectSubclusters == null || preSelectSubclusters.isEmpty()) { + throw new FederationPolicyException("No available subcluster to choose from."); } - + List list = new ArrayList<>(preSelectSubclusters.keySet()); return list.get(rand.nextInt(list.size())); } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java index b1434104836c0..f2acf663603f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java @@ -19,10 +19,8 @@ package org.apache.hadoop.yarn.server.federation.policies.router; import java.util.ArrayList; -import java.util.List; import java.util.Map; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; @@ -35,47 +33,30 @@ * sub-clusters. */ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy { - @Override - public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext, - List blacklist) throws YarnException { - - // null checks and default-queue behavior - validate(appSubmissionContext); - - Map activeSubclusters = - getActiveSubclusters(); + protected SubClusterId chooseSubCluster( + String queue, Map preSelectSubclusters) throws YarnException { - FederationPolicyUtils.validateSubClusterAvailability( - new ArrayList(activeSubclusters.keySet()), blacklist); - - // note: we cannot pre-compute the weights, as the set of activeSubcluster + // note: we cannot pre-compute the weights, as the set of activeSubCluster // changes dynamically (and this would unfairly spread the load to // sub-clusters adjacent to an inactive one), hence we need to count/scan // the list and based on weight pick the next sub-cluster. - Map weights = - getPolicyInfo().getRouterPolicyWeights(); + Map weights = getPolicyInfo().getRouterPolicyWeights(); ArrayList weightList = new ArrayList<>(); ArrayList scIdList = new ArrayList<>(); for (Map.Entry entry : weights.entrySet()) { - if (blacklist != null && blacklist.contains(entry.getKey().toId())) { - continue; - } - if (entry.getKey() != null - && activeSubclusters.containsKey(entry.getKey().toId())) { + SubClusterIdInfo key = entry.getKey(); + if (key != null && preSelectSubclusters.containsKey(key.toId())) { weightList.add(entry.getValue()); - scIdList.add(entry.getKey().toId()); + scIdList.add(key.toId()); } } int pickedIndex = FederationPolicyUtils.getWeightedRandom(weightList); if (pickedIndex == -1) { - throw new FederationPolicyException( - "No positive weight found on active subclusters"); + throw new FederationPolicyException("No positive weight found on active subclusters"); } return scIdList.get(pickedIndex); } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClustersInfoResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClustersInfoResponse.java index bcf75aba1aef0..0ffe4ae28a355 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClustersInfoResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClustersInfoResponse.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.federation.store.records; import java.util.List; +import java.util.Collection; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -36,7 +37,7 @@ public abstract class GetSubClustersInfoResponse { @Public @Unstable public static GetSubClustersInfoResponse newInstance( - List subClusters) { + Collection subClusters) { GetSubClustersInfoResponse subClusterInfos = Records.newRecord(GetSubClustersInfoResponse.class); subClusterInfos.setSubClusters(subClusters); @@ -61,6 +62,5 @@ public static GetSubClustersInfoResponse newInstance( */ @Private @Unstable - public abstract void setSubClusters(List subClusters); - + public abstract void setSubClusters(Collection subClusters); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetSubClustersInfoResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetSubClustersInfoResponsePBImpl.java index 5ecc3e249b2fc..271570882f922 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetSubClustersInfoResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetSubClustersInfoResponsePBImpl.java @@ -19,8 +19,10 @@ package org.apache.hadoop.yarn.server.federation.store.records.impl.pb; import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.stream.Collectors; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -93,12 +95,12 @@ public List getSubClusters() { } @Override - public void setSubClusters(List subClusters) { + public void setSubClusters(Collection subClusters) { if (subClusters == null) { builder.clearSubClusterInfos(); return; } - this.subClusterInfos = subClusters; + this.subClusterInfos = subClusters.stream().collect(Collectors.toList()); } private void initSubClustersInfoList() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java index 249efd324b4a0..d9ebd2f1c5e92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.federation.policies; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import java.nio.ByteBuffer; import java.util.HashMap; @@ -28,20 +27,26 @@ import java.util.Map; import java.util.Random; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.junit.Test; /** @@ -58,6 +63,9 @@ public abstract class BaseFederationPoliciesTest { private Random rand = new Random(); private SubClusterId homeSubCluster; + private ReservationSubmissionRequest reservationSubmissionRequest = + mock(ReservationSubmissionRequest.class); + @Test public void testReinitilialize() throws YarnException { FederationPolicyInitializationContext fpc = @@ -177,11 +185,60 @@ public void setHomeSubCluster(SubClusterId homeSubCluster) { public void setMockActiveSubclusters(int numSubclusters) { for (int i = 1; i <= numSubclusters; i++) { SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i); - SubClusterInfo sci = mock(SubClusterInfo.class); - when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING); - when(sci.getSubClusterId()).thenReturn(sc.toId()); + SubClusterInfo sci = SubClusterInfo.newInstance( + sc.toId(), "dns1:80", "dns1:81", "dns1:82", "dns1:83", SubClusterState.SC_RUNNING, + System.currentTimeMillis(), "something"); getActiveSubclusters().put(sc.toId(), sci); } } + public String generateClusterMetricsInfo(int id) { + long mem = 1024 * getRand().nextInt(277 * 100 - 1); + // plant a best cluster + if (id == 5) { + mem = 1024 * 277 * 100; + } + String clusterMetrics = + "{\"clusterMetrics\":{\"appsSubmitted\":65, \"appsCompleted\":64,\"appsPending\":0," + + "\"appsRunning\":0, \"appsFailed\":0, \"appsKilled\":1,\"reservedMB\":0,\"availableMB\":" + + mem + ", \"allocatedMB\":0,\"reservedVirtualCores\":0, \"availableVirtualCores\":2216," + + "\"allocatedVirtualCores\":0, \"containersAllocated\":0,\"containersReserved\":0," + + "\"containersPending\":0,\"totalMB\":28364800, \"totalVirtualCores\":2216," + + "\"totalNodes\":278, \"lostNodes\":1,\"unhealthyNodes\":0,\"decommissionedNodes\":0, " + + "\"rebootedNodes\":0, \"activeNodes\":277}}"; + return clusterMetrics; + } + + public FederationStateStoreFacade getMemoryFacade() throws YarnException { + + // setting up a store and its facade (with caching off) + FederationStateStoreFacade fedFacade = FederationStateStoreFacade.getInstance(); + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0); + FederationStateStore store = new MemoryFederationStateStore(); + store.init(conf); + fedFacade.reinitialize(store, conf); + + for (SubClusterInfo sinfo : getActiveSubclusters().values()) { + store.registerSubCluster(SubClusterRegisterRequest.newInstance(sinfo)); + } + + return fedFacade; + } + + public ReservationSubmissionRequest getReservationSubmissionRequest() { + return reservationSubmissionRequest; + } + + public void setReservationSubmissionRequest( + ReservationSubmissionRequest reservationSubmissionRequest) { + this.reservationSubmissionRequest = reservationSubmissionRequest; + } + + public void setupContext() throws YarnException { + FederationPolicyInitializationContext context = + FederationPoliciesTestUtil.initializePolicyContext2(getPolicy(), + getPolicyInfo(), getActiveSubclusters(), getMemoryFacade()); + this.setFederationPolicyContext(context); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java index d09ba754d55a7..afa46b358cf92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Random; +import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; @@ -115,4 +116,14 @@ public void testAllBlacklistSubcluster() throws YarnException { } } } + + @Test + public void testNullReservationContext() throws Exception { + FederationRouterPolicy policy = ((FederationRouterPolicy) getPolicy()); + + LambdaTestUtils.intercept(FederationPolicyException.class, + "The ReservationSubmissionRequest cannot be null.", + () -> policy.getReservationHomeSubcluster(null)); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java index ee3e09d2b93ed..57f0b59ffe5fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java @@ -27,7 +27,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; -import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -50,8 +49,7 @@ public void setUp() throws Exception { setMockActiveSubclusters(numSubclusters); // initialize policy with context - FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), - getPolicyInfo(), getActiveSubclusters()); + setupContext(); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java index 58f1b9947bd81..3f6c9d9577854 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java @@ -17,12 +17,13 @@ package org.apache.hadoop.yarn.server.federation.policies.router; -import static org.junit.Assert.fail; - import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; @@ -46,12 +47,14 @@ public void setUp() throws Exception { Map routerWeights = new HashMap<>(); Map amrmWeights = new HashMap<>(); + long now = Time.now(); + // simulate 20 active subclusters for (int i = 0; i < 20; i++) { SubClusterIdInfo sc = new SubClusterIdInfo(String.format("sc%02d", i)); - SubClusterInfo federationSubClusterInfo = - SubClusterInfo.newInstance(sc.toId(), null, null, null, null, -1, - SubClusterState.SC_RUNNING, -1, generateClusterMetricsInfo(i)); + SubClusterInfo federationSubClusterInfo = SubClusterInfo.newInstance( + sc.toId(), "dns1:80", "dns1:81", "dns1:82", "dns1:83", + now - 1000, SubClusterState.SC_RUNNING, now - 2000, generateClusterMetricsInfo(i)); getActiveSubclusters().put(sc.toId(), federationSubClusterInfo); float weight = getRand().nextInt(2); if (i == 5) { @@ -67,12 +70,11 @@ public void setUp() throws Exception { getPolicyInfo().setRouterPolicyWeights(routerWeights); getPolicyInfo().setAMRMPolicyWeights(amrmWeights); - FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), - getPolicyInfo(), getActiveSubclusters()); - + // initialize policy with context + setupContext(); } - private String generateClusterMetricsInfo(int id) { + public String generateClusterMetricsInfo(int id) { long mem = 1024 * getRand().nextInt(277 * 100 - 1); // plant a best cluster @@ -106,7 +108,7 @@ public void testLoadIsRespected() throws YarnException { } @Test - public void testIfNoSubclustersWithWeightOne() { + public void testIfNoSubclustersWithWeightOne() throws Exception { setPolicy(new LoadBasedRouterPolicy()); setPolicyInfo(new WeightedPolicyInfo()); Map routerWeights = new HashMap<>(); @@ -123,15 +125,13 @@ public void testIfNoSubclustersWithWeightOne() { getPolicyInfo().setRouterPolicyWeights(routerWeights); getPolicyInfo().setAMRMPolicyWeights(amrmWeights); - try { - FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), - getPolicyInfo(), getActiveSubclusters()); - ((FederationRouterPolicy) getPolicy()) - .getHomeSubcluster(getApplicationSubmissionContext(), null); - fail(); - } catch (YarnException ex) { - Assert.assertTrue( - ex.getMessage().contains("Zero Active Subcluster with weight 1")); - } + + ConfigurableFederationPolicy policy = getPolicy(); + FederationPoliciesTestUtil.initializePolicyContext(policy, + getPolicyInfo(), getActiveSubclusters()); + + LambdaTestUtils.intercept(YarnException.class, "Zero Active Subcluster with weight 1.", + () -> ((FederationRouterPolicy) policy). + getHomeSubcluster(getApplicationSubmissionContext(), null)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLocalityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLocalityRouterPolicy.java index 05939329a0681..3af0d037fcae6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLocalityRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLocalityRouterPolicy.java @@ -73,6 +73,7 @@ public void setUp() throws Exception { configureWeights(4); + // initialize policy with context initializePolicy(new YarnConfiguration()); } @@ -86,9 +87,7 @@ private void initializePolicy(Configuration conf) throws YarnException { .newInstance("queue1", getPolicy().getClass().getCanonicalName(), buf)); getFederationPolicyContext().setHomeSubcluster(getHomeSubCluster()); - FederationPoliciesTestUtil - .initializePolicyContext(getFederationPolicyContext(), getPolicy(), - getPolicyInfo(), getActiveSubclusters(), conf); + setupContext(); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java index e1799d321083c..ea03905110273 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; @@ -54,10 +55,11 @@ public void setUp() throws Exception { // with 5% omit a subcluster if (getRand().nextFloat() < 0.95f || i == 5) { - SubClusterInfo sci = mock(SubClusterInfo.class); - when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING); - when(sci.getSubClusterId()).thenReturn(sc.toId()); - getActiveSubclusters().put(sc.toId(), sci); + long now = Time.now(); + SubClusterInfo federationSubClusterInfo = SubClusterInfo.newInstance( + sc.toId(), "dns1:80", "dns1:81", "dns1:82", "dns1:83", + now - 1000, SubClusterState.SC_RUNNING, now - 2000, generateClusterMetricsInfo(i)); + getActiveSubclusters().put(sc.toId(), federationSubClusterInfo); } float weight = getRand().nextFloat(); if (i == 5) { @@ -105,7 +107,7 @@ public void testZeroSubClustersWithPositiveWeight() throws Exception { getPolicyInfo(), getActiveSubclusters()); intercept(FederationPolicyException.class, - "No Active Subcluster with weight vector greater than zero", + "No Active Subcluster with weight vector greater than zero.", () -> ((FederationRouterPolicy) getPolicy()) .getHomeSubcluster(getApplicationSubmissionContext(), null)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java index 1747f73715cda..a3816b6d08777 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java @@ -20,7 +20,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; -import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Before; import org.junit.Test; @@ -39,8 +38,7 @@ public void setUp() throws Exception { setMockActiveSubclusters(2); // initialize policy with context - FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), - getPolicyInfo(), getActiveSubclusters()); + setupContext(); } @@ -59,5 +57,4 @@ public void testNullQueueRouting() throws YarnException { false, false, 0, Resources.none(), null, false, null, null); localPolicy.getHomeSubcluster(applicationSubmissionContext, null); } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java index 05490aba67247..8346277505ecb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java @@ -18,15 +18,14 @@ package org.apache.hadoop.yarn.server.federation.policies.router; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; -import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -44,14 +43,14 @@ public void setUp() throws Exception { setPolicyInfo(mock(WeightedPolicyInfo.class)); for (int i = 1; i <= 2; i++) { SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i); - SubClusterInfo sci = mock(SubClusterInfo.class); - when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING); - when(sci.getSubClusterId()).thenReturn(sc.toId()); - getActiveSubclusters().put(sc.toId(), sci); + long now = Time.now(); + SubClusterInfo federationSubClusterInfo = SubClusterInfo.newInstance( + sc.toId(), "dns1:80", "dns1:81", "dns1:82", "dns1:83", + now - 1000, SubClusterState.SC_RUNNING, now - 2000, generateClusterMetricsInfo(i)); + getActiveSubclusters().put(sc.toId(), federationSubClusterInfo); } - FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), - mock(WeightedPolicyInfo.class), getActiveSubclusters()); + setupContext(); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java index d549250f07256..8121ce282cd6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; @@ -32,7 +33,6 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; -import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -51,8 +51,7 @@ public void setUp() throws Exception { configureWeights(20); - FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), - getPolicyInfo(), getActiveSubclusters()); + setupContext(); } public void configureWeights(float numSubClusters) { @@ -68,10 +67,11 @@ public void configureWeights(float numSubClusters) { SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i); // with 5% omit a subcluster if (getRand().nextFloat() < 0.95f) { - SubClusterInfo sci = mock(SubClusterInfo.class); - when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING); - when(sci.getSubClusterId()).thenReturn(sc.toId()); - getActiveSubclusters().put(sc.toId(), sci); + long now = Time.now(); + SubClusterInfo federationSubClusterInfo = SubClusterInfo.newInstance( + sc.toId(), "dns1:80", "dns1:81", "dns1:82", "dns1:83", + now - 1000, SubClusterState.SC_RUNNING, now - 2000, generateClusterMetricsInfo(i)); + getActiveSubclusters().put(sc.toId(), federationSubClusterInfo); } // 80% of the weight is evenly spread, 20% is randomly generated diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java index a9b9029b25732..6ae64d555b7e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java @@ -159,6 +159,51 @@ public static void initializePolicyContext( new Configuration()); } + public static FederationPolicyInitializationContext initializePolicyContext2( + ConfigurableFederationPolicy policy, WeightedPolicyInfo policyInfo, + Map activeSubClusters, + FederationStateStoreFacade facade) throws YarnException { + FederationPolicyInitializationContext context = + new FederationPolicyInitializationContext(null, initResolver(), facade, + SubClusterId.newInstance("homesubcluster")); + return initializePolicyContext2(context, policy, policyInfo, activeSubClusters); + } + + public static FederationPolicyInitializationContext initializePolicyContext2( + ConfigurableFederationPolicy policy, WeightedPolicyInfo policyInfo, + Map activeSubClusters) + throws YarnException { + return initializePolicyContext2(policy, policyInfo, activeSubClusters, initFacade()); + } + + public static FederationPolicyInitializationContext initializePolicyContext2( + FederationPolicyInitializationContext fpc, + ConfigurableFederationPolicy policy, WeightedPolicyInfo policyInfo, + Map activeSubClusters) + throws YarnException { + ByteBuffer buf = policyInfo.toByteBuffer(); + fpc.setSubClusterPolicyConfiguration(SubClusterPolicyConfiguration + .newInstance("queue1", policy.getClass().getCanonicalName(), buf)); + + if (fpc.getFederationStateStoreFacade() == null) { + FederationStateStoreFacade facade = FederationStateStoreFacade.getInstance(); + FederationStateStore fss = mock(FederationStateStore.class); + + if (activeSubClusters == null) { + activeSubClusters = new HashMap<>(); + } + + GetSubClustersInfoResponse response = GetSubClustersInfoResponse.newInstance( + activeSubClusters.values()); + + when(fss.getSubClusters(any())).thenReturn(response); + facade.reinitialize(fss, new Configuration()); + fpc.setFederationStateStoreFacade(facade); + } + policy.reinitialize(fpc); + return fpc; + } + /** * Initialize a {@link SubClusterResolver}. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index a1094de749e71..2be7d844beff2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.stream.Collectors; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequestWrapper; @@ -173,19 +174,13 @@ private SubClusterId getRandomActiveSubCluster( RouterServerUtil.logAndThrowException( FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null); } - List list = new ArrayList<>(activeSubclusters.keySet()); - - FederationPolicyUtils.validateSubClusterAvailability( - list, blackListSubClusters); - + Collection keySet = activeSubclusters.keySet(); + FederationPolicyUtils.validateSubClusterAvailability(keySet, blackListSubClusters); if (blackListSubClusters != null) { - - // Remove from the active SubClusters from StateStore the blacklisted ones - for (SubClusterId scId : blackListSubClusters) { - list.remove(scId); - } + keySet.removeAll(blackListSubClusters); } + List list = keySet.stream().collect(Collectors.toList()); return list.get(rand.nextInt(list.size())); }