Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,15 @@ private void clearLocalDeadNodes() {
deadNodes.clear();
}

/**
* clear list of ignored nodes used for hedged reads
*/
private void clearIgnoredNodes(Collection<DatanodeInfo> ignoredNodes) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. Param documentation.

if (ignoredNodes != null) {
ignoredNodes.clear();
}
}

protected DFSClient getDFSClient() {
return dfsClient;
}
Expand Down Expand Up @@ -940,7 +949,8 @@ private DNAddrPair chooseDataNode(LocatedBlock block,
* @return Returns chosen DNAddrPair; Can be null if refetchIfRequired is
* false.
*/
private DNAddrPair chooseDataNode(LocatedBlock block,
@VisibleForTesting
DNAddrPair chooseDataNode(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes, boolean refetchIfRequired)
throws IOException {
while (true) {
Expand All @@ -955,6 +965,10 @@ private DNAddrPair chooseDataNode(LocatedBlock block,
}
}

/**
* RefetchLocations should only be called when there are no active requests
* to datanodes. In the hedged read case this means futures should be empty
*/
private LocatedBlock refetchLocations(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) throws IOException {
String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
Expand Down Expand Up @@ -1000,6 +1014,7 @@ private LocatedBlock refetchLocations(LocatedBlock block,
"Interrupted while choosing DataNode for read.");
}
clearLocalDeadNodes(); //2nd option is to remove only nodes[blockId]
clearIgnoredNodes(ignoredNodes);
openInfo(true);
block = refreshLocatedBlock(block);
failures++;
Expand Down Expand Up @@ -1337,7 +1352,11 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
} catch (InterruptedException ie) {
// Ignore and retry
}
if (refetch) {
// if refetch is true then all nodes are in deadlist or ignorelist
// we should loop through all futures and remove them so we do not
Copy link
Member

@simbadzina simbadzina Feb 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add punctuation here and start new sentences with caps. That will make the comment easier to follow.

What is the deadlist?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed comments. deadlist is actually deadNodes (I fixed that comment as well.)
When connections fail (in both hedged and non hedged code path) nodes are added to the deadNodes collection to try other nodes. Once chooseDataNode returns null (or more accurately getBestNodeDNAddrPair) it calls refetchLocations which clears the deadNodes clearLocalDeadNodes() and now with my change, also clears the ignore list.

Note we have added an assumption to this method refetchLocations. The comment I added to refetchLocations

 /**
   * RefetchLocations should only be called when there are no active requests
   * to datanodes. In the hedged read case this means futures should be empty
   */

// have concurrent requests to the same node.
// Once all futures are cleared we can clear the ignore list and retry
if (refetch && futures.isEmpty()) {
refetchLocations(block, ignored);
}
// We got here if exception. Ignore this node on next go around IFF
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -35,11 +36,14 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -200,6 +204,25 @@ public void testDeferredRegistrationGetAllBlocks() throws IOException {
testWithRegistrationMethod(DFSInputStream::getAllBlocks);
}

/**
* If the ignoreList contains all datanodes, the ignoredList should be cleared to take advantage
* of retries built into chooseDataNode. This is needed for hedged reads
* @throws IOException
*/
@Test
public void testClearIgnoreListChooseDataNode() throws IOException {
final String fileName = "/test_cache_locations";
filePath = createFile(fileName);

try (DFSInputStream fin = dfsClient.open(fileName)) {
LocatedBlocks existing = fin.locatedBlocks;
LocatedBlock block = existing.getLastLocatedBlock();
ArrayList<DatanodeInfo> ignoreList = new ArrayList<>(Arrays.asList(block.getLocations()));
Assert.assertNotNull(fin.chooseDataNode(block, ignoreList, true));
Assert.assertEquals(0, ignoreList.size());
}
}

@FunctionalInterface
interface ThrowingConsumer {
void accept(DFSInputStream fin) throws IOException;
Expand Down