diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 30b00b139c030..7eac03c6ed130 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -225,6 +225,7 @@ public Configuration getConf() { private ResourceCalculator calculator; private boolean usePortForNodeName; + private AsyncSchedulingConfiguration asyncSchedulingConf; private boolean scheduleAsynchronously; @VisibleForTesting protected List asyncSchedulerThreads; @@ -235,16 +236,6 @@ public Configuration getConf() { private boolean printedVerboseLoggingForAsyncScheduling; - /** - * EXPERT - */ - private long asyncScheduleInterval; - private static final String ASYNC_SCHEDULER_INTERVAL = - CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX - + ".scheduling-interval-ms"; - private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5; - private long asyncMaxPendingBacklogs; - private CSMaxRunningAppsEnforcer maxRunningEnforcer; public CapacityScheduler() { @@ -348,30 +339,17 @@ void initScheduler(Configuration configuration) throws initializeQueues(this.conf); this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled(); - scheduleAsynchronously = this.conf.getScheduleAynschronously(); - asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, - DEFAULT_ASYNC_SCHEDULER_INTERVAL); - this.assignMultipleEnabled = this.conf.getAssignMultipleEnabled(); this.maxAssignPerHeartbeat = this.conf.getMaxAssignPerHeartbeat(); - // number of threads for async scheduling - int maxAsyncSchedulingThreads = this.conf.getInt( - CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, - 1); - maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1); - + this.asyncSchedulingConf = new AsyncSchedulingConfiguration(conf); + this.scheduleAsynchronously = this.asyncSchedulingConf.isScheduleAsynchronously(); if (scheduleAsynchronously) { asyncSchedulerThreads = new ArrayList<>(); - for (int i = 0; i < maxAsyncSchedulingThreads; i++) { + for (int i = 0; i < asyncSchedulingConf.getMaxAsyncSchedulingThreads(); i++) { asyncSchedulerThreads.add(new AsyncScheduleThread(this)); } resourceCommitterService = new ResourceCommitterService(this); - asyncMaxPendingBacklogs = this.conf.getInt( - CapacitySchedulerConfiguration. - SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS, - CapacitySchedulerConfiguration. - DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS); } // Setup how many containers we can allocate for each round @@ -391,8 +369,8 @@ void initScheduler(Configuration configuration) throws + getResourceCalculator().getClass() + ", " + "minimumAllocation=" + getMinimumResourceCapability() + ", " + "maximumAllocation=" + getMaximumResourceCapability() + ", " + "asynchronousScheduling=" - + scheduleAsynchronously + ", " + "asyncScheduleInterval=" - + asyncScheduleInterval + "ms" + ",multiNodePlacementEnabled=" + + asyncSchedulingConf.isScheduleAsynchronously() + ", " + "asyncScheduleInterval=" + + asyncSchedulingConf.getAsyncScheduleInterval() + "ms" + ",multiNodePlacementEnabled=" + multiNodePlacementEnabled + ", " + "assignMultipleEnabled=" + assignMultipleEnabled + ", " + "maxAssignPerHeartbeat=" + maxAssignPerHeartbeat + ", " + "offswitchPerHeartbeatLimit=" @@ -511,10 +489,6 @@ public void reinitialize(Configuration newConf, RMContext rmContext) reinitialize(newConf, rmContext, false); } - long getAsyncScheduleInterval() { - return asyncScheduleInterval; - } - private final static Random random = new Random(System.currentTimeMillis()); @VisibleForTesting @@ -641,7 +615,7 @@ static void schedule(CapacityScheduler cs) throws InterruptedException{ } } - Thread.sleep(cs.getAsyncScheduleInterval()); + Thread.sleep(cs.asyncSchedulingConf.getAsyncScheduleInterval()); } static class AsyncScheduleThread extends Thread { @@ -665,7 +639,7 @@ public void run() { } else { // Don't run schedule if we have some pending backlogs already if (cs.getAsyncSchedulingPendingBacklogs() - > cs.asyncMaxPendingBacklogs) { + > cs.asyncSchedulingConf.getAsyncMaxPendingBacklogs()) { Thread.sleep(1); } else{ schedule(cs); @@ -3447,4 +3421,48 @@ public boolean placementConstraintEnabled() { public void setQueueManager(CapacitySchedulerQueueManager qm) { this.queueManager = qm; } + + private static class AsyncSchedulingConfiguration { + private final boolean scheduleAsynchronously; + private long asyncScheduleInterval; + private long asyncMaxPendingBacklogs; + private int maxAsyncSchedulingThreads; + + AsyncSchedulingConfiguration(CapacitySchedulerConfiguration conf) { + this.scheduleAsynchronously = conf.getScheduleAynschronously(); + if (this.scheduleAsynchronously) { + this.asyncScheduleInterval = conf.getLong( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_INTERVAL, + CapacitySchedulerConfiguration.DEFAULT_SCHEDULE_ASYNCHRONOUSLY_INTERVAL); + + // number of threads for async scheduling + this.maxAsyncSchedulingThreads = conf.getInt( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, + 1); + this.maxAsyncSchedulingThreads = Math.max(this.maxAsyncSchedulingThreads, 1); + + this.asyncMaxPendingBacklogs = conf.getInt( + CapacitySchedulerConfiguration. + SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS, + CapacitySchedulerConfiguration. + DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS); + } + } + + public boolean isScheduleAsynchronously() { + return scheduleAsynchronously; + } + + public long getAsyncScheduleInterval() { + return asyncScheduleInterval; + } + + public long getAsyncMaxPendingBacklogs() { + return asyncMaxPendingBacklogs; + } + + public int getMaxAsyncSchedulingThreads() { + return maxAsyncSchedulingThreads; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 6e1b72feac0d6..ee576fe713c06 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -272,6 +272,13 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur public static final String SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS = SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".maximum-pending-backlogs"; + @Private + public static final String SCHEDULE_ASYNCHRONOUSLY_INTERVAL = + SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".scheduling-interval-ms"; + + @Private + public static final long DEFAULT_SCHEDULE_ASYNCHRONOUSLY_INTERVAL = 5; + @Private public static final String APP_FAIL_FAST = PREFIX + "application.fail-fast";