Skip to content
Closed
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
86b32bd
YARN-10930. Introduce universal capacity resource vector
9uapaw Aug 31, 2021
b935b29
YARN-10930. Cleanup for QueueCapacityVector
9uapaw Sep 21, 2021
5f3c0da
YARN-10965. Introduce enhanced queue calculation
9uapaw Sep 23, 2021
6ee8dc2
YARN-10930. Cover additional test cases
9uapaw Oct 14, 2021
ca8016c
YARN-10965. Extend
9uapaw Oct 14, 2021
4fb353c
YARN-10965. Extend 2
9uapaw Oct 19, 2021
8df3901
YARN-10930. Fix review feedbacks
9uapaw Oct 20, 2021
8c6559f
YARN-10965. Simplify calculation
9uapaw Oct 21, 2021
0b7adc8
YARN-10965. Simplify calculators
9uapaw Oct 29, 2021
2bcb962
Merge remote-tracking branch 'origin/trunk' into YARN-10965
9uapaw Oct 29, 2021
83aabd0
YARN-10965. Simplify logic in QueueHandler
9uapaw Nov 15, 2021
3dae1dc
YARN-10965. Implement strict resource iteration order
9uapaw Nov 16, 2021
e549acc
YARN-10965. Introduce driver concept to simplify the logic and encaps…
9uapaw Nov 18, 2021
6c629ea
YARN-10965. Fix root driver child queue setting
9uapaw Nov 18, 2021
27e6344
YARN-10965. Fix warnings and nits
9uapaw Nov 22, 2021
e6609f2
YARN-10965. Make Absolute calculator use remaining resource ratio as …
9uapaw Nov 22, 2021
784c1be
YARN-10965. Create node label test
9uapaw Nov 23, 2021
0793322
YARN-10965. Make rounding strategies more flexible with regards to ca…
9uapaw Nov 30, 2021
75bca46
YARN-10965. Simplify test API
9uapaw Dec 9, 2021
513555c
YARN-10965. Fix review feedbacks
9uapaw Dec 9, 2021
01c3d5b
Merge branch 'trunk' into YARN-10965
9uapaw Dec 15, 2021
ec56a1c
YARN-10965. Fix checkstyle issues
9uapaw Dec 15, 2021
4deb72a
YARN-10965. Introduce calculation iteration context
9uapaw Feb 1, 2022
044cf3b
YARN-10965. Fix review feedbacks
9uapaw Feb 2, 2022
c648a82
Merge branch 'trunk' into YARN-10965
9uapaw Feb 2, 2022
faa9a9a
YARN-10965. Remove unnecessary change in LeafQueue
9uapaw Feb 2, 2022
c16c335
YARN-10965. Fix review feedback
9uapaw Feb 17, 2022
415342d
Merge branch 'trunk' into YARN-10965
9uapaw Mar 22, 2022
785f2fa
YARN-10965. Fix checkstyle issues and nomenclature nits
9uapaw Mar 22, 2022
edbf3da
Merge branch 'trunk' into YARN-10965
9uapaw Aug 30, 2022
673a8c0
YARN-10965. Fix javac and spotbugs errors
9uapaw Aug 30, 2022
2958698
Merge branch 'apache:trunk' into YARN-10965
9uapaw Jan 25, 2023
ce2618d
Fix checkstyle
szilard-nemeth Jan 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,28 @@ public static Resource createResourceWithSameValue(long value) {
return res;
}

public static Resource multiply(Resource resource, float multiplier) {
Resource newResource = Resource.newInstance(0, 0);

for (ResourceInformation resourceInformation : resource.getResources()) {
newResource.setResourceValue(resourceInformation.getName(),
(long) Math.floor(resourceInformation.getValue() * multiplier));
}

return newResource;
}

public static Resource multiplyRound(Resource resource, float multiplier) {
Resource newResource = Resource.newInstance(0, 0);

for (ResourceInformation resourceInformation : resource.getResources()) {
newResource.setResourceValue(resourceInformation.getName(),
Math.round(resourceInformation.getValue() * multiplier));
}

return newResource;
}

