diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 2b511bfc2ebee..07568b034f96a 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -137,6 +137,9 @@ public interface HdfsClientConfigKeys { String DFS_CHECKSUM_COMBINE_MODE_DEFAULT = "MD5MD5CRC"; String DFS_CHECKSUM_EC_SOCKET_TIMEOUT_KEY = "dfs.checksum.ec.socket-timeout"; int DFS_CHECKSUM_EC_SOCKET_TIMEOUT_DEFAULT = 3000; + String DFS_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_KEY = + "dfs.copy.block.cross.namespace.socket-timeout"; + int DFS_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_DEFAULT = 5 * 60 * 1000; String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY = "dfs.datanode.socket.write.timeout"; String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java index 384f1dc3507af..cb68933e5cdbe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java @@ -238,4 +238,17 @@ void blockGroupChecksum(StripedBlockInfo stripedBlockInfo, Token blockToken, long requestedNumBytes, BlockChecksumOptions blockChecksumOptions) throws IOException; + + /** + * Copy a block cross namespace. It is used for fast copy purpose. + * + * @param srcBlk the block to copy. + * @param srcBlockToken security token for accessing the source block. + * @param targetBlk the block to copy to. + * @param targetBlockToken security token for accessing the target block. + * @param targetDN the target block belongs to. + */ + void copyBlockCrossNamespace(ExtendedBlock srcBlk, Token srcBlockToken, + ExtendedBlock targetBlk, Token targetBlockToken, DatanodeInfo targetDN) + throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java index 94250e5e7f622..991cf7f61b174 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java @@ -39,6 +39,7 @@ public enum Op { RELEASE_SHORT_CIRCUIT_FDS((byte)88), REQUEST_SHORT_CIRCUIT_SHM((byte)89), BLOCK_GROUP_CHECKSUM((byte)90), + COPY_BLOCK_CROSS_NAMESPACE((byte)91), CUSTOM((byte)127); /** The code for this operation. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index 3d81a62993efc..870e22739a700 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockCrossNamespaceProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto; @@ -303,4 +304,20 @@ public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo, send(out, Op.BLOCK_GROUP_CHECKSUM, proto); } + + @Override + public void copyBlockCrossNamespace( + final ExtendedBlock srcBlk, + final Token srcBlockToken, + final ExtendedBlock targetBlk, + final Token targetBlockToken, + final DatanodeInfo targetDN) throws IOException { + OpCopyBlockCrossNamespaceProto proto = OpCopyBlockCrossNamespaceProto.newBuilder() + .setHeader(DataTransferProtoUtil.buildBaseHeader(srcBlk, srcBlockToken)) + .setTargetBlock(PBHelperClient.convert(targetBlk)) + .setTargetBlockToken(PBHelperClient.convert(targetBlockToken)) + .setTargetDatanode(PBHelperClient.convert(targetDN)) + .build(); + send(out, Op.COPY_BLOCK_CROSS_NAMESPACE, proto); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto index 5356cd6961699..1eb07c64258e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto @@ -331,3 +331,10 @@ message OpBlockChecksumResponseProto { message OpCustomProto { required string customId = 1; } + +message OpCopyBlockCrossNamespaceProto { + required BaseHeaderProto header = 1; + required ExtendedBlockProto targetBlock = 2; + optional hadoop.common.TokenProto targetBlockToken = 3; + required DatanodeInfoProto targetDatanode = 4; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index 8bcfb199ff5a9..2cfd253bfd52a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockCrossNamespaceProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto; @@ -133,6 +134,9 @@ protected final void processOp(Op op) throws IOException { case REQUEST_SHORT_CIRCUIT_SHM: opRequestShortCircuitShm(in); break; + case COPY_BLOCK_CROSS_NAMESPACE: + opCopyBlockCrossNamespace(in); + break; default: throw new IOException("Unknown op " + op + " in data stream"); } @@ -339,4 +343,19 @@ private void opStripedBlockChecksum(DataInputStream dis) throws IOException { } } } + + /** Receive {@link Op#COPY_BLOCK_CROSS_NAMESPACE}. */ + private void opCopyBlockCrossNamespace(DataInputStream inputStream) throws IOException { + OpCopyBlockCrossNamespaceProto proto = + OpCopyBlockCrossNamespaceProto.parseFrom(vintPrefixed(inputStream)); + try (TraceScope ignored = continueTraceSpan(proto.getHeader(), + proto.getClass().getSimpleName())) { + copyBlockCrossNamespace( + PBHelperClient.convert(proto.getHeader().getBlock()), + PBHelperClient.convert(proto.getHeader().getToken()), + PBHelperClient.convert(proto.getTargetBlock()), + PBHelperClient.convert(proto.getTargetBlockToken()), + PBHelperClient.convert(proto.getTargetDatanode())); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 9b5343321d30b..96b4f715ecdcb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -64,6 +64,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_EC_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_EC_SOCKET_TIMEOUT_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_DEFAULT; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; @@ -88,6 +90,7 @@ public class DNConf { final int socketWriteTimeout; final int socketKeepaliveTimeout; final int ecChecksumSocketTimeout; + private final int copyBlockCrossNamespaceSocketTimeout; private final int transferSocketSendBufferSize; private final int transferSocketRecvBufferSize; private final boolean tcpNoDelay; @@ -152,6 +155,9 @@ public DNConf(final Configurable dn) { ecChecksumSocketTimeout = getConf().getInt( DFS_CHECKSUM_EC_SOCKET_TIMEOUT_KEY, DFS_CHECKSUM_EC_SOCKET_TIMEOUT_DEFAULT); + this.copyBlockCrossNamespaceSocketTimeout = getConf().getInt( + DFS_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_KEY, + DFS_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_DEFAULT); this.transferSocketSendBufferSize = getConf().getInt( DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_KEY, DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_DEFAULT); @@ -389,6 +395,15 @@ public int getEcChecksumSocketTimeout() { return ecChecksumSocketTimeout; } + /** + * Returns socket timeout for cross-namespace block copying. + * + * @return int socket timeout + */ + public int getCopyBlockCrossNamespaceSocketTimeout() { + return copyBlockCrossNamespaceSocketTimeout; + } + /** * Returns the SaslPropertiesResolver configured for use with * DataTransferProtocol, or null if not configured. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 41088cf59ce4b..6c9483dd3c282 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -453,6 +453,9 @@ public static InetSocketAddress createSocketAddr(String target) { private final ExecutorService xferService; + /** One executor to copy block cross namespace. **/ + private final ExecutorService blockCopyExecutor; + @Nullable private final StorageLocationChecker storageLocationChecker; @@ -500,6 +503,7 @@ private static Tracer createTracer(Configuration conf) { volumeChecker = new DatasetVolumeChecker(conf, new Timer()); this.xferService = HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory()); + this.blockCopyExecutor = HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory()); } /** @@ -599,6 +603,7 @@ public Map load(String key) { new DataTransferThrottler(100, ecReconstuctReadBandwidth) : null; this.ecReconstuctWriteThrottler = ecReconstuctWriteBandwidth > 0 ? new DataTransferThrottler(100, ecReconstuctWriteBandwidth) : null; + this.blockCopyExecutor = HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory()); } @Override // ReconfigurableBase @@ -2465,9 +2470,12 @@ public void shutdown() { // wait reconfiguration thread, if any, to exit shutdownReconfigurationTask(); - LOG.info("Waiting up to 30 seconds for transfer threads to complete"); + LOG.info("Waiting up to 15 seconds for transfer threads to complete"); HadoopExecutors.shutdown(this.xferService, LOG, 15L, TimeUnit.SECONDS); + LOG.info("Waiting up to 15 seconds for copy block cross namespace threads to complete"); + HadoopExecutors.shutdown(this.blockCopyExecutor, LOG, 15L, TimeUnit.SECONDS); + // wait for all data receiver threads to exit if (this.threadGroup != null) { int sleepMs = 2; @@ -2755,7 +2763,7 @@ void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets, xferTargetsString); final DataTransfer dataTransferTask = new DataTransfer(xferTargets, - xferTargetStorageTypes, xferTargetStorageIDs, block, + xferTargetStorageTypes, xferTargetStorageIDs, block, block, BlockConstructionStage.PIPELINE_SETUP_CREATE, ""); this.xferService.execute(dataTransferTask); @@ -2865,6 +2873,85 @@ CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32) ************************************************************************ */ + /** + * Copy the source block to the target block from one namespace to another. + * If the target dn is the local dn, will copy via HardLink. + * If the target dn is remote dn, will copy via DataTransfer. + * + * @param srcBlock Block to copy. + * @param destBlock Block to copy to. + * @param targetDatanodeInfo Target block belongs to. + * @return the future of the submitted task. + */ + public Future internalCopyBlockCrossNamespace( + final ExtendedBlock srcBlock, final ExtendedBlock destBlock, + final DatanodeInfo targetDatanodeInfo) throws IOException { + // Do some verification for the source block. + if (!data.isValidBlock(srcBlock)) { + // block does not exist or is under-construction + String msg = "CopyBlockCrossNamespace: can't copy block " + srcBlock + + " because is invalid, r = " + data.getReplicaString(srcBlock.getBlockPoolId(), + srcBlock.getBlockId()); + LOG.error(msg); + throw new IOException(msg); + } + long onDiskLength = data.getLength(srcBlock); + if (srcBlock.getNumBytes() > onDiskLength) { + String msg = "CopyBlockCrossNamespace: can't copy block " + srcBlock + + " because on-disk length " + onDiskLength + + " is shorter than provided length " + srcBlock.getNumBytes(); + LOG.error(msg); + throw new IOException(msg); + } + + boolean isLocal = getDatanodeUuid().equals(targetDatanodeInfo.getDatanodeUuid()); + if (isLocal) { + // Local Copy via hardlink. + return blockCopyExecutor.submit(new BlockCopyViaHardLink(srcBlock, destBlock)); + } else { + // Remote Copy via dataTransfer. + DatanodeInfo[] xferTargets = new DatanodeInfo[] {targetDatanodeInfo}; + StorageType[] storageTypes = new StorageType[] {StorageType.DEFAULT}; + return blockCopyExecutor.submit(new DataTransfer( + xferTargets, storageTypes, new String[0], srcBlock, destBlock, + BlockConstructionStage.PIPELINE_SETUP_CREATE, "")); + } + } + + /** + * Copy block via hardLink. + */ + class BlockCopyViaHardLink implements Callable { + private final ExtendedBlock srcBlock; + private final ExtendedBlock targetBlock; + + BlockCopyViaHardLink(ExtendedBlock sourceBlock, ExtendedBlock targetBlock) { + this.srcBlock = sourceBlock; + this.targetBlock = targetBlock; + } + + public Boolean call() throws IOException { + incrementXmitsInProgress(); + try { + // HardLink the block and finalize the target block + data.hardLinkOneBlock(srcBlock, targetBlock); + + LOG.info("{}: HardLinked source block {} to target block {}.", + getClass().getSimpleName(), srcBlock, targetBlock); + + metrics.incrBlocksWritten(); + metrics.incrBlocksReplicatedViaHardlink(); + } catch (IOException ie) { + handleBadBlock(srcBlock, ie, false); + LOG.error("Failed copy block via hard link from {} to {}.", srcBlock, targetBlock, ie); + throw ie; + } finally { + decrementXmitsInProgress(); + } + return true; + } + } + /** * Used for transferring a block of data. This class * sends a piece of data to another DataNode. @@ -2873,7 +2960,8 @@ private class DataTransfer implements Runnable { final DatanodeInfo[] targets; final StorageType[] targetStorageTypes; final private String[] targetStorageIds; - final ExtendedBlock b; + final private ExtendedBlock sourceBlock; + final private ExtendedBlock targetBlock; final BlockConstructionStage stage; final private DatanodeRegistration bpReg; final String clientname; @@ -2887,21 +2975,22 @@ private class DataTransfer implements Runnable { * entire target list, the block, and the data. */ DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes, - String[] targetStorageIds, ExtendedBlock b, + String[] targetStorageIds, ExtendedBlock sourceBlock, ExtendedBlock targetBlock, BlockConstructionStage stage, final String clientname) { DataTransferProtocol.LOG.debug("{}: {} (numBytes={}), stage={}, " + - "clientname={}, targets={}, target storage types={}, " + - "target storage IDs={}", getClass().getSimpleName(), b, - b.getNumBytes(), stage, clientname, Arrays.asList(targets), + "clientname={}, targetBlock={}, targets={}, target storage types={}, " + + "target storage IDs={}", getClass().getSimpleName(), sourceBlock, + sourceBlock.getNumBytes(), stage, clientname, targetBlock, Arrays.asList(targets), targetStorageTypes == null ? "[]" : Arrays.asList(targetStorageTypes), targetStorageIds == null ? "[]" : Arrays.asList(targetStorageIds)); this.targets = targets; this.targetStorageTypes = targetStorageTypes; this.targetStorageIds = targetStorageIds; - this.b = b; + this.sourceBlock = sourceBlock; + this.targetBlock = targetBlock; this.stage = stage; - BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId()); + BPOfferService bpos = blockPoolManager.get(sourceBlock.getBlockPoolId()); bpReg = bpos.bpRegistration; this.clientname = clientname; this.cachingStrategy = @@ -2937,7 +3026,7 @@ public void run() { // // Header info // - Token accessToken = getBlockAccessToken(b, + Token accessToken = getBlockAccessToken(targetBlock, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE), targetStorageTypes, targetStorageIds); @@ -2945,24 +3034,23 @@ public void run() { HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1); OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); InputStream unbufIn = NetUtils.getInputStream(sock); - DataEncryptionKeyFactory keyFactory = - getDataEncryptionKeyFactoryForBlock(b); + DataEncryptionKeyFactory keyFactory = getDataEncryptionKeyFactoryForBlock(targetBlock); IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, - unbufIn, keyFactory, accessToken, bpReg); + unbufIn, keyFactory, accessToken, targets[0]); unbufOut = saslStreams.out; unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, DFSUtilClient.getSmallBufferSize(getConf()))); in = new DataInputStream(unbufIn); - blockSender = new BlockSender(b, 0, b.getNumBytes(), + blockSender = new BlockSender(sourceBlock, 0, sourceBlock.getNumBytes(), false, false, true, DataNode.this, null, cachingStrategy); DatanodeInfo srcNode = new DatanodeInfoBuilder().setNodeID(bpReg) .build(); String storageId = targetStorageIds.length > 0 ? targetStorageIds[0] : null; - new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken, + new Sender(out).writeBlock(targetBlock, targetStorageTypes[0], accessToken, clientname, targets, targetStorageTypes, srcNode, stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy, false, false, null, storageId, @@ -2972,9 +3060,9 @@ public void run() { blockSender.sendBlock(out, unbufOut, throttler); // no response necessary - LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {}", + LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {} to {}", getClass().getSimpleName(), DataNode.this.getDisplayName(), - b, b.getNumBytes(), curTarget); + sourceBlock, sourceBlock.getNumBytes(), targetBlock, curTarget); // read ack if (isClient) { @@ -2995,11 +3083,11 @@ public void run() { metrics.incrBlocksReplicated(); } } catch (IOException ie) { - handleBadBlock(b, ie, false); - LOG.warn("{}:Failed to transfer {} to {} got", - bpReg, b, targets[0], ie); + handleBadBlock(sourceBlock, ie, false); + LOG.warn("{}: Failed to transfer {} to {} to {} got", + bpReg, sourceBlock, targetBlock, targets[0], ie); } catch (Throwable t) { - LOG.error("Failed to transfer block {}", b, t); + LOG.error("Failed to transfer block {} to {}", sourceBlock, targetBlock, t); } finally { decrementXmitsInProgress(); IOUtils.closeStream(blockSender); @@ -3011,7 +3099,7 @@ public void run() { @Override public String toString() { - return "DataTransfer " + b + " to " + Arrays.asList(targets); + return "DataTransfer " + sourceBlock + " to " + targetBlock + " to " + Arrays.asList(targets); } } @@ -3506,7 +3594,7 @@ void transferReplicaForPipelineRecovery(final ExtendedBlock b, } final DataTransfer dataTransferTask = new DataTransfer(targets, - targetStorageTypes, targetStorageIds, b, stage, client); + targetStorageTypes, targetStorageIds, b, b, stage, client); @SuppressWarnings("unchecked") Future f = (Future) this.xferService.submit(dataTransferTask); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index ab706fb173179..86f2f08d25594 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -87,6 +87,7 @@ import java.net.SocketTimeoutException; import java.nio.channels.ClosedChannelException; import java.util.Arrays; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION; @@ -1078,6 +1079,44 @@ public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo, datanode.metrics.addBlockChecksumOp(elapsed()); } + @Override + public void copyBlockCrossNamespace( + final ExtendedBlock srcBlock, final Token srcBlockToken, + final ExtendedBlock targetBlock, final Token targetBlockToken, + final DatanodeInfo targetDN) throws IOException { + updateCurrentThreadName("Copying block cross namespace " + srcBlock); + final DataOutputStream reply = getBufferedOutputStream(); + try { + // Check access + checkAccess(reply, true, srcBlock, srcBlockToken, + Op.COPY_BLOCK_CROSS_NAMESPACE, BlockTokenIdentifier.AccessMode.COPY); + checkAccess(reply, true, targetBlock, targetBlockToken, + Op.COPY_BLOCK_CROSS_NAMESPACE, BlockTokenIdentifier.AccessMode.WRITE); + + // Async copy the block + Future result = datanode.internalCopyBlockCrossNamespace(srcBlock, targetBlock, targetDN); + try { + result.get(dnConf.getCopyBlockCrossNamespaceSocketTimeout(), TimeUnit.MILLISECONDS); + } catch (Exception e) { + LOG.error("CopyBlockCrossNamespace from {} to {} to {} failed,", srcBlock, targetBlock, + targetDN, e); + throw new IOException(e); + } + + writeResponse(Status.SUCCESS, null, reply); + } catch (IOException ioe) { + LOG.warn("CopyBlockCrossNamespace from {} to {} to {} received exception,", + srcBlock, targetBlock, targetDN, ioe); + incrDatanodeNetworkErrors(); + throw ioe; + } finally { + IOUtils.closeStream(reply); + } + + //update metrics + datanode.metrics.addCopyBlockCrossNamespaceOp(elapsed()); + } + @Override public void copyBlock(final ExtendedBlock block, final Token blockToken) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 4cad7aa4d36bc..1924a92fe8178 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -673,6 +673,14 @@ ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block, */ DataNodeLockManager acquireDatasetLockManager(); + /** + * Copies over a block from a block file via hard link technology. + * @param srcBlock the block to copy. + * @param targetBlock the block to copy to. + * @throws IOException + */ + void hardLinkOneBlock(ExtendedBlock srcBlock, ExtendedBlock targetBlock) throws IOException; + /** * Deep copy the replica info belonging to given block pool. * @param bpid Specified block pool id. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index d81b5411c531b..875a80cdb63bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -67,6 +67,7 @@ import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.LocalReplica; +import org.apache.hadoop.hdfs.server.datanode.LocalReplicaInPipeline; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.protocol.Block; @@ -207,6 +208,46 @@ public Block getStoredBlock(String bpid, long blkid) } } + @Override + public void hardLinkOneBlock(ExtendedBlock srcBlock, ExtendedBlock targetBlock) + throws IOException { + ReplicaInfo srcReplicaInfo = getReplicaInfo(srcBlock); + FsVolumeImpl volume = (FsVolumeImpl) srcReplicaInfo.getVolume(); + try (FsVolumeReference ref = volume.obtainReference()) { + if (volume.getAvailable() < targetBlock.getNumBytes()) { + throw new DiskOutOfSpaceException("Insufficient space for hardlink block " + srcBlock); + } + + BlockPoolSlice targetBP = volume.getBlockPoolSlice(targetBlock.getBlockPoolId()); + targetBlock.setNumBytes(srcBlock.getNumBytes()); + + // Create one LocalReplicaInPipeline for target block. + LocalReplicaInPipeline replica = new ReplicaBuilder(ReplicaState.TEMPORARY) + .setBlockId(targetBlock.getBlockId()) + .setGenerationStamp(targetBlock.getGenerationStamp()) + .setDirectoryToUse(targetBP.getTmpDir()) + .setBytesToReserve(targetBlock.getLocalBlock().getNumBytes()) + .setFsVolume(volume) + .buildLocalReplicaInPipeline(); + + // hard link source block path to tmp dest block path. + // hard link source block meta path to tmp dest block meta path. + hardLinkBlockFiles(srcReplicaInfo, replica.getMetaFile(), replica.getBlockFile()); + + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, + targetBlock.getBlockPoolId(), replica.getStorageUuid())) { + // Finalize the target block. + File targetBlockFile = targetBP.addFinalizedBlock(targetBlock.getLocalBlock(), replica); + ReplicaInfo finalizedReplica = new FinalizedReplica(targetBlock.getLocalBlock(), + volume, targetBlockFile.getParentFile()); + volumeMap.add(targetBlock.getBlockPoolId(), finalizedReplica); + } + + datanode.notifyNamenodeReceivedBlock(targetBlock, null, + replica.getStorageUuid(), replica.isOnTransientStorage()); + } + } + @Override public Set deepCopyReplica(String bpid) throws IOException { @@ -1046,7 +1087,7 @@ static File[] hardLinkBlockFiles(ReplicaInfo srcReplica, File dstMeta, + srcReplica + " metadata to " + dstMeta, e); } - LOG.debug("Linked {} to {} . Dest meta file: {}", srcReplicaUri, dstFile, + LOG.debug("Linked {} to dest block file: {}, dest meta file: {}", srcReplicaUri, dstFile, dstMeta); return new File[]{dstMeta, dstFile}; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index c3aa3c3a4540b..3192888ae39dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -66,6 +66,7 @@ public class DataNodeMetrics { @Metric MutableCounterLong blocksWritten; @Metric MutableCounterLong blocksRead; @Metric MutableCounterLong blocksReplicated; + @Metric private MutableCounterLong blocksReplicatedViaHardlink; @Metric MutableCounterLong blocksRemoved; @Metric MutableCounterLong blocksVerified; @Metric MutableCounterLong blockVerificationFailures; @@ -121,6 +122,7 @@ public class DataNodeMetrics { @Metric MutableRate writeBlockOp; @Metric MutableRate blockChecksumOp; @Metric MutableRate copyBlockOp; + @Metric private MutableRate copyBlockCrossNamespaceOp; @Metric MutableRate replaceBlockOp; @Metric MutableRate heartbeats; @Metric MutableRate heartbeatsTotal; @@ -338,6 +340,14 @@ public void incrBlocksReplicated() { blocksReplicated.incr(); } + public void incrBlocksReplicatedViaHardlink() { + blocksReplicatedViaHardlink.incr(); + } + + public long getBlocksReplicatedViaHardlink() { + return blocksReplicatedViaHardlink.value(); + } + public void incrBlocksWritten() { blocksWritten.incr(); } @@ -383,6 +393,10 @@ public void addCopyBlockOp(long latency) { copyBlockOp.add(latency); } + public void addCopyBlockCrossNamespaceOp(long latency) { + copyBlockCrossNamespaceOp.add(latency); + } + public void addBlockChecksumOp(long latency) { blockChecksumOp.add(latency); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 8e6ef99040a6e..41c7c9829f318 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4326,6 +4326,14 @@ + + dfs.copy.block.cross.namespace.socket-timeout + 300000 + + Default timeout value in milliseconds for cross-namespace block copying. + + + dfs.client.block.write.locateFollowingBlock.retries 5 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 594b46f6cacb0..f991a0a4eeb69 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -70,6 +70,8 @@ import java.util.UUID; import java.util.concurrent.TimeoutException; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.thirdparty.com.google.common.base.Charsets; import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; import org.apache.hadoop.util.Preconditions; @@ -2258,6 +2260,37 @@ public static boolean replaceBlock(ExtendedBlock block, DatanodeInfo source, } } + /** + * Copy the source block of the sourceDN across the namespace to + * the target block of the target DN. + * @param sourceBlock the block to copy. + * @param sourceDN the datanode that source block belongs to. + * @param targetBlock the block to copy to. + * @param targetDN the datanode that target block belongs to. + * @throws IOException + */ + public static void copyBlockCrossNamespace( + ExtendedBlock sourceBlock, DatanodeInfo sourceDN, + ExtendedBlock targetBlock, DatanodeInfo targetDN) throws IOException { + try (Socket sock = new Socket()) { + sock.connect(NetUtils.createSocketAddr(sourceDN.getXferAddr()), + HdfsConstants.READ_TIMEOUT); + sock.setKeepAlive(true); + // sendRequest + DataOutputStream out = new DataOutputStream(sock.getOutputStream()); + new Sender(out).copyBlockCrossNamespace(sourceBlock, + BlockTokenSecretManager.DUMMY_TOKEN, + targetBlock, BlockTokenSecretManager.DUMMY_TOKEN, targetDN); + out.flush(); + // receiveResponse + DataInputStream reply = new DataInputStream(sock.getInputStream()); + BlockOpResponseProto proto = BlockOpResponseProto.parseFrom( + PBHelperClient.vintPrefixed(reply)); + DataTransferProtoUtil.checkBlockOpStatus(proto, "copyBlockCrossNamespace " + + sourceBlock + " to " + targetBlock + " from " + sourceDN + " to " + targetDN); + } + } + /** * Because currently DFSStripedOutputStream does not support hflush/hsync, * tests can use this method to flush all the buffered data to DataNodes. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java index 98556c4fd15ff..e64cc823c7d01 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -31,11 +33,23 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; import java.util.Random; - +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.crypto.CryptoProtocolVersion; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; +import org.apache.hadoop.io.EnumSetWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -650,4 +664,87 @@ public void testReleaseVolumeRefIfExceptionThrown() cluster.shutdown(); } } + + @Test(timeout = 30000) + public void testCopyBlockCrossNamespace() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024 * 1024); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) + .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2)) + .build(); + try { + cluster.waitActive(); + + // Create one file with one block with one replica in Namespace0. + Path ns0Path = new Path("/testCopyBlockCrossNamespace_0.txt"); + DistributedFileSystem ns0FS = cluster.getFileSystem(0); + DFSTestUtil.createFile(ns0FS, ns0Path, 1024, (short) 1, 0); + DFSTestUtil.waitReplication(ns0FS, ns0Path, (short) 1); + + LocatedBlocks locatedBlocks = ns0FS.getClient() + .getLocatedBlocks(ns0Path.toUri().getPath(), 0, Long.MAX_VALUE); + assertEquals(1, locatedBlocks.getLocatedBlocks().size()); + assertTrue(locatedBlocks.isLastBlockComplete()); + LocatedBlock locatedBlockNS0 = locatedBlocks.get(0); + + // Create one similar file with two replicas in Namespace1. + Path ns1Path = new Path("/testCopyBlockCrossNamespace_1.txt"); + DistributedFileSystem ns1FS = cluster.getFileSystem(1); + HdfsFileStatus ns1FileStatus = ns1FS.getClient().getNamenode().create( + ns1Path.toUri().getPath(), FsPermission.getCachePoolDefault(), + ns1FS.getClient().getClientName(), new EnumSetWritable<>(EnumSet.of(CreateFlag.CREATE)), + true, (short) 2, locatedBlocks.get(0).getBlockSize(), + CryptoProtocolVersion.supported(), null, null); + + // Add one new block with the favored nodes for the file in Namespace1. + List favoredNodes = new ArrayList<>(); + for (DatanodeInfo dn : locatedBlockNS0.getLocations()) { + favoredNodes.add(dn.getXferAddr()); + } + + LocatedBlock locatedBlockNS1 = ns1FS.getClient().getNamenode().addBlock( + ns1Path.toUri().getPath(), ns1FS.getClient().getClientName(), null, null, + ns1FileStatus.getFileId(), favoredNodes.toArray(new String[0]), + EnumSet.noneOf(AddBlockFlag.class)); + assertEquals(2, locatedBlockNS1.getLocations().length); + + // Align the datanode. + DatanodeInfoWithStorage firstReplica = locatedBlockNS1.getLocations()[0]; + DatanodeInfoWithStorage secondReplica = locatedBlockNS1.getLocations()[1]; + assertTrue(favoredNodes.get(0).equals(firstReplica.getXferAddr()) + || favoredNodes.get(0).equals(secondReplica.getXferAddr())); + + DatanodeInfoWithStorage localReplica = favoredNodes.get(0).equals(firstReplica.getInfoAddr()) + ? firstReplica : secondReplica; + DatanodeInfoWithStorage remoteReplica = favoredNodes.get(0).equals(firstReplica.getInfoAddr()) + ? secondReplica : firstReplica; + + // Copy local replication via FastCopy. + DFSTestUtil.copyBlockCrossNamespace(locatedBlockNS0.getBlock(), + locatedBlockNS0.getLocations()[0], locatedBlockNS1.getBlock(), localReplica); + + // Copy remote replication via DataTransfer. + DFSTestUtil.copyBlockCrossNamespace(locatedBlockNS0.getBlock(), + locatedBlockNS0.getLocations()[0], locatedBlockNS1.getBlock(), remoteReplica); + + // Wait two heartbeat interval. + long heartbeatInterval = conf.getTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY, + DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS, TimeUnit.MILLISECONDS); + Thread.sleep(heartbeatInterval * 2); + + // Complete the file of namespace1. + assertTrue(ns1FS.getClient().getNamenode().complete(ns1Path.toUri().getPath(), + ns1FS.getClient().getClientName(), locatedBlockNS1.getBlock(), + ns1FileStatus.getFileId())); + + // Do verification that the file in namespace1 should contain one block with two replicas. + LocatedBlocks locatedBlocksNS1 = ns1FS.getClient().getNamenode() + .getBlockLocations(ns1Path.toUri().getPath(), 0, Long.MAX_VALUE); + assertEquals(1, locatedBlocksNS1.getLocatedBlocks().size()); + assertEquals(2, locatedBlocksNS1.getLocatedBlocks().get(0).getLocations().length); + } finally { + cluster.shutdown(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 0bb4c2930a493..191f03b2921db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -1601,6 +1601,12 @@ public DataNodeLockManager acquireDatasetLockManager() { return null; } + @Override + public void hardLinkOneBlock(ExtendedBlock srcBlock, ExtendedBlock targetBlock) + throws IOException { + throw new UnsupportedOperationException(); + } + @Override public Set deepCopyReplica(String bpid) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 413a2e6b594f2..8ad6efc7be457 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -462,6 +462,12 @@ public DataNodeLockManager acquireDatasetLockManager() { return null; } + @Override + public void hardLinkOneBlock(ExtendedBlock srcBlock, ExtendedBlock targetBlock) + throws IOException { + throw new UnsupportedOperationException(); + } + @Override public Set deepCopyReplica(String bpid) throws IOException {