Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
Expand Down Expand Up @@ -181,9 +180,9 @@ public String getName() {

private static void setupQueueConfiguration(
CapacitySchedulerConfiguration config) {
config.setQueues(new QueuePath(CapacitySchedulerConfiguration.ROOT),
config.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[]{"testqueue"});
QueuePath a = new QueuePath(CapacitySchedulerConfiguration.ROOT + ".testqueue");
String a = CapacitySchedulerConfiguration.ROOT + ".testqueue";
config.setCapacity(a, 100f);
config.setMaximumCapacity(a, 100f);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
Expand Down Expand Up @@ -251,13 +250,13 @@ public void testCallRM() {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();

final QueuePath a = new QueuePath(CapacitySchedulerConfiguration.ROOT + ".a");
final QueuePath b = new QueuePath(CapacitySchedulerConfiguration.ROOT + ".b");
final QueuePath a1 = new QueuePath(a + ".a1");
final QueuePath a2 = new QueuePath(a + ".a2");
final QueuePath b1 = new QueuePath(b + ".b1");
final QueuePath b2 = new QueuePath(b + ".b2");
final QueuePath b3 = new QueuePath(b + ".b3");
final String a = CapacitySchedulerConfiguration.ROOT + ".a";
final String b = CapacitySchedulerConfiguration.ROOT + ".b";
final String a1 = a + ".a1";
final String a2 = a + ".a2";
final String b1 = b + ".b1";
final String b2 = b + ".b2";
final String b3 = b + ".b3";
float aCapacity = 10.5f;
float bCapacity = 89.5f;
float a1Capacity = 30;
Expand All @@ -267,7 +266,7 @@ public void testCallRM() {
float b3Capacity = 20;

// Define top-level queues
csConf.setQueues(new QueuePath(CapacitySchedulerConfiguration.ROOT),
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] {"a", "b"});

csConf.setCapacity(a, aCapacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.CapacityReservationsACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.FairReservationsACLsManager;
Expand Down Expand Up @@ -431,7 +430,7 @@ protected Plan initializePlan(String planQueueName) throws YarnException {
Plan plan = new InMemoryPlan(getRootQueueMetrics(), adPolicy,
getAgent(planQueuePath), totCap, planStepSize, rescCalc, minAllocation,
maxAllocation, planQueueName, getReplanner(planQueuePath),
getReservationSchedulerConfiguration().getMoveOnExpiry(new QueuePath(planQueuePath)),
getReservationSchedulerConfiguration().getMoveOnExpiry(planQueuePath),
maxPeriodicity, rmContext);
LOG.info("Initialized plan {} based on reservable queue {}",
plan.toString(), planQueueName);
Expand All @@ -441,7 +440,7 @@ maxAllocation, planQueueName, getReplanner(planQueuePath),
protected Planner getReplanner(String planQueueName) {
ReservationSchedulerConfiguration reservationConfig =
getReservationSchedulerConfiguration();
String plannerClassName = reservationConfig.getReplanner(new QueuePath(planQueueName));
String plannerClassName = reservationConfig.getReplanner(planQueueName);
LOG.info("Using Replanner: " + plannerClassName + " for queue: "
+ planQueueName);
try {
Expand All @@ -464,7 +463,7 @@ protected Planner getReplanner(String planQueueName) {
protected ReservationAgent getAgent(String queueName) {
ReservationSchedulerConfiguration reservationConfig =
getReservationSchedulerConfiguration();
String agentClassName = reservationConfig.getReservationAgent(new QueuePath(queueName));
String agentClassName = reservationConfig.getReservationAgent(queueName);
LOG.info("Using Agent: " + agentClassName + " for queue: " + queueName);
try {
Class<?> agentClazz = conf.getClassByName(agentClassName);
Expand All @@ -488,7 +487,7 @@ protected SharingPolicy getAdmissionPolicy(String queueName) {
ReservationSchedulerConfiguration reservationConfig =
getReservationSchedulerConfiguration();
String admissionPolicyClassName =
reservationConfig.getReservationAdmissionPolicy(new QueuePath(queueName));
reservationConfig.getReservationAdmissionPolicy(queueName);
LOG.info("Using AdmissionPolicy: " + admissionPolicyClassName
+ " for queue: " + queueName);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.apache.hadoop.yarn.util.resource.Resources;

import java.util.Map;
Expand Down Expand Up @@ -58,10 +57,9 @@ public class CapacityOverTimePolicy extends NoOverCommitPolicy {
private float maxAvg;

@Override
public void init(String reservationQueue,
public void init(String reservationQueuePath,
ReservationSchedulerConfiguration conf) {
this.conf = conf;
QueuePath reservationQueuePath = new QueuePath(reservationQueue);
validWindow = this.conf.getReservationWindow(reservationQueuePath);
maxInst = this.conf.getInstantaneousMaxCapacity(reservationQueuePath) / 100;
maxAvg = this.conf.getAverageCapacity(reservationQueuePath) / 100;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ReservationACL;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;

import java.util.Map;

Expand Down Expand Up @@ -70,7 +69,7 @@ public ReservationSchedulerConfiguration(
* @param queue name of the queue
* @return true if the queue participates in reservation based scheduling
*/
public abstract boolean isReservable(QueuePath queue);
public abstract boolean isReservable(String queue);

/**
* Gets a map containing the {@link AccessControlList} of users for each
Expand All @@ -81,7 +80,7 @@ public ReservationSchedulerConfiguration(
* which contains a list of users that have the specified permission level.
*/
public abstract Map<ReservationACL, AccessControlList> getReservationAcls(
QueuePath queue);
String queue);

/**
* Gets the length of time in milliseconds for which the {@link SharingPolicy}
Expand All @@ -90,7 +89,7 @@ public abstract Map<ReservationACL, AccessControlList> getReservationAcls(
* @return length in time in milliseconds for which to check the
* {@link SharingPolicy}
*/
public long getReservationWindow(QueuePath queue) {
public long getReservationWindow(String queue) {
return DEFAULT_RESERVATION_WINDOW;
}

Expand All @@ -101,7 +100,7 @@ public long getReservationWindow(QueuePath queue) {
* @param queue name of the queue
* @return average capacity allowed by the {@link SharingPolicy}
*/
public float getAverageCapacity(QueuePath queue) {
public float getAverageCapacity(String queue) {
return DEFAULT_CAPACITY_OVER_TIME_MULTIPLIER;
}

Expand All @@ -110,7 +109,7 @@ public float getAverageCapacity(QueuePath queue) {
* @param queue name of the queue
* @return maximum allowed capacity at any time
*/
public float getInstantaneousMaxCapacity(QueuePath queue) {
public float getInstantaneousMaxCapacity(String queue) {
return DEFAULT_CAPACITY_OVER_TIME_MULTIPLIER;
}

Expand All @@ -119,7 +118,7 @@ public float getInstantaneousMaxCapacity(QueuePath queue) {
* @param queue name of the queue
* @return the class name of the {@link SharingPolicy}
*/
public String getReservationAdmissionPolicy(QueuePath queue) {
public String getReservationAdmissionPolicy(String queue) {
return DEFAULT_RESERVATION_ADMISSION_POLICY;
}

Expand All @@ -129,7 +128,7 @@ public String getReservationAdmissionPolicy(QueuePath queue) {
* @param queue name of the queue
* @return the class name of the {@code ReservationAgent}
*/
public String getReservationAgent(QueuePath queue) {
public String getReservationAgent(String queue) {
return DEFAULT_RESERVATION_AGENT_NAME;
}

Expand All @@ -138,7 +137,7 @@ public String getReservationAgent(QueuePath queue) {
* @param queuePath name of the queue
* @return true if reservation queues should be visible
*/
public boolean getShowReservationAsQueues(QueuePath queuePath) {
public boolean getShowReservationAsQueues(String queuePath) {
return DEFAULT_SHOW_RESERVATIONS_AS_QUEUES;
}

Expand All @@ -148,7 +147,7 @@ public boolean getShowReservationAsQueues(QueuePath queuePath) {
* @param queue name of the queue
* @return the class name of the {@code Planner}
*/
public String getReplanner(QueuePath queue) {
public String getReplanner(String queue) {
return DEFAULT_RESERVATION_PLANNER_NAME;
}

Expand All @@ -159,7 +158,7 @@ public String getReplanner(QueuePath queue) {
* @return true if application should be moved, false if they need to be
* killed
*/
public boolean getMoveOnExpiry(QueuePath queue) {
public boolean getMoveOnExpiry(String queue) {
return DEFAULT_RESERVATION_MOVE_ON_EXPIRY;
}

Expand All @@ -169,7 +168,7 @@ public boolean getMoveOnExpiry(QueuePath queue) {
* @param queue name of the queue
* @return the time in milliseconds for which to check constraints
*/
public long getEnforcementWindow(QueuePath queue) {
public long getEnforcementWindow(String queue) {
return DEFAULT_RESERVATION_ENFORCEMENT_WINDOW;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Set;
import java.util.TreeSet;

import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
Expand Down Expand Up @@ -73,7 +72,7 @@ public SimpleCapacityReplanner() {
@Override
public void init(String planQueueName,
ReservationSchedulerConfiguration conf) {
this.lengthOfCheckZone = conf.getEnforcementWindow(new QueuePath(planQueueName));
this.lengthOfCheckZone = conf.getEnforcementWindow(planQueueName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,26 +61,26 @@ public void setEntitlement(QueueEntitlement entitlement)
}

@Override
protected Resource getMinimumAbsoluteResource(QueuePath queuePath,
protected Resource getMinimumAbsoluteResource(String queuePath,
String label) {
return super.getMinimumAbsoluteResource(QueuePrefixes
.getAutoCreatedQueueObjectTemplateConfPrefix(this.getParent().getQueuePathObject()),
return super.getMinimumAbsoluteResource(queueContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(this.getParent().getQueuePath()),
label);
}

@Override
protected Resource getMaximumAbsoluteResource(QueuePath queuePath,
protected Resource getMaximumAbsoluteResource(String queuePath,
String label) {
return super.getMaximumAbsoluteResource(QueuePrefixes
.getAutoCreatedQueueObjectTemplateConfPrefix(this.getParent().getQueuePathObject()),
return super.getMaximumAbsoluteResource(queueContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(this.getParent().getQueuePath()),
label);
}

@Override
protected boolean checkConfigTypeIsAbsoluteResource(QueuePath queuePath,
protected boolean checkConfigTypeIsAbsoluteResource(String queuePath,
String label) {
return super.checkConfigTypeIsAbsoluteResource(QueuePrefixes
.getAutoCreatedQueueObjectTemplateConfPrefix(this.getParent().getQueuePathObject()),
return super.checkConfigTypeIsAbsoluteResource(queueContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(this.getParent().getQueuePath()),
label);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ protected void setupQueueConfigs(Resource clusterResource) throws
writeLock.lock();
try {
CapacitySchedulerConfiguration configuration = queueContext.getConfiguration();
this.acls = configuration.getAcls(getQueuePathObject());
this.acls = configuration.getAcls(getQueuePath());

if (isDynamicQueue() || this instanceof AbstractAutoCreatedLeafQueue) {
parseAndSetDynamicTemplates();
Expand All @@ -367,7 +367,7 @@ protected void setupQueueConfigs(Resource clusterResource) throws

// Setup queue's maximumAllocation respecting the global
// and the queue settings
this.queueAllocationSettings.setupMaximumAllocation(configuration, getQueuePathObject(),
this.queueAllocationSettings.setupMaximumAllocation(configuration, getQueuePath(),
parent);

// Initialize the queue state based on previous state, configured state
Expand All @@ -382,10 +382,10 @@ protected void setupQueueConfigs(Resource clusterResource) throws
configuration.getReservationContinueLook();

this.configuredCapacityVectors = configuration
.parseConfiguredResourceVector(queuePath,
.parseConfiguredResourceVector(queuePath.getFullPath(),
this.queueNodeLabelsSettings.getConfiguredNodeLabels());
this.configuredMaxCapacityVectors = configuration
.parseConfiguredMaximumCapacityVector(queuePath,
.parseConfiguredMaximumCapacityVector(queuePath.getFullPath(),
this.queueNodeLabelsSettings.getConfiguredNodeLabels(),
QueueCapacityVector.newInstance());

Expand Down Expand Up @@ -420,11 +420,11 @@ protected void setupQueueConfigs(Resource clusterResource) throws
// Store preemption settings
this.preemptionSettings = new CSQueuePreemptionSettings(this, configuration);
this.priority = configuration.getQueuePriority(
getQueuePathObject());
getQueuePath());

// Update multi-node sorting algorithm for scheduling as configured.
setMultiNodeSortingPolicyName(
configuration.getMultiNodesSortingAlgorithmPolicy(getQueuePathObject()));
configuration.getMultiNodesSortingAlgorithmPolicy(getQueuePath()));

// Setup application related limits
this.queueAppLifetimeSettings = new QueueAppLifetimeAndLimitSettings(configuration,
Expand All @@ -440,7 +440,7 @@ protected void setupQueueConfigs(Resource clusterResource) throws
protected void parseAndSetDynamicTemplates() {
// Set the template properties from the parent to the queuepath of the child
((AbstractParentQueue) parent).getAutoCreatedQueueTemplate()
.setTemplateEntriesForChild(queueContext.getConfiguration(), getQueuePathObject(),
.setTemplateEntriesForChild(queueContext.getConfiguration(), getQueuePath(),
this instanceof AbstractLeafQueue);

String parentTemplate = String.format("%s.%s", parent.getQueuePath(),
Expand Down Expand Up @@ -488,21 +488,21 @@ private UserWeights getUserWeightsFromHierarchy() {
// Insert this queue's userWeights, overriding parent's userWeights if
// there is an overlap.
unionInheritedWeights.addFrom(
queueContext.getConfiguration().getAllUserWeightsForQueue(getQueuePathObject()));
queueContext.getConfiguration().getAllUserWeightsForQueue(getQueuePath()));
return unionInheritedWeights;
}

protected Resource getMinimumAbsoluteResource(QueuePath queuePath, String label) {
protected Resource getMinimumAbsoluteResource(String queuePath, String label) {
return queueContext.getConfiguration()
.getMinimumResourceRequirement(label, queuePath, resourceTypes);
}

protected Resource getMaximumAbsoluteResource(QueuePath queuePath, String label) {
protected Resource getMaximumAbsoluteResource(String queuePath, String label) {
return queueContext.getConfiguration()
.getMaximumResourceRequirement(label, queuePath, resourceTypes);
}

protected boolean checkConfigTypeIsAbsoluteResource(QueuePath queuePath,
protected boolean checkConfigTypeIsAbsoluteResource(String queuePath,
String label) {
return queueContext.getConfiguration().checkConfigTypeIsAbsoluteResource(label,
queuePath, resourceTypes);
Expand All @@ -518,7 +518,7 @@ protected void updateCapacityConfigType() {

if (queueContext.getConfiguration().isLegacyQueueMode()) {
localType = checkConfigTypeIsAbsoluteResource(
getQueuePathObject(), label) ? CapacityConfigType.ABSOLUTE_RESOURCE
getQueuePath(), label) ? CapacityConfigType.ABSOLUTE_RESOURCE
: CapacityConfigType.PERCENTAGE;
} else {
// TODO: revisit this later
Expand Down Expand Up @@ -556,8 +556,8 @@ protected void updateCapacityConfigType() {
*/
protected void updateConfigurableResourceLimits(Resource clusterResource) {
for (String label : queueNodeLabelsSettings.getConfiguredNodeLabels()) {
final Resource minResource = getMinimumAbsoluteResource(getQueuePathObject(), label);
Resource maxResource = getMaximumAbsoluteResource(getQueuePathObject(), label);
final Resource minResource = getMinimumAbsoluteResource(getQueuePath(), label);
Resource maxResource = getMaximumAbsoluteResource(getQueuePath(), label);

if (parent != null) {
final Resource parentMax = parent.getQueueResourceQuotas()
Expand Down
Loading