Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.balancer;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
Expand Down Expand Up @@ -237,6 +238,11 @@ private void testBalancerWithObserver(boolean withObserverFailure)
// Avoid the same FS being reused between tests
conf.setBoolean("fs.hdfs.impl.disable.cache", true);

// Reduce datanode retry so cluster shutdown won't be blocked.
if (withObserverFailure) {
conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 2);
}

MiniQJMHACluster qjmhaCluster = null;
try {
qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.File;
import java.io.IOException;
Expand All @@ -43,16 +44,20 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.metrics2.MetricsRecordBuilder;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
Expand Down Expand Up @@ -107,8 +112,6 @@ public class TestBPOfferService {
private long firstLeaseId = 0;
private long secondLeaseId = 0;
private long nextFullBlockReportLeaseId = 1L;
private int fullBlockReportCount = 0;
private int incrBlockReportCount = 0;

static {
GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
Expand Down Expand Up @@ -231,14 +234,6 @@ public HeartbeatResponse answer(InvocationOnMock invocation)
}
}

private void setBlockReportCount(int count) {
fullBlockReportCount = count;
}

private void setIncreaseBlockReportCount(int count) {
incrBlockReportCount += count;
}

/**
* Test that the BPOS can register to talk to two different NNs,
* sends block reports to both, etc.
Expand Down Expand Up @@ -285,26 +280,24 @@ public void testMissBlocksWhenReregister() throws Exception {
int totalTestBlocks = 4000;
Thread addNewBlockThread = null;
final AtomicInteger count = new AtomicInteger(0);

DataNodeFaultInjector prevDNFaultInjector = null;
Set<Long> blocks = new TreeSet<>();
try {
waitForBothActors(bpos);
waitForInitialization(bpos);
prevDNFaultInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector.set(new DataNodeFaultInjector() {
public void blockUtilSendFullBlockReport() {
try {
GenericTestUtils.waitFor(() -> {
if(count.get() > 2000) {
return true;
}
return false;
}, 100, 1000);
GenericTestUtils.waitFor(() -> count.get() > 2000,
100, 1000);
} catch (Exception e) {
e.printStackTrace();
LOG.error("error DataNodeFaultInjector", e);
}
}
});

countBlockReportItems(FAKE_BLOCK, mockNN1);
countBlockReportItems(FAKE_BLOCK, mockNN1, blocks);
addNewBlockThread = new Thread(() -> {
for (int i = 0; i < totalTestBlocks; i++) {
SimulatedFSDataset fsDataset = (SimulatedFSDataset) mockFSDataset;
Expand All @@ -318,45 +311,39 @@ public void blockUtilSendFullBlockReport() {
count.addAndGet(1);
Thread.sleep(1);
} catch (Exception e) {
e.printStackTrace();
LOG.error("error addNewBlockThread", e);
}
}
});
addNewBlockThread.start();

// Make sure that generate blocks for DataNode and IBR not empty now.
GenericTestUtils.waitFor(() -> {
if(count.get() > 0) {
return true;
}
return false;
}, 100, 1000);
GenericTestUtils.waitFor(() -> count.get() > 0, 100, 1000);

// Trigger re-register using DataNode Command.
datanodeCommands[0] = new DatanodeCommand[]{RegisterCommand.REGISTER};
bpos.triggerHeartbeatForTests();

bpos.triggerHeartbeatForTests();
addNewBlockThread.join();
addNewBlockThread = null;
// Verify FBR/IBR count is equal to generate number.
try {
GenericTestUtils.waitFor(() -> {
if(fullBlockReportCount == totalTestBlocks ||
incrBlockReportCount == totalTestBlocks) {
return true;
}
return false;
}, 1000, 15000);
} catch (Exception e) {}
GenericTestUtils.waitFor(() -> blocks.size() == totalTestBlocks,
1000, 15000);
} catch (Exception e) {
fail(String.format("Timed out waiting for blocks count. "
+ "reported = %d, expected = %d. Exception: %s",
blocks.size(), totalTestBlocks, e.getMessage()));
}

// Verify FBR/IBR count is equal to generate number.
assertTrue(fullBlockReportCount == totalTestBlocks ||
incrBlockReportCount == totalTestBlocks);
} finally {
addNewBlockThread.join();
if (addNewBlockThread != null) {
addNewBlockThread.interrupt();
}
bpos.stop();
bpos.join();

DataNodeFaultInjector.set(new DataNodeFaultInjector() {
public void blockUtilSendFullBlockReport() {}
});
DataNodeFaultInjector.set(prevDNFaultInjector);
}
}

Expand Down Expand Up @@ -716,7 +703,8 @@ private void setTimeForSynchronousBPOSCalls() {
* which assume no deleting blocks here.
*/
private void countBlockReportItems(final ExtendedBlock fakeBlock,
final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
final DatanodeProtocolClientSideTranslatorPB mockNN,
final Set<Long> blocks) throws Exception {
final String fakeBlockPoolId = fakeBlock.getBlockPoolId();
final ArgumentCaptor<StorageBlockReport[]> captor =
ArgumentCaptor.forClass(StorageBlockReport[].class);
Expand All @@ -725,7 +713,9 @@ private void countBlockReportItems(final ExtendedBlock fakeBlock,
Mockito.doAnswer((Answer<Object>) invocation -> {
Object[] arguments = invocation.getArguments();
StorageBlockReport[] list = (StorageBlockReport[])arguments[2];
setBlockReportCount(list[0].getBlocks().getNumberOfBlocks());
for (BlockReportReplica brr : list[0].getBlocks()) {
blocks.add(brr.getBlockId());
}
return null;
}).when(mockNN).blockReport(
Mockito.any(),
Expand All @@ -739,7 +729,9 @@ private void countBlockReportItems(final ExtendedBlock fakeBlock,
Object[] arguments = invocation.getArguments();
StorageReceivedDeletedBlocks[] list =
(StorageReceivedDeletedBlocks[])arguments[2];
setIncreaseBlockReportCount(list[0].getBlocks().length);
for (ReceivedDeletedBlockInfo rdbi : list[0].getBlocks()) {
blocks.add(rdbi.getBlock().getBlockId());
}
return null;
}).when(mockNN).blockReceivedAndDeleted(
Mockito.any(),
Expand Down