Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3751,9 +3751,24 @@ private void invalidateCorruptReplicas(BlockInfo blk, Block reported,
// ConcurrentModificationException, when the block is removed from the node
DatanodeDescriptor[] nodesCopy =
nodes.toArray(new DatanodeDescriptor[nodes.size()]);

DatanodeStorageInfo[] storages = null;
if (blk.isStriped()) {
storages = getStorages(blk);
}

for (DatanodeDescriptor node : nodesCopy) {
Block blockToInvalidate = reported;
if (storages != null && blk.isStriped()) {
for (DatanodeStorageInfo s : storages) {
if (s.getDatanodeDescriptor().equals(node)) {
blockToInvalidate = getBlockOnStorage(blk, s);
break;
}
}
}
try {
if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null,
if (!invalidateBlock(new BlockToMarkCorrupt(blockToInvalidate, blk, null,
Reason.ANY), node, numberReplicas)) {
removedFromBlocksMap = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ abstract public class ReadStripedFileWithDecodingHelper {
public static MiniDFSCluster initializeCluster() throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 2);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
0);
2);
MiniDFSCluster myCluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(NUM_DATANODES)
.numDataNodes(NUM_DATANODES + 3)
.build();
myCluster.getFileSystem().enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
Expand Down Expand Up @@ -108,6 +108,22 @@ public static int findFirstDataNode(MiniDFSCluster cluster,
return -1;
}

// The index begins from 1.
public static int findDataNodeAtIndex(MiniDFSCluster cluster,
DistributedFileSystem dfs, Path file, long length, int index) throws IOException {
BlockLocation[] locs = dfs.getFileBlockLocations(file, 0, length);
String name = (locs[0].getNames())[index - 1];
int dnIndex = 0;
for (DataNode dn : cluster.getDataNodes()) {
int port = dn.getXferPort();
if (name.contains(Integer.toString(port))) {
return dnIndex;
}
dnIndex++;
}
return -1;
}

/**
* Cross product of FILE_LENGTHS, NUM_PARITY_UNITS+1, NUM_PARITY_UNITS.
* Input for parameterized tests classes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -50,6 +51,7 @@
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.CELL_SIZE;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_DATA_UNITS;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_PARITY_UNITS;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.findDataNodeAtIndex;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.findFirstDataNode;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.initializeCluster;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.tearDownCluster;
Expand Down Expand Up @@ -96,7 +98,7 @@ public void testReportBadBlock() throws IOException {
.get(0);
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
CELL_SIZE, NUM_DATA_UNITS, NUM_PARITY_UNITS);
// find the first block file
// Find the first block file.
File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
Assert.assertTrue("Block file does not exist", blkFile.exists());
Expand Down Expand Up @@ -169,6 +171,129 @@ public void testInvalidateBlock() throws IOException, InterruptedException {
}
}

/**
* This unit test try to cover the below situation:
* Suppose we have an EC file with RS(d,p) policy and block group id
* is blk_-9223372036845119810_1920002.
* If the first and second data block in this ec block group are corrupted,
* meanwhile we read this EC file.
* It will trigger reportBadBlock RPC and
* add the blk_-9223372036845119810_1920002
* and blk_-9223372036845119809_1920002 blocks to corruptReplicas.
* It will also reconstruct the two blocks and send IBR to namenode,
* then execute BlockManager#addStoredBlock and
* invalidateCorruptReplicas method. Suppose we first receive the IBR of
* blk_-9223372036845119810_1920002, then in invalidateCorruptReplicas method,
* it will only invalidate 9223372036845119809_1920002 on the two datanodes contains
* the two corrupt blocks.
*
* @throws Exception
*/
@Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just suggest to add some java doc for the new unit test about what do you want to cover case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Hexiaoqiao , Sir, I have added some java doc. Thanks for your suggestions.

public void testCorruptionECBlockInvalidate() throws Exception {

final Path file = new Path("/invalidate_corrupted");
final int length = BLOCK_SIZE * NUM_DATA_UNITS;
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
DFSTestUtil.writeFile(dfs, file, bytes);

int dnIndex = findFirstDataNode(cluster, dfs, file,
CELL_SIZE * NUM_DATA_UNITS);
int dnIndex2 = findDataNodeAtIndex(cluster, dfs, file,
CELL_SIZE * NUM_DATA_UNITS, 2);
Assert.assertNotEquals(-1, dnIndex);
Assert.assertNotEquals(-1, dnIndex2);

LocatedStripedBlock slb = (LocatedStripedBlock) dfs.getClient()
.getLocatedBlocks(file.toString(), 0, CELL_SIZE * NUM_DATA_UNITS)
.get(0);
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
CELL_SIZE, NUM_DATA_UNITS, NUM_PARITY_UNITS);

final Block b = blks[0].getBlock().getLocalBlock();
final Block b2 = blks[1].getBlock().getLocalBlock();

// Find the first block file.
File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
Assert.assertTrue("Block file does not exist", blkFile.exists());
// Corrupt the block file.
LOG.info("Deliberately corrupting file " + blkFile.getName());
try (FileOutputStream out = new FileOutputStream(blkFile)) {
out.write("corruption".getBytes());
out.flush();
}

// Find the second block file.
File storageDir2 = cluster.getInstanceStorageDir(dnIndex2, 0);
File blkFile2 = MiniDFSCluster.getBlockFile(storageDir2, blks[1].getBlock());
Assert.assertTrue("Block file does not exist", blkFile2.exists());
// Corrupt the second block file.
LOG.info("Deliberately corrupting file " + blkFile2.getName());
try (FileOutputStream out = new FileOutputStream(blkFile2)) {
out.write("corruption".getBytes());
out.flush();
}

// Disable the heartbeat from DN so that the corrupted block record is kept
// in NameNode.
for (DataNode dataNode : cluster.getDataNodes()) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, true);
}
try {
// Do stateful read.
StripedFileTestUtil.verifyStatefulRead(dfs, file, length, bytes,
ByteBuffer.allocate(1024));

// Check whether the corruption has been reported to the NameNode.
final FSNamesystem ns = cluster.getNamesystem();
final BlockManager bm = ns.getBlockManager();
BlockInfo blockInfo = (ns.getFSDirectory().getINode4Write(file.toString())
.asFile().getBlocks())[0];
GenericTestUtils.waitFor(() -> {
if (bm.getCorruptReplicas(blockInfo) == null) {
return false;
}
return bm.getCorruptReplicas(blockInfo).size() == 2;
}, 250, 60000);
// Double check.
Assert.assertEquals(2, bm.getCorruptReplicas(blockInfo).size());

DatanodeDescriptor dnd =
NameNodeAdapter.getDatanode(ns, cluster.getDataNodes().get(dnIndex).getDatanodeId());

DatanodeDescriptor dnd2 =
NameNodeAdapter.getDatanode(ns, cluster.getDataNodes().get(dnIndex2).getDatanodeId());

for (DataNode datanode : cluster.getDataNodes()) {
if (!datanode.getDatanodeUuid().equals(dnd.getDatanodeUuid()) &&
!datanode.getDatanodeUuid().equals(dnd2.getDatanodeUuid())) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(datanode, false);
}
}

GenericTestUtils.waitFor(() -> {
return bm.containsInvalidateBlock(
blks[0].getLocations()[0], b) || dnd.containsInvalidateBlock(b);
}, 250, 60000);
Assert.assertTrue(bm.containsInvalidateBlock(
blks[0].getLocations()[0], b) || dnd.containsInvalidateBlock(b));

GenericTestUtils.waitFor(() -> {
return bm.containsInvalidateBlock(
blks[1].getLocations()[0], b2) || dnd2.containsInvalidateBlock(b2);
}, 250, 60000);

Assert.assertTrue(bm.containsInvalidateBlock(
blks[1].getLocations()[0], b2) || dnd2.containsInvalidateBlock(b2));

} finally {
for (DataNode datanode : cluster.getDataNodes()) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(datanode, false);
}
}
}

@Test
public void testMoreThanOneCorruptedBlock() throws IOException {
final Path file = new Path("/corrupted");
Expand Down