Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,9 @@ private static QueuePath createQueuePath(CSQueue parent, String queueName) {
return new QueuePath(parent.getQueuePath(), queueName);
}

protected void setupConfigurableCapacities(
CapacitySchedulerConfiguration configuration) {
protected void setupConfigurableCapacities() {
CSQueueUtils.loadCapacitiesByLabelsFromConf(queuePath, queueCapacities,
configuration, this.queueNodeLabelsSettings.getConfiguredNodeLabels());
queueContext.getConfiguration(), this.queueNodeLabelsSettings.getConfiguredNodeLabels());
}

@Override
Expand Down Expand Up @@ -329,22 +328,22 @@ public String getDefaultNodeLabelExpression() {
return this.queueNodeLabelsSettings.getDefaultLabelExpression();
}

protected void setupQueueConfigs(Resource clusterResource,
CapacitySchedulerConfiguration configuration) throws
protected void setupQueueConfigs(Resource clusterResource) throws
IOException {

writeLock.lock();
try {
CapacitySchedulerConfiguration configuration = queueContext.getConfiguration();
if (isDynamicQueue() || this instanceof AbstractAutoCreatedLeafQueue) {
setDynamicQueueProperties(configuration);
setDynamicQueueProperties();
}

// Collect and set the Node label configuration
this.queueNodeLabelsSettings = new QueueNodeLabelsSettings(configuration, parent,
getQueuePath(), queueContext.getQueueManager().getConfiguredNodeLabelsForAllQueues());

// Initialize the queue capacities
setupConfigurableCapacities(configuration);
setupConfigurableCapacities();
updateAbsoluteCapacities();
updateCapacityConfigType();

Expand All @@ -354,26 +353,23 @@ protected void setupQueueConfigs(Resource clusterResource,

// Setup queue's maximumAllocation respecting the global
// and the queue settings
// TODO remove the getConfiguration() param after the AQC configuration duplication
// removal is resolved
this.queueAllocationSettings.setupMaximumAllocation(configuration,
queueContext.getConfiguration(), getQueuePath(),
this.queueAllocationSettings.setupMaximumAllocation(configuration, getQueuePath(),
parent);

// Initialize the queue state based on previous state, configured state
// and its parent state
initializeQueueState(configuration);
initializeQueueState();

authorizer = YarnAuthorizationProvider.getInstance(configuration);

this.acls = configuration.getAcls(getQueuePath());

this.userWeights = getUserWeightsFromHierarchy(configuration);
this.userWeights = getUserWeightsFromHierarchy();

this.reservationsContinueLooking =
configuration.getReservationContinueLook();

this.configuredCapacityVectors = queueContext.getConfiguration()
this.configuredCapacityVectors = configuration
.parseConfiguredResourceVector(queuePath.getFullPath(),
this.queueNodeLabelsSettings.getConfiguredNodeLabels());

Expand All @@ -382,10 +378,7 @@ protected void setupQueueConfigs(Resource clusterResource,
this, labelManager, null);

// Store preemption settings
// TODO remove the getConfiguration() param after the AQC configuration duplication
// removal is resolved
this.preemptionSettings = new CSQueuePreemptionSettings(this, configuration,
queueContext.getConfiguration());
this.preemptionSettings = new CSQueuePreemptionSettings(this, configuration);
this.priority = configuration.getQueuePriority(
getQueuePath());

Expand All @@ -403,14 +396,12 @@ protected void setupQueueConfigs(Resource clusterResource,

/**
* Set properties specific to dynamic queues.
* @param configuration configuration on which the properties are set
*/
protected void setDynamicQueueProperties(
CapacitySchedulerConfiguration configuration) {
protected void setDynamicQueueProperties() {
// Set properties from parent template
if (parent instanceof ParentQueue) {
((ParentQueue) parent).getAutoCreatedQueueTemplate()
.setTemplateEntriesForChild(configuration, getQueuePath());
.setTemplateEntriesForChild(queueContext.getConfiguration(), getQueuePath());

String parentTemplate = String.format("%s.%s", parent.getQueuePath(),
AutoCreatedQueueTemplate.AUTO_QUEUE_TEMPLATE_PREFIX);
Expand All @@ -428,8 +419,7 @@ protected void setDynamicQueueProperties(
}
}

private UserWeights getUserWeightsFromHierarchy(
CapacitySchedulerConfiguration configuration) {
private UserWeights getUserWeightsFromHierarchy() {
UserWeights unionInheritedWeights = UserWeights.createEmpty();
CSQueue parentQ = parent;
if (parentQ != null) {
Expand All @@ -439,7 +429,7 @@ private UserWeights getUserWeightsFromHierarchy(
// Insert this queue's userWeights, overriding parent's userWeights if
// there is an overlap.
unionInheritedWeights.addFrom(
configuration.getAllUserWeightsForQueue(getQueuePath()));
queueContext.getConfiguration().getAllUserWeightsForQueue(getQueuePath()));
return unionInheritedWeights;
}

Expand Down Expand Up @@ -572,9 +562,9 @@ public QueueCapacityVector getConfiguredCapacityVector(
return configuredCapacityVectors.get(label);
}

private void initializeQueueState(CapacitySchedulerConfiguration configuration) {
private void initializeQueueState() {
QueueState previousState = getState();
QueueState configuredState = configuration
QueueState configuredState = queueContext.getConfiguration()
.getConfiguredState(getQueuePath());
QueueState parentState = (parent == null) ? null : parent.getState();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,12 @@ public AbstractLeafQueue(CapacitySchedulerQueueContext queueContext,
}

@SuppressWarnings("checkstyle:nowhitespaceafter")
protected void setupQueueConfigs(Resource clusterResource,
CapacitySchedulerConfiguration conf) throws
protected void setupQueueConfigs(Resource clusterResource) throws
IOException {
writeLock.lock();
try {
// TODO conf parameter can be a modified configuration with template entries and missing
// some global configs. This config duplication needs to be removed.
CapacitySchedulerConfiguration originalConfiguration = queueContext.getConfiguration();
super.setupQueueConfigs(clusterResource, conf);
CapacitySchedulerConfiguration configuration = queueContext.getConfiguration();
super.setupQueueConfigs(clusterResource);

this.lastClusterResource = clusterResource;

Expand All @@ -189,26 +186,26 @@ protected void setupQueueConfigs(Resource clusterResource,
setQueueResourceLimitsInfo(clusterResource);

setOrderingPolicy(
conf.<FiCaSchedulerApp>getAppOrderingPolicy(getQueuePath()));
configuration.<FiCaSchedulerApp>getAppOrderingPolicy(getQueuePath()));

usersManager.setUserLimit(conf.getUserLimit(getQueuePath()));
usersManager.setUserLimitFactor(conf.getUserLimitFactor(getQueuePath()));
usersManager.setUserLimit(configuration.getUserLimit(getQueuePath()));
usersManager.setUserLimitFactor(configuration.getUserLimitFactor(getQueuePath()));

maxAMResourcePerQueuePercent =
conf.getMaximumApplicationMasterResourcePerQueuePercent(
configuration.getMaximumApplicationMasterResourcePerQueuePercent(
getQueuePath());

maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath());
maxApplications = configuration.getMaximumApplicationsPerQueue(getQueuePath());
if (maxApplications < 0) {
int maxGlobalPerQueueApps =
conf.getGlobalMaximumApplicationsPerQueue();
configuration.getGlobalMaximumApplicationsPerQueue();
if (maxGlobalPerQueueApps > 0) {
maxApplications = maxGlobalPerQueueApps;
}
}

priorityAcls = conf.getPriorityAcls(getQueuePath(),
originalConfiguration.getClusterLevelApplicationMaxPriority());
priorityAcls = configuration.getPriorityAcls(getQueuePath(),
configuration.getClusterLevelApplicationMaxPriority());

Set<String> accessibleNodeLabels = this.queueNodeLabelsSettings.getAccessibleNodeLabels();
if (!SchedulerUtils.checkQueueLabelExpression(accessibleNodeLabels,
Expand All @@ -224,10 +221,10 @@ protected void setupQueueConfigs(Resource clusterResource,
.join(getAccessibleNodeLabels().iterator(), ',')));
}

nodeLocalityDelay = originalConfiguration.getNodeLocalityDelay();
rackLocalityAdditionalDelay = originalConfiguration
nodeLocalityDelay = configuration.getNodeLocalityDelay();
rackLocalityAdditionalDelay = configuration
.getRackLocalityAdditionalDelay();
rackLocalityFullReset = originalConfiguration
rackLocalityFullReset = configuration
.getRackLocalityFullReset();

// re-init this since max allocation could have changed
Expand All @@ -250,10 +247,10 @@ protected void setupQueueConfigs(Resource clusterResource,
}

defaultAppPriorityPerQueue = Priority.newInstance(
conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath()));
configuration.getDefaultApplicationPriorityConfPerQueue(getQueuePath()));

// Validate leaf queue's user's weights.
float queueUserLimit = Math.min(100.0f, conf.getUserLimit(getQueuePath()));
float queueUserLimit = Math.min(100.0f, configuration.getUserLimit(getQueuePath()));
getUserWeights().validateForLeafQueue(queueUserLimit, getQueuePath());
usersManager.updateUserWeights();

Expand Down Expand Up @@ -529,9 +526,8 @@ public List<AppPriorityACLGroup> getPriorityACLs() {
}
}

protected void reinitialize(
CSQueue newlyParsedQueue, Resource clusterResource,
CapacitySchedulerConfiguration configuration) throws
@Override
public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws
IOException {

writeLock.lock();
Expand Down Expand Up @@ -565,20 +561,12 @@ protected void reinitialize(
+ newMax);
}

setupQueueConfigs(clusterResource, configuration);
setupQueueConfigs(clusterResource);
} finally {
writeLock.unlock();
}
}

@Override
public void reinitialize(
CSQueue newlyParsedQueue, Resource clusterResource)
throws IOException {
reinitialize(newlyParsedQueue, clusterResource,
queueContext.getConfiguration());
}

@Override
public void submitApplicationAttempt(FiCaSchedulerApp application,
String userName) {
Expand Down Expand Up @@ -1700,13 +1688,13 @@ protected boolean canAssignToUser(Resource clusterResource,
}

@Override
protected void setDynamicQueueProperties(CapacitySchedulerConfiguration configuration) {
protected void setDynamicQueueProperties() {
// set to -1, to disable it
configuration.setUserLimitFactor(getQueuePath(), -1);
queueContext.getConfiguration().setUserLimitFactor(getQueuePath(), -1);
// Set Max AM percentage to a higher value
configuration.setMaximumApplicationMasterResourcePerQueuePercent(
queueContext.getConfiguration().setMaximumApplicationMasterResourcePerQueuePercent(
getQueuePath(), 1f);
super.setDynamicQueueProperties(configuration);
super.setDynamicQueueProperties();
}

private void updateSchedulerHealthForCompletedContainer(
Expand Down Expand Up @@ -1948,7 +1936,7 @@ public void updateClusterResource(Resource clusterResource,
super.updateEffectiveResources(clusterResource);

// Update maximum applications for the queue and for users
updateMaximumApplications(queueContext.getConfiguration());
updateMaximumApplications();

updateCurrentResourceLimits(currentResourceLimits, clusterResource);

Expand Down Expand Up @@ -2342,11 +2330,12 @@ public void stopQueue() {
}
}

void updateMaximumApplications(CapacitySchedulerConfiguration conf) {
int maxAppsForQueue = conf.getMaximumApplicationsPerQueue(getQueuePath());
void updateMaximumApplications() {
CapacitySchedulerConfiguration configuration = queueContext.getConfiguration();
int maxAppsForQueue = configuration.getMaximumApplicationsPerQueue(getQueuePath());

int maxDefaultPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue();
int maxSystemApps = conf.getMaximumSystemApplications();
int maxDefaultPerQueueApps = configuration.getGlobalMaximumApplicationsPerQueue();
int maxSystemApps = configuration.getMaximumSystemApplications();
int baseMaxApplications = maxDefaultPerQueueApps > 0 ?
Math.min(maxDefaultPerQueueApps, maxSystemApps)
: maxSystemApps;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;

import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common
Expand Down Expand Up @@ -55,7 +54,7 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
writeLock.lock();
try {
// Set new configs
setupQueueConfigs(clusterResource, queueContext.getConfiguration());
setupQueueConfigs(clusterResource);

} finally {
writeLock.unlock();
Expand Down Expand Up @@ -175,22 +174,12 @@ protected CapacitySchedulerConfiguration initializeLeafQueueConfigs(String
CapacitySchedulerConfiguration leafQueueConfigs = new
CapacitySchedulerConfiguration(new Configuration(false), false);

Map<String, String> rtProps = queueContext
.getConfiguration().getConfigurationProperties()
.getPropertiesWithPrefix(YarnConfiguration.RESOURCE_TYPES + ".", true);
for (Map.Entry<String, String> entry : rtProps.entrySet()) {
leafQueueConfigs.set(entry.getKey(), entry.getValue());
}

Map<String, String> templateConfigs = queueContext
.getConfiguration().getConfigurationProperties()
.getPropertiesWithPrefix(configPrefix, true);

for (final Iterator<Map.Entry<String, String>> iterator =
templateConfigs.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, String> confKeyValuePair = iterator.next();
leafQueueConfigs.set(confKeyValuePair.getKey(),
confKeyValuePair.getValue());
for (Map.Entry<String, String> confKeyValuePair : templateConfigs.entrySet()) {
leafQueueConfigs.set(confKeyValuePair.getKey(), confKeyValuePair.getValue());
}

return leafQueueConfigs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public class AutoCreatedLeafQueue extends AbstractAutoCreatedLeafQueue {
public AutoCreatedLeafQueue(CapacitySchedulerQueueContext queueContext, String queueName,
ManagedParentQueue parent) throws IOException {
super(queueContext, queueName, parent, null);
super.setupQueueConfigs(queueContext.getClusterResource(), parent.getLeafQueueConfigs(queueName));
parent.setLeafQueueConfigs(queueName);
super.setupQueueConfigs(queueContext.getClusterResource());

updateCapacitiesToZero();
}
Expand All @@ -56,8 +57,8 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)

ManagedParentQueue managedParentQueue = (ManagedParentQueue) parent;

super.reinitialize(newlyParsedQueue, clusterResource, managedParentQueue
.getLeafQueueConfigs(newlyParsedQueue.getQueueShortName()));
managedParentQueue.setLeafQueueConfigs(newlyParsedQueue.getQueueShortName());
super.reinitialize(newlyParsedQueue, clusterResource);

//Reset capacities to 0 since reinitialize above
// queueCapacities to initialize to configured capacity which might
Expand Down Expand Up @@ -122,8 +123,7 @@ public void validateConfigurations(AutoCreatedLeafQueueConfig template)
}

@Override
protected void setDynamicQueueProperties(
CapacitySchedulerConfiguration configuration) {
protected void setDynamicQueueProperties() {
String parentTemplate = String.format("%s.%s", getParent().getQueuePath(),
CapacitySchedulerConfiguration
.AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX);
Expand Down
Loading