Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ public RMContainer preemptContainer() {
RMContainer toBePreempted = null;

// If this queue is not over its fair share, reject
if (!preemptContainerPreCheck()) {
if (!canBePreempted()) {
return toBePreempted;
}

Expand Down Expand Up @@ -533,16 +533,6 @@ public void setWeights(float weight) {
new ResourceWeights(weight));
}

/**
* Helper method to check if the queue should preempt containers
*
* @return true if check passes (can preempt) or false otherwise
*/
private boolean preemptContainerPreCheck() {
return parent.getPolicy().checkIfUsageOverFairShare(getResourceUsage(),
getFairShare());
}

/**
* Is a queue being starved for its min share.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,16 @@ public RMContainer preemptContainer() {
readLock.lock();
try {
for (FSQueue queue : childQueues) {
// Skip selection for non-preemptable queue
if (!queue.canBePreempted()) {
if (LOG.isDebugEnabled()) {
LOG.debug("skipping from queue=" + getName()
+ " because it's a non-preemptable queue or there is no"
+ " sub-queues whose resource usage exceeds fair share.");
}
continue;
}

if (candidateQueue == null ||
comparator.compare(queue, candidateQueue) > 0) {
candidateQueue = queue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.util.resource.Resources;

import com.google.common.base.Preconditions;

@Private
@Unstable
public abstract class FSQueue implements Queue, Schedulable {
Expand Down Expand Up @@ -235,6 +237,28 @@ public void setFairSharePreemptionThreshold(float fairSharePreemptionThreshold)
this.fairSharePreemptionThreshold = fairSharePreemptionThreshold;
}

/**
* Recursively check if the queue can be preempted based on whether the
* resource usage is greater than fair share.
*
* @return true if the queue can be preempted
*/
public boolean canBePreempted() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the check of the allowPreemptionFrom flag also be part of this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be, but allowPreemptionFrom is introduced after 2.8.x.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aah, I keep forgetting branch-2.8 was cut years ago. :(

if (parent == null || parent.policy.checkIfUsageOverFairShare(
getResourceUsage(), getFairShare())) {
return true;
} else {
// recursively find one queue which can be preempted
for (FSQueue queue: getChildQueues()) {
if (queue.canBePreempted()) {
return true;
}
}
}

return false;
}

/**
* Recomputes the shares for all child queues and applications based on this
* queue's current share
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2036,10 +2036,10 @@ public void testPreemptionIsNotDelayedToNextRound() throws Exception {
.getLeafQueue("queueA.queueA2", false), clock.getTime());
assertEquals(3277, toPreempt.getMemorySize());

// verify if the 3 containers required by queueA2 are preempted in the same
// verify if the 4 containers required by queueA2 are preempted in the same
// round
scheduler.preemptResources(toPreempt);
assertEquals(3, scheduler.getSchedulerApp(app1).getPreemptionContainers()
assertEquals(4, scheduler.getSchedulerApp(app1).getPreemptionContainers()
.size());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a new test that verifies the exact scenario in the JIRA description?


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
Expand All @@ -39,9 +40,9 @@
import java.io.PrintWriter;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
private final int GB = 1024;
private final static String ALLOC_FILE = new File(TEST_DIR,
TestFairSchedulerPreemption.class.getName() + ".xml").getAbsolutePath();

Expand Down Expand Up @@ -90,8 +91,6 @@ private void startResourceManager(float utilizationThreshold) {
resourceManager = new MockRM(conf);
resourceManager.start();

assertTrue(
resourceManager.getResourceScheduler() instanceof StubbedFairScheduler);
scheduler = (FairScheduler)resourceManager.getResourceScheduler();

scheduler.setClock(clock);
Expand Down Expand Up @@ -189,4 +188,67 @@ public void testPreemptionWithFreeResources() throws Exception {
assertEquals("preemptResources() should have been called", 1024,
((StubbedFairScheduler) scheduler).lastPreemptMemory);
}

@Test
public void testPreemptionFilterOutNonPreemptableQueues() throws Exception {
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println(" <queue name=\"queueA1\" />");
out.println(" <queue name=\"queueA2\" />");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("</queue>");
out.println("<defaultFairSharePreemptionTimeout>5</defaultFairSharePreemptionTimeout>");
out.println("</allocations>");
out.close();

conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
startResourceManager(0.8f);

// Add a node of 8 GB
RMNode node1 = MockNodes.newNodeInfo(1,
Resources.createResource(8 * GB, 8), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);

// Run apps in queueA.A1 and queueB
ApplicationAttemptId app1 = createSchedulingRequest(1 * GB, 1,
"queueA.queueA1", "user1", 4, 1);
ApplicationAttemptId app2 = createSchedulingRequest(1 * GB, 1, "queueB",
"user2", 4, 1);

scheduler.update();

NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
for (int i = 0; i < 8; i++) {
scheduler.handle(nodeUpdate1);
}

// verify if the apps got the containers they requested
assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());

// Now submit an app in queueA.queueA2
createSchedulingRequest(GB, 1, "queueA.queueA2", "user3", 2, 1);
scheduler.update();

// Let 6 sec pass
clock.tickSec(6);

scheduler.update();
Resource toPreempt = scheduler.resourceDeficit(scheduler.getQueueManager()
.getLeafQueue("queueA.queueA2", false), clock.getTime());
assertEquals(2 * GB, toPreempt.getMemorySize());

// Verify if containers required by queueA2 are preempted from queueA1
// instead of queueB
scheduler.preemptResources(toPreempt);
assertEquals(2, scheduler.getSchedulerApp(app1).getPreemptionContainers()
.size());
assertEquals(0, scheduler.getSchedulerApp(app2).getPreemptionContainers()
.size());
}
}