diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index b786de53ffa2b..d45bb46247646 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -117,6 +117,7 @@ import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.net.Node; +import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Daemon; @@ -1858,17 +1859,26 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b, expectedRedundancies; boolean corruptedDuringWrite = minReplicationSatisfied && b.isCorruptedDuringWrite(); + + int countNumOfAvailableNodes = getDatanodeManager() + .getNetworkTopology().countNumOfAvailableNodes(NodeBase.ROOT, new HashSet<>()); + boolean noEnoughNodes = minReplicationSatisfied && + (numberOfReplicas.liveReplicas() + + numberOfReplicas.corruptReplicas()) == countNumOfAvailableNodes; // case 1: have enough number of usable replicas // case 2: corrupted replicas + usable replicas > Replication factor // case 3: Block is marked corrupt due to failure while writing. In this // case genstamp will be different than that of valid block. + // case 4: Block is marked corrupt not due to failure while writing + // and number of replicas == count number of available nodes. + // This means we cannot find node to reconstruction, should delete corrupt replica. // In all these cases we can delete the replica. // In case 3, rbw block will be deleted and valid block can be replicated. // Note NN only becomes aware of corrupt blocks when the block report is sent, // this means that by default it can take up to 6 hours for a corrupt block to // be invalidated, after which the valid block can be replicated. if (hasEnoughLiveReplicas || hasMoreCorruptReplicas - || corruptedDuringWrite) { + || corruptedDuringWrite || noEnoughNodes) { if (b.getStored().isStriped()) { // If the block is an EC block, the whole block group is marked // corrupted, so if this block is getting deleted, remove the block diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 72792e3173df8..8db7475dc09b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -21,6 +21,7 @@ import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hadoop.thirdparty.com.google.common.collect.LinkedListMultimap; import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; + import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; @@ -94,8 +95,11 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.EnumSet; import java.util.Iterator; import java.util.LinkedList; @@ -144,6 +148,7 @@ public class TestBlockManager { */ private static final int NUM_TEST_ITERS = 30; private static final int BLOCK_SIZE = 64*1024; + private static final int DN_DIRECTORYSCAN_INTERVAL = 10; private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TestBlockManager.class); @@ -452,6 +457,71 @@ private void doTestSingleRackClusterHasSufficientRedundancy(int testIndex, assertFalse(bm.isNeededReconstruction(block, bm.countNodes(block, fsn.isInStartupSafeMode()))); } + + @Test(timeout = 60000) + public void testMiniClusterCannotReconstructionWhileReplicaAnomaly() + throws IOException, InterruptedException, TimeoutException { + Configuration conf = new HdfsConfiguration(); + conf.setInt("dfs.datanode.directoryscan.interval", DN_DIRECTORYSCAN_INTERVAL); + conf.setInt("dfs.namenode.replication.interval", 1); + conf.setInt("dfs.heartbeat.interval", 1); + String src = "/test-reconstruction"; + Path file = new Path(src); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + try { + cluster.waitActive(); + FSNamesystem fsn = cluster.getNamesystem(); + BlockManager bm = fsn.getBlockManager(); + FSDataOutputStream out = null; + FileSystem fs = cluster.getFileSystem(); + try { + out = fs.create(file); + for (int i = 0; i < 1024 * 1; i++) { + out.write(i); + } + out.hflush(); + } finally { + IOUtils.closeStream(out); + } + FSDataInputStream in = null; + ExtendedBlock oldBlock = null; + try { + in = fs.open(file); + oldBlock = DFSTestUtil.getAllBlocks(in).get(0).getBlock(); + } finally { + IOUtils.closeStream(in); + } + DataNode dn = cluster.getDataNodes().get(0); + String blockPath = dn.getFSDataset().getBlockLocalPathInfo(oldBlock).getBlockPath(); + String metaBlockPath = dn.getFSDataset().getBlockLocalPathInfo(oldBlock).getMetaPath(); + Files.write(Paths.get(blockPath), Collections.emptyList()); + Files.write(Paths.get(metaBlockPath), Collections.emptyList()); + cluster.restartDataNode(0, true); + cluster.waitDatanodeConnectedToActive(dn, 60000); + while(!dn.isDatanodeFullyStarted()) { + Thread.sleep(1000); + } + Thread.sleep(DN_DIRECTORYSCAN_INTERVAL * 1000); + cluster.triggerBlockReports(); + BlockInfo bi = bm.getStoredBlock(oldBlock.getLocalBlock()); + boolean isNeededReconstruction = bm.isNeededReconstruction(bi, + bm.countNodes(bi, cluster.getNamesystem().isInStartupSafeMode())); + if (isNeededReconstruction) { + BlockReconstructionWork reconstructionWork = null; + fsn.readLock(); + try { + reconstructionWork = bm.scheduleReconstruction(bi, 3); + } finally { + fsn.readUnlock(); + } + assertNull(reconstructionWork); + } + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } @Test(timeout = 60000) public void testNeededReconstructionWhileAppending() throws IOException {