Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,26 @@
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.consume;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE;

import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -48,6 +51,8 @@
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.NettyFutureUtils;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
Expand All @@ -63,14 +68,13 @@
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelId;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundInvoker;
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
Expand Down Expand Up @@ -252,7 +256,7 @@ private synchronized void failed(Channel channel, Supplier<Throwable> errorSuppl
// disable further write, and fail all pending ack.
state = State.BROKEN;
failWaitingAckQueue(channel, errorSupplier);
datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
datanodeInfoMap.keySet().forEach(NettyFutureUtils::safeClose);
Copy link
Contributor

@apurtell apurtell Jul 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the approach here where new NettyFutureUtils encapsulates the await/cleanup details, (edit: and suppresses warnings in one place, mostly.)

}

private void failWaitingAckQueue(Channel channel, Supplier<Throwable> errorSupplier) {
Expand Down Expand Up @@ -329,7 +333,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
ByteBuf buf = alloc.buffer(len);
heartbeat.putInBuffer(buf.nioBuffer(0, len));
buf.writerIndex(len);
ctx.channel().writeAndFlush(buf);
safeWriteAndFlush(ctx.channel(), buf);
}
return;
}
Expand Down Expand Up @@ -440,9 +444,9 @@ private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,
// TODO: we should perhaps measure time taken per DN here;
// we could collect statistics per DN, and/or exclude bad nodes in createOutput.
datanodeInfoMap.keySet().forEach(ch -> {
ch.write(headerBuf.retainedDuplicate());
ch.write(checksumBuf.retainedDuplicate());
ch.writeAndFlush(dataBuf.retainedDuplicate());
safeWrite(ch, headerBuf.retainedDuplicate());
safeWrite(ch, checksumBuf.retainedDuplicate());
safeWriteAndFlush(ch, dataBuf.retainedDuplicate());
});
checksumBuf.release();
headerBuf.release();
Expand Down Expand Up @@ -562,31 +566,31 @@ private void endBlock() throws IOException {
headerBuf.writerIndex(headerLen);
CompletableFuture<Long> future = new CompletableFuture<>();
waitingAckQueue.add(new Callback(future, finalizedLength, datanodeInfoMap.keySet(), 0));
datanodeInfoMap.keySet().forEach(ch -> ch.writeAndFlush(headerBuf.retainedDuplicate()));
datanodeInfoMap.keySet().forEach(ch -> safeWriteAndFlush(ch, headerBuf.retainedDuplicate()));
headerBuf.release();
try {
future.get();
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException().initCause(e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
Throwables.propagateIfPossible(cause, IOException.class);
throw new IOException(cause);
FutureUtils.get(future);
}

private void closeDataNodeChannelsAndAwait() {
List<ChannelFuture> futures = new ArrayList<>();
for (Channel ch : datanodeInfoMap.keySet()) {
futures.add(ch.close());
}
for (ChannelFuture future : futures) {
consume(future.awaitUninterruptibly());
}
}

/**
* The close method when error occurred. Now we just call recoverFileLease.
*/
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void recoverAndClose(CancelableProgressable reporter) throws IOException {
if (buf != null) {
buf.release();
buf = null;
}
datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
datanodeInfoMap.keySet().forEach(ch -> ch.closeFuture().awaitUninterruptibly());
closeDataNodeChannelsAndAwait();
endFileLease(client, fileId);
RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
reporter == null ? new CancelOnClose(client) : reporter);
Expand All @@ -597,12 +601,10 @@ public void recoverAndClose(CancelableProgressable reporter) throws IOException
* {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception.
*/
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void close() throws IOException {
endBlock();
state = State.CLOSED;
datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
datanodeInfoMap.keySet().forEach(ch -> ch.closeFuture().awaitUninterruptibly());
closeDataNodeChannelsAndAwait();
block.setNumBytes(ackedBlockLength);
completeFile(client, namenode, src, clientName, block, fileId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.addListener;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeClose;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
Expand Down Expand Up @@ -351,7 +354,7 @@ private static void requestWriteBlock(Channel channel, StorageType storageType,
buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
buffer.writeByte(Op.WRITE_BLOCK.code);
proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
channel.writeAndFlush(buffer);
safeWriteAndFlush(channel, buffer);
}

private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
Expand All @@ -360,7 +363,7 @@ private static void initialize(Configuration conf, Channel channel, DatanodeInfo
throws IOException {
Promise<Void> saslPromise = channel.eventLoop().newPromise();
trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
saslPromise.addListener(new FutureListener<Void>() {
addListener(saslPromise, new FutureListener<Void>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Promoting" the listener makes sense.


@Override
public void operationComplete(Future<Void> future) throws Exception {
Expand Down Expand Up @@ -404,7 +407,7 @@ private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSC
Promise<Channel> promise = eventLoopGroup.next().newPromise();
futureList.add(promise);
String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
new Bootstrap().group(eventLoopGroup).channel(channelClass)
addListener(new Bootstrap().group(eventLoopGroup).channel(channelClass)
.option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {

@Override
Expand All @@ -413,7 +416,7 @@ protected void initChannel(Channel ch) throws Exception {
// channel connected. Leave an empty implementation here because netty does not allow
// a null handler.
}
}).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() {
}).connect(NetUtils.createSocketAddr(dnAddr)), new ChannelFutureListener() {

@Override
public void operationComplete(ChannelFuture future) throws Exception {
Expand Down Expand Up @@ -533,12 +536,12 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
if (!succ) {
if (futureList != null) {
for (Future<Channel> f : futureList) {
f.addListener(new FutureListener<Channel>() {
addListener(f, new FutureListener<Channel>() {

@Override
public void operationComplete(Future<Channel> future) throws Exception {
if (future.isSuccess()) {
future.getNow().close();
safeClose(future.getNow());
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.asyncfs;

import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;

Expand Down Expand Up @@ -448,12 +449,12 @@ private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload,
size += CodedOutputStream.computeRawVarint32Size(size);
ByteBuf buf = ctx.alloc().buffer(size);
proto.writeDelimitedTo(new ByteBufOutputStream(buf));
ctx.write(buf);
safeWrite(ctx, buf);
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER));
safeWrite(ctx, ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER));
sendSaslMessage(ctx, new byte[0]);
ctx.flush();
step++;
Expand Down Expand Up @@ -642,7 +643,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
cBuf.addComponent(buf);
cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes());
} else {
ctx.write(msg);
safeWrite(ctx, msg);
}
}

Expand All @@ -656,7 +657,7 @@ public void flush(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length);
buf.writeInt(wrapped.length);
buf.writeBytes(wrapped);
ctx.write(buf);
safeWrite(ctx, buf);
}
ctx.flush();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.asyncfs;

import static org.apache.hadoop.hbase.util.FutureUtils.consume;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -93,9 +94,9 @@ public static void setUp() throws Exception {
}

@AfterClass
public static void tearDown() throws IOException, InterruptedException {
public static void tearDown() throws Exception {
if (EVENT_LOOP_GROUP != null) {
EVENT_LOOP_GROUP.shutdownGracefully().sync();
EVENT_LOOP_GROUP.shutdownGracefully().get();
}
shutdownMiniDFSCluster();
}
Expand Down Expand Up @@ -262,7 +263,7 @@ public void testWriteLargeChunk() throws IOException, InterruptedException, Exec
byte[] b = new byte[50 * 1024 * 1024];
Bytes.random(b);
out.write(b);
out.flush(false);
consume(out.flush(false));
assertEquals(b.length, out.flush(false).get().longValue());
out.close();
assertEquals(b.length, FS.getFileStatus(f).getLen());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -103,12 +102,12 @@ public static void setUp() throws Exception {
}

@AfterClass
public static void tearDown() throws IOException, InterruptedException {
public static void tearDown() throws Exception {
if (OUT != null) {
OUT.recoverAndClose(null);
}
if (EVENT_LOOP_GROUP != null) {
EVENT_LOOP_GROUP.shutdownGracefully().sync();
EVENT_LOOP_GROUP.shutdownGracefully().get();
}
shutdownMiniDFSCluster();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ public class TestLocalAsyncOutput {
private static StreamSlowMonitor MONITOR;

@AfterClass
public static void tearDownAfterClass() throws IOException {
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.cleanupTestDir();
GROUP.shutdownGracefully();
GROUP.shutdownGracefully().get();
MONITOR = StreamSlowMonitor.create(TEST_UTIL.getConfiguration(), "testMonitor");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ public static void setUpBeforeClass() throws Exception {
}

@AfterClass
public static void tearDownAfterClass() throws IOException, InterruptedException {
public static void tearDownAfterClass() throws Exception {
if (EVENT_LOOP_GROUP != null) {
EVENT_LOOP_GROUP.shutdownGracefully().sync();
EVENT_LOOP_GROUP.shutdownGracefully().get();
}
if (KDC != null) {
KDC.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.util.NettyFutureUtils.consume;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeClose;

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Constructor;
Expand Down Expand Up @@ -183,7 +186,6 @@ public MulticastListener() {

@Override
public void connect(Configuration conf) throws IOException {

String mcAddress =
conf.get(HConstants.STATUS_MULTICAST_ADDRESS, HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
String bindAddress = conf.get(HConstants.STATUS_MULTICAST_BIND_ADDRESS,
Expand Down Expand Up @@ -218,16 +220,21 @@ public void connect(Configuration conf) throws IOException {
}

LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina);
channel.joinGroup(ina, ni, null, channel.newPromise());
try {
consume(channel.joinGroup(ina, ni, null).sync());
} catch (InterruptedException e) {
close();
throw ExceptionUtil.asInterrupt(e);
}
}

@Override
public void close() {
if (channel != null) {
channel.close();
safeClose(channel);
channel = null;
}
group.shutdownGracefully();
consume(group.shutdownGracefully());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void testWrapConnectionException() throws Exception {
}

@Test
public void testExecute() throws IOException {
public void testExecute() throws Exception {
EventLoop eventLoop = new DefaultEventLoop();
MutableInt executed = new MutableInt(0);
MutableInt numStackTraceElements = new MutableInt(0);
Expand Down Expand Up @@ -156,7 +156,7 @@ public void run() {
});
FutureUtils.get(future);
} finally {
eventLoop.shutdownGracefully();
eventLoop.shutdownGracefully().get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ public static <T> void addListener(CompletableFuture<T> future,
}, executor);
}

/**
* Log the error if the future indicates any failure.
*/
public static void consume(CompletableFuture<?> future) {
addListener(future, (r, e) -> {
if (e != null) {
LOG.warn("Async operation fails", e);
}
});
}

/**
* Return a {@link CompletableFuture} which is same with the given {@code future}, but execute all
* the callbacks in the given {@code executor}.
Expand Down
Loading