Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@
import java.util.Comparator;
import java.util.Iterator;
import java.util.PriorityQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Calculate how much resources need to be preempted for each queue,
* will be used by {@link PreemptionCandidatesSelector}.
*/
public class AbstractPreemptableResourceCalculator {
private static final Logger LOG = LoggerFactory.getLogger(
AbstractPreemptableResourceCalculator.class);

protected final CapacitySchedulerPreemptionContext context;
protected final ResourceCalculator rc;
Expand Down Expand Up @@ -76,6 +80,34 @@ private double getIdealPctOfGuaranteed(TempQueuePerPartition q) {
}
}

private static class NormalizationTuple {
private Resource numerator;
private Resource denominator;

NormalizationTuple(Resource numer, Resource denom) {
this.numerator = numer;
this.denominator = denom;
}

long getNumeratorValue(int i) {
return numerator.getResourceInformation(i).getValue();
}

long getDenominatorValue(int i) {
String nUnits = numerator.getResourceInformation(i).getUnits();
ResourceInformation dResourceInformation = denominator
.getResourceInformation(i);
return UnitsConversionUtil.convert(
dResourceInformation.getUnits(), nUnits, dResourceInformation.getValue());
}

float getNormalizedValue(int i) {
long nValue = getNumeratorValue(i);
long dValue = getDenominatorValue(i);
return dValue == 0 ? 0.0f : (float) nValue / dValue;
}
}

