Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,6 @@ public class CapacityScheduler extends

private WorkflowPriorityMappingsManager workflowPriorityMappingsMgr;

// timeout to join when we stop this service
protected final long THREAD_JOIN_TIMEOUT_MS = 1000;

private PreemptionManager preemptionManager = new PreemptionManager();

private volatile boolean isLazyPreemptionEnabled = false;
Expand Down Expand Up @@ -227,27 +224,14 @@ public Configuration getConf() {
private ResourceCalculator calculator;
private boolean usePortForNodeName;

private boolean scheduleAsynchronously;
@VisibleForTesting
protected List<AsyncScheduleThread> asyncSchedulerThreads;
private ResourceCommitterService resourceCommitterService;
private AsyncSchedulingConfiguration asyncSchedulingConf;
private RMNodeLabelsManager labelManager;
private AppPriorityACLsManager appPriorityACLManager;
private boolean multiNodePlacementEnabled;

private boolean printedVerboseLoggingForAsyncScheduling;
private boolean appShouldFailFast;

/**
* EXPERT
*/
private long asyncScheduleInterval;
private static final String ASYNC_SCHEDULER_INTERVAL =
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+ ".scheduling-interval-ms";
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
private long asyncMaxPendingBacklogs;

private CSMaxRunningAppsEnforcer maxRunningEnforcer;

public CapacityScheduler() {
Expand Down Expand Up @@ -376,27 +360,7 @@ private ResourceCalculator initResourceCalculator() {
}

private void initAsyncSchedulingProperties() {
scheduleAsynchronously = this.conf.getScheduleAynschronously();
asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
DEFAULT_ASYNC_SCHEDULER_INTERVAL);

// number of threads for async scheduling
int maxAsyncSchedulingThreads = this.conf.getInt(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, 1);
maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1);

if (scheduleAsynchronously) {
asyncSchedulerThreads = new ArrayList<>();
for (int i = 0; i < maxAsyncSchedulingThreads; i++) {
asyncSchedulerThreads.add(new AsyncScheduleThread(this));
}
resourceCommitterService = new ResourceCommitterService(this);
asyncMaxPendingBacklogs = this.conf.getInt(
CapacitySchedulerConfiguration.
SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS,
CapacitySchedulerConfiguration.
DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS);
}
this.asyncSchedulingConf = new AsyncSchedulingConfiguration(conf, this);
}

