diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 48ce704e0d11e..b3c401453fd24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -20,7 +20,6 @@ import static org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils.newNodeHeartbeatResponse; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -63,6 +62,7 @@ import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.service.ServiceOperations; +import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -131,8 +131,8 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase { /** Bytes in a GigaByte. */ private static final long GB = 1024L * 1024L * 1024L; - volatile int heartBeatID = 0; - volatile Throwable nmStartError = null; + private volatile Throwable nmStartError = null; + private AtomicInteger heartBeatID = new AtomicInteger(0); private final List registeredNodes = new ArrayList(); private boolean triggered = false; private NodeManager nm; @@ -147,8 +147,12 @@ public void before() { @After public void tearDown() { this.registeredNodes.clear(); - heartBeatID = 0; - ServiceOperations.stop(nm); + heartBeatID.set(0); + if (nm != null) { + ServiceOperations.stop(nm); + nm.waitForServiceToStop(10000); + } + assertionFailedInThread.set(false); DefaultMetricsSystem.shutdown(); } @@ -220,7 +224,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) EventHandler mockEventHandler = mock(EventHandler.class); when(mockDispatcher.getEventHandler()).thenReturn(mockEventHandler); NMStateStoreService stateStore = new NMNullStateStoreService(); - nodeStatus.setResponseId(heartBeatID++); + nodeStatus.setResponseId(heartBeatID.getAndIncrement()); Map> appToContainers = getAppToContainerStatusMap(nodeStatus.getContainersStatuses()); List containersToSignal = null; @@ -229,14 +233,14 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) ApplicationId appId2 = ApplicationId.newInstance(0, 2); ContainerId firstContainerID = null; - if (heartBeatID == 1) { + if (heartBeatID.get() == 1) { Assert.assertEquals(0, nodeStatus.getContainersStatuses().size()); // Give a container to the NM. ApplicationAttemptId appAttemptID = ApplicationAttemptId.newInstance(appId1, 0); firstContainerID = - ContainerId.newContainerId(appAttemptID, heartBeatID); + ContainerId.newContainerId(appAttemptID, heartBeatID.get()); ContainerLaunchContext launchContext = recordFactory .newRecordInstance(ContainerLaunchContext.class); Resource resource = BuilderUtils.newResource(2, 1); @@ -252,7 +256,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) Container container = new ContainerImpl(conf, mockDispatcher, launchContext, null, mockMetrics, containerToken, context); this.context.getContainers().put(firstContainerID, container); - } else if (heartBeatID == 2) { + } else if (heartBeatID.get() == 2) { // Checks on the RM end Assert.assertEquals("Number of applications should only be one!", 1, nodeStatus.getContainersStatuses().size()); @@ -277,7 +281,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) ApplicationAttemptId appAttemptID = ApplicationAttemptId.newInstance(appId2, 0); ContainerId secondContainerID = - ContainerId.newContainerId(appAttemptID, heartBeatID); + ContainerId.newContainerId(appAttemptID, heartBeatID.get()); ContainerLaunchContext launchContext = recordFactory .newRecordInstance(ContainerLaunchContext.class); long currentTime = System.currentTimeMillis(); @@ -293,7 +297,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) Container container = new ContainerImpl(conf, mockDispatcher, launchContext, null, mockMetrics, containerToken, context); this.context.getContainers().put(secondContainerID, container); - } else if (heartBeatID == 3) { + } else if (heartBeatID.get() == 3) { // Checks on the RM end Assert.assertEquals("Number of applications should have two!", 2, appToContainers.size()); @@ -309,8 +313,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) } NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. - newNodeHeartbeatResponse(heartBeatID, null, null, null, null, null, - 1000L); + newNodeHeartbeatResponse(heartBeatID.get(), null, null, null, null, + null, 1000L); if (containersToSignal != null) { nhResponse.addAllContainersToSignal(containersToSignal); } @@ -576,10 +580,10 @@ public RegisterNodeManagerResponse registerNodeManager( public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException { NodeStatus nodeStatus = request.getNodeStatus(); - nodeStatus.setResponseId(heartBeatID++); + nodeStatus.setResponseId(heartBeatID.getAndIncrement()); NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. - newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null, + newNodeHeartbeatResponse(heartBeatID.get(), heartBeatNodeAction, null, null, null, null, 1000L); nhResponse.setDiagnosticsMessage(shutDownMessage); return nhResponse; @@ -623,9 +627,9 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException { LOG.info("Got heartBeatId: [" + heartBeatID +"]"); NodeStatus nodeStatus = request.getNodeStatus(); - nodeStatus.setResponseId(heartBeatID++); + nodeStatus.setResponseId(heartBeatID.getAndIncrement()); NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. - newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null, + newNodeHeartbeatResponse(heartBeatID.get(), heartBeatNodeAction, null, null, null, null, 1000L); if (nodeStatus.getKeepAliveApplications() != null @@ -639,7 +643,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) list.add(System.currentTimeMillis()); } } - if (heartBeatID == 2) { + if (heartBeatID.get() == 2) { LOG.info("Sending FINISH_APP for application: [" + appId + "]"); this.context.getApplications().put(appId, mock(Application.class)); nhResponse.addAllApplicationsToCleanup(Collections.singletonList(appId)); @@ -698,11 +702,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) List finishedContainersPulledByAM = new ArrayList (); try { - if (heartBeatID == 0) { + if (heartBeatID.get() == 0) { Assert.assertEquals(0, request.getNodeStatus().getContainersStatuses() .size()); Assert.assertEquals(0, context.getContainers().size()); - } else if (heartBeatID == 1) { + } else if (heartBeatID.get() == 1) { List statuses = request.getNodeStatus().getContainersStatuses(); Assert.assertEquals(2, statuses.size()); @@ -712,14 +716,14 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) for (ContainerStatus status : statuses) { if (status.getContainerId().equals( containerStatus2.getContainerId())) { - Assert.assertTrue(status.getState().equals( - containerStatus2.getState())); + Assert.assertEquals(containerStatus2.getState(), + status.getState()); container2Exist = true; } if (status.getContainerId().equals( containerStatus3.getContainerId())) { - Assert.assertTrue(status.getState().equals( - containerStatus3.getState())); + Assert.assertEquals(containerStatus3.getState(), + status.getState()); container3Exist = true; } } @@ -729,7 +733,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // nodeStatusUpdaterRunnable, otherwise nm just shuts down and the // test passes. throw new YarnRuntimeException("Lost the heartbeat response"); - } else if (heartBeatID == 2 || heartBeatID == 3) { + } else if (heartBeatID.get() == 2 || heartBeatID.get() == 3) { List statuses = request.getNodeStatus().getContainersStatuses(); // NM should send completed containers on heartbeat 2, @@ -744,36 +748,36 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) for (ContainerStatus status : statuses) { if (status.getContainerId().equals( containerStatus2.getContainerId())) { - Assert.assertTrue(status.getState().equals( - containerStatus2.getState())); + Assert.assertEquals(containerStatus2.getState(), + status.getState()); container2Exist = true; } if (status.getContainerId().equals( containerStatus3.getContainerId())) { - Assert.assertTrue(status.getState().equals( - containerStatus3.getState())); + Assert.assertEquals(containerStatus3.getState(), + status.getState()); container3Exist = true; } if (status.getContainerId().equals( containerStatus4.getContainerId())) { - Assert.assertTrue(status.getState().equals( - containerStatus4.getState())); + Assert.assertEquals(containerStatus4.getState(), + status.getState()); container4Exist = true; } if (status.getContainerId().equals( containerStatus5.getContainerId())) { - Assert.assertTrue(status.getState().equals( - containerStatus5.getState())); + Assert.assertEquals(containerStatus5.getState(), + status.getState()); container5Exist = true; } } Assert.assertTrue(container2Exist && container3Exist && container4Exist && container5Exist); - if (heartBeatID == 3) { + if (heartBeatID.get() == 3) { finishedContainersPulledByAM.add(containerStatus3.getContainerId()); } - } else if (heartBeatID == 4) { + } else if (heartBeatID.get() == 4) { List statuses = request.getNodeStatus().getContainersStatuses(); Assert.assertEquals(2, statuses.size()); @@ -793,12 +797,12 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) error.printStackTrace(); assertionFailedInThread.set(true); } finally { - heartBeatID++; + heartBeatID.incrementAndGet(); } NodeStatus nodeStatus = request.getNodeStatus(); - nodeStatus.setResponseId(heartBeatID); + nodeStatus.setResponseId(heartBeatID.get()); NodeHeartbeatResponse nhResponse = - YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID, + YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID.get(), heartBeatNodeAction, null, null, null, null, 1000L); nhResponse.addContainersToBeRemovedFromNM(finishedContainersPulledByAM); Map appCredentials = @@ -839,8 +843,7 @@ public RegisterNodeManagerResponse registerNodeManager( @Override public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException { - heartBeatID++; - if(heartBeatID == 1) { + if (heartBeatID.incrementAndGet() == 1) { // EOFException should be retried as well. throw new EOFException("NodeHeartbeat exception"); } @@ -909,10 +912,10 @@ public RegisterNodeManagerResponse registerNodeManager( public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException { NodeStatus nodeStatus = request.getNodeStatus(); - nodeStatus.setResponseId(heartBeatID++); + nodeStatus.setResponseId(heartBeatID.getAndIncrement()); NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. - newNodeHeartbeatResponse(heartBeatID, NodeAction.NORMAL, null, + newNodeHeartbeatResponse(heartBeatID.get(), NodeAction.NORMAL, null, null, null, null, 1000L); return nhResponse; } @@ -1141,7 +1144,7 @@ public ContainerState getCurrentState() { } @Test - public void testNMRegistration() throws InterruptedException, IOException { + public void testNMRegistration() throws Exception { nm = new NodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, @@ -1161,43 +1164,32 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, Assert.assertTrue("last service is NOT the node status updater", lastService instanceof NodeStatusUpdater); - new Thread() { - public void run() { - try { - nm.start(); - } catch (Throwable e) { - TestNodeStatusUpdater.this.nmStartError = e; - throw new YarnRuntimeException(e); - } + Thread starterThread = new Thread(() -> { + try { + nm.start(); + } catch (Throwable e) { + TestNodeStatusUpdater.this.nmStartError = e; + throw new YarnRuntimeException(e); } - }.start(); + }); + starterThread.start(); - System.out.println(" ----- thread already started.." - + nm.getServiceState()); + LOG.info(" ----- thread already started..{}", nm.getServiceState()); - int waitCount = 0; - while (nm.getServiceState() == STATE.INITED && waitCount++ != 50) { - LOG.info("Waiting for NM to start.."); - if (nmStartError != null) { - LOG.error("Error during startup. ", nmStartError); - Assert.fail(nmStartError.getCause().getMessage()); - } - Thread.sleep(2000); - } - if (nm.getServiceState() != STATE.STARTED) { - // NM could have failed. - Assert.fail("NodeManager failed to start"); - } + starterThread.join(100000); - waitCount = 0; - while (heartBeatID <= 3 && waitCount++ != 200) { - Thread.sleep(1000); + if (nmStartError != null) { + LOG.error("Error during startup. ", nmStartError); + Assert.fail(nmStartError.getCause().getMessage()); } - Assert.assertFalse(heartBeatID <= 3); - Assert.assertEquals("Number of registered NMs is wrong!!", 1, - this.registeredNodes.size()); - nm.stop(); + GenericTestUtils.waitFor( + () -> nm.getServiceState() != STATE.STARTED || heartBeatID.get() > 3, + 50, 20000); + + Assert.assertTrue(heartBeatID.get() > 3); + Assert.assertEquals("Number of registered NMs is wrong!!", + 1, this.registeredNodes.size()); } @Test @@ -1236,31 +1228,23 @@ public void cleanUpApplicationsOnNMShutDown() { YarnConfiguration conf = createNMConfig(); nm.init(conf); nm.start(); - - int waitCount = 0; - while (heartBeatID < 1 && waitCount++ != 200) { - Thread.sleep(500); - } - Assert.assertFalse(heartBeatID < 1); + GenericTestUtils.waitFor(() -> nm.getServiceState() == STATE.STARTED, + 20, 10000); + GenericTestUtils.waitFor( + () -> nm.getServiceState() != STATE.STARTED || heartBeatID.get() >= 1, + 50, 20000); + Assert.assertTrue(heartBeatID.get() >= 1); // Meanwhile call stop directly as the shutdown hook would nm.stop(); // NM takes a while to reach the STOPPED state. - waitCount = 0; - while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) { - LOG.info("Waiting for NM to stop.."); - Thread.sleep(1000); - } + nm.waitForServiceToStop(20000); Assert.assertEquals(STATE.STOPPED, nm.getServiceState()); // It further takes a while after NM reached the STOPPED state. - waitCount = 0; - while (numCleanups.get() == 0 && waitCount++ != 20) { - LOG.info("Waiting for NM shutdown.."); - Thread.sleep(1000); - } + GenericTestUtils.waitFor(() -> numCleanups.get() > 0, 20, 20000); Assert.assertEquals(1, numCleanups.get()); } @@ -1271,20 +1255,22 @@ public void testNodeDecommision() throws Exception { nm.init(conf); Assert.assertEquals(STATE.INITED, nm.getServiceState()); nm.start(); - - int waitCount = 0; - while (heartBeatID < 1 && waitCount++ != 200) { - Thread.sleep(500); - } - Assert.assertFalse(heartBeatID < 1); + GenericTestUtils.waitFor(() -> nm.getServiceState() == STATE.STARTED, + 20, 10000); + GenericTestUtils.waitFor( + () -> { + if (nm.getServiceState() == STATE.STARTED) { + return (heartBeatID.get() >= 1 + && nm.getNMContext().getDecommissioned()); + } + return true; + }, + 50, 200000); + Assert.assertTrue(heartBeatID.get() >= 1); Assert.assertTrue(nm.getNMContext().getDecommissioned()); // NM takes a while to reach the STOPPED state. - waitCount = 0; - while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) { - LOG.info("Waiting for NM to stop.."); - Thread.sleep(1000); - } + nm.waitForServiceToStop(20000); Assert.assertEquals(STATE.STOPPED, nm.getServiceState()); } @@ -1529,9 +1515,14 @@ public void testApplicationKeepAlive() throws Exception { nm.init(conf); nm.start(); // HB 2 -> app cancelled by RM. - while (heartBeatID < 12) { - Thread.sleep(1000l); - } + GenericTestUtils.waitFor(() -> nm.getServiceState() == STATE.STARTED, 20, + 10000); + GenericTestUtils.waitFor( + () -> nm.getServiceState() != STATE.STARTED + || heartBeatID.get() >= 12, + 100L, 60000000); + + Assert.assertTrue(heartBeatID.get() >= 12); MyResourceTracker3 rt = (MyResourceTracker3) nm.getNodeStatusUpdater().getRMClient(); rt.context.getApplications().remove(rt.appId); @@ -1539,14 +1530,18 @@ public void testApplicationKeepAlive() throws Exception { int numKeepAliveRequests = rt.keepAliveRequests.get(rt.appId).size(); LOG.info("Number of Keep Alive Requests: [" + numKeepAliveRequests + "]"); Assert.assertTrue(numKeepAliveRequests == 2 || numKeepAliveRequests == 3); - while (heartBeatID < 20) { - Thread.sleep(1000l); - } + GenericTestUtils.waitFor( + () -> nm.getServiceState() != STATE.STARTED + || heartBeatID.get() >= 20, + 100L, 60000000); + Assert.assertTrue(heartBeatID.get() >= 20); int numKeepAliveRequests2 = rt.keepAliveRequests.get(rt.appId).size(); Assert.assertEquals(numKeepAliveRequests, numKeepAliveRequests2); } finally { - if (nm.getServiceState() == STATE.STARTED) + if (nm != null) { nm.stop(); + nm.waitForServiceToStop(10000); + } } } @@ -1581,20 +1576,19 @@ protected NMContext createNMContext( nm.init(conf); nm.start(); - int waitCount = 0; - while (heartBeatID <= 4 && waitCount++ != 20) { - Thread.sleep(500); - } - if (heartBeatID <= 4) { - Assert.fail("Failed to get all heartbeats in time, " + - "heartbeatID:" + heartBeatID); - } - if(assertionFailedInThread.get()) { - Assert.fail("ContainerStatus Backup failed"); - } + GenericTestUtils.waitFor(() -> nm.getServiceState() == STATE.STARTED, + 20, 10000); + + GenericTestUtils.waitFor( + () -> nm.getServiceState() != STATE.STARTED || heartBeatID.get() > 4, + 50, 20000); + int hbID = heartBeatID.get(); + Assert.assertFalse("Failed to get all heartbeats in time, " + + "heartbeatID:" + hbID, hbID <= 4); + Assert.assertFalse("ContainerStatus Backup failed", + assertionFailedInThread.get()); Assert.assertNotNull(nm.getNMContext().getSystemCredentialsForApps() .get(ApplicationId.newInstance(1234, 1)).getToken(new Text("token1"))); - nm.stop(); } @Test(timeout = 200000) @@ -1631,13 +1625,12 @@ public void testNodeStatusUpdaterRetryAndNMShutdown() Assert.assertFalse("Containers not cleaned up when NM stopped", assertionFailedInThread.get()); Assert.assertTrue(((MyNodeManager2) nm).isStopped); - Assert.assertTrue("calculate heartBeatCount based on" + - " connectionWaitSecs and RetryIntervalSecs", heartBeatID == 2); + Assert.assertEquals("calculate heartBeatCount based on" + + " connectionWaitSecs and RetryIntervalSecs", 2, heartBeatID.get()); } @Test - public void testRMVersionLessThanMinimum() throws InterruptedException, - IOException { + public void testRMVersionLessThanMinimum() throws Exception { final AtomicInteger numCleanups = new AtomicInteger(0); YarnConfiguration conf = createNMConfig(); conf.set(YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION, "3.0.0"); @@ -1674,15 +1667,9 @@ public void cleanUpApplicationsOnNMShutDown() { nm.init(conf); nm.start(); - // NM takes a while to reach the STARTED state. - int waitCount = 0; - while (nm.getServiceState() != STATE.STARTED && waitCount++ != 20) { - LOG.info("Waiting for NM to stop.."); - Thread.sleep(1000); - } - Assert.assertTrue(nm.getServiceState() == STATE.STARTED); - nm.stop(); + GenericTestUtils.waitFor(() -> nm.getServiceState() == STATE.STARTED, + 20, 200000); } @@ -1712,37 +1699,20 @@ protected ContainerManagerImpl createContainerManager(Context context, YarnConfiguration conf = createNMConfig(); nm.init(conf); nm.start(); + GenericTestUtils.waitFor(() -> nm.getServiceState() == STATE.STARTED, + 20, 20000); - System.out.println(" ----- thread already started.." - + nm.getServiceState()); - - int waitCount = 0; - while (nm.getServiceState() == STATE.INITED && waitCount++ != 20) { - LOG.info("Waiting for NM to start.."); - if (nmStartError != null) { - LOG.error("Error during startup. ", nmStartError); - Assert.fail(nmStartError.getCause().getMessage()); - } - Thread.sleep(1000); - } - if (nm.getServiceState() != STATE.STARTED) { - // NM could have failed. - Assert.fail("NodeManager failed to start"); - } - - waitCount = 0; - while (heartBeatID <= 3 && waitCount++ != 20) { - Thread.sleep(500); - } - Assert.assertFalse(heartBeatID <= 3); + GenericTestUtils.waitFor( + () -> nm.getServiceState() != STATE.STARTED + || heartBeatID.get() > 3, + 50, 20000); + Assert.assertTrue(heartBeatID.get() > 3); Assert.assertEquals("Number of registered NMs is wrong!!", 1, this.registeredNodes.size()); MyContainerManager containerManager = (MyContainerManager)nm.getContainerManager(); Assert.assertTrue(containerManager.signaled); - - nm.stop(); } @Test @@ -1823,38 +1793,48 @@ public void testUpdateNMResources() throws Exception { LOG.info("Start the Node Manager"); NodeManager nodeManager = new NodeManager(); YarnConfiguration nmConf = new YarnConfiguration(); - nmConf.setSocketAddr(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, - resourceTracker.getListenerAddress()); - nmConf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "0.0.0.0:0"); - nodeManager.init(nmConf); - nodeManager.start(); - - LOG.info("Initially the Node Manager should have the default resources"); - ContainerManager containerManager = nodeManager.getContainerManager(); - ContainersMonitor containerMonitor = - containerManager.getContainersMonitor(); - assertEquals(8, containerMonitor.getVCoresAllocatedForContainers()); - assertEquals(8 * GB, containerMonitor.getPmemAllocatedForContainers()); - - LOG.info("The first heartbeat should trigger a resource change to {}", - resource); - GenericTestUtils.waitFor( - () -> containerMonitor.getVCoresAllocatedForContainers() == 1, - 100, 2 * 1000); - assertEquals(8 * GB, containerMonitor.getPmemAllocatedForContainers()); - - resource.setVirtualCores(5); - resource.setMemorySize(4 * 1024); - LOG.info("Change the resources to {}", resource); - GenericTestUtils.waitFor( - () -> containerMonitor.getVCoresAllocatedForContainers() == 5, - 100, 2 * 1000); - assertEquals(4 * GB, containerMonitor.getPmemAllocatedForContainers()); - - LOG.info("Cleanup"); - nodeManager.stop(); - nodeManager.close(); - resourceTracker.stop(); + try { + nmConf.setSocketAddr(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + resourceTracker.getListenerAddress()); + nmConf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "0.0.0.0:0"); + nodeManager.init(nmConf); + nodeManager.start(); + + LOG.info("Initially the Node Manager should have the default resources"); + ContainerManager containerManager = nodeManager.getContainerManager(); + ContainersMonitor containerMonitor = + containerManager.getContainersMonitor(); + Assert.assertEquals(8, + containerMonitor.getVCoresAllocatedForContainers()); + Assert.assertEquals(8 * GB, + containerMonitor.getPmemAllocatedForContainers()); + + LOG.info("The first heartbeat should trigger a resource change to {}", + resource); + GenericTestUtils.waitFor( + () -> containerMonitor.getVCoresAllocatedForContainers() == 1, + 100, 2 * 1000); + Assert.assertEquals(8 * GB, + containerMonitor.getPmemAllocatedForContainers()); + + resource.setVirtualCores(5); + resource.setMemorySize(4 * 1024); + LOG.info("Change the resources to {}", resource); + GenericTestUtils.waitFor( + () -> containerMonitor.getVCoresAllocatedForContainers() == 5, + 100, 2 * 1000); + Assert.assertEquals(4 * GB, + containerMonitor.getPmemAllocatedForContainers()); + } finally { + LOG.info("Cleanup"); + nodeManager.stop(); + try { + nodeManager.close(); + } catch (IOException ex) { + LOG.error("Could not close the node manager", ex); + } + resourceTracker.stop(); + } } /** @@ -1908,9 +1888,9 @@ public MyNMContext( @Override public ConcurrentMap getContainers() { - if (heartBeatID == 0) { + if (heartBeatID.get() == 0) { return containers; - } else if (heartBeatID == 1) { + } else if (heartBeatID.get() == 1) { ContainerStatus containerStatus2 = createContainerStatus(2, ContainerState.RUNNING); putMockContainer(containerStatus2); @@ -1919,7 +1899,7 @@ public ConcurrentMap getContainers() { createContainerStatus(3, ContainerState.COMPLETE); putMockContainer(containerStatus3); return containers; - } else if (heartBeatID == 2) { + } else if (heartBeatID.get() == 2) { ContainerStatus containerStatus4 = createContainerStatus(4, ContainerState.RUNNING); putMockContainer(containerStatus4); @@ -1928,7 +1908,7 @@ public ConcurrentMap getContainers() { createContainerStatus(5, ContainerState.COMPLETE); putMockContainer(containerStatus5); return containers; - } else if (heartBeatID == 3 || heartBeatID == 4) { + } else if (heartBeatID.get() == 3 || heartBeatID.get() == 4) { return containers; } else { containers.clear(); @@ -1978,22 +1958,16 @@ private void verifyNodeStartFailure(String errMessage) throws Exception { Assert.assertNotNull("nm is null", nm); YarnConfiguration conf = createNMConfig(); nm.init(conf); - try { - nm.start(); - Assert.fail("NM should have failed to start. Didn't get exception!!"); - } catch (Exception e) { - //the version in trunk looked in the cause for equality - // and assumed failures were nested. - //this version assumes that error strings propagate to the base and - //use a contains() test only. It should be less brittle - if(!e.getMessage().contains(errMessage)) { - throw e; - } - } + + //the version in trunk looked in the cause for equality + // and assumed failures were nested. + //this version assumes that error strings propagate to the base and + //use a contains() test only. It should be less brittle + LambdaTestUtils.intercept(Exception.class, errMessage, () -> nm.start()); // the service should be stopped - Assert.assertEquals("NM state is wrong!", STATE.STOPPED, nm - .getServiceState()); + Assert.assertEquals("NM state is wrong!", STATE.STOPPED, + nm.getServiceState()); Assert.assertEquals("Number of registered nodes is wrong!", 0, this.registeredNodes.size());