/**
* PreemptableResourceCalculator constructor.
*
Expand Down Expand Up @@ -175,7 +207,7 @@ protected void computeFixpointAllocation(Resource totGuarant,
unassigned, Resources.none())) {
// we compute normalizedGuarantees capacity based on currently active
// queues
resetCapacity(unassigned, orderedByNeed, ignoreGuarantee);
resetCapacity(orderedByNeed, ignoreGuarantee);

// For each underserved queue (or set of queues if multiple are equally
// underserved), offer its share of the unassigned resources based on its
Expand Down Expand Up @@ -252,47 +284,146 @@ protected void initIdealAssignment(Resource totGuarant,
/**
* Computes a normalizedGuaranteed capacity based on active queues.
*
* @param clusterResource
* the total amount of resources in the cluster
* @param queues
* the list of queues to consider
* @param ignoreGuar
* ignore guarantee.
*/
private void resetCapacity(Resource clusterResource,
Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
private void resetCapacity(Collection<TempQueuePerPartition> queues,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For testing, I think we can make this method static package-private and add @VisibleForTesting annotation. That way we can call this method directly from test class. Note that the package of the annotation must be "org.apache.hadoop.thirdparty.com.google.common.annotations".

boolean ignoreGuar) {
Resource activeCap = Resource.newInstance(0, 0);
float activeTotalAbsCap = 0.0f;
int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();

if (ignoreGuar) {
for (TempQueuePerPartition q : queues) {
for (int i = 0; i < maxLength; i++) {
q.normalizedGuarantee[i] = 1.0f / queues.size();
for (int i = 0; i < maxLength; i++) {
for (TempQueuePerPartition q : queues) {
computeNormGuarEvenly(q, queues.size(), i);
}
}
} else {
for (TempQueuePerPartition q : queues) {
Resources.addTo(activeCap, q.getGuaranteed());
activeTotalAbsCap += q.getAbsCapacity();
}
for (TempQueuePerPartition q : queues) {
for (int i = 0; i < maxLength; i++) {
ResourceInformation nResourceInformation = q.getGuaranteed()
.getResourceInformation(i);
ResourceInformation dResourceInformation = activeCap
.getResourceInformation(i);

long nValue = nResourceInformation.getValue();
long dValue = UnitsConversionUtil.convert(
dResourceInformation.getUnits(), nResourceInformation.getUnits(),
dResourceInformation.getValue());
if (dValue != 0) {
q.normalizedGuarantee[i] = (float) nValue / dValue;

// loop through all resource types and normalize guaranteed capacity for all queues
for (int i = 0; i < maxLength; i++) {
boolean useAbsCapBasedNorm = false;
// if the sum of absolute capacity of all queues involved is 0,
// we should normalize evenly
boolean useEvenlyDistNorm = activeTotalAbsCap == 0;

// loop through all the queues once to determine the
// right normalization strategy for current processing resource type
for (TempQueuePerPartition q : queues) {
NormalizationTuple normTuple = new NormalizationTuple(
q.getGuaranteed(), activeCap);
long queueGuaranValue = normTuple.getNumeratorValue(i);
long totalActiveGuaranValue = normTuple.getDenominatorValue(i);

if (queueGuaranValue == 0 && q.getAbsCapacity() != 0 && totalActiveGuaranValue != 0) {
// when the rounded value of a resource type is 0 but its absolute capacity is not 0,
// we should consider taking the normalized guarantee based on absolute capacity
useAbsCapBasedNorm = true;
break;
}

if (totalActiveGuaranValue == 0) {
// If totalActiveGuaranValue from activeCap is zero, that means the guaranteed capacity
// of this resource dimension for all active queues is tiny (close to 0).
// For example, if a queue has 1% of minCapacity on a cluster with a totalVcores of 48,
// then the idealAssigned Vcores for this queue is (48 * 0.01)=0.48 which then
// get rounded/casted into 0 (double -> long)
// In this scenario where the denominator is 0, we can just spread resources across
// all tiny queues evenly since their absoluteCapacity are roughly the same
useEvenlyDistNorm = true;
}
}

if (LOG.isDebugEnabled()) {
LOG.debug("Queue normalization strategy: " +
"absoluteCapacityBasedNormalization(" + useAbsCapBasedNorm +
"), evenlyDistributedNormalization(" + useEvenlyDistNorm +
"), defaultNormalization(" + !(useAbsCapBasedNorm || useEvenlyDistNorm) + ")");
}

// loop through all the queues again to apply normalization strategy
for (TempQueuePerPartition q : queues) {
if (useAbsCapBasedNorm) {
computeNormGuarFromAbsCapacity(q, activeTotalAbsCap, i);
} else if (useEvenlyDistNorm) {
computeNormGuarEvenly(q, queues.size(), i);
} else {
computeDefaultNormGuar(q, activeCap, i);
}
}
}
}
}

/**
* Computes the normalized guaranteed capacity based on the weight of a queue's abs capacity.
*
* Example:
* There are two active queues: queueA & queueB, and
* their configured absolute minimum capacity is 1% and 3% respectively.
*
* Then their normalized guaranteed capacity are:
* normalized_guar_queueA = 0.01 / (0.01 + 0.03) = 0.25
* normalized_guar_queueB = 0.03 / (0.01 + 0.03) = 0.75
*
* @param q
* the queue to consider
* @param activeTotalAbsCap
* the sum of absolute capacity of all active queues
* @param resourceTypeIdx
* index of the processing resource type
*/
private static void computeNormGuarFromAbsCapacity(TempQueuePerPartition q,
float activeTotalAbsCap,
int resourceTypeIdx) {
if (activeTotalAbsCap != 0) {
q.normalizedGuarantee[resourceTypeIdx] = q.getAbsCapacity() / activeTotalAbsCap;
}
}

/**
* Computes the normalized guaranteed capacity evenly based on num of active queues.
*
* @param q
* the queue to consider
* @param numOfActiveQueues
* number of active queues
* @param resourceTypeIdx
* index of the processing resource type
*/
private static void computeNormGuarEvenly(TempQueuePerPartition q,
int numOfActiveQueues,
int resourceTypeIdx) {
q.normalizedGuarantee[resourceTypeIdx] = 1.0f / numOfActiveQueues;
}

/**
* The default way to compute a queue's normalized guaranteed capacity.
*
* For each resource type, divide a queue's configured guaranteed amount (MBs/Vcores) by
* the total amount of guaranteed resource of all active queues
*
* @param q
* the queue to consider
* @param activeCap
* total guaranteed resources of all active queues
* @param resourceTypeIdx
* index of the processing resource type
*/
private static void computeDefaultNormGuar(TempQueuePerPartition q,
Resource activeCap,
int resourceTypeIdx) {
NormalizationTuple normTuple = new NormalizationTuple(q.getGuaranteed(), activeCap);
q.normalizedGuarantee[resourceTypeIdx] = normTuple.getNormalizedValue(resourceTypeIdx);
}

// Take the most underserved TempQueue (the one on the head). Collect and
// return the list of all queues that have the same idealAssigned
// percentage of guaranteed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ Resource offer(Resource avail, ResourceCalculator rc,
return remain;
}

public float getAbsCapacity() {
return absCapacity;
}

public Resource getGuaranteed() {
if(!effMinRes.equals(Resources.none())) {
return Resources.clone(effMinRes);
Expand Down