Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -302,110 +302,123 @@ void initScheduler(Configuration configuration) throws
IOException, YarnException {
writeLock.lock();
try {
String confProviderStr = configuration.get(
YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
YarnConfiguration.DEFAULT_CONFIGURATION_STORE);
switch (confProviderStr) {
case YarnConfiguration.FILE_CONFIGURATION_STORE:
this.csConfProvider =
new FileBasedCSConfigurationProvider(rmContext);
break;
case YarnConfiguration.MEMORY_CONFIGURATION_STORE:
case YarnConfiguration.LEVELDB_CONFIGURATION_STORE:
case YarnConfiguration.ZK_CONFIGURATION_STORE:
case YarnConfiguration.FS_CONFIGURATION_STORE:
this.csConfProvider = new MutableCSConfigurationProvider(rmContext);
break;
default:
throw new IOException("Invalid configuration store class: " +
confProviderStr);
}
this.csConfProvider = getCsConfProvider(configuration);
this.csConfProvider.init(configuration);
this.conf = this.csConfProvider.loadConfiguration(configuration);
validateConf(this.conf);
this.minimumAllocation = super.getMinimumAllocation();
initMaximumResourceCapability(super.getMaximumAllocation());
this.calculator = this.conf.getResourceCalculator();
if (this.calculator instanceof DefaultResourceCalculator
&& ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
throw new YarnRuntimeException("RM uses DefaultResourceCalculator which"
+ " used only memory as resource-type but invalid resource-types"
+ " specified " + ResourceUtils.getResourceTypes() + ". Use"
+ " DominantResourceCalculator instead to make effective use of"
+ " these resource-types");
}
this.calculator = initResourceCalculator();
this.usePortForNodeName = this.conf.getUsePortForNodeName();
this.applications = new ConcurrentHashMap<>();
this.labelManager = rmContext.getNodeLabelManager();
this.appPriorityACLManager = new AppPriorityACLsManager(conf);
this.queueManager = new CapacitySchedulerQueueManager(yarnConf,
this.labelManager, this.appPriorityACLManager);
this.queueManager.setCapacitySchedulerContext(this);

this.workflowPriorityMappingsMgr = new WorkflowPriorityMappingsManager();

this.activitiesManager = new ActivitiesManager(rmContext);
activitiesManager.init(conf);
initializeQueues(this.conf);
this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled();

scheduleAsynchronously = this.conf.getScheduleAynschronously();
asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
DEFAULT_ASYNC_SCHEDULER_INTERVAL);

this.assignMultipleEnabled = this.conf.getAssignMultipleEnabled();
this.maxAssignPerHeartbeat = this.conf.getMaxAssignPerHeartbeat();

this.appShouldFailFast = CapacitySchedulerConfiguration.shouldAppFailFast(
getConfig());

// number of threads for async scheduling
int maxAsyncSchedulingThreads = this.conf.getInt(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
1);
maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1);

if (scheduleAsynchronously) {
asyncSchedulerThreads = new ArrayList<>();
for (int i = 0; i < maxAsyncSchedulingThreads; i++) {
asyncSchedulerThreads.add(new AsyncScheduleThread(this));
}
resourceCommitterService = new ResourceCommitterService(this);
asyncMaxPendingBacklogs = this.conf.getInt(
CapacitySchedulerConfiguration.
SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS,
CapacitySchedulerConfiguration.
DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS);
}
this.appShouldFailFast = CapacitySchedulerConfiguration.shouldAppFailFast(getConfig());
initAsyncSchedulingProperties();

// Setup how many containers we can allocate for each round
offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit();

// Register CS specific multi-node policies to common MultiNodeManager
// which will add to a MultiNodeSorter which gives a pre-sorted list of
// nodes to scheduler's allocation.
multiNodePlacementEnabled = this.conf.getMultiNodePlacementEnabled();
if(rmContext.getMultiNodeSortingManager() != null) {
rmContext.getMultiNodeSortingManager().registerMultiNodePolicyNames(
multiNodePlacementEnabled,
this.conf.getMultiNodePlacementPolicies());
}

