Skip to content
Closed
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
86b32bd
YARN-10930. Introduce universal capacity resource vector
9uapaw Aug 31, 2021
b935b29
YARN-10930. Cleanup for QueueCapacityVector
9uapaw Sep 21, 2021
5f3c0da
YARN-10965. Introduce enhanced queue calculation
9uapaw Sep 23, 2021
6ee8dc2
YARN-10930. Cover additional test cases
9uapaw Oct 14, 2021
ca8016c
YARN-10965. Extend
9uapaw Oct 14, 2021
4fb353c
YARN-10965. Extend 2
9uapaw Oct 19, 2021
8df3901
YARN-10930. Fix review feedbacks
9uapaw Oct 20, 2021
8c6559f
YARN-10965. Simplify calculation
9uapaw Oct 21, 2021
0b7adc8
YARN-10965. Simplify calculators
9uapaw Oct 29, 2021
2bcb962
Merge remote-tracking branch 'origin/trunk' into YARN-10965
9uapaw Oct 29, 2021
83aabd0
YARN-10965. Simplify logic in QueueHandler
9uapaw Nov 15, 2021
3dae1dc
YARN-10965. Implement strict resource iteration order
9uapaw Nov 16, 2021
e549acc
YARN-10965. Introduce driver concept to simplify the logic and encaps…
9uapaw Nov 18, 2021
6c629ea
YARN-10965. Fix root driver child queue setting
9uapaw Nov 18, 2021
27e6344
YARN-10965. Fix warnings and nits
9uapaw Nov 22, 2021
e6609f2
YARN-10965. Make Absolute calculator use remaining resource ratio as …
9uapaw Nov 22, 2021
784c1be
YARN-10965. Create node label test
9uapaw Nov 23, 2021
0793322
YARN-10965. Make rounding strategies more flexible with regards to ca…
9uapaw Nov 30, 2021
75bca46
YARN-10965. Simplify test API
9uapaw Dec 9, 2021
513555c
YARN-10965. Fix review feedbacks
9uapaw Dec 9, 2021
01c3d5b
Merge branch 'trunk' into YARN-10965
9uapaw Dec 15, 2021
ec56a1c
YARN-10965. Fix checkstyle issues
9uapaw Dec 15, 2021
4deb72a
YARN-10965. Introduce calculation iteration context
9uapaw Feb 1, 2022
044cf3b
YARN-10965. Fix review feedbacks
9uapaw Feb 2, 2022
c648a82
Merge branch 'trunk' into YARN-10965
9uapaw Feb 2, 2022
faa9a9a
YARN-10965. Remove unnecessary change in LeafQueue
9uapaw Feb 2, 2022
c16c335
YARN-10965. Fix review feedback
9uapaw Feb 17, 2022
415342d
Merge branch 'trunk' into YARN-10965
9uapaw Mar 22, 2022
785f2fa
YARN-10965. Fix checkstyle issues and nomenclature nits
9uapaw Mar 22, 2022
edbf3da
Merge branch 'trunk' into YARN-10965
9uapaw Aug 30, 2022
673a8c0
YARN-10965. Fix javac and spotbugs errors
9uapaw Aug 30, 2022
2958698
Merge branch 'apache:trunk' into YARN-10965
9uapaw Jan 25, 2023
ce2618d
Fix checkstyle
szilard-nemeth Jan 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,28 @@ public static Resource createResourceWithSameValue(long value) {
return res;
}

public static Resource multiplyFloor(Resource resource, double multiplier) {
Resource newResource = Resource.newInstance(0, 0);

for (ResourceInformation resourceInformation : resource.getResources()) {
newResource.setResourceValue(resourceInformation.getName(),
(long) Math.floor(resourceInformation.getValue() * multiplier));
}

return newResource;
}

public static Resource multiplyRound(Resource resource, double multiplier) {
Resource newResource = Resource.newInstance(0, 0);

for (ResourceInformation resourceInformation : resource.getResources()) {
newResource.setResourceValue(resourceInformation.getName(),
Math.round(resourceInformation.getValue() * multiplier));
}

return newResource;
}

