Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ public Configuration getConf() {
private ResourceCalculator calculator;
private boolean usePortForNodeName;

private AsyncSchedulingConfiguration asyncSchedulingConf;
private boolean scheduleAsynchronously;
@VisibleForTesting
protected List<AsyncScheduleThread> asyncSchedulerThreads;
Expand All @@ -235,16 +236,6 @@ public Configuration getConf() {

private boolean printedVerboseLoggingForAsyncScheduling;

/**
* EXPERT
*/
private long asyncScheduleInterval;
private static final String ASYNC_SCHEDULER_INTERVAL =
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+ ".scheduling-interval-ms";
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
private long asyncMaxPendingBacklogs;

private CSMaxRunningAppsEnforcer maxRunningEnforcer;

public CapacityScheduler() {
Expand Down Expand Up @@ -348,30 +339,17 @@ void initScheduler(Configuration configuration) throws
initializeQueues(this.conf);
this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled();

scheduleAsynchronously = this.conf.getScheduleAynschronously();
asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
DEFAULT_ASYNC_SCHEDULER_INTERVAL);

this.assignMultipleEnabled = this.conf.getAssignMultipleEnabled();
this.maxAssignPerHeartbeat = this.conf.getMaxAssignPerHeartbeat();

// number of threads for async scheduling
int maxAsyncSchedulingThreads = this.conf.getInt(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
1);
maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1);

this.asyncSchedulingConf = new AsyncSchedulingConfiguration(conf);
this.scheduleAsynchronously = this.asyncSchedulingConf.isScheduleAsynchronously();
if (scheduleAsynchronously) {
asyncSchedulerThreads = new ArrayList<>();
for (int i = 0; i < maxAsyncSchedulingThreads; i++) {
for (int i = 0; i < asyncSchedulingConf.getMaxAsyncSchedulingThreads(); i++) {
asyncSchedulerThreads.add(new AsyncScheduleThread(this));
}
resourceCommitterService = new ResourceCommitterService(this);
asyncMaxPendingBacklogs = this.conf.getInt(
CapacitySchedulerConfiguration.
SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS,
CapacitySchedulerConfiguration.
DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS);
}

// Setup how many containers we can allocate for each round
Expand All @@ -391,8 +369,8 @@ void initScheduler(Configuration configuration) throws
+ getResourceCalculator().getClass() + ", " + "minimumAllocation="
+ getMinimumResourceCapability() + ", " + "maximumAllocation="
+ getMaximumResourceCapability() + ", " + "asynchronousScheduling="
+ scheduleAsynchronously + ", " + "asyncScheduleInterval="
+ asyncScheduleInterval + "ms" + ",multiNodePlacementEnabled="
+ asyncSchedulingConf.isScheduleAsynchronously() + ", " + "asyncScheduleInterval="
+ asyncSchedulingConf.getAsyncScheduleInterval() + "ms" + ",multiNodePlacementEnabled="
+ multiNodePlacementEnabled + ", " + "assignMultipleEnabled="
+ assignMultipleEnabled + ", " + "maxAssignPerHeartbeat="
+ maxAssignPerHeartbeat + ", " + "offswitchPerHeartbeatLimit="
Expand Down Expand Up @@ -511,10 +489,6 @@ public void reinitialize(Configuration newConf, RMContext rmContext)
reinitialize(newConf, rmContext, false);
}

long getAsyncScheduleInterval() {
return asyncScheduleInterval;
}

private final static Random random = new Random(System.currentTimeMillis());

@VisibleForTesting
Expand Down Expand Up @@ -641,7 +615,7 @@ static void schedule(CapacityScheduler cs) throws InterruptedException{
}

}
Thread.sleep(cs.getAsyncScheduleInterval());
Thread.sleep(cs.asyncSchedulingConf.getAsyncScheduleInterval());
}

static class AsyncScheduleThread extends Thread {
Expand All @@ -665,7 +639,7 @@ public void run() {
} else {
// Don't run schedule if we have some pending backlogs already
if (cs.getAsyncSchedulingPendingBacklogs()
> cs.asyncMaxPendingBacklogs) {
> cs.asyncSchedulingConf.getAsyncMaxPendingBacklogs()) {
Thread.sleep(1);
} else{
schedule(cs);
Expand Down Expand Up @@ -3447,4 +3421,48 @@ public boolean placementConstraintEnabled() {
public void setQueueManager(CapacitySchedulerQueueManager qm) {
this.queueManager = qm;
}

private static class AsyncSchedulingConfiguration {
private final boolean scheduleAsynchronously;
private long asyncScheduleInterval;
private long asyncMaxPendingBacklogs;
private int maxAsyncSchedulingThreads;

AsyncSchedulingConfiguration(CapacitySchedulerConfiguration conf) {
this.scheduleAsynchronously = conf.getScheduleAynschronously();
if (this.scheduleAsynchronously) {
this.asyncScheduleInterval = conf.getLong(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_INTERVAL,
CapacitySchedulerConfiguration.DEFAULT_SCHEDULE_ASYNCHRONOUSLY_INTERVAL);

// number of threads for async scheduling
this.maxAsyncSchedulingThreads = conf.getInt(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
1);
this.maxAsyncSchedulingThreads = Math.max(this.maxAsyncSchedulingThreads, 1);

this.asyncMaxPendingBacklogs = conf.getInt(
CapacitySchedulerConfiguration.
SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS,
CapacitySchedulerConfiguration.
DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS);
}
}

public boolean isScheduleAsynchronously() {
return scheduleAsynchronously;
}

public long getAsyncScheduleInterval() {
return asyncScheduleInterval;
}

public long getAsyncMaxPendingBacklogs() {
return asyncMaxPendingBacklogs;
}

public int getMaxAsyncSchedulingThreads() {
return maxAsyncSchedulingThreads;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,13 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final String SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS =
SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".maximum-pending-backlogs";

@Private
public static final String SCHEDULE_ASYNCHRONOUSLY_INTERVAL =
SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".scheduling-interval-ms";

@Private
public static final long DEFAULT_SCHEDULE_ASYNCHRONOUSLY_INTERVAL = 5;

@Private
public static final String APP_FAIL_FAST = PREFIX + "application.fail-fast";

Expand Down