From 5691fdff2b400571bb4669daf6e25791aaaa0db9 Mon Sep 17 00:00:00 2001 From: Yufei Gu Date: Mon, 6 Feb 2017 12:57:05 -0800 Subject: [PATCH 1/3] YARN-6151. FS Preemption doesn't filter out queues which cannot be preempted. --- .../scheduler/fair/FSLeafQueue.java | 12 +--------- .../scheduler/fair/FSParentQueue.java | 10 ++++++++ .../scheduler/fair/FSQueue.java | 23 +++++++++++++++++++ .../scheduler/fair/TestFairScheduler.java | 4 ++-- 4 files changed, 36 insertions(+), 13 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 51849f8a747c6..87a5448a9d702 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -351,7 +351,7 @@ public RMContainer preemptContainer() { RMContainer toBePreempted = null; // If this queue is not over its fair share, reject - if (!preemptContainerPreCheck()) { + if (!canBePreempted()) { return toBePreempted; } @@ -533,16 +533,6 @@ public void setWeights(float weight) { new ResourceWeights(weight)); } - /** - * Helper method to check if the queue should preempt containers - * - * @return true if check passes (can preempt) or false otherwise - */ - private boolean preemptContainerPreCheck() { - return parent.getPolicy().checkIfUsageOverFairShare(getResourceUsage(), - getFairShare()); - } - /** * Is a queue being starved for its min share. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index fe0e3e241fd8e..e08b38681e114 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -260,6 +260,16 @@ public RMContainer preemptContainer() { readLock.lock(); try { for (FSQueue queue : childQueues) { + // Skip selection for non-preemptable queue + if (!queue.canBePreempted()) { + if (LOG.isDebugEnabled()) { + LOG.debug("skipping from queue=" + getName() + + " because it's a non-preemptable queue or there is no" + + " sub-queues whose resource usage exceeds fair share."); + } + continue; + } + if (candidateQueue == null || comparator.compare(queue, candidateQueue) > 0) { candidateQueue = queue; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index e4a21978cb1c7..f7a11d076cb8b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -235,6 +235,29 @@ public void setFairSharePreemptionThreshold(float fairSharePreemptionThreshold) this.fairSharePreemptionThreshold = fairSharePreemptionThreshold; } + /** + * Recursively check if the queue can be preempted based on whether the + * resource usage is greater than fair share. + * + * @return true if the queue can be preempted + */ + public boolean canBePreempted() { + assert parent != null; + if (parent.policy.checkIfUsageOverFairShare( + getResourceUsage(), getFairShare())) { + return true; + } else { + // recursively find one queue which can be preempted + for (FSQueue queue: getChildQueues()) { + if (queue.canBePreempted()) { + return true; + } + } + } + + return false; + } + /** * Recomputes the shares for all child queues and applications based on this * queue's current share diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 10f6c2b449aaa..9497a044a1bd9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2036,10 +2036,10 @@ public void testPreemptionIsNotDelayedToNextRound() throws Exception { .getLeafQueue("queueA.queueA2", false), clock.getTime()); assertEquals(3277, toPreempt.getMemorySize()); - // verify if the 3 containers required by queueA2 are preempted in the same + // verify if the 4 containers required by queueA2 are preempted in the same // round scheduler.preemptResources(toPreempt); - assertEquals(3, scheduler.getSchedulerApp(app1).getPreemptionContainers() + assertEquals(4, scheduler.getSchedulerApp(app1).getPreemptionContainers() .size()); } From 6581248d0e8f1ed71ad9cead6a7656cad75825a7 Mon Sep 17 00:00:00 2001 From: Yufei Gu Date: Tue, 7 Feb 2017 15:43:38 -0800 Subject: [PATCH 2/3] YARN-6151. FS Preemption doesn't filter out queues which cannot be preempted. --- .../scheduler/fair/FSQueue.java | 6 +- .../scheduler/fair/TestFairScheduler.java | 72 +++++++++++++++++++ 2 files changed, 77 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index f7a11d076cb8b..ee07c5b128919 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.base.Preconditions; + @Private @Unstable public abstract class FSQueue implements Queue, Schedulable { @@ -242,7 +244,9 @@ public void setFairSharePreemptionThreshold(float fairSharePreemptionThreshold) * @return true if the queue can be preempted */ public boolean canBePreempted() { - assert parent != null; + Preconditions.checkNotNull(parent, "Parent queue can't be null since" + + " parent's policy is needed for preemptable checking."); + if (parent.policy.checkIfUsageOverFairShare( getResourceUsage(), getFairShare())) { return true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 9497a044a1bd9..b69b23a716143 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2043,6 +2043,78 @@ public void testPreemptionIsNotDelayedToNextRound() throws Exception { .size()); } + @Test + public void testPreemptionFilterOutNonPreemptableQueues() throws Exception { + conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000); + conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); + + ControlledClock clock = new ControlledClock(); + scheduler.setClock(clock); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(" "); + out.println(" "); + out.println(""); + out.println(""); + out.println(""); + out.println("10"); + out.println(".5"); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node of 8 GB + RMNode node1 = MockNodes.newNodeInfo(1, + Resources.createResource(8 * GB, 8), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Run apps in queueA.A1 and queueB + ApplicationAttemptId app1 = createSchedulingRequest(1 * GB, 1, + "queueA.queueA1", "user1", 4, 1); + ApplicationAttemptId app2 = createSchedulingRequest(1 * GB, 1, "queueB", + "user2", 4, 1); + + scheduler.update(); + + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + for (int i = 0; i < 8; i++) { + scheduler.handle(nodeUpdate1); + } + + // verify if the apps got the containers they requested + assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + + // Now submit an app in queueA.queueA2 + createSchedulingRequest(GB, 1, "queueA.queueA2", "user3", 2, 1); + scheduler.update(); + + // Let 11 sec pass + clock.tickSec(11); + + scheduler.update(); + Resource toPreempt = scheduler.resourceDeficit(scheduler.getQueueManager() + .getLeafQueue("queueA.queueA2", false), clock.getTime()); + assertEquals(2 * GB, toPreempt.getMemorySize()); + + // Verify if containers required by queueA2 are preempted from queueA1 + // instead of queueB + scheduler.preemptResources(toPreempt); + assertEquals(2, scheduler.getSchedulerApp(app1).getPreemptionContainers() + .size()); + assertEquals(0, scheduler.getSchedulerApp(app2).getPreemptionContainers() + .size()); + } + @Test (timeout = 5000) /** * Tests the timing of decision to preempt tasks. From fd192a8f31a9ce4842a23ecc5daac346ca161544 Mon Sep 17 00:00:00 2001 From: Yufei Gu Date: Tue, 7 Feb 2017 17:54:10 -0800 Subject: [PATCH 3/3] YARN-6151. FS Preemption doesn't filter out queues which cannot be preempted. --- .../scheduler/fair/FSQueue.java | 5 +- .../scheduler/fair/TestFairScheduler.java | 72 ------------------- .../fair/TestFairSchedulerPreemption.java | 68 +++++++++++++++++- 3 files changed, 66 insertions(+), 79 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index ee07c5b128919..011d9d6a49eb5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -244,10 +244,7 @@ public void setFairSharePreemptionThreshold(float fairSharePreemptionThreshold) * @return true if the queue can be preempted */ public boolean canBePreempted() { - Preconditions.checkNotNull(parent, "Parent queue can't be null since" - + " parent's policy is needed for preemptable checking."); - - if (parent.policy.checkIfUsageOverFairShare( + if (parent == null || parent.policy.checkIfUsageOverFairShare( getResourceUsage(), getFairShare())) { return true; } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index b69b23a716143..9497a044a1bd9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2043,78 +2043,6 @@ public void testPreemptionIsNotDelayedToNextRound() throws Exception { .size()); } - @Test - public void testPreemptionFilterOutNonPreemptableQueues() throws Exception { - conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000); - conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); - - ControlledClock clock = new ControlledClock(); - scheduler.setClock(clock); - - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println(""); - out.println(""); - out.println(""); - out.println(" "); - out.println(" "); - out.println(""); - out.println(""); - out.println(""); - out.println("10"); - out.println(".5"); - out.println(""); - out.close(); - - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - - // Add a node of 8 GB - RMNode node1 = MockNodes.newNodeInfo(1, - Resources.createResource(8 * GB, 8), 1, "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - - // Run apps in queueA.A1 and queueB - ApplicationAttemptId app1 = createSchedulingRequest(1 * GB, 1, - "queueA.queueA1", "user1", 4, 1); - ApplicationAttemptId app2 = createSchedulingRequest(1 * GB, 1, "queueB", - "user2", 4, 1); - - scheduler.update(); - - NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); - for (int i = 0; i < 8; i++) { - scheduler.handle(nodeUpdate1); - } - - // verify if the apps got the containers they requested - assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - - // Now submit an app in queueA.queueA2 - createSchedulingRequest(GB, 1, "queueA.queueA2", "user3", 2, 1); - scheduler.update(); - - // Let 11 sec pass - clock.tickSec(11); - - scheduler.update(); - Resource toPreempt = scheduler.resourceDeficit(scheduler.getQueueManager() - .getLeafQueue("queueA.queueA2", false), clock.getTime()); - assertEquals(2 * GB, toPreempt.getMemorySize()); - - // Verify if containers required by queueA2 are preempted from queueA1 - // instead of queueB - scheduler.preemptResources(toPreempt); - assertEquals(2, scheduler.getSchedulerApp(app1).getPreemptionContainers() - .size()); - assertEquals(0, scheduler.getSchedulerApp(app2).getPreemptionContainers() - .size()); - } - @Test (timeout = 5000) /** * Tests the timing of decision to preempt tasks. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index 1d5a70fcdee85..8367cf9a897e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; @@ -39,9 +40,9 @@ import java.io.PrintWriter; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; public class TestFairSchedulerPreemption extends FairSchedulerTestBase { + private final int GB = 1024; private final static String ALLOC_FILE = new File(TEST_DIR, TestFairSchedulerPreemption.class.getName() + ".xml").getAbsolutePath(); @@ -90,8 +91,6 @@ private void startResourceManager(float utilizationThreshold) { resourceManager = new MockRM(conf); resourceManager.start(); - assertTrue( - resourceManager.getResourceScheduler() instanceof StubbedFairScheduler); scheduler = (FairScheduler)resourceManager.getResourceScheduler(); scheduler.setClock(clock); @@ -189,4 +188,67 @@ public void testPreemptionWithFreeResources() throws Exception { assertEquals("preemptResources() should have been called", 1024, ((StubbedFairScheduler) scheduler).lastPreemptMemory); } + + @Test + public void testPreemptionFilterOutNonPreemptableQueues() throws Exception { + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(" "); + out.println(" "); + out.println(""); + out.println(""); + out.println(""); + out.println("5"); + out.println(""); + out.close(); + + conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + ResourceScheduler.class); + startResourceManager(0.8f); + + // Add a node of 8 GB + RMNode node1 = MockNodes.newNodeInfo(1, + Resources.createResource(8 * GB, 8), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Run apps in queueA.A1 and queueB + ApplicationAttemptId app1 = createSchedulingRequest(1 * GB, 1, + "queueA.queueA1", "user1", 4, 1); + ApplicationAttemptId app2 = createSchedulingRequest(1 * GB, 1, "queueB", + "user2", 4, 1); + + scheduler.update(); + + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + for (int i = 0; i < 8; i++) { + scheduler.handle(nodeUpdate1); + } + + // verify if the apps got the containers they requested + assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + + // Now submit an app in queueA.queueA2 + createSchedulingRequest(GB, 1, "queueA.queueA2", "user3", 2, 1); + scheduler.update(); + + // Let 6 sec pass + clock.tickSec(6); + + scheduler.update(); + Resource toPreempt = scheduler.resourceDeficit(scheduler.getQueueManager() + .getLeafQueue("queueA.queueA2", false), clock.getTime()); + assertEquals(2 * GB, toPreempt.getMemorySize()); + + // Verify if containers required by queueA2 are preempted from queueA1 + // instead of queueB + scheduler.preemptResources(toPreempt); + assertEquals(2, scheduler.getSchedulerApp(app1).getPreemptionContainers() + .size()); + assertEquals(0, scheduler.getSchedulerApp(app2).getPreemptionContainers() + .size()); + } }