diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java index cdfa76a9d1f88..7d8d258192586 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.balancer; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY; @@ -237,6 +238,11 @@ private void testBalancerWithObserver(boolean withObserverFailure) // Avoid the same FS being reused between tests conf.setBoolean("fs.hdfs.impl.disable.cache", true); + // Reduce datanode retry so cluster shutdown won't be blocked. + if (withObserverFailure) { + conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 2); + } + MiniQJMHACluster qjmhaCluster = null; try { qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 2, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index a34401150574c..bf919e16fcf31 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -34,6 +34,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; @@ -43,16 +44,20 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.metrics2.MetricsRecordBuilder; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; @@ -107,8 +112,6 @@ public class TestBPOfferService { private long firstLeaseId = 0; private long secondLeaseId = 0; private long nextFullBlockReportLeaseId = 1L; - private int fullBlockReportCount = 0; - private int incrBlockReportCount = 0; static { GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL); @@ -231,14 +234,6 @@ public HeartbeatResponse answer(InvocationOnMock invocation) } } - private void setBlockReportCount(int count) { - fullBlockReportCount = count; - } - - private void setIncreaseBlockReportCount(int count) { - incrBlockReportCount += count; - } - /** * Test that the BPOS can register to talk to two different NNs, * sends block reports to both, etc. @@ -285,26 +280,24 @@ public void testMissBlocksWhenReregister() throws Exception { int totalTestBlocks = 4000; Thread addNewBlockThread = null; final AtomicInteger count = new AtomicInteger(0); - + DataNodeFaultInjector prevDNFaultInjector = null; + Set blocks = new TreeSet<>(); try { waitForBothActors(bpos); waitForInitialization(bpos); + prevDNFaultInjector = DataNodeFaultInjector.get(); DataNodeFaultInjector.set(new DataNodeFaultInjector() { public void blockUtilSendFullBlockReport() { try { - GenericTestUtils.waitFor(() -> { - if(count.get() > 2000) { - return true; - } - return false; - }, 100, 1000); + GenericTestUtils.waitFor(() -> count.get() > 2000, + 100, 1000); } catch (Exception e) { - e.printStackTrace(); + LOG.error("error DataNodeFaultInjector", e); } } }); - countBlockReportItems(FAKE_BLOCK, mockNN1); + countBlockReportItems(FAKE_BLOCK, mockNN1, blocks); addNewBlockThread = new Thread(() -> { for (int i = 0; i < totalTestBlocks; i++) { SimulatedFSDataset fsDataset = (SimulatedFSDataset) mockFSDataset; @@ -318,45 +311,39 @@ public void blockUtilSendFullBlockReport() { count.addAndGet(1); Thread.sleep(1); } catch (Exception e) { - e.printStackTrace(); + LOG.error("error addNewBlockThread", e); } } }); addNewBlockThread.start(); // Make sure that generate blocks for DataNode and IBR not empty now. - GenericTestUtils.waitFor(() -> { - if(count.get() > 0) { - return true; - } - return false; - }, 100, 1000); + GenericTestUtils.waitFor(() -> count.get() > 0, 100, 1000); // Trigger re-register using DataNode Command. datanodeCommands[0] = new DatanodeCommand[]{RegisterCommand.REGISTER}; - bpos.triggerHeartbeatForTests(); + bpos.triggerHeartbeatForTests(); + addNewBlockThread.join(); + addNewBlockThread = null; + // Verify FBR/IBR count is equal to generate number. try { - GenericTestUtils.waitFor(() -> { - if(fullBlockReportCount == totalTestBlocks || - incrBlockReportCount == totalTestBlocks) { - return true; - } - return false; - }, 1000, 15000); - } catch (Exception e) {} + GenericTestUtils.waitFor(() -> blocks.size() == totalTestBlocks, + 1000, 15000); + } catch (Exception e) { + fail(String.format("Timed out waiting for blocks count. " + + "reported = %d, expected = %d. Exception: %s", + blocks.size(), totalTestBlocks, e.getMessage())); + } - // Verify FBR/IBR count is equal to generate number. - assertTrue(fullBlockReportCount == totalTestBlocks || - incrBlockReportCount == totalTestBlocks); } finally { - addNewBlockThread.join(); + if (addNewBlockThread != null) { + addNewBlockThread.interrupt(); + } bpos.stop(); bpos.join(); - DataNodeFaultInjector.set(new DataNodeFaultInjector() { - public void blockUtilSendFullBlockReport() {} - }); + DataNodeFaultInjector.set(prevDNFaultInjector); } } @@ -716,7 +703,8 @@ private void setTimeForSynchronousBPOSCalls() { * which assume no deleting blocks here. */ private void countBlockReportItems(final ExtendedBlock fakeBlock, - final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception { + final DatanodeProtocolClientSideTranslatorPB mockNN, + final Set blocks) throws Exception { final String fakeBlockPoolId = fakeBlock.getBlockPoolId(); final ArgumentCaptor captor = ArgumentCaptor.forClass(StorageBlockReport[].class); @@ -725,7 +713,9 @@ private void countBlockReportItems(final ExtendedBlock fakeBlock, Mockito.doAnswer((Answer) invocation -> { Object[] arguments = invocation.getArguments(); StorageBlockReport[] list = (StorageBlockReport[])arguments[2]; - setBlockReportCount(list[0].getBlocks().getNumberOfBlocks()); + for (BlockReportReplica brr : list[0].getBlocks()) { + blocks.add(brr.getBlockId()); + } return null; }).when(mockNN).blockReport( Mockito.any(), @@ -739,7 +729,9 @@ private void countBlockReportItems(final ExtendedBlock fakeBlock, Object[] arguments = invocation.getArguments(); StorageReceivedDeletedBlocks[] list = (StorageReceivedDeletedBlocks[])arguments[2]; - setIncreaseBlockReportCount(list[0].getBlocks().length); + for (ReceivedDeletedBlockInfo rdbi : list[0].getBlocks()) { + blocks.add(rdbi.getBlock().getBlockId()); + } return null; }).when(mockNN).blockReceivedAndDeleted( Mockito.any(),