diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index bf9df5b694f1f..4a1f2e9042b87 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -193,10 +193,9 @@ public void reinitializeQueues(CapacitySchedulerConfiguration newConf) CSQueue newRoot = parseQueue(this.csContext.getQueueContext(), newConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues, NOOP); - // When failing over, if using configuration store, don't validate queue + // When failing over, don't validate queue // hierarchy since queues can be removed without being STOPPED. - if (!csContext.isConfigurationMutable() || - csContext.getRMContext().getHAServiceState() + if (csContext.getRMContext().getHAServiceState() != HAServiceProtocol.HAServiceState.STANDBY) { // Ensure queue hierarchy in the new XML file is proper. CapacitySchedulerConfigValidator diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java index 143142b3efd80..61b4abaf89b24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java @@ -25,16 +25,24 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.util.resource.Resources; @@ -48,6 +56,7 @@ public class TestQueueStateManager { private static final String Q1 = "q1"; private static final String Q2 = "q2"; private static final String Q3 = "q3"; + private static final String Q4 = "q4"; private final static String Q1_PATH = CapacitySchedulerConfiguration.ROOT + "." + Q1; @@ -55,11 +64,14 @@ public class TestQueueStateManager { Q1_PATH + "." + Q2; private final static String Q3_PATH = Q1_PATH + "." + Q3; + private final static String Q4_PATH = + CapacitySchedulerConfiguration.ROOT + "." + Q4; private final static QueuePath ROOT_QUEUE_PATH = new QueuePath(CapacitySchedulerConfiguration.ROOT); private final static QueuePath Q1_QUEUE_PATH = new QueuePath(Q1_PATH); private final static QueuePath Q2_QUEUE_PATH = new QueuePath(Q2_PATH); private final static QueuePath Q3_QUEUE_PATH = new QueuePath(Q3_PATH); + private final static QueuePath Q4_QUEUE_PATH = new QueuePath(Q4_PATH); private CapacityScheduler cs; private YarnConfiguration conf; @@ -147,6 +159,46 @@ public void testQueueStateManager() throws AccessControlException, assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState()); } + @Test + public void testRemoveQueueWithoutStopWhenFailover() throws IOException { + Configuration configuration = new Configuration(); + configuration.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + CapacityScheduler.class); + configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + configuration.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2"); + for (String confKey : YarnConfiguration + .getServiceAddressConfKeys(configuration)) { + configuration.set(HAUtil.addSuffix(confKey, "rm1"), "1.1.1.1:1"); + configuration.set(HAUtil.addSuffix(confKey, "rm2"), "0.0.0.0:0"); + } + ResourceManager rm = new MockRM(configuration); + rm.start(); + assertEquals(HAServiceProtocol.HAServiceState.STANDBY, + rm.getRMContext().getRMAdminService().getServiceStatus().getState()); + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.setQueues(ROOT_QUEUE_PATH, new String[] {Q1, Q4}); + csConf.setCapacity(Q1_QUEUE_PATH, 100); + csConf.setCapacity(Q4_QUEUE_PATH, 0); + + conf = new YarnConfiguration(csConf); + rm.getResourceScheduler().reinitialize(conf, rm.getRMContext()); + + assertTrue(Arrays.asList(((CapacityScheduler) rm.getResourceScheduler()).getConfiguration() + .get("yarn.scheduler.capacity.root.queues").split(",")).contains(Q1)); + assertTrue(Arrays.asList(((CapacityScheduler) rm.getResourceScheduler()).getConfiguration() + .get("yarn.scheduler.capacity.root.queues").split(",")).contains(Q4)); + + csConf.setQueues(ROOT_QUEUE_PATH, new String[] {Q1}); + conf = new YarnConfiguration(csConf); + rm.getResourceScheduler().reinitialize(conf, rm.getRMContext()); + + assertEquals(Q1, ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration() + .get("yarn.scheduler.capacity.root.queues")); + } + private FiCaSchedulerApp getMockApplication(ApplicationId appId, String user, Resource amResource) { FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);