Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void setEntitlement(String nodeLabel, QueueEntitlement entitlement)
}
setCapacity(nodeLabel, capacity);
setAbsoluteCapacity(nodeLabel,
getParent().getQueueCapacities().
this.getParent().getQueueCapacities().
getAbsoluteCapacity(nodeLabel)
* getQueueCapacities().getCapacity(nodeLabel));
// note: we currently set maxCapacity to capacity
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

import org.apache.hadoop.yarn.conf.YarnConfiguration;

public class CSQueuePreemption {
public class CSQueuePreemptionSettings {
private final boolean preemptionDisabled;
// Indicates if the in-queue preemption setting is ever disabled within the
// hierarchy of this queue.
private final boolean intraQueuePreemptionDisabledInHierarchy;

public CSQueuePreemption(
public CSQueuePreemptionSettings(
CSQueue queue,
CapacitySchedulerContext csContext,
CapacitySchedulerConfiguration configuration) {
Expand Down Expand Up @@ -109,6 +109,10 @@ private boolean isIntraQueueHierarchyPreemptionDisabled(CSQueue q,
parentQ.getIntraQueuePreemptionDisabledInHierarchy());
}

public boolean getIntraQueuePreemptionDisabled() {
return intraQueuePreemptionDisabledInHierarchy || preemptionDisabled;
}

public boolean isIntraQueuePreemptionDisabledInHierarchy() {
return intraQueuePreemptionDisabledInHierarchy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,15 +215,14 @@ protected void setupQueueConfigs(Resource clusterResource,
priorityAcls = conf.getPriorityAcls(getQueuePath(),
csContext.getMaxClusterLevelAppPriority());

if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels,
this.defaultLabelExpression, null)) {
Set<String> accessibleNodeLabels = this.queueNodeLabelsSettings.getAccessibleNodeLabels();
if (!SchedulerUtils.checkQueueLabelExpression(accessibleNodeLabels,
this.queueNodeLabelsSettings.getDefaultLabelExpression(), null)) {
throw new IOException(
"Invalid default label expression of " + " queue=" + getQueuePath()
+ " doesn't have permission to access all labels "
+ "in default label expression. labelExpression of resource request="
+ (this.defaultLabelExpression == null ?
"" :
this.defaultLabelExpression) + ". Queue labels=" + (
+ getDefaultNodeLabelExpressionStr() + ". Queue labels=" + (
getAccessibleNodeLabels() == null ?
"" :
StringUtils
Expand All @@ -238,19 +237,20 @@ protected void setupQueueConfigs(Resource clusterResource,

// re-init this since max allocation could have changed
this.minimumAllocationFactor = Resources.ratio(resourceCalculator,
Resources.subtract(maximumAllocation, minimumAllocation),
maximumAllocation);
Resources.subtract(
queueAllocationSettings.getMaximumAllocation(),
queueAllocationSettings.getMinimumAllocation()),
queueAllocationSettings.getMaximumAllocation());

StringBuilder aclsString = new StringBuilder();
for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) {
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
}

StringBuilder labelStrBuilder = new StringBuilder();
if (accessibleLabels != null) {
for (String s : accessibleLabels) {
labelStrBuilder.append(s)
.append(",");
if (accessibleNodeLabels != null) {
for (String nodeLabel : accessibleNodeLabels) {
labelStrBuilder.append(nodeLabel).append(",");
}
}

Expand Down Expand Up @@ -297,7 +297,8 @@ protected void setupQueueConfigs(Resource clusterResource,
+ "minimumAllocationFactor = " + minimumAllocationFactor
+ " [= (float)(maximumAllocationMemory - minimumAllocationMemory) / "
+ "maximumAllocationMemory ]" + "\n" + "maximumAllocation = "
+ maximumAllocation + " [= configuredMaxAllocation ]" + "\n"
+ queueAllocationSettings.getMaximumAllocation() +
" [= configuredMaxAllocation ]" + "\n"
+ "numContainers = " + usageTracker.getNumContainers()
+ " [= currentNumContainers ]" + "\n" + "state = " + getState()
+ " [= configuredState ]" + "\n" + "acls = " + aclsString
Expand All @@ -318,6 +319,11 @@ protected void setupQueueConfigs(Resource clusterResource,
}
}

private String getDefaultNodeLabelExpressionStr() {
String defaultLabelExpression = queueNodeLabelsSettings.getDefaultLabelExpression();
return defaultLabelExpression == null ? "" : defaultLabelExpression;
}

/**
* Used only by tests.
*/
Expand Down Expand Up @@ -602,7 +608,7 @@ public void submitApplicationAttempt(FiCaSchedulerApp application,
usageTracker.getMetrics().submitAppAttempt(userName, unmanagedAM);
}

getParent().submitApplicationAttempt(application, userName);
parent.submitApplicationAttempt(application, userName);
}

@Override
Expand All @@ -616,10 +622,10 @@ public void submitApplication(ApplicationId applicationId, String userName,

// Inform the parent queue
try {
getParent().submitApplication(applicationId, userName, queue);
parent.submitApplication(applicationId, userName, queue);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application to parent-queue: " +
getParent().getQueuePath(), ace);
parent.getQueuePath(), ace);
throw ace;
}

Expand Down Expand Up @@ -664,10 +670,10 @@ public void validateSubmitApplication(ApplicationId applicationId,
}

try {
getParent().validateSubmitApplication(applicationId, userName, queue);
parent.validateSubmitApplication(applicationId, userName, queue);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application to parent-queue: " +
getParent().getQueuePath(), ace);
parent.getQueuePath(), ace);
throw ace;
}
}
Expand Down Expand Up @@ -714,6 +720,8 @@ public Resource getUserAMResourceLimitPerPartition(

Resource queuePartitionResource = getEffectiveCapacity(nodePartition);

Resource minimumAllocation = queueAllocationSettings.getMinimumAllocation();

Resource userAMLimit = Resources.multiplyAndNormalizeUp(
resourceCalculator, queuePartitionResource,
queueCapacities.getMaxAMResourcePercentage(nodePartition)
Expand Down Expand Up @@ -800,7 +808,7 @@ public Resource calculateAndGetAMResourceLimitPerPartition(

Resource amResouceLimit = Resources.multiplyAndNormalizeUp(
resourceCalculator, queuePartitionUsableResource, amResourcePercent,
minimumAllocation);
queueAllocationSettings.getMinimumAllocation());

usageTracker.getMetrics().setAMResouceLimit(nodePartition, amResouceLimit);
usageTracker.getQueueUsage().setAMLimit(nodePartition, amResouceLimit);
Expand Down Expand Up @@ -987,14 +995,14 @@ public void finishApplication(ApplicationId application, String user) {
appFinished();

// Inform the parent queue
getParent().finishApplication(application, user);
parent.finishApplication(application, user);
}

@Override
public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) {
// Careful! Locking order is important!
removeApplicationAttempt(application, application.getUser());
getParent().finishApplicationAttempt(application, queue);
parent.finishApplicationAttempt(application, queue);
}

private void removeApplicationAttempt(
Expand Down Expand Up @@ -1165,9 +1173,9 @@ public CSAssignment assignContainers(Resource clusterResource,

// if our queue cannot access this node, just return
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
&& !accessibleToPartition(candidates.getPartition())) {
&& !queueNodeLabelsSettings.isAccessibleToPartition(candidates.getPartition())) {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParent().getQueuePath(), getQueuePath(), ActivityState.REJECTED,
parent.getQueuePath(), getQueuePath(), ActivityState.REJECTED,
ActivityDiagnosticConstant.QUEUE_NOT_ABLE_TO_ACCESS_PARTITION);
return CSAssignment.NULL_ASSIGNMENT;
}
Expand All @@ -1183,7 +1191,7 @@ public CSAssignment assignContainers(Resource clusterResource,
.getPartition());
}
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParent().getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
parent.getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
return CSAssignment.NULL_ASSIGNMENT;
}
Expand Down Expand Up @@ -1211,7 +1219,7 @@ public CSAssignment assignContainers(Resource clusterResource,
activitiesManager, node, application, application.getPriority(),
ActivityDiagnosticConstant.QUEUE_HIT_MAX_CAPACITY_LIMIT);
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParent().getQueuePath(), getQueuePath(),
parent.getQueuePath(), getQueuePath(),
ActivityState.REJECTED,
ActivityDiagnosticConstant.QUEUE_HIT_MAX_CAPACITY_LIMIT);
return CSAssignment.NULL_ASSIGNMENT;
Expand Down Expand Up @@ -1280,7 +1288,7 @@ public CSAssignment assignContainers(Resource clusterResource,
if (Resources.greaterThan(resourceCalculator, clusterResource, assigned,
Resources.none())) {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParent().getQueuePath(), getQueuePath(),
parent.getQueuePath(), getQueuePath(),
ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
return assignment;
} else if (assignment.getSkippedType()
Expand All @@ -1292,15 +1300,15 @@ public CSAssignment assignContainers(Resource clusterResource,
} else if (assignment.getSkippedType()
== CSAssignment.SkippedType.QUEUE_LIMIT) {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParent().getQueuePath(), getQueuePath(), ActivityState.REJECTED,
parent.getQueuePath(), getQueuePath(), ActivityState.REJECTED,
() -> ActivityDiagnosticConstant.QUEUE_DO_NOT_HAVE_ENOUGH_HEADROOM
+ " from " + application.getApplicationId());
return assignment;
} else{
// If we don't allocate anything, and it is not skipped by application,
// we will return to respect FIFO of applications
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParent().getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
parent.getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
ActivityDiagnosticConstant.QUEUE_SKIPPED_TO_RESPECT_FIFO);
ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
activitiesManager, application.getApplicationId(),
Expand All @@ -1309,7 +1317,7 @@ public CSAssignment assignContainers(Resource clusterResource,
}
}
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParent().getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
parent.getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
ActivityDiagnosticConstant.EMPTY);

return CSAssignment.NULL_ASSIGNMENT;
Expand Down Expand Up @@ -1516,7 +1524,8 @@ private Resource getHeadroom(User user,
usageTracker.getQueueUsage().getUsed(partition)));
// Normalize it before return
headroom =
Resources.roundDown(resourceCalculator, headroom, minimumAllocation);
Resources.roundDown(resourceCalculator, headroom,
queueAllocationSettings.getMinimumAllocation());