@InterfaceAudience.Private
@InterfaceStability.Unstable
public static Resource createResourceFromString(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;

public class AbsoluteResourceCapacityCalculator extends AbstractQueueCapacityCalculator {

@Override
public float calculateMinimumResource(
ResourceCalculationDriver resourceCalculationDriver, String label) {
String resourceName = resourceCalculationDriver.getCurrentResourceName();
float normalizedRatio = resourceCalculationDriver.getNormalizedResourceRatios().getOrDefault(
label, ResourceVector.of(1)).getValue(resourceName);
float remainingResourceRatio = resourceCalculationDriver.getRemainingRatioOfResource(
label, resourceName);

return normalizedRatio * remainingResourceRatio * resourceCalculationDriver
.getCurrentMinimumCapacityEntry(label).getResourceValue();
}

@Override
public float calculateMaximumResource(
ResourceCalculationDriver resourceCalculationDriver, String label) {
return resourceCalculationDriver.getCurrentMaximumCapacityEntry(label).getResourceValue();
}

@Override
public void updateCapacitiesAfterCalculation(
ResourceCalculationDriver resourceCalculationDriver, String label) {
setQueueCapacities(resourceCalculationDriver.getUpdateContext().getUpdatedClusterResource(
label), resourceCalculationDriver.getCurrentChild(), label);
}

@Override
public ResourceUnitCapacityType getCapacityType() {
return ResourceUnitCapacityType.ABSOLUTE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public enum CapacityConfigType {
CapacityConfigType.NONE;

protected Map<String, QueueCapacityVector> configuredCapacityVectors;
protected Map<String, QueueCapacityVector> configuredMaxCapacityVectors;

private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
Expand Down Expand Up @@ -199,6 +200,20 @@ public float getAbsoluteCapacity() {
return queueCapacities.getAbsoluteCapacity();
}

@Override
public ResourceVector getOrCreateAbsoluteMinCapacityVector(String label) {
usageTracker.getAbsoluteMinCapacityVector().putIfAbsent(label, ResourceVector.newInstance());

return usageTracker.getAbsoluteMinCapacityVector().get(label);
}

@Override
public ResourceVector getOrCreateAbsoluteMaxCapacityVector(String label) {
usageTracker.getAbsoluteMaxCapacityVector().putIfAbsent(label, ResourceVector.newInstance());

return usageTracker.getAbsoluteMaxCapacityVector().get(label);
}

@Override
public float getAbsoluteMaximumCapacity() {
return queueCapacities.getAbsoluteMaximumCapacity();
Expand Down Expand Up @@ -373,7 +388,8 @@ protected void setupQueueConfigs(Resource clusterResource) throws
this.configuredCapacityVectors = configuration
.parseConfiguredResourceVector(queuePath.getFullPath(),
this.queueNodeLabelsSettings.getConfiguredNodeLabels());

this.configuredMaxCapacityVectors = configuration
.parseConfiguredMaximumCapacityVector(queuePath.getFullPath(), this.queueNodeLabelsSettings.getConfiguredNodeLabels(), QueueCapacityVector.newInstance());
// Update metrics
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
this, labelManager, null);
Expand Down Expand Up @@ -519,7 +535,8 @@ private void validateMinResourceIsNotGreaterThanMaxResource(Resource minResource
private void validateAbsoluteVsPercentageCapacityConfig(
CapacityConfigType localType) {
if (!queuePath.isRoot()
&& !this.capacityConfigType.equals(localType)) {
&& !this.capacityConfigType.equals(localType) &&
queueContext.getConfiguration().isLegacyQueueMode()) {
throw new IllegalArgumentException("Queue '" + getQueuePath()
+ "' should use either percentage based capacity"
+ " configuration or absolute resource.");
Expand Down Expand Up @@ -558,11 +575,25 @@ public Resource getEffectiveMaxCapacityDown(String label, Resource factor) {
}

@Override
public QueueCapacityVector getConfiguredCapacityVector(
String label) {
public QueueCapacityVector getConfiguredCapacityVector(String label) {
return configuredCapacityVectors.get(label);
}

@Override
public QueueCapacityVector getConfiguredMaxCapacityVector(String label) {
return configuredMaxCapacityVectors.get(label);
}

@Override
public void setConfiguredMinCapacityVector(String label, QueueCapacityVector minCapacityVector) {
configuredCapacityVectors.put(label, minCapacityVector);
}

@Override
public void setConfiguredMaxCapacityVector(String label, QueueCapacityVector maxCapacityVector) {
configuredMaxCapacityVectors.put(label, maxCapacityVector);
}

private void initializeQueueState() {
QueueState previousState = getState();
QueueState configuredState = queueContext.getConfiguration()
Expand Down Expand Up @@ -789,6 +820,11 @@ public ReentrantReadWriteLock.ReadLock getReadLock() {
return readLock;
}

@Override
public ReentrantReadWriteLock.WriteLock getWriteLock() {
return writeLock;
}

private Resource getCurrentLimitResource(String nodePartition,
Resource clusterResource, ResourceLimits currentResourceLimits,
SchedulingMode schedulingMode) {
Expand Down Expand Up @@ -916,6 +952,11 @@ boolean canAssignToThisQueue(Resource clusterResource,

}

@Override
public Set<String> getConfiguredNodeLabels() {
return queueNodeLabelsSettings.getConfiguredNodeLabels();
}

private static String ensurePartition(String partition) {
return Optional.ofNullable(partition).orElse(RMNodeLabelsManager.NO_LABEL);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType.PERCENTAGE;

public class AbstractLeafQueue extends AbstractCSQueue {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractLeafQueue.class);
Expand Down Expand Up @@ -1919,6 +1922,48 @@ private void updateCurrentResourceLimits(
currentResourceLimits.getLimit()));
}

@Override
public void refreshAfterResourceCalculation(Resource clusterResource, ResourceLimits resourceLimits) {
lastClusterResource = clusterResource;
// Update maximum applications for the queue and for users
updateMaximumApplications();

updateCurrentResourceLimits(resourceLimits, clusterResource);

// Update headroom info based on new cluster resource value
// absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity
// during allocation
setQueueResourceLimitsInfo(clusterResource);

// Update user consumedRatios
recalculateQueueUsageRatio(clusterResource, null);

// Update metrics
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
this, labelManager, null);
// Update configured capacity/max-capacity for default partition only
CSQueueUtils.updateConfiguredCapacityMetrics(resourceCalculator,
labelManager.getResourceByLabel(null, clusterResource),
NO_LABEL, this);

// queue metrics are updated, more resource may be available
// activate the pending applications if possible
activateApplications();

// In case of any resource change, invalidate recalculateULCount to clear
// the computed user-limit.
usersManager.userLimitNeedsRecompute();

// Update application properties
for (FiCaSchedulerApp application : orderingPolicy
.getSchedulableEntities()) {
computeUserLimitAndSetHeadroom(application, clusterResource,
NO_LABEL,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null);

}
}

@Override
public void updateClusterResource(Resource clusterResource,
ResourceLimits currentResourceLimits) {
Expand Down Expand Up @@ -2207,11 +2252,14 @@ public Map<String, TreeSet<RMContainer>> getIgnoreExclusivityRMContainers() {
}
}


public void setCapacity(float capacity) {
configuredCapacityVectors.put(NO_LABEL, QueueCapacityVector.of(capacity * 100, PERCENTAGE));
queueCapacities.setCapacity(capacity);
}

public void setCapacity(String nodeLabel, float capacity) {
configuredCapacityVectors.put(nodeLabel, QueueCapacityVector.of(capacity * 100, PERCENTAGE));
queueCapacities.setCapacity(nodeLabel, capacity);
}

Expand Down
Loading