Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hdfs.DistributedFileSystem;
Expand All @@ -47,11 +48,11 @@ private AsyncFSOutputHelper() {
*/
public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite,
boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass)
Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
throws IOException, CommonFSUtils.StreamLacksCapabilityException {
if (fs instanceof DistributedFileSystem) {
return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass);
overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor);
}
final FSDataOutputStream out;
int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -45,7 +45,9 @@
import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
Expand All @@ -68,6 +70,7 @@
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelId;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundInvoker;
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
Expand Down Expand Up @@ -121,7 +124,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {

private final Encryptor encryptor;

private final List<Channel> datanodeList;
private final Map<Channel, DatanodeInfo> datanodeInfoMap;

private final DataChecksum summer;

Expand All @@ -137,17 +140,22 @@ private static final class Callback {

// should be backed by a thread safe collection
private final Set<ChannelId> unfinishedReplicas;
private final long packetDataLen;
private final long flushTimestamp;
private long lastAckTimestamp = -1;

public Callback(CompletableFuture<Long> future, long ackedLength,
Collection<Channel> replicas) {
final Collection<Channel> replicas, long packetDataLen) {
this.future = future;
this.ackedLength = ackedLength;
this.packetDataLen = packetDataLen;
this.flushTimestamp = EnvironmentEdgeManager.currentTime();
if (replicas.isEmpty()) {
this.unfinishedReplicas = Collections.emptySet();
} else {
this.unfinishedReplicas =
Collections.newSetFromMap(new ConcurrentHashMap<ChannelId, Boolean>(replicas.size()));
replicas.stream().map(c -> c.id()).forEachOrdered(unfinishedReplicas::add);
replicas.stream().map(Channel::id).forEachOrdered(unfinishedReplicas::add);
}
}
}
Expand Down Expand Up @@ -177,13 +185,19 @@ private enum State {

private volatile State state;

private final StreamSlowMonitor streamSlowMonitor;

// all lock-free to make it run faster
private void completed(Channel channel) {
for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
Callback c = iter.next();
// if the current unfinished replicas does not contain us then it means that we have already
// acked this one, let's iterate to find the one we have not acked yet.
if (c.unfinishedReplicas.remove(channel.id())) {
long current = EnvironmentEdgeManager.currentTime();
streamSlowMonitor.checkProcessTimeAndSpeed(datanodeInfoMap.get(channel), c.packetDataLen,
current - c.flushTimestamp, c.lastAckTimestamp, c.unfinishedReplicas.size());
c.lastAckTimestamp = current;
if (c.unfinishedReplicas.isEmpty()) {
// we need to remove first before complete the future. It is possible that after we
// complete the future the upper layer will call close immediately before we remove the
Expand Down Expand Up @@ -246,7 +260,7 @@ private synchronized void failed(Channel channel, Supplier<Throwable> errorSuppl
}
break;
}
datanodeList.forEach(ch -> ch.close());
datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
}

@Sharable
Expand Down Expand Up @@ -314,7 +328,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc

private void setupReceiver(int timeoutMs) {
AckHandler ackHandler = new AckHandler(timeoutMs);
for (Channel ch : datanodeList) {
for (Channel ch : datanodeInfoMap.keySet()) {
ch.pipeline().addLast(
new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS),
new ProtobufVarint32FrameDecoder(),
Expand All @@ -325,8 +339,8 @@ private void setupReceiver(int timeoutMs) {

FanOutOneBlockAsyncDFSOutput(Configuration conf,DistributedFileSystem dfs,
DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId,
LocatedBlock locatedBlock, Encryptor encryptor, List<Channel> datanodeList,
DataChecksum summer, ByteBufAllocator alloc) {
LocatedBlock locatedBlock, Encryptor encryptor, Map<Channel, DatanodeInfo> datanodeInfoMap,
DataChecksum summer, ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) {
this.conf = conf;
this.dfs = dfs;
this.client = client;
Expand All @@ -337,13 +351,14 @@ private void setupReceiver(int timeoutMs) {
this.block = locatedBlock.getBlock();
this.locations = locatedBlock.getLocations();
this.encryptor = encryptor;
this.datanodeList = datanodeList;
this.datanodeInfoMap = datanodeInfoMap;
this.summer = summer;
this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum());
this.alloc = alloc;
this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize());
this.state = State.STREAMING;
setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
this.streamSlowMonitor = streamSlowMonitor;
}

@Override
Expand Down Expand Up @@ -395,7 +410,8 @@ private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,
ByteBuf headerBuf = alloc.buffer(headerLen);
header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
headerBuf.writerIndex(headerLen);
Callback c = new Callback(future, nextPacketOffsetInBlock + dataLen, datanodeList);
Callback c = new Callback(future, nextPacketOffsetInBlock + dataLen,
datanodeInfoMap.keySet(), dataLen);
waitingAckQueue.addLast(c);
// recheck again after we pushed the callback to queue
if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) {
Expand All @@ -404,7 +420,9 @@ private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,
waitingAckQueue.removeFirst();
return;
}
datanodeList.forEach(ch -> {
// TODO: we should perhaps measure time taken per DN here;
// we could collect statistics per DN, and/or exclude bad nodes in createOutput.
datanodeInfoMap.keySet().forEach(ch -> {
ch.write(headerBuf.retainedDuplicate());
ch.write(checksumBuf.retainedDuplicate());
ch.writeAndFlush(dataBuf.retainedDuplicate());
Expand All @@ -426,7 +444,7 @@ private void flush0(CompletableFuture<Long> future, boolean syncBlock) {
long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
Callback lastFlush = waitingAckQueue.peekLast();
if (lastFlush != null) {
Callback c = new Callback(future, lengthAfterFlush, Collections.emptyList());
Callback c = new Callback(future, lengthAfterFlush, Collections.emptySet(), dataLen);
waitingAckQueue.addLast(c);
// recheck here if we have already removed the previous callback from the queue
if (waitingAckQueue.peekFirst() == c) {
Expand Down Expand Up @@ -526,8 +544,8 @@ private void endBlock() throws IOException {
header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
headerBuf.writerIndex(headerLen);
CompletableFuture<Long> future = new CompletableFuture<>();
waitingAckQueue.add(new Callback(future, finalizedLength, datanodeList));
datanodeList.forEach(ch -> ch.writeAndFlush(headerBuf.retainedDuplicate()));
waitingAckQueue.add(new Callback(future, finalizedLength, datanodeInfoMap.keySet(), 0));
datanodeInfoMap.keySet().forEach(ch -> ch.writeAndFlush(headerBuf.retainedDuplicate()));
headerBuf.release();
try {
future.get();
Expand All @@ -544,13 +562,14 @@ private void endBlock() throws IOException {
* The close method when error occurred. Now we just call recoverFileLease.
*/
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void recoverAndClose(CancelableProgressable reporter) throws IOException {
if (buf != null) {
buf.release();
buf = null;
}
datanodeList.forEach(ch -> ch.close());
datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
datanodeInfoMap.keySet().forEach(ch -> ch.closeFuture().awaitUninterruptibly());
endFileLease(client, fileId);
RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
reporter == null ? new CancelOnClose(client) : reporter);
Expand All @@ -561,11 +580,12 @@ public void recoverAndClose(CancelableProgressable reporter) throws IOException
* {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception.
*/
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void close() throws IOException {
endBlock();
state = State.CLOSED;
datanodeList.forEach(ch -> ch.close());
datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
datanodeInfoMap.keySet().forEach(ch -> ch.closeFuture().awaitUninterruptibly());
block.setNumBytes(ackedBlockLength);
completeFile(client, namenode, src, clientName, block, fileId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.crypto.Encryptor;
Expand All @@ -47,6 +50,8 @@
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSOutputStream;
Expand Down Expand Up @@ -128,8 +133,6 @@ private FanOutOneBlockAsyncDFSOutputHelper() {
// Timeouts for communicating with DataNode for streaming writes/reads
public static final int READ_TIMEOUT = 60 * 1000;

private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0];

private interface LeaseManager {

void begin(DFSClient client, long inodeId);
Expand Down Expand Up @@ -511,15 +514,20 @@ private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite) {

private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass,
StreamSlowMonitor monitor) throws IOException {
Configuration conf = dfs.getConf();
DFSClient client = dfs.getClient();
String clientName = client.getClientName();
ClientProtocol namenode = client.getNamenode();
int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES,
DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES);
DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY;
ExcludeDatanodeManager excludeDatanodeManager = monitor.getExcludeDatanodeManager();
Set<DatanodeInfo> toExcludeNodes =
new HashSet<>(excludeDatanodeManager.getExcludeDNs().keySet());
for (int retry = 0;; retry++) {
LOG.debug("When create output stream for {}, exclude list is {}, retry={}", src,
toExcludeNodes, retry);
HdfsFileStatus stat;
try {
stat = FILE_CREATOR.create(namenode, src,
Expand All @@ -539,24 +547,26 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
List<Future<Channel>> futureList = null;
try {
DataChecksum summer = createChecksum(client);
locatedBlock = namenode.addBlock(src, client.getClientName(), null, excludesNodes,
stat.getFileId(), null, null);
List<Channel> datanodeList = new ArrayList<>();
locatedBlock = namenode.addBlock(src, client.getClientName(), null,
toExcludeNodes.toArray(new DatanodeInfo[0]), stat.getFileId(), null, null);
Map<Channel, DatanodeInfo> datanodes = new IdentityHashMap<>();
futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
for (int i = 0, n = futureList.size(); i < n; i++) {
DatanodeInfo datanodeInfo = locatedBlock.getLocations()[i];
try {
datanodeList.add(futureList.get(i).syncUninterruptibly().getNow());
datanodes.put(futureList.get(i).syncUninterruptibly().getNow(), datanodeInfo);
} catch (Exception e) {
// exclude the broken DN next time
excludesNodes = ArrayUtils.add(excludesNodes, locatedBlock.getLocations()[i]);
toExcludeNodes.add(datanodeInfo);
excludeDatanodeManager.tryAddExcludeDN(datanodeInfo, "connect error");
throw e;
}
}
Encryptor encryptor = createEncryptor(conf, stat, client);
FanOutOneBlockAsyncDFSOutput output =
new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src,
stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC);
stat.getFileId(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
succ = true;
return output;
} catch (RemoteException e) {
Expand Down Expand Up @@ -607,14 +617,15 @@ public void operationComplete(Future<Channel> future) throws Exception {
*/
public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass,
final StreamSlowMonitor monitor) throws IOException {
return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {

@Override
public FanOutOneBlockAsyncDFSOutput doCall(Path p)
throws IOException, UnresolvedLinkException {
return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
blockSize, eventLoopGroup, channelClass);
blockSize, eventLoopGroup, channelClass, monitor);
}

@Override
Expand Down
Loading