//headroom = min (unused resourcelimit of a label, calculated headroom )
Resource clusterPartitionResource =
Expand Down Expand Up @@ -1795,7 +1804,7 @@ public void completedContainer(Resource clusterResource,

if (removed) {
// Inform the parent queue _outside_ of the leaf-queue lock
getParent().completedContainer(clusterResource, application, node,
parent.completedContainer(clusterResource, application, node,
rmContainer, null, event, this, sortQueues);
}
}
Expand Down Expand Up @@ -1922,7 +1931,7 @@ private void updateCurrentResourceLimits(
this.cachedResourceLimitsForHeadroom =
new ResourceLimits(currentResourceLimits.getLimit());
Resource queueMaxResource = getEffectiveMaxCapacityDown(
RMNodeLabelsManager.NO_LABEL, minimumAllocation);
RMNodeLabelsManager.NO_LABEL, queueAllocationSettings.getMinimumAllocation());
this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(
resourceCalculator, clusterResource, queueMaxResource,
currentResourceLimits.getLimit()));
Expand Down Expand Up @@ -2033,7 +2042,7 @@ public void recoverContainer(Resource clusterResource,
writeLock.unlock();
}

getParent().recoverContainer(clusterResource, attempt, rmContainer);
parent.recoverContainer(clusterResource, attempt, rmContainer);
}

