Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -55,6 +57,7 @@
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
Expand Down Expand Up @@ -109,8 +112,6 @@ public class TestBPOfferService {
private long firstLeaseId = 0;
private long secondLeaseId = 0;
private long nextFullBlockReportLeaseId = 1L;
private int fullBlockReportCount = 0;
private int incrBlockReportCount = 0;

static {
GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
Expand Down Expand Up @@ -233,14 +234,6 @@ public HeartbeatResponse answer(InvocationOnMock invocation)
}
}

private void setBlockReportCount(int count) {
fullBlockReportCount = count;
}

private void setIncreaseBlockReportCount(int count) {
incrBlockReportCount += count;
}

/**
* Test that the BPOS can register to talk to two different NNs,
* sends block reports to both, etc.
Expand Down Expand Up @@ -288,6 +281,7 @@ public void testMissBlocksWhenReregister() throws Exception {
Thread addNewBlockThread = null;
final AtomicInteger count = new AtomicInteger(0);
DataNodeFaultInjector prevDNFaultInjector = null;
Set<Long> blocks = new TreeSet<>();
try {
waitForBothActors(bpos);
waitForInitialization(bpos);
Expand All @@ -303,7 +297,7 @@ public void blockUtilSendFullBlockReport() {
}
});

countBlockReportItems(FAKE_BLOCK, mockNN1);
countBlockReportItems(FAKE_BLOCK, mockNN1, blocks);
addNewBlockThread = new Thread(() -> {
for (int i = 0; i < totalTestBlocks; i++) {
SimulatedFSDataset fsDataset = (SimulatedFSDataset) mockFSDataset;
Expand Down Expand Up @@ -334,14 +328,12 @@ public void blockUtilSendFullBlockReport() {
addNewBlockThread = null;
// Verify FBR/IBR count is equal to generate number.
try {
GenericTestUtils.waitFor(() ->
(fullBlockReportCount == totalTestBlocks ||
incrBlockReportCount == totalTestBlocks), 1000, 15000);
GenericTestUtils.waitFor(() -> blocks.size() == totalTestBlocks,
1000, 15000);
} catch (Exception e) {
fail(String.format("Timed out wait for IBR counts FBRCount = %d,"
+ " IBRCount = %d; expected = %d. Exception: %s",
fullBlockReportCount, incrBlockReportCount, totalTestBlocks,
e.getMessage()));
fail(String.format("Timed out waiting for blocks count. "
+ "reported = %d, expected = %d. Exception: %s",
blocks.size(), totalTestBlocks, e.getMessage()));
}

} finally {
Expand Down Expand Up @@ -711,7 +703,8 @@ private void setTimeForSynchronousBPOSCalls() {
* which assume no deleting blocks here.
*/
private void countBlockReportItems(final ExtendedBlock fakeBlock,
final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
final DatanodeProtocolClientSideTranslatorPB mockNN,
final Set<Long> blocks) throws Exception {
final String fakeBlockPoolId = fakeBlock.getBlockPoolId();
final ArgumentCaptor<StorageBlockReport[]> captor =
ArgumentCaptor.forClass(StorageBlockReport[].class);
Expand All @@ -720,7 +713,9 @@ private void countBlockReportItems(final ExtendedBlock fakeBlock,
Mockito.doAnswer((Answer<Object>) invocation -> {
Object[] arguments = invocation.getArguments();
StorageBlockReport[] list = (StorageBlockReport[])arguments[2];
setBlockReportCount(list[0].getBlocks().getNumberOfBlocks());
for (BlockReportReplica brr : list[0].getBlocks()) {
blocks.add(brr.getBlockId());
}
return null;
}).when(mockNN).blockReport(
Mockito.any(),
Expand All @@ -734,7 +729,9 @@ private void countBlockReportItems(final ExtendedBlock fakeBlock,
Object[] arguments = invocation.getArguments();
StorageReceivedDeletedBlocks[] list =
(StorageReceivedDeletedBlocks[])arguments[2];
setIncreaseBlockReportCount(list[0].getBlocks().length);
for (ReceivedDeletedBlockInfo rdbi : list[0].getBlocks()) {
blocks.add(rdbi.getBlock().getBlockId());
}
return null;
}).when(mockNN).blockReceivedAndDeleted(
Mockito.any(),
Expand Down Expand Up @@ -1233,4 +1230,4 @@ public void testCommandProcessingThreadExit() throws Exception {
}
}
}
}
}