Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ public interface HdfsClientConfigKeys {
String DFS_CHECKSUM_COMBINE_MODE_DEFAULT = "MD5MD5CRC";
String DFS_CHECKSUM_EC_SOCKET_TIMEOUT_KEY = "dfs.checksum.ec.socket-timeout";
int DFS_CHECKSUM_EC_SOCKET_TIMEOUT_DEFAULT = 3000;
String DFS_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_KEY =
"dfs.copy.block.cross.namespace.socket-timeout";
int DFS_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_DEFAULT = 5 * 60 * 1000;
String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY =
"dfs.datanode.socket.write.timeout";
String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,4 +238,17 @@ void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
Token<BlockTokenIdentifier> blockToken,
long requestedNumBytes,
BlockChecksumOptions blockChecksumOptions) throws IOException;

/**
* Copy a block cross namespace. It is used for fast copy purpose.
*
* @param srcBlk the block to copy.
* @param srcBlockToken security token for accessing the source block.
* @param targetBlk the block to copy to.
* @param targetBlockToken security token for accessing the target block.
* @param targetDN the target block belongs to.
*/
void copyBlockCrossNamespace(ExtendedBlock srcBlk, Token<BlockTokenIdentifier> srcBlockToken,
ExtendedBlock targetBlk, Token<BlockTokenIdentifier> targetBlockToken, DatanodeInfo targetDN)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public enum Op {
RELEASE_SHORT_CIRCUIT_FDS((byte)88),
REQUEST_SHORT_CIRCUIT_SHM((byte)89),
BLOCK_GROUP_CHECKSUM((byte)90),
COPY_BLOCK_CROSS_NAMESPACE((byte)91),
CUSTOM((byte)127);

/** The code for this operation. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockCrossNamespaceProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
Expand Down Expand Up @@ -303,4 +304,20 @@ public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,

send(out, Op.BLOCK_GROUP_CHECKSUM, proto);
}

@Override
public void copyBlockCrossNamespace(
final ExtendedBlock srcBlk,
final Token<BlockTokenIdentifier> srcBlockToken,
final ExtendedBlock targetBlk,
final Token<BlockTokenIdentifier> targetBlockToken,
final DatanodeInfo targetDN) throws IOException {
OpCopyBlockCrossNamespaceProto proto = OpCopyBlockCrossNamespaceProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(srcBlk, srcBlockToken))
.setTargetBlock(PBHelperClient.convert(targetBlk))
.setTargetBlockToken(PBHelperClient.convert(targetBlockToken))
.setTargetDatanode(PBHelperClient.convert(targetDN))
.build();
send(out, Op.COPY_BLOCK_CROSS_NAMESPACE, proto);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,10 @@ message OpBlockChecksumResponseProto {
message OpCustomProto {
required string customId = 1;
}

message OpCopyBlockCrossNamespaceProto {
required BaseHeaderProto header = 1;
required ExtendedBlockProto targetBlock = 2;
optional hadoop.common.TokenProto targetBlockToken = 3;
required DatanodeInfoProto targetDatanode = 4;
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockCrossNamespaceProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
Expand Down Expand Up @@ -133,6 +134,9 @@ protected final void processOp(Op op) throws IOException {
case REQUEST_SHORT_CIRCUIT_SHM:
opRequestShortCircuitShm(in);
break;
case COPY_BLOCK_CROSS_NAMESPACE:
opCopyBlockCrossNamespace(in);
break;
default:
throw new IOException("Unknown op " + op + " in data stream");
}
Expand Down Expand Up @@ -339,4 +343,19 @@ private void opStripedBlockChecksum(DataInputStream dis) throws IOException {
}
}
}

/** Receive {@link Op#COPY_BLOCK_CROSS_NAMESPACE}. */
private void opCopyBlockCrossNamespace(DataInputStream inputStream) throws IOException {
OpCopyBlockCrossNamespaceProto proto =
OpCopyBlockCrossNamespaceProto.parseFrom(vintPrefixed(inputStream));
try (TraceScope ignored = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName())) {
copyBlockCrossNamespace(
PBHelperClient.convert(proto.getHeader().getBlock()),
PBHelperClient.convert(proto.getHeader().getToken()),
PBHelperClient.convert(proto.getTargetBlock()),
PBHelperClient.convert(proto.getTargetBlockToken()),
PBHelperClient.convert(proto.getTargetDatanode()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_EC_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_EC_SOCKET_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_DEFAULT;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -88,6 +90,7 @@ public class DNConf {
final int socketWriteTimeout;
final int socketKeepaliveTimeout;
final int ecChecksumSocketTimeout;
private final int copyBlockCrossNamespaceSocketTimeout;
private final int transferSocketSendBufferSize;
private final int transferSocketRecvBufferSize;
private final boolean tcpNoDelay;
Expand Down Expand Up @@ -152,6 +155,9 @@ public DNConf(final Configurable dn) {
ecChecksumSocketTimeout = getConf().getInt(
DFS_CHECKSUM_EC_SOCKET_TIMEOUT_KEY,
DFS_CHECKSUM_EC_SOCKET_TIMEOUT_DEFAULT);
this.copyBlockCrossNamespaceSocketTimeout = getConf().getInt(
DFS_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_KEY,
DFS_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_DEFAULT);
this.transferSocketSendBufferSize = getConf().getInt(
DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_DEFAULT);
Expand Down Expand Up @@ -389,6 +395,15 @@ public int getEcChecksumSocketTimeout() {
return ecChecksumSocketTimeout;
}

/**
* Returns socket timeout for cross-namespace block copying.
*
* @return int socket timeout
*/
public int getCopyBlockCrossNamespaceSocketTimeout() {
return copyBlockCrossNamespaceSocketTimeout;
}

/**
* Returns the SaslPropertiesResolver configured for use with
* DataTransferProtocol, or null if not configured.
Expand Down
Loading