LOG.info("Initialized CapacityScheduler with " + "calculator="
+ getResourceCalculator().getClass() + ", " + "minimumAllocation="
+ getMinimumResourceCapability() + ", " + "maximumAllocation="
+ getMaximumResourceCapability() + ", " + "asynchronousScheduling="
+ scheduleAsynchronously + ", " + "asyncScheduleInterval="
+ asyncScheduleInterval + "ms" + ",multiNodePlacementEnabled="
+ multiNodePlacementEnabled + ", " + "assignMultipleEnabled="
+ assignMultipleEnabled + ", " + "maxAssignPerHeartbeat="
+ maxAssignPerHeartbeat + ", " + "offswitchPerHeartbeatLimit="
+ offswitchPerHeartbeatLimit);
initMultiNodePlacement();
printSchedulerInitialized();
} finally {
writeLock.unlock();
}
}

private CSConfigurationProvider getCsConfProvider(Configuration configuration)
throws IOException {
String confProviderStr = configuration.get(
YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
YarnConfiguration.DEFAULT_CONFIGURATION_STORE);
switch (confProviderStr) {
case YarnConfiguration.FILE_CONFIGURATION_STORE:
return new FileBasedCSConfigurationProvider(rmContext);
case YarnConfiguration.MEMORY_CONFIGURATION_STORE:
case YarnConfiguration.LEVELDB_CONFIGURATION_STORE:
case YarnConfiguration.ZK_CONFIGURATION_STORE:
case YarnConfiguration.FS_CONFIGURATION_STORE:
return new MutableCSConfigurationProvider(rmContext);
default:
throw new IOException("Invalid configuration store class: " + confProviderStr);
}
}

private ResourceCalculator initResourceCalculator() {
ResourceCalculator resourceCalculator = this.conf.getResourceCalculator();
if (resourceCalculator instanceof DefaultResourceCalculator
&& ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
throw new YarnRuntimeException("RM uses DefaultResourceCalculator which"
+ " used only memory as resource-type but invalid resource-types"
+ " specified " + ResourceUtils.getResourceTypes() + ". Use"
+ " DominantResourceCalculator instead to make effective use of"
+ " these resource-types");
}
return resourceCalculator;
}

private void initAsyncSchedulingProperties() {
scheduleAsynchronously = this.conf.getScheduleAynschronously();
asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
DEFAULT_ASYNC_SCHEDULER_INTERVAL);

// number of threads for async scheduling
int maxAsyncSchedulingThreads = this.conf.getInt(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, 1);
maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1);

if (scheduleAsynchronously) {
asyncSchedulerThreads = new ArrayList<>();
for (int i = 0; i < maxAsyncSchedulingThreads; i++) {
asyncSchedulerThreads.add(new AsyncScheduleThread(this));
}
resourceCommitterService = new ResourceCommitterService(this);
asyncMaxPendingBacklogs = this.conf.getInt(
CapacitySchedulerConfiguration.
SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS,
CapacitySchedulerConfiguration.
DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS);
}
}

private void initMultiNodePlacement() {
// Register CS specific multi-node policies to common MultiNodeManager
// which will add to a MultiNodeSorter which gives a pre-sorted list of
// nodes to scheduler's allocation.
multiNodePlacementEnabled = this.conf.getMultiNodePlacementEnabled();
if (rmContext.getMultiNodeSortingManager() != null) {
rmContext.getMultiNodeSortingManager().registerMultiNodePolicyNames(
multiNodePlacementEnabled,
this.conf.getMultiNodePlacementPolicies());
}
}

private void printSchedulerInitialized() {
LOG.info("Initialized CapacityScheduler with calculator={}, minimumAllocation={}, " +
"maximumAllocation={}, asynchronousScheduling={}, asyncScheduleInterval={} ms, " +
"multiNodePlacementEnabled={}, assignMultipleEnabled={}, maxAssignPerHeartbeat={}, " +
"offswitchPerHeartbeatLimit={}",
getResourceCalculator().getClass(),
getMinimumResourceCapability(),
getMaximumResourceCapability(),
scheduleAsynchronously,
asyncScheduleInterval,
multiNodePlacementEnabled,
assignMultipleEnabled,
maxAssignPerHeartbeat,
offswitchPerHeartbeatLimit);
}

private void startSchedulerThreads() {
writeLock.lock();
try {
Expand Down