Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2163,6 +2163,17 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block,
return null;
}

// skip if source datanodes for reconstructing ec block are not enough
if (block.isStriped()) {
BlockInfoStriped stripedBlock = (BlockInfoStriped) block;
int cellsNum = (int) ((stripedBlock.getNumBytes() - 1) / stripedBlock.getCellSize() + 1);
int minRequiredSources = Math.min(cellsNum, stripedBlock.getDataBlockNum());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this logic same as BlockInfoStriped.getRealDataBlockNum() can we use or extract the logic from there? or do some refactoring there, just trying if we can keep the logic at one place, in case there is some issue in the logic changing at one places fixes all the places..

if (minRequiredSources > srcNodes.length) {
LOG.debug("Block {} cannot be reconstructed due to shortage of source datanodes ", block);
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we increment the metrics before returning null

    NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled();

}
}

// liveReplicaNodes can include READ_ONLY_SHARED replicas which are
// not included in the numReplicas.liveReplicas() count
assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,101 @@ public void testChooseSrcDNWithDupECInDecommissioningNode() throws Exception {
0, numReplicas.redundantInternalBlocks());
}

@Test
public void testSkipReconstructionWithManyBusyNodes() {
long blockId = -9223372036854775776L; // real ec block id
// RS-3-2 EC policy
ErasureCodingPolicy ecPolicy =
SystemErasureCodingPolicies.getPolicies().get(1);

// striped blockInfo: 3 data blocks + 2 parity blocks
Block aBlock = new Block(blockId, ecPolicy.getCellSize() * ecPolicy.getNumDataUnits(), 0);
BlockInfoStriped aBlockInfoStriped = new BlockInfoStriped(aBlock, ecPolicy);

// create 4 storageInfo, which means 1 block is missing
DatanodeStorageInfo ds1 = DFSTestUtil.createDatanodeStorageInfo(
"storage1", "1.1.1.1", "rack1", "host1");
DatanodeStorageInfo ds2 = DFSTestUtil.createDatanodeStorageInfo(
"storage2", "2.2.2.2", "rack2", "host2");
DatanodeStorageInfo ds3 = DFSTestUtil.createDatanodeStorageInfo(
"storage3", "3.3.3.3", "rack3", "host3");
DatanodeStorageInfo ds4 = DFSTestUtil.createDatanodeStorageInfo(
"storage4", "4.4.4.4", "rack4", "host4");

// link block with storage
aBlockInfoStriped.addStorage(ds1, aBlock);
aBlockInfoStriped.addStorage(ds2, new Block(blockId + 1, 0, 0));
aBlockInfoStriped.addStorage(ds3, new Block(blockId + 2, 0, 0));
aBlockInfoStriped.addStorage(ds4, new Block(blockId + 3, 0, 0));

addEcBlockToBM(blockId, ecPolicy);
aBlockInfoStriped.setBlockCollectionId(mockINodeId);

// reconstruction should be scheduled
BlockReconstructionWork work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
assertNotNull(work);

// simulate the 2 nodes reach maxReplicationStreams
for(int i = 0; i < bm.maxReplicationStreams; i++){
ds3.getDatanodeDescriptor().incrementPendingReplicationWithoutTargets();
ds4.getDatanodeDescriptor().incrementPendingReplicationWithoutTargets();
}

// reconstruction should be skipped since the number of non-busy nodes are not enough
work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
assertNull(work);
}

@Test
public void testSkipReconstructionWithManyBusyNodes2() {
long blockId = -9223372036854775776L; // real ec block id
// RS-3-2 EC policy
ErasureCodingPolicy ecPolicy =
SystemErasureCodingPolicies.getPolicies().get(1);

// striped blockInfo: 2 data blocks + 2 paritys
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo paritys

Block aBlock = new Block(blockId, ecPolicy.getCellSize() * (ecPolicy.getNumDataUnits() - 1), 0);
BlockInfoStriped aBlockInfoStriped = new BlockInfoStriped(aBlock, ecPolicy);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can you use a better variable name, couldn't decode what does a stands for, or drop a comment above.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the variable name. I want to keep the comment to clarify the difference between testSkipReconstructionWithManyBusyNodes and testSkipReconstructionWithManyBusyNodes2.


// create 3 storageInfo, which means 1 block is missing
DatanodeStorageInfo ds1 = DFSTestUtil.createDatanodeStorageInfo(
"storage1", "1.1.1.1", "rack1", "host1");
DatanodeStorageInfo ds2 = DFSTestUtil.createDatanodeStorageInfo(
"storage2", "2.2.2.2", "rack2", "host2");
DatanodeStorageInfo ds3 = DFSTestUtil.createDatanodeStorageInfo(
"storage3", "3.3.3.3", "rack3", "host3");

// link block with storage
aBlockInfoStriped.addStorage(ds1, aBlock);
aBlockInfoStriped.addStorage(ds2, new Block(blockId + 1, 0, 0));
aBlockInfoStriped.addStorage(ds3, new Block(blockId + 2, 0, 0));

addEcBlockToBM(blockId, ecPolicy);
aBlockInfoStriped.setBlockCollectionId(mockINodeId);

// reconstruction should be scheduled
BlockReconstructionWork work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
assertNotNull(work);

// simulate the 1 node reaches maxReplicationStreams
for(int i = 0; i < bm.maxReplicationStreams; i++){
ds2.getDatanodeDescriptor().incrementPendingReplicationWithoutTargets();
}

// reconstruction should still be scheduled since there are 2 source nodes to create 2 blocks
work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
assertNotNull(work);

// simulate the 1 more node reaches maxReplicationStreams
for(int i = 0; i < bm.maxReplicationStreams; i++){
ds3.getDatanodeDescriptor().incrementPendingReplicationWithoutTargets();
}

// reconstruction should be skipped since the number of non-busy nodes are not enough
work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
assertNull(work);
}

@Test
public void testFavorDecomUntilHardLimit() throws Exception {
bm.maxReplicationStreams = 0;
Expand Down