private void initMultiNodePlacement() {
Expand All @@ -419,8 +383,8 @@ private void printSchedulerInitialized() {
getResourceCalculator().getClass(),
getMinimumResourceCapability(),
getMaximumResourceCapability(),
scheduleAsynchronously,
asyncScheduleInterval,
asyncSchedulingConf.isScheduleAsynchronously(),
asyncSchedulingConf.getAsyncScheduleInterval(),
multiNodePlacementEnabled,
assignMultipleEnabled,
maxAssignPerHeartbeat,
Expand All @@ -431,15 +395,7 @@ private void startSchedulerThreads() {
writeLock.lock();
try {
activitiesManager.start();
if (scheduleAsynchronously) {
Preconditions.checkNotNull(asyncSchedulerThreads,
"asyncSchedulerThreads is null");
for (Thread t : asyncSchedulerThreads) {
t.start();
}

resourceCommitterService.start();
}
asyncSchedulingConf.startThreads();
} finally {
writeLock.unlock();
}
Expand All @@ -465,14 +421,7 @@ public void serviceStop() throws Exception {
writeLock.lock();
try {
this.activitiesManager.stop();
if (scheduleAsynchronously && asyncSchedulerThreads != null) {
for (Thread t : asyncSchedulerThreads) {
t.interrupt();
t.join(THREAD_JOIN_TIMEOUT_MS);
}
resourceCommitterService.interrupt();
resourceCommitterService.join(THREAD_JOIN_TIMEOUT_MS);
}
asyncSchedulingConf.serviceStopInvoked();
} finally {
writeLock.unlock();
}
Expand Down Expand Up @@ -539,7 +488,7 @@ public void reinitialize(Configuration newConf, RMContext rmContext)
}

long getAsyncScheduleInterval() {
return asyncScheduleInterval;
return asyncSchedulingConf.getAsyncScheduleInterval();
}

private final static Random random = new Random(System.currentTimeMillis());
Expand Down Expand Up @@ -671,6 +620,11 @@ static void schedule(CapacityScheduler cs) throws InterruptedException{
Thread.sleep(cs.getAsyncScheduleInterval());
}

@VisibleForTesting
public void setAsyncSchedulingConf(AsyncSchedulingConfiguration conf) {
this.asyncSchedulingConf = conf;
}

static class AsyncScheduleThread extends Thread {

private final CapacityScheduler cs;
Expand All @@ -692,7 +646,7 @@ public void run() {
} else {
// Don't run schedule if we have some pending backlogs already
if (cs.getAsyncSchedulingPendingBacklogs()
> cs.asyncMaxPendingBacklogs) {
> cs.asyncSchedulingConf.getAsyncMaxPendingBacklogs()) {
Thread.sleep(1);
} else{
schedule(cs);
Expand Down Expand Up @@ -1479,7 +1433,7 @@ protected void nodeUpdate(RMNode rmNode) {
}

// Try to do scheduling
if (!scheduleAsynchronously) {
if (!asyncSchedulingConf.isScheduleAsynchronously()) {
writeLock.lock();
try {
// reset allocation and reservation stats before we start doing any
Expand Down Expand Up @@ -2291,8 +2245,8 @@ private void addNode(RMNode nodeManager) {
"Added node " + nodeManager.getNodeAddress() + " clusterResource: "
+ clusterResource);

if (scheduleAsynchronously && getNumClusterNodes() == 1) {
for (AsyncScheduleThread t : asyncSchedulerThreads) {
if (asyncSchedulingConf.isScheduleAsynchronously() && getNumClusterNodes() == 1) {
for (AsyncScheduleThread t : asyncSchedulingConf.asyncSchedulerThreads) {
t.beginSchedule();
}
}
Expand Down Expand Up @@ -2340,11 +2294,7 @@ private void removeNode(RMNode nodeInfo) {
new ResourceLimits(clusterResource));
int numNodes = nodeTracker.nodeCount();

if (scheduleAsynchronously && numNodes == 0) {
for (AsyncScheduleThread t : asyncSchedulerThreads) {
t.suspendSchedule();
}
}
asyncSchedulingConf.nodeRemoved(numNodes);

LOG.info(
"Removed node " + nodeInfo.getNodeAddress() + " clusterResource: "
Expand Down Expand Up @@ -3092,9 +3042,9 @@ public void submitResourceCommitRequest(Resource cluster,
return;
}

if (scheduleAsynchronously) {
if (asyncSchedulingConf.isScheduleAsynchronously()) {
// Submit to a commit thread and commit it async-ly
resourceCommitterService.addNewCommitRequest(request);
asyncSchedulingConf.resourceCommitterService.addNewCommitRequest(request);
} else{
// Otherwise do it sync-ly.
tryCommit(cluster, request, true);
Expand Down Expand Up @@ -3339,10 +3289,7 @@ public boolean tryCommit(Resource cluster, ResourceCommitRequest r,
}

public int getAsyncSchedulingPendingBacklogs() {
if (scheduleAsynchronously) {
return resourceCommitterService.getPendingBacklogs();
}
return 0;
return asyncSchedulingConf.getPendingBacklogs();
}

@Override
Expand Down Expand Up @@ -3483,7 +3430,7 @@ public boolean isMultiNodePlacementEnabled() {
}

public int getNumAsyncSchedulerThreads() {
return asyncSchedulerThreads == null ? 0 : asyncSchedulerThreads.size();
return asyncSchedulingConf.getNumAsyncSchedulerThreads();
}

@VisibleForTesting
Expand All @@ -3503,4 +3450,109 @@ public boolean placementConstraintEnabled() {
public void setQueueManager(CapacitySchedulerQueueManager qm) {
this.queueManager = qm;
}

@VisibleForTesting
public List<AsyncScheduleThread> getAsyncSchedulerThreads() {
return asyncSchedulingConf.getAsyncSchedulerThreads();
}

static class AsyncSchedulingConfiguration {
// timeout to join when we stop this service
private static final long THREAD_JOIN_TIMEOUT_MS = 1000;

@VisibleForTesting
protected List<AsyncScheduleThread> asyncSchedulerThreads;
private ResourceCommitterService resourceCommitterService;

private long asyncScheduleInterval;
private static final String ASYNC_SCHEDULER_INTERVAL =
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+ ".scheduling-interval-ms";
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
private long asyncMaxPendingBacklogs;

private final boolean scheduleAsynchronously;

AsyncSchedulingConfiguration(CapacitySchedulerConfiguration conf,
CapacityScheduler cs) {
this.scheduleAsynchronously = conf.getScheduleAynschronously();
if (this.scheduleAsynchronously) {
this.asyncScheduleInterval = conf.getLong(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_INTERVAL,
CapacitySchedulerConfiguration.DEFAULT_SCHEDULE_ASYNCHRONOUSLY_INTERVAL);
// number of threads for async scheduling
int maxAsyncSchedulingThreads = conf.getInt(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
1);
maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1);
this.asyncMaxPendingBacklogs = conf.getInt(
CapacitySchedulerConfiguration.
SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS,
CapacitySchedulerConfiguration.
DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS);

this.asyncSchedulerThreads = new ArrayList<>();
for (int i = 0; i < maxAsyncSchedulingThreads; i++) {
asyncSchedulerThreads.add(new AsyncScheduleThread(cs));
}
this.resourceCommitterService = new ResourceCommitterService(cs);
}
}
public boolean isScheduleAsynchronously() {
return scheduleAsynchronously;
}
public long getAsyncScheduleInterval() {
return asyncScheduleInterval;
}
public long getAsyncMaxPendingBacklogs() {
return asyncMaxPendingBacklogs;
}

public void startThreads() {
if (scheduleAsynchronously) {
Preconditions.checkNotNull(asyncSchedulerThreads,
"asyncSchedulerThreads is null");
for (Thread t : asyncSchedulerThreads) {
t.start();
}

resourceCommitterService.start();
}
}

public void serviceStopInvoked() throws InterruptedException {
if (scheduleAsynchronously && asyncSchedulerThreads != null) {
for (Thread t : asyncSchedulerThreads) {
t.interrupt();
t.join(THREAD_JOIN_TIMEOUT_MS);
}
resourceCommitterService.interrupt();
resourceCommitterService.join(THREAD_JOIN_TIMEOUT_MS);
}
}

public void nodeRemoved(int numNodes) {
if (scheduleAsynchronously && numNodes == 0) {
for (AsyncScheduleThread t : asyncSchedulerThreads) {
t.suspendSchedule();
}
}
}

public int getPendingBacklogs() {
if (scheduleAsynchronously) {
return resourceCommitterService.getPendingBacklogs();
}
return 0;
}

public int getNumAsyncSchedulerThreads() {
return asyncSchedulerThreads == null ? 0 : asyncSchedulerThreads.size();
}

@VisibleForTesting
public List<AsyncScheduleThread> getAsyncSchedulerThreads() {
return asyncSchedulerThreads;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final String SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS =
SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".maximum-pending-backlogs";

@Private
public static final String SCHEDULE_ASYNCHRONOUSLY_INTERVAL =
SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".scheduling-interval-ms";
@Private
public static final long DEFAULT_SCHEDULE_ASYNCHRONOUSLY_INTERVAL = 5;

@Private
public static final String APP_FAIL_FAST = PREFIX + "application.fail-fast";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public RMNodeLabelsManager createNodeLabelManager() {

CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
for (CapacityScheduler.AsyncScheduleThread thread :
cs.asyncSchedulerThreads) {
cs.getAsyncSchedulerThreads()) {
Assert.assertTrue(thread.getName()
.startsWith("AsyncCapacitySchedulerThread"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private void testUserLimitThroughputWithNumberOfResourceTypes(

if (numThreads > 0) {
// disable async scheduling threads
for (CapacityScheduler.AsyncScheduleThread t : cs.asyncSchedulerThreads) {
for (CapacityScheduler.AsyncScheduleThread t : cs.getAsyncSchedulerThreads()) {
t.suspendSchedule();
}
}
Expand Down Expand Up @@ -268,7 +268,7 @@ private void testUserLimitThroughputWithNumberOfResourceTypes(

if (numThreads > 0) {
// enable async scheduling threads
for (CapacityScheduler.AsyncScheduleThread t : cs.asyncSchedulerThreads) {
for (CapacityScheduler.AsyncScheduleThread t : cs.getAsyncSchedulerThreads()) {
t.beginSchedule();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,7 @@ private void applyCSAssignment(Resource clusterResource, CSAssignment assign,
LeafQueue q, final Map<NodeId, FiCaSchedulerNode> nodes,
final Map<ApplicationAttemptId, FiCaSchedulerApp> apps)
throws IOException {
TestUtils.applyResourceCommitRequest(clusterResource, assign, nodes, apps);
TestUtils.applyResourceCommitRequest(clusterResource, assign, nodes, apps, csConf);
}

@Test
Expand Down
Loading