@InterfaceAudience.Private
@InterfaceStability.Unstable
public static Resource createResourceFromString(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;

import java.util.Map;

import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueUpdateWarning.QueueUpdateWarningType.BRANCH_DOWNSCALED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ResourceCalculationDriver.MB_UNIT;

public class AbsoluteResourceCapacityCalculator extends AbstractQueueCapacityCalculator {

@Override
public void calculateResourcePrerequisites(ResourceCalculationDriver resourceCalculationDriver) {
setNormalizedResourceRatio(resourceCalculationDriver);
}

@Override
public double calculateMinimumResource(
ResourceCalculationDriver resourceCalculationDriver, CalculationContext context, String label) {
String resourceName = context.getResourceName();
double normalizedRatio = resourceCalculationDriver.getNormalizedResourceRatios().getOrDefault(
label, ResourceVector.of(1)).getValue(resourceName);
double remainingResourceRatio = resourceCalculationDriver.getRemainingRatioOfResource(
label, resourceName);

return normalizedRatio * remainingResourceRatio * context.getCurrentMinimumCapacityEntry(
label).getResourceValue();
}

@Override
public double calculateMaximumResource(
ResourceCalculationDriver resourceCalculationDriver, CalculationContext context, String label) {
return context.getCurrentMaximumCapacityEntry(label).getResourceValue();
}

@Override
public void updateCapacitiesAfterCalculation(
ResourceCalculationDriver resourceCalculationDriver, CSQueue queue, String label) {
CapacitySchedulerQueueCapacityHandler.setQueueCapacities(
resourceCalculationDriver.getUpdateContext().getUpdatedClusterResource(label), queue, label);
}

@Override
public ResourceUnitCapacityType getCapacityType() {
return ResourceUnitCapacityType.ABSOLUTE;
}

/**
* Calculates the normalized resource ratio of a parent queue, under which children are defined
* with absolute capacity type. If the effective resource of the parent is less, than the
* aggregated configured absolute resource of its children, the resource ratio will be less,
* than 1.
*
* @param calculationDriver the driver, which contains the parent queue that will form the base
* of the normalization calculation
*/
public static void setNormalizedResourceRatio(ResourceCalculationDriver calculationDriver) {
CSQueue queue = calculationDriver.getQueue();

for (String label : queue.getConfiguredNodeLabels()) {
// ManagedParents assign zero capacity to queues in case of overutilization, downscaling is
// turned off for their children
if (queue instanceof ManagedParentQueue) {
return;
}

for (String resourceName : queue.getConfiguredCapacityVector(label).getResourceNames()) {
long childrenConfiguredResource = 0;
long effectiveMinResource = queue.getQueueResourceQuotas().getEffectiveMinResource(
label).getResourceValue(resourceName);

// Total configured min resources of direct children of the queue
for (CSQueue childQueue : queue.getChildQueues()) {
if (!childQueue.getConfiguredNodeLabels().contains(label)) {
continue;
}
QueueCapacityVector capacityVector = childQueue.getConfiguredCapacityVector(label);
if (capacityVector.isResourceOfType(resourceName, ResourceUnitCapacityType.ABSOLUTE)) {
childrenConfiguredResource += capacityVector.getResource(resourceName)
.getResourceValue();
}
}
// If no children is using ABSOLUTE capacity type, normalization is not needed
if (childrenConfiguredResource == 0) {
continue;
}
// Factor to scale down effective resource: When cluster has sufficient
// resources, effective_min_resources will be same as configured
// min_resources.
float numeratorForMinRatio = childrenConfiguredResource;
if (effectiveMinResource < childrenConfiguredResource) {
numeratorForMinRatio = queue.getQueueResourceQuotas().getEffectiveMinResource(label)
.getResourceValue(resourceName);
calculationDriver.getUpdateContext().addUpdateWarning(BRANCH_DOWNSCALED.ofQueue(
queue.getQueuePath()));
}

String unit = resourceName.equals(MEMORY_URI) ? MB_UNIT : "";
long convertedValue = UnitsConversionUtil.convert(unit, calculationDriver.getUpdateContext()
.getUpdatedClusterResource(label).getResourceInformation(resourceName).getUnits(),
childrenConfiguredResource);

if (convertedValue != 0) {
Map<String, ResourceVector> normalizedResourceRatios =
calculationDriver.getNormalizedResourceRatios();
normalizedResourceRatios.putIfAbsent(label, ResourceVector.newInstance());
normalizedResourceRatios.get(label).setValue(resourceName, numeratorForMinRatio /
convertedValue);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public enum CapacityConfigType {
CapacityConfigType.NONE;

protected Map<String, QueueCapacityVector> configuredCapacityVectors;
protected Map<String, QueueCapacityVector> configuredMaxCapacityVectors;

private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
Expand Down Expand Up @@ -381,7 +382,8 @@ protected void setupQueueConfigs(Resource clusterResource) throws
this.configuredCapacityVectors = configuration
.parseConfiguredResourceVector(queuePath.getFullPath(),
this.queueNodeLabelsSettings.getConfiguredNodeLabels());

this.configuredMaxCapacityVectors = configuration
.parseConfiguredMaximumCapacityVector(queuePath.getFullPath(), this.queueNodeLabelsSettings.getConfiguredNodeLabels(), QueueCapacityVector.newInstance());
// Update metrics
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
this, labelManager, null);
Expand Down Expand Up @@ -535,7 +537,8 @@ private void validateMinResourceIsNotGreaterThanMaxResource(Resource minResource
private void validateAbsoluteVsPercentageCapacityConfig(
CapacityConfigType localType) {
if (!queuePath.isRoot()
&& !this.capacityConfigType.equals(localType)) {
&& !this.capacityConfigType.equals(localType) &&
queueContext.getConfiguration().isLegacyQueueMode()) {
throw new IllegalArgumentException("Queue '" + getQueuePath()
+ "' should use either percentage based capacity"
+ " configuration or absolute resource.");
Expand Down Expand Up @@ -574,11 +577,25 @@ public Resource getEffectiveMaxCapacityDown(String label, Resource factor) {
}

@Override
public QueueCapacityVector getConfiguredCapacityVector(
String label) {
public QueueCapacityVector getConfiguredCapacityVector(String label) {
return configuredCapacityVectors.get(label);
}

@Override
public QueueCapacityVector getConfiguredMaxCapacityVector(String label) {
return configuredMaxCapacityVectors.get(label);
}

@Override
public void setConfiguredMinCapacityVector(String label, QueueCapacityVector minCapacityVector) {
configuredCapacityVectors.put(label, minCapacityVector);
}

@Override
public void setConfiguredMaxCapacityVector(String label, QueueCapacityVector maxCapacityVector) {
configuredMaxCapacityVectors.put(label, maxCapacityVector);
}

protected QueueInfo getQueueInfo() {
// Deliberately doesn't use lock here, because this method will be invoked
// from schedulerApplicationAttempt, to avoid deadlock, sacrifice
Expand Down Expand Up @@ -769,6 +786,11 @@ public ReentrantReadWriteLock.ReadLock getReadLock() {
return readLock;
}

@Override
public ReentrantReadWriteLock.WriteLock getWriteLock() {
return writeLock;
}

private Resource getCurrentLimitResource(String nodePartition,
Resource clusterResource, ResourceLimits currentResourceLimits,
SchedulingMode schedulingMode) {
Expand Down Expand Up @@ -905,6 +927,11 @@ boolean canAssignToThisQueue(Resource clusterResource,

}

@Override
public Set<String> getConfiguredNodeLabels() {
return queueNodeLabelsSettings.getConfiguredNodeLabels();
}

private static String ensurePartition(String partition) {
return Optional.ofNullable(partition).orElse(NO_LABEL);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.getACLsForFlexibleAutoCreatedLeafQueue;

import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType.PERCENTAGE;

public class AbstractLeafQueue extends AbstractCSQueue {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractLeafQueue.class);
Expand Down Expand Up @@ -164,7 +167,7 @@ public AbstractLeafQueue(CapacitySchedulerQueueContext queueContext,
resourceCalculator);

// One time initialization is enough since it is static ordering policy
this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps<>();
}

@SuppressWarnings("checkstyle:nowhitespaceafter")
Expand Down Expand Up @@ -1936,6 +1939,48 @@ private void updateCurrentResourceLimits(
currentResourceLimits.getLimit()));
}

@Override
public void refreshAfterResourceCalculation(Resource clusterResource, ResourceLimits resourceLimits) {
lastClusterResource = clusterResource;
// Update maximum applications for the queue and for users
updateMaximumApplications();

updateCurrentResourceLimits(resourceLimits, clusterResource);

// Update headroom info based on new cluster resource value
// absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity
// during allocation
setQueueResourceLimitsInfo(clusterResource);

// Update user consumedRatios
recalculateQueueUsageRatio(clusterResource, null);

// Update metrics
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
this, labelManager, null);
// Update configured capacity/max-capacity for default partition only
CSQueueUtils.updateConfiguredCapacityMetrics(resourceCalculator,
labelManager.getResourceByLabel(null, clusterResource),
NO_LABEL, this);

// queue metrics are updated, more resource may be available
// activate the pending applications if possible
activateApplications();

// In case of any resource change, invalidate recalculateULCount to clear
// the computed user-limit.
usersManager.userLimitNeedsRecompute();

// Update application properties
for (FiCaSchedulerApp application : orderingPolicy
.getSchedulableEntities()) {
computeUserLimitAndSetHeadroom(application, clusterResource,
NO_LABEL,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null);

}
}

@Override
public void updateClusterResource(Resource clusterResource,
ResourceLimits currentResourceLimits) {
Expand Down Expand Up @@ -2225,10 +2270,12 @@ public Map<String, TreeSet<RMContainer>> getIgnoreExclusivityRMContainers() {
}

public void setCapacity(float capacity) {
configuredCapacityVectors.put(NO_LABEL, QueueCapacityVector.of(capacity * 100, PERCENTAGE));
queueCapacities.setCapacity(capacity);
}

public void setCapacity(String nodeLabel, float capacity) {
configuredCapacityVectors.put(nodeLabel, QueueCapacityVector.of(capacity * 100, PERCENTAGE));
queueCapacities.setCapacity(nodeLabel, capacity);
}

Expand Down
Loading