/**
Expand Down Expand Up @@ -2161,7 +2170,7 @@ public void attachContainer(Resource clusterResource,
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
+ usageTracker.getQueueUsage().getUsed() + " cluster=" + clusterResource);
// Inform the parent queue
getParent().attachContainer(clusterResource, application, rmContainer);
parent.attachContainer(clusterResource, application, rmContainer);
}
}

Expand All @@ -2181,7 +2190,7 @@ public void detachContainer(Resource clusterResource,
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
+ usageTracker.getQueueUsage().getUsed() + " cluster=" + clusterResource);
// Inform the parent queue
getParent().detachContainer(clusterResource, application, rmContainer);
parent.detachContainer(clusterResource, application, rmContainer);
}
}

Expand Down Expand Up @@ -2341,7 +2350,7 @@ void updateMaximumApplications(CapacitySchedulerConfiguration conf) {
!= CapacityConfigType.ABSOLUTE_RESOURCE) {
maxAppsForQueue = baseMaxApplications;
} else {
for (String label : configuredNodeLabels) {
for (String label : queueNodeLabelsSettings.getConfiguredNodeLabels()) {
int maxApplicationsByLabel = (int) (baseMaxApplications
* queueCapacities.getAbsoluteCapacity(label));
if (maxApplicationsByLabel > maxAppsForQueue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,9 @@ protected void setupQueueConfigs(Resource clusterResource,
}

StringBuilder labelStrBuilder = new StringBuilder();
if (accessibleLabels != null) {
for (String s : accessibleLabels) {
labelStrBuilder.append(s)
.append(",");
if (queueNodeLabelsSettings.getAccessibleNodeLabels() != null) {
for (String nodeLabel : queueNodeLabelsSettings.getAccessibleNodeLabels()) {
labelStrBuilder.append(nodeLabel).append(",");
}
}

Expand Down Expand Up @@ -757,7 +756,7 @@ public void submitApplication(ApplicationId applicationId, String user,
try {
parent.submitApplication(applicationId, user, queue);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application to parent-queue: " +
LOG.info("Failed to submit application to parent-queue: " +
parent.getQueuePath(), ace);
removeApplication(applicationId, user);
throw ace;
Expand Down Expand Up @@ -846,7 +845,7 @@ private void removeApplication(ApplicationId applicationId,
}

private String getParentName() {
return getParent() != null ? getParent().getQueuePath() : "";
return parent != null ? parent.getQueuePath() : "";
}

@Override
Expand All @@ -857,7 +856,7 @@ public CSAssignment assignContainers(Resource clusterResource,

// if our queue cannot access this node, just return
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
&& !accessibleToPartition(candidates.getPartition())) {
&& !queueNodeLabelsSettings.isAccessibleToPartition(candidates.getPartition())) {
if (LOG.isDebugEnabled()) {
long now = System.currentTimeMillis();
// Do logging every 1 sec to avoid excessive logging.
Expand Down Expand Up @@ -1038,7 +1037,7 @@ private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
// 1) Node doesn't have reserved container
// 2) Node's available-resource + killable-resource should > 0
boolean accept = node.getReservedContainer() == null &&
Resources.fitsIn(resourceCalculator, minimumAllocation,
Resources.fitsIn(resourceCalculator, queueAllocationSettings.getMinimumAllocation(),
Resources.add(node.getUnallocatedResource(), node.getTotalKillableResources()));
if (!accept) {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
Expand Down Expand Up @@ -1085,7 +1084,8 @@ private ResourceLimits getResourceLimitsOfChild(CSQueue child,

// Normalize before return
childLimit =
Resources.roundDown(resourceCalculator, childLimit, minimumAllocation);
Resources.roundDown(resourceCalculator, childLimit,
queueAllocationSettings.getMinimumAllocation());

return new ResourceLimits(childLimit);
}
Expand Down Expand Up @@ -1270,7 +1270,7 @@ public void updateClusterResource(Resource clusterResource,
}

// Update effective capacity in all parent queue.
for (String label : configuredNodeLabels) {
for (String label : queueNodeLabelsSettings.getConfiguredNodeLabels()) {
calculateEffectiveResourcesAndCapacity(label, clusterResource);
}

Expand Down
Loading