diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index ad1b3595cbd5b..bc4960a5a006f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -40,6 +40,7 @@ import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.ScriptBasedMapping; +import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.ReflectionUtils; import com.google.common.annotations.VisibleForTesting; @@ -118,6 +119,19 @@ public class ClientContext { private NodeBase clientNode; private boolean topologyResolutionEnabled; + private Daemon deadNodeDetectorThr = null; + + /** + * The switch to DeadNodeDetector. + */ + private boolean sharedDeadNodesEnabled = false; + + /** + * Detect the dead datanodes n advance, and share this information among all + * the DFSInputStreams in the same client. + */ + private DeadNodeDetector deadNodeDetector = null; + private ClientContext(String name, DfsClientConf conf, Configuration config) { final ShortCircuitConf scConf = conf.getShortCircuitConf(); @@ -134,6 +148,12 @@ private ClientContext(String name, DfsClientConf conf, this.byteArrayManager = ByteArrayManager.newInstance( conf.getWriteByteArrayManagerConf()); + this.sharedDeadNodesEnabled = conf.isSharedDeadNodesEnabled(); + if (sharedDeadNodesEnabled && deadNodeDetector == null) { + deadNodeDetector = new DeadNodeDetector(name); + deadNodeDetectorThr = new Daemon(deadNodeDetector); + deadNodeDetectorThr.start(); + } initTopologyResolution(config); } @@ -251,4 +271,12 @@ public int getNetworkDistance(DatanodeInfo datanodeInfo) throws IOException { datanodeInfo.getNetworkLocation()); return NetworkTopology.getDistanceByPath(clientNode, node); } + + public boolean isSharedDeadNodesEnabled() { + return sharedDeadNodesEnabled; + } + + public DeadNodeDetector getDeadNodeDetector() { + return deadNodeDetector; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 56280f3a8bcff..61434a82dee06 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -44,6 +44,8 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; @@ -3226,4 +3228,95 @@ public HAServiceProtocol.HAServiceState getHAServiceState() throws IOException { return namenode.getHAServiceState(); } + + /** + * If sharedDeadNodesEnabled is true, return the dead nodes are detected by + * all the DFSInputStreams in the same client. Otherwise return the dead nodes + * are detected by this DFSInputStream. + */ + public ConcurrentHashMap getDeadNodes( + DFSInputStream dfsInputStream) { + if (clientContext.isSharedDeadNodesEnabled()) { + ConcurrentHashMap deadNodes = + new ConcurrentHashMap(); + if (dfsInputStream != null) { + deadNodes.putAll(dfsInputStream.getLocalDeadNodes()); + } + + Set detectDeadNodes = + clientContext.getDeadNodeDetector().getDeadNodesToDetect(); + for (DatanodeInfo detectDeadNode : detectDeadNodes) { + deadNodes.put(detectDeadNode, detectDeadNode); + } + return deadNodes; + } else { + return dfsInputStream.getLocalDeadNodes(); + } + } + + /** + * If sharedDeadNodesEnabled is true, judgement based on whether this datanode + * is included or not in DeadNodeDetector#deadnodes. Otherwise judgment based + * on whether it is included or not in DFSInputStream#deadnodes. + */ + public boolean isDeadNode(DFSInputStream dfsInputStream, + DatanodeInfo datanodeInfo) { + if (isSharedDeadNodesEnabled()) { + boolean isDeadNode = + clientContext.getDeadNodeDetector().isDeadNode(datanodeInfo); + if (dfsInputStream != null) { + isDeadNode = isDeadNode + || dfsInputStream.getLocalDeadNodes().contains(datanodeInfo); + } + return isDeadNode; + } else { + return dfsInputStream.getLocalDeadNodes().contains(datanodeInfo); + } + } + + /** + * If sharedDeadNodesEnabled is true, add datanode in + * DeadNodeDetector#deadnodes and dfsInputStreamNodes. + */ + public void addNodeToDetect(DFSInputStream dfsInputStream, + DatanodeInfo datanodeInfo) { + if (!isSharedDeadNodesEnabled()) { + return; + } + clientContext.getDeadNodeDetector().addNodeToDetect(dfsInputStream, + datanodeInfo); + } + + /** + * If sharedDeadNodesEnabled is true,remove datanode from + * DeadNodeDetector#dfsInputStreamNodes. + */ + public void removeNodeFromDetectByDFSInputStream( + DFSInputStream dfsInputStream, DatanodeInfo datanodeInfo) { + if (!isSharedDeadNodesEnabled()) { + return; + } + clientContext.getDeadNodeDetector() + .removeNodeFromDetectByDFSInputStream(dfsInputStream, datanodeInfo); + } + + /** + * If sharedDeadNodesEnabled is true and locatedBlocks is not null,remove + * locatedBlocks#datanodeInfos from DeadNodeDetector#dfsInputStreamNodes. + */ + public void removeNodeFromDetectByDFSInputStream( + DFSInputStream dfsInputStream, LocatedBlocks locatedBlocks) { + if (!isSharedDeadNodesEnabled() || locatedBlocks == null) { + return; + } + for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { + for (DatanodeInfo datanodeInfo : locatedBlock.getLocations()) { + removeNodeFromDetectByDFSInputStream(dfsInputStream, datanodeInfo); + } + } + } + + private boolean isSharedDeadNodesEnabled() { + return clientContext.isSharedDeadNodesEnabled(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 22622956cc151..5fec6217498e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -170,10 +170,26 @@ public class DFSInputStream extends FSInputStream private byte[] oneByteBuf; // used for 'int read()' - void addToDeadNodes(DatanodeInfo dnInfo) { + void addToLocalDeadNodes(DatanodeInfo dnInfo) { deadNodes.put(dnInfo, dnInfo); } + public void removeFromLocalDeadNodes(DatanodeInfo dnInfo) { + deadNodes.remove(dnInfo); + } + + public ConcurrentHashMap getLocalDeadNodes() { + return deadNodes; + } + + public void clearLocalDeadNodes() { + deadNodes.clear(); + } + + public DFSClient getDfsClient() { + return dfsClient; + } + DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, LocatedBlocks locatedBlocks) throws IOException { this.dfsClient = dfsClient; @@ -599,7 +615,8 @@ private synchronized DatanodeInfo blockSeekTo(long target) + "{}, add to deadNodes and continue. ", targetAddr, src, targetBlock.getBlock(), ex); // Put chosen node into dead list, continue - addToDeadNodes(chosenNode); + addToLocalDeadNodes(chosenNode); + dfsClient.addNodeToDetect(this, chosenNode); } } } @@ -650,28 +667,40 @@ protected BlockReader getBlockReader(LocatedBlock targetBlock, */ @Override public synchronized void close() throws IOException { - if (!closed.compareAndSet(false, true)) { - DFSClient.LOG.debug("DFSInputStream has been closed already"); - return; - } - dfsClient.checkOpen(); - - if ((extendedReadBuffers != null) && (!extendedReadBuffers.isEmpty())) { - final StringBuilder builder = new StringBuilder(); - extendedReadBuffers.visitAll(new IdentityHashStore.Visitor() { - private String prefix = ""; - @Override - public void accept(ByteBuffer k, Object v) { - builder.append(prefix).append(k); - prefix = ", "; - } - }); - DFSClient.LOG.warn("closing file " + src + ", but there are still " + - "unreleased ByteBuffers allocated by read(). " + - "Please release " + builder.toString() + "."); + try { + if (!closed.compareAndSet(false, true)) { + DFSClient.LOG.debug("DFSInputStream has been closed already"); + return; + } + dfsClient.checkOpen(); + + if ((extendedReadBuffers != null) && (!extendedReadBuffers.isEmpty())) { + final StringBuilder builder = new StringBuilder(); + extendedReadBuffers + .visitAll(new IdentityHashStore.Visitor() { + private String prefix = ""; + + @Override + public void accept(ByteBuffer k, Object v) { + builder.append(prefix).append(k); + prefix = ", "; + } + }); + DFSClient.LOG.warn("closing file " + src + ", but there are still " + + "unreleased ByteBuffers allocated by read(). " + + "Please release " + builder.toString() + "."); + } + closeCurrentBlockReaders(); + super.close(); + } finally { + /** + * If dfsInputStream is closed and datanode is in + * DeadNodeDetector#dfsInputStreamNodes, we need remove the datanode from + * the DeadNodeDetector#dfsInputStreamNodes. Since user should not use + * this dfsInputStream anymore. + */ + dfsClient.removeNodeFromDetectByDFSInputStream(this, locatedBlocks); } - closeCurrentBlockReaders(); - super.close(); } @Override @@ -728,7 +757,8 @@ private synchronized int readBuffer(ReaderStrategy reader, int len, */ sourceFound = seekToBlockSource(pos); } else { - addToDeadNodes(currentNode); + addToLocalDeadNodes(currentNode); + dfsClient.addNodeToDetect(this, currentNode); sourceFound = seekToNewSource(pos); } if (!sourceFound) { @@ -788,7 +818,8 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy) } blockEnd = -1; if (currentNode != null) { - addToDeadNodes(currentNode); + addToLocalDeadNodes(currentNode); + dfsClient.addNodeToDetect(this, currentNode); } if (--retries == 0) { throw e; @@ -870,7 +901,7 @@ private DNAddrPair chooseDataNode(LocatedBlock block, private LocatedBlock refetchLocations(LocatedBlock block, Collection ignoredNodes) throws IOException { String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(), - deadNodes, ignoredNodes); + dfsClient.getDeadNodes(this), ignoredNodes); String blockInfo = block.getBlock() + " file=" + src; if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) { String description = "Could not obtain block: " + blockInfo; @@ -911,7 +942,7 @@ private LocatedBlock refetchLocations(LocatedBlock block, throw new InterruptedIOException( "Interrupted while choosing DataNode for read."); } - deadNodes.clear(); //2nd option is to remove only nodes[blockId] + clearLocalDeadNodes(); //2nd option is to remove only nodes[blockId] openInfo(true); block = refreshLocatedBlock(block); failures++; @@ -932,7 +963,7 @@ protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block, StorageType storageType = null; if (nodes != null) { for (int i = 0; i < nodes.length; i++) { - if (!deadNodes.containsKey(nodes[i]) + if (!dfsClient.getDeadNodes(this).containsKey(nodes[i]) && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) { chosenNode = nodes[i]; // Storage types are ordered to correspond with nodes, so use the same @@ -1084,7 +1115,7 @@ void actualGetFromOneDataNode(final DNAddrPair datanode, final long startInBlk, DFSClient.LOG.warn(msg); // we want to remember what we have tried corruptedBlocks.addCorruptedBlock(block.getBlock(), datanode.info); - addToDeadNodes(datanode.info); + addToLocalDeadNodes(datanode.info); throw new IOException(msg); } catch (IOException e) { checkInterrupted(e); @@ -1106,7 +1137,8 @@ void actualGetFromOneDataNode(final DNAddrPair datanode, final long startInBlk, String msg = "Failed to connect to " + datanode.addr + " for file " + src + " for block " + block.getBlock() + ":" + e; DFSClient.LOG.warn("Connection failure: " + msg, e); - addToDeadNodes(datanode.info); + addToLocalDeadNodes(datanode.info); + dfsClient.addNodeToDetect(this, datanode.info); throw new IOException(msg); } // Refresh the block for updated tokens in case of token failures or @@ -1509,14 +1541,14 @@ public synchronized boolean seekToNewSource(long targetPos) if (currentNode == null) { return seekToBlockSource(targetPos); } - boolean markedDead = deadNodes.containsKey(currentNode); - addToDeadNodes(currentNode); + boolean markedDead = dfsClient.isDeadNode(this, currentNode); + addToLocalDeadNodes(currentNode); DatanodeInfo oldNode = currentNode; DatanodeInfo newNode = blockSeekTo(targetPos); if (!markedDead) { /* remove it from deadNodes. blockSeekTo could have cleared * deadNodes and added currentNode again. Thats ok. */ - deadNodes.remove(oldNode); + removeFromLocalDeadNodes(oldNode); } if (!oldNode.getDatanodeUuid().equals(newNode.getDatanodeUuid())) { currentNode = newNode; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 3f688d410be7e..9e0eed0613f78 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -146,10 +146,6 @@ protected String getSrc() { return src; } - protected DFSClient getDFSClient() { - return dfsClient; - } - protected LocatedBlocks getLocatedBlocks() { return locatedBlocks; } @@ -282,7 +278,7 @@ boolean createBlockReader(LocatedBlock block, long offsetInBlock, "block" + block.getBlock(), e); // re-fetch the block in case the block has been moved fetchBlockAt(block.getStartOffset()); - addToDeadNodes(dnInfo.info); + addToLocalDeadNodes(dnInfo.info); } } if (reader != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java new file mode 100644 index 0000000000000..f9dddde78b539 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Detect the dead nodes in advance, and share this information among all the + * DFSInputStreams in the same client. + */ +public class DeadNodeDetector implements Runnable { + public static final Logger LOG = + LoggerFactory.getLogger(DeadNodeDetector.class); + + private static final long ERROR_SLEEP_MS = 5000; + private static final long IDLE_SLEEP_MS = 10000; + + private String name; + + /** + * Dead nodes shared by all the DFSInputStreams of the client. + */ + private final ConcurrentHashMap deadNodes; + + /** + * Record dead nodes by one DFSInputStream. When dead node is not used by one + * DFSInputStream, remove it from dfsInputStreamNodes#DFSInputStream. If + * DFSInputStream does not include any dead node, remove DFSInputStream from + * dfsInputStreamNodes. + */ + private final ConcurrentHashMap> + dfsInputStreamNodes; + + /** + * The state of DeadNodeDetector. + */ + private enum State { + INIT, CHECK_DEAD, IDLE, ERROR + } + + private State state; + + public DeadNodeDetector(String name) { + this.deadNodes = new ConcurrentHashMap(); + this.dfsInputStreamNodes = + new ConcurrentHashMap>(); + this.name = name; + + LOG.info("start dead node detector for DFSClient " + this.name); + state = State.INIT; + } + + @Override + public void run() { + while (true) { + LOG.debug("state " + state); + switch (state) { + case INIT: + init(); + break; + case IDLE: + idle(); + break; + case ERROR: + try { + Thread.sleep(ERROR_SLEEP_MS); + } catch (InterruptedException e) { + } + return; + default: + break; + } + } + } + + private void idle() { + try { + Thread.sleep(IDLE_SLEEP_MS); + } catch (InterruptedException e) { + + } + + state = State.IDLE; + } + + private void init() { + state = State.IDLE; + } + + private void addToDead(DatanodeInfo datanodeInfo) { + deadNodes.put(datanodeInfo, datanodeInfo); + } + + public boolean isDeadNode(DatanodeInfo datanodeInfo) { + return deadNodes.contains(datanodeInfo); + } + + /** + * Add datanode in deadNodes and dfsInputStreamNodes. The node is considered + * to dead node. The dead node is shared by all the DFSInputStreams in the + * same client. + */ + public synchronized void addNodeToDetect(DFSInputStream dfsInputStream, + DatanodeInfo datanodeInfo) { + HashSet datanodeInfos = + dfsInputStreamNodes.get(dfsInputStream); + if (datanodeInfos == null) { + datanodeInfos = new HashSet(); + datanodeInfos.add(datanodeInfo); + dfsInputStreamNodes.putIfAbsent(dfsInputStream, datanodeInfos); + } else { + datanodeInfos.add(datanodeInfo); + } + + addToDead(datanodeInfo); + } + + /** + * Remove dead node which is not used by any DFSInputStream from deadNodes. + * @return new dead node shared by all DFSInputStreams. + */ + public synchronized Set getDeadNodesToDetect() { + // remove the dead nodes who doesn't have any inputstream first + Set newDeadNodes = new HashSet(); + for (HashSet datanodeInfos : dfsInputStreamNodes.values()) { + newDeadNodes.addAll(datanodeInfos); + } + + newDeadNodes.retainAll(deadNodes.values()); + + for (DatanodeInfo datanodeInfo : deadNodes.values()) { + if (!newDeadNodes.contains(datanodeInfo)) { + deadNodes.remove(datanodeInfo); + } + } + return newDeadNodes; + } + + /** + * Remove dead node from dfsInputStreamNodes#dfsInputStream. If + * dfsInputStreamNodes#dfsInputStream does not contain any dead node, remove + * it from dfsInputStreamNodes. + */ + public synchronized void removeNodeFromDetectByDFSInputStream( + DFSInputStream dfsInputStream, DatanodeInfo datanodeInfo) { + Set datanodeInfos = dfsInputStreamNodes.get(dfsInputStream); + if (datanodeInfos != null) { + datanodeInfos.remove(datanodeInfo); + if (datanodeInfos.isEmpty()) { + dfsInputStreamNodes.remove(dfsInputStream); + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java index e90af846d7d50..547b8d70e3384 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java @@ -215,7 +215,7 @@ private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) { if (chunk.useByteBuffer()) { ByteBufferStrategy strategy = new ByteBufferStrategy( chunk.getByteBuffer(), dfsStripedInputStream.getReadStatistics(), - dfsStripedInputStream.getDFSClient()); + dfsStripedInputStream.getDfsClient()); return new ByteBufferStrategy[]{strategy}; } @@ -225,7 +225,7 @@ private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) { ByteBuffer buffer = chunk.getChunkBuffer().getSlice(i); strategies[i] = new ByteBufferStrategy(buffer, dfsStripedInputStream.getReadStatistics(), - dfsStripedInputStream.getDFSClient()); + dfsStripedInputStream.getDfsClient()); } return strategies; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 2e2e4a6df4cf5..839901b0e5810 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -149,6 +149,10 @@ public interface HdfsClientConfigKeys { long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT = TimeUnit.DAYS.toMillis(10); // 10 days + String DFS_CLIENT_DEAD_NODE_DETECT_ENABLED_KEY = + "dfs.client.deadnode.detect.enabled"; + boolean DFS_CLIENT_DEAD_NODE_DETECT_ENABLED_DEFAULT = false; + String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY = "dfs.datanode.kerberos.principal"; String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index 04bdfe47960f1..7275deaa1f53b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -26,14 +26,19 @@ import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.ReplicaAccessorBuilder; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.util.ByteArrayManager; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.util.DataChecksum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY; @@ -47,6 +52,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECT_ENABLED_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECT_ENABLED_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT; @@ -61,8 +68,6 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; @@ -71,6 +76,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT; @@ -87,11 +94,6 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ShortCircuit; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; - /** * DFSClient configuration. */ @@ -145,6 +147,8 @@ public class DfsClientConf { private final boolean dataTransferTcpNoDelay; + private final boolean sharedDeadNodesEnabled; + public DfsClientConf(Configuration conf) { // The hdfsTimeout is currently the same as the ipc timeout hdfsTimeout = Client.getRpcTimeout(conf); @@ -262,6 +266,10 @@ public DfsClientConf(Configuration conf) { HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT); + sharedDeadNodesEnabled = + conf.getBoolean(DFS_CLIENT_DEAD_NODE_DETECT_ENABLED_KEY, + DFS_CLIENT_DEAD_NODE_DETECT_ENABLED_DEFAULT); + stripedReadThreadpoolSize = conf.getInt( HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY, HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_DEFAULT); @@ -595,6 +603,13 @@ public int getStripedReadThreadpoolSize() { return stripedReadThreadpoolSize; } + /** + * @return the sharedDeadNodesEnabled + */ + public boolean isSharedDeadNodesEnabled() { + return sharedDeadNodesEnabled; + } + /** * @return the replicaAccessorBuilderClasses */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index e4e7a159600e2..f1ace4c4610c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2963,6 +2963,14 @@ + + dfs.client.deadnode.detect.enabled + false + + Set to true to enable all the DFSInputStreams of some Client share DeadNode information. + + + dfs.namenode.lease-recheck-interval-ms 2000 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientDetectDeadNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientDetectDeadNodes.java new file mode 100644 index 0000000000000..dc032d9eb2282 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientDetectDeadNodes.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; +import org.apache.hadoop.util.ThreadUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECT_ENABLED_KEY; +import static org.junit.Assert.assertTrue; + +/** + * These tests make sure that DFSClient excludes writing data to a DN properly + * in case of errors. + */ +public class TestDFSClientDetectDeadNodes { + + private MiniDFSCluster cluster; + private Configuration conf; + + @Before + public void setUp() { + cluster = null; + conf = new HdfsConfiguration(); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test(timeout = 60000000) + public void testDetectDeadNodeInBackground() throws IOException { + conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECT_ENABLED_KEY, true); + // We'll be using a 512 bytes block size just for tests + // so making sure the checksum bytes too match it. + conf.setInt("io.bytes.per.checksum", 512); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + cluster.waitActive(); + ThreadUtil.sleepAtLeastIgnoreInterrupts(10 * 1000L); + + FileSystem fs = cluster.getFileSystem(); + Path filePath = new Path("/testNodeBecomeDead"); + + // 256 bytes data chunk for writes + byte[] bytes = new byte[256]; + for (int index = 0; index < bytes.length; index++) { + bytes[index] = '0'; + } + + // File with a 512 bytes block size + FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512); + + // Write a block to all 3 DNs (2x256bytes). + out.write(bytes); + out.write(bytes); + out.hflush(); + out.close(); + + // Remove three DNs, + cluster.stopDataNode(0); + cluster.stopDataNode(0); + cluster.stopDataNode(0); + + FSDataInputStream in = fs.open(filePath); + try { + try { + in.read(); + } catch (BlockMissingException e) { + } + + DFSInputStream din = (DFSInputStream) in.getWrappedStream(); + assertTrue(din.getDfsClient().getDeadNodes(din).size() == 3); + } finally { + in.close(); + } + } + + @Test(timeout = 60000000) + public void testDeadNodeMultipleDFSInputStream() throws IOException { + conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECT_ENABLED_KEY, true); + // We'll be using a 512 bytes block size just for tests + // so making sure the checksum bytes too match it. + conf.setInt("io.bytes.per.checksum", 512); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + ThreadUtil.sleepAtLeastIgnoreInterrupts(10 * 1000L); + + FileSystem fs = cluster.getFileSystem(); + Path filePath = new Path("/testNodeBecomeDead"); + + // 256 bytes data chunk for writes + byte[] bytes = new byte[256]; + for (int index = 0; index < bytes.length; index++) { + bytes[index] = '0'; + } + + // File with a 512 bytes block size + FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512); + + // Write a block to DN (2x256bytes). + out.write(bytes); + out.write(bytes); + out.hflush(); + out.close(); + + FSDataInputStream in1 = fs.open(filePath); + DFSInputStream din1 = (DFSInputStream) in1.getWrappedStream(); + cluster.stopDataNode(0); + + FSDataInputStream in2 = fs.open(filePath); + try { + try { + in1.read(); + } catch (BlockMissingException e) { + } + + DFSInputStream din2 = (DFSInputStream) in1.getWrappedStream(); + assertTrue(din1.getDfsClient().getDeadNodes(din1).size() == 1); + assertTrue(din2.getDfsClient().getDeadNodes(din2).size() == 1); + } finally { + in1.close(); + in2.close(); + } + } +}