Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hdfs.DistributedFileSystem;
Expand All @@ -47,11 +48,11 @@ private AsyncFSOutputHelper() {
*/
public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite,
boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass)
Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
throws IOException, CommonFSUtils.StreamLacksCapabilityException {
if (fs instanceof DistributedFileSystem) {
return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass);
overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor);
}
final FSDataOutputStream out;
int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -44,7 +44,9 @@
import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
Expand All @@ -67,6 +69,7 @@
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelId;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundInvoker;
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
Expand Down Expand Up @@ -120,7 +123,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {

private final Encryptor encryptor;

private final List<Channel> datanodeList;
private final Map<Channel, DatanodeInfo> datanodeInfoMap;

private final DataChecksum summer;

Expand All @@ -136,17 +139,22 @@ private static final class Callback {

// should be backed by a thread safe collection
private final Set<ChannelId> unfinishedReplicas;
private final long packetDataLen;
private final long flushTimestamp;
private long lastAckTimestamp = -1;

public Callback(CompletableFuture<Long> future, long ackedLength,
Collection<Channel> replicas) {
final Collection<Channel> replicas, long packetDataLen) {
this.future = future;
this.ackedLength = ackedLength;
this.packetDataLen = packetDataLen;
this.flushTimestamp = EnvironmentEdgeManager.currentTime();
if (replicas.isEmpty()) {
this.unfinishedReplicas = Collections.emptySet();
} else {
this.unfinishedReplicas =
Collections.newSetFromMap(new ConcurrentHashMap<ChannelId, Boolean>(replicas.size()));
replicas.stream().map(c -> c.id()).forEachOrdered(unfinishedReplicas::add);
replicas.stream().map(Channel::id).forEachOrdered(unfinishedReplicas::add);
}
}
}
Expand Down Expand Up @@ -176,13 +184,19 @@ private enum State {

private volatile State state;

private final StreamSlowMonitor streamSlowMonitor;

// all lock-free to make it run faster
private void completed(Channel channel) {
for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
Callback c = iter.next();
// if the current unfinished replicas does not contain us then it means that we have already
// acked this one, let's iterate to find the one we have not acked yet.
if (c.unfinishedReplicas.remove(channel.id())) {
long current = EnvironmentEdgeManager.currentTime();
streamSlowMonitor.checkProcessTimeAndSpeed(datanodeInfoMap.get(channel), c.packetDataLen,
current - c.flushTimestamp, c.lastAckTimestamp, c.unfinishedReplicas.size());
c.lastAckTimestamp = current;
if (c.unfinishedReplicas.isEmpty()) {
// we need to remove first before complete the future. It is possible that after we
// complete the future the upper layer will call close immediately before we remove the
Expand Down Expand Up @@ -245,7 +259,7 @@ private synchronized void failed(Channel channel, Supplier<Throwable> errorSuppl
}
break;
}
datanodeList.forEach(ch -> ch.close());
datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
}

@Sharable
Expand Down Expand Up @@ -313,7 +327,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc

private void setupReceiver(int timeoutMs) {
AckHandler ackHandler = new AckHandler(timeoutMs);
for (Channel ch : datanodeList) {
for (Channel ch : datanodeInfoMap.keySet()) {
ch.pipeline().addLast(
new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS),
new ProtobufVarint32FrameDecoder(),
Expand All @@ -324,8 +338,8 @@ private void setupReceiver(int timeoutMs) {

FanOutOneBlockAsyncDFSOutput(Configuration conf,DistributedFileSystem dfs,
DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId,
LocatedBlock locatedBlock, Encryptor encryptor, List<Channel> datanodeList,
DataChecksum summer, ByteBufAllocator alloc) {
LocatedBlock locatedBlock, Encryptor encryptor, Map<Channel, DatanodeInfo> datanodeInfoMap,
DataChecksum summer, ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) {
this.conf = conf;
this.dfs = dfs;
this.client = client;
Expand All @@ -336,13 +350,14 @@ private void setupReceiver(int timeoutMs) {
this.block = locatedBlock.getBlock();
this.locations = locatedBlock.getLocations();
this.encryptor = encryptor;
this.datanodeList = datanodeList;
this.datanodeInfoMap = datanodeInfoMap;
this.summer = summer;
this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum());
this.alloc = alloc;
this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize());
this.state = State.STREAMING;
setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
this.streamSlowMonitor = streamSlowMonitor;
}

@Override
Expand Down Expand Up @@ -394,7 +409,8 @@ private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,
ByteBuf headerBuf = alloc.buffer(headerLen);
header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
headerBuf.writerIndex(headerLen);
Callback c = new Callback(future, nextPacketOffsetInBlock + dataLen, datanodeList);
Callback c = new Callback(future, nextPacketOffsetInBlock + dataLen,
datanodeInfoMap.keySet(), dataLen);
waitingAckQueue.addLast(c);
// recheck again after we pushed the callback to queue
if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) {
Expand All @@ -405,7 +421,7 @@ private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,
}
// TODO: we should perhaps measure time taken per DN here;
// we could collect statistics per DN, and/or exclude bad nodes in createOutput.
datanodeList.forEach(ch -> {
datanodeInfoMap.keySet().forEach(ch -> {
ch.write(headerBuf.retainedDuplicate());
ch.write(checksumBuf.retainedDuplicate());
ch.writeAndFlush(dataBuf.retainedDuplicate());
Expand All @@ -427,7 +443,7 @@ private void flush0(CompletableFuture<Long> future, boolean syncBlock) {
long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
Callback lastFlush = waitingAckQueue.peekLast();
if (lastFlush != null) {
Callback c = new Callback(future, lengthAfterFlush, Collections.emptyList());
Callback c = new Callback(future, lengthAfterFlush, Collections.emptySet(), dataLen);
waitingAckQueue.addLast(c);
// recheck here if we have already removed the previous callback from the queue
if (waitingAckQueue.peekFirst() == c) {
Expand Down Expand Up @@ -527,8 +543,8 @@ private void endBlock() throws IOException {
header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
headerBuf.writerIndex(headerLen);
CompletableFuture<Long> future = new CompletableFuture<>();
waitingAckQueue.add(new Callback(future, finalizedLength, datanodeList));
datanodeList.forEach(ch -> ch.writeAndFlush(headerBuf.retainedDuplicate()));
waitingAckQueue.add(new Callback(future, finalizedLength, datanodeInfoMap.keySet(), 0));
datanodeInfoMap.keySet().forEach(ch -> ch.writeAndFlush(headerBuf.retainedDuplicate()));
headerBuf.release();
try {
future.get();
Expand All @@ -545,13 +561,14 @@ private void endBlock() throws IOException {
* The close method when error occurred. Now we just call recoverFileLease.
*/
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void recoverAndClose(CancelableProgressable reporter) throws IOException {
if (buf != null) {
buf.release();
buf = null;
}
datanodeList.forEach(ch -> ch.close());
datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
datanodeInfoMap.keySet().forEach(ch -> ch.closeFuture().awaitUninterruptibly());
endFileLease(client, fileId);
RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
reporter == null ? new CancelOnClose(client) : reporter);
Expand All @@ -562,11 +579,12 @@ public void recoverAndClose(CancelableProgressable reporter) throws IOException
* {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception.
*/
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void close() throws IOException {
endBlock();
state = State.CLOSED;
datanodeList.forEach(ch -> ch.close());
datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
datanodeInfoMap.keySet().forEach(ch -> ch.closeFuture().awaitUninterruptibly());
block.setNumBytes(ackedBlockLength);
completeFile(client, namenode, src, clientName, block, fileId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.crypto.Encryptor;
Expand All @@ -47,6 +50,8 @@
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSOutputStream;
Expand Down Expand Up @@ -128,8 +133,6 @@ private FanOutOneBlockAsyncDFSOutputHelper() {
// Timeouts for communicating with DataNode for streaming writes/reads
public static final int READ_TIMEOUT = 60 * 1000;

private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0];

private interface LeaseManager {

void begin(DFSClient client, long inodeId);
Expand Down Expand Up @@ -451,15 +454,20 @@ private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite) {

private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass,
StreamSlowMonitor monitor) throws IOException {
Configuration conf = dfs.getConf();
DFSClient client = dfs.getClient();
String clientName = client.getClientName();
ClientProtocol namenode = client.getNamenode();
int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES,
DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES);
DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY;
ExcludeDatanodeManager excludeDatanodeManager = monitor.getExcludeDatanodeManager();
Set<DatanodeInfo> toExcludeNodes =
new HashSet<>(excludeDatanodeManager.getExcludeDNs().keySet());
for (int retry = 0;; retry++) {
LOG.debug("When create output stream for {}, exclude list is {}, retry={}", src,
toExcludeNodes, retry);
HdfsFileStatus stat;
try {
stat = FILE_CREATOR.create(namenode, src,
Expand All @@ -479,24 +487,26 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
List<Future<Channel>> futureList = null;
try {
DataChecksum summer = createChecksum(client);
locatedBlock = namenode.addBlock(src, client.getClientName(), null, excludesNodes,
stat.getFileId(), null, null);
List<Channel> datanodeList = new ArrayList<>();
locatedBlock = namenode.addBlock(src, client.getClientName(), null,
toExcludeNodes.toArray(new DatanodeInfo[0]), stat.getFileId(), null, null);
Map<Channel, DatanodeInfo> datanodes = new IdentityHashMap<>();
futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
for (int i = 0, n = futureList.size(); i < n; i++) {
DatanodeInfo datanodeInfo = locatedBlock.getLocations()[i];
try {
datanodeList.add(futureList.get(i).syncUninterruptibly().getNow());
datanodes.put(futureList.get(i).syncUninterruptibly().getNow(), datanodeInfo);
} catch (Exception e) {
// exclude the broken DN next time
excludesNodes = ArrayUtils.add(excludesNodes, locatedBlock.getLocations()[i]);
toExcludeNodes.add(datanodeInfo);
excludeDatanodeManager.tryAddExcludeDN(datanodeInfo, "connect error");
throw e;
}
}
Encryptor encryptor = createEncryptor(conf, stat, client);
FanOutOneBlockAsyncDFSOutput output =
new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src,
stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC);
stat.getFileId(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
succ = true;
return output;
} catch (RemoteException e) {
Expand Down Expand Up @@ -547,14 +557,15 @@ public void operationComplete(Future<Channel> future) throws Exception {
*/
public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass,
final StreamSlowMonitor monitor) throws IOException {
return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {

@Override
public FanOutOneBlockAsyncDFSOutput doCall(Path p)
throws IOException, UnresolvedLinkException {
return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
blockSize, eventLoopGroup, channelClass);
blockSize, eventLoopGroup, channelClass, monitor);
}

@Override
Expand Down
Loading