diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index bb3a863a26fdf..1e91a996de731 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.util.concurrent.SettableFuture; +import java.io.Serializable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; @@ -103,6 +104,8 @@ import org.apache.hadoop.yarn.util.resource.Resources; import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -188,8 +191,7 @@ public class FairScheduler extends @Deprecated protected volatile int continuousSchedulingSleepMs; // Node available resource comparator - private Comparator nodeAvailableResourceComparator = - new NodeAvailableResourceComparator(); + private Comparator nodeAvailableResourceComparator; protected double nodeLocalityThreshold; // Cluster threshold for node locality protected double rackLocalityThreshold; // Cluster threshold for rack locality @Deprecated @@ -295,6 +297,33 @@ private void validateConf(FairSchedulerConfiguration config) { } } + private Comparator initNodeComparator( + FairSchedulerConfiguration config) { + Class nodeComparatorClass = config.getNodeComparatorClass(); + if (!Comparator.class.isAssignableFrom(nodeComparatorClass)) { + throw new YarnRuntimeException("Class: " + + nodeComparatorClass.getCanonicalName() + " not instance of " + + Comparator.class.getCanonicalName() + ""); + } + + try { + try { + Constructor constructor = + nodeComparatorClass.getDeclaredConstructor(FairScheduler.class); + constructor.setAccessible(true); + return (Comparator) constructor.newInstance(this); + } catch (NoSuchMethodException e) { + // constructor doesn't exist, use default + return (Comparator) nodeComparatorClass.newInstance(); + } + } catch (ClassCastException | InstantiationException | + IllegalAccessException | InvocationTargetException e) { + throw new YarnRuntimeException( + "Could not create instance of class: " + + nodeComparatorClass.getCanonicalName(), e); + } + } + public FairSchedulerConfiguration getConf() { return conf; } @@ -1054,12 +1083,13 @@ void continuousSchedulingAttempt() throws InterruptedException { } /** Sort nodes by available resource */ - private class NodeAvailableResourceComparator - implements Comparator { + protected static class NodeAvailableResourceComparator + implements Comparator, Serializable { @Override public int compare(FSSchedulerNode n1, FSSchedulerNode n2) { - return RESOURCE_CALCULATOR.compare(getClusterResource(), + // clusterResource unused by DefaultResourceCalculator + return RESOURCE_CALCULATOR.compare(null, n2.getUnallocatedResource(), n1.getUnallocatedResource()); } @@ -1432,6 +1462,7 @@ private void initScheduler(Configuration conf) throws IOException { sizeBasedWeight = this.conf.getSizeBasedWeight(); usePortForNodeName = this.conf.getUsePortForNodeName(); reservableNodesRatio = this.conf.getReservableNodes(); + nodeAvailableResourceComparator = initNodeComparator(this.conf); updateInterval = this.conf.getUpdateInterval(); if (updateInterval < 0) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index f9424f79c8913..d7209879bd38c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -26,6 +26,7 @@ import java.util.regex.Pattern; import com.google.common.collect.Lists; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -251,6 +252,9 @@ public class FairSchedulerConfiguration extends Configuration { private static final String RESOURCES_WITH_SPACES_PATTERN = "-?\\d+(?:\\.\\d*)?\\s*[a-z]+\\s*"; + public static final String NODE_COMPARATOR_CLASS = + CONF_PREFIX + "node.comparator.class"; + public FairSchedulerConfiguration() { super(); } @@ -757,4 +761,14 @@ private static String findResourceFromValues(String[] resourceValues, } throw new AllocationConfigurationException("Missing resource: " + resource); } + + protected Class getNodeComparatorClass() { + try { + return getClassByName(get(NODE_COMPARATOR_CLASS, + FairScheduler.NodeAvailableResourceComparator.class.getName())); + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException( + "Could not find class for " + NODE_COMPARATOR_CLASS); + } + } }