Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2658,7 +2658,7 @@ public long getProvidedCapacity() {
void updateHeartbeat(DatanodeDescriptor node, StorageReport[] reports,
long cacheCapacity, long cacheUsed, int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) {

BlockManagerFaultInjector.getInstance().mockAnException();
for (StorageReport report: reports) {
providedStorageMap.updateStorage(node, report.getStorage());
}
Expand All @@ -2670,6 +2670,7 @@ void updateHeartbeatState(DatanodeDescriptor node,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) {
BlockManagerFaultInjector.getInstance().mockAnException();
for (StorageReport report: reports) {
providedStorageMap.updateStorage(node, report.getStorage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,8 @@ public void requestBlockReportLease(DatanodeDescriptor node, long leaseId) {
@VisibleForTesting
public void removeBlockReportLease(DatanodeDescriptor node, long leaseId) {
}

@VisibleForTesting
public void mockAnException() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -256,23 +256,29 @@ synchronized void updateHeartbeat(final DatanodeDescriptor node,
int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) {
stats.subtract(node);
blockManager.updateHeartbeat(node, reports, cacheCapacity, cacheUsed,
xceiverCount, failedVolumes, volumeFailureSummary);
stats.add(node);
try {
blockManager.updateHeartbeat(node, reports, cacheCapacity, cacheUsed,
xceiverCount, failedVolumes, volumeFailureSummary);
} finally {
stats.add(node);
}
}

synchronized void updateLifeline(final DatanodeDescriptor node,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) {
stats.subtract(node);
// This intentionally calls updateHeartbeatState instead of
// updateHeartbeat, because we don't want to modify the
// heartbeatedSinceRegistration flag. Arrival of a lifeline message does
// not count as arrival of the first heartbeat.
blockManager.updateHeartbeatState(node, reports, cacheCapacity, cacheUsed,
xceiverCount, failedVolumes, volumeFailureSummary);
stats.add(node);
try {
// This intentionally calls updateHeartbeatState instead of
// updateHeartbeat, because we don't want to modify the
// heartbeatedSinceRegistration flag. Arrival of a lifeline message does
// not count as arrival of the first heartbeat.
blockManager.updateHeartbeatState(node, reports, cacheCapacity, cacheUsed,
xceiverCount, failedVolumes, volumeFailureSummary);
} finally {
stats.add(node);
}
}

synchronized void startDecommission(final DatanodeDescriptor node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3877,4 +3877,8 @@ boolean isSlownodeByBlockPoolId(String bpId) {
boolean isSlownode() {
return blockPoolManager.isSlownode();
}

BlockPoolManager getBlockPoolManager() {
return blockPoolManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerFaultInjector;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
Expand Down Expand Up @@ -95,6 +98,12 @@ public class TestDataNodeLifeline {
private FSNamesystem namesystem;
private DataNode dn;
private BPServiceActor bpsa;
private final BlockManagerFaultInjector injector = new BlockManagerFaultInjector() {
@Override
public void mockAnException() {
throw new UnknownError("Unknown exception");
}
};

@Before
public void setup() throws Exception {
Expand Down Expand Up @@ -336,4 +345,49 @@ public T answer(InvocationOnMock invocation)
return result;
}
}

/**
* Mock an exception in HeartbeatManager#updateHeartbeat and HeartbeatManager#updateLifeline
* respectively, and trigger the heartbeat and lifeline in sequence. The capacityTotal obtained
* before and after this operation should be the same.
* @throws Exception
*/
@Test
public void testHeartbeatAndLifelineOnError() throws Exception {
final Configuration config = new HdfsConfiguration();
config.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY, "0.0.0.0:0");

try(MiniDFSCluster cluster =
new MiniDFSCluster.Builder(config).numDataNodes(1).build()) {
cluster.waitActive();
final FSNamesystem fsNamesystem = cluster.getNamesystem();

// Get capacityTotal before triggering heartbeat and lifeline.
DatanodeStatistics datanodeStatistics =
fsNamesystem.getBlockManager().getDatanodeManager().getDatanodeStatistics();
long capacityTotalBefore = datanodeStatistics.getCapacityTotal();

// Mock an exception in HeartbeatManager#updateHeartbeat and HeartbeatManager#updateLifeline.
BlockManagerFaultInjector.instance = injector;
DataNode dataNode = cluster.getDataNodes().get(0);
BlockPoolManager blockPoolManager = dataNode.getBlockPoolManager();
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
if (bpos != null) {
for (BPServiceActor actor : bpos.getBPServiceActors()) {
try {
actor.triggerHeartbeatForTests();
actor.sendLifelineForTests();
} catch (Throwable e) {
assertTrue(e.getMessage().contains("Unknown exception"));
}
}
}
}

// Get capacityTotal after triggering heartbeat and lifeline.
long capacityTotalAfter = datanodeStatistics.getCapacityTotal();
// The capacityTotal should be same.
assertEquals(capacityTotalBefore, capacityTotalAfter);
}
}
}