diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index a43bed317345..149dec431e0f 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -22,23 +22,26 @@ import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus; +import static org.apache.hadoop.hbase.util.NettyFutureUtils.consume; +import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite; +import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE; import com.google.errorprone.annotations.RestrictedApi; import java.io.IOException; -import java.io.InterruptedIOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; @@ -48,6 +51,8 @@ import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.hadoop.hbase.util.NettyFutureUtils; import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -63,14 +68,13 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; -import org.apache.hbase.thirdparty.com.google.common.base.Throwables; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; import org.apache.hbase.thirdparty.io.netty.channel.Channel; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; import org.apache.hbase.thirdparty.io.netty.channel.ChannelId; -import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundInvoker; import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; @@ -252,7 +256,7 @@ private synchronized void failed(Channel channel, Supplier errorSuppl // disable further write, and fail all pending ack. state = State.BROKEN; failWaitingAckQueue(channel, errorSupplier); - datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close); + datanodeInfoMap.keySet().forEach(NettyFutureUtils::safeClose); } private void failWaitingAckQueue(Channel channel, Supplier errorSupplier) { @@ -329,7 +333,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc ByteBuf buf = alloc.buffer(len); heartbeat.putInBuffer(buf.nioBuffer(0, len)); buf.writerIndex(len); - ctx.channel().writeAndFlush(buf); + safeWriteAndFlush(ctx.channel(), buf); } return; } @@ -440,9 +444,9 @@ private void flushBuffer(CompletableFuture future, ByteBuf dataBuf, // TODO: we should perhaps measure time taken per DN here; // we could collect statistics per DN, and/or exclude bad nodes in createOutput. datanodeInfoMap.keySet().forEach(ch -> { - ch.write(headerBuf.retainedDuplicate()); - ch.write(checksumBuf.retainedDuplicate()); - ch.writeAndFlush(dataBuf.retainedDuplicate()); + safeWrite(ch, headerBuf.retainedDuplicate()); + safeWrite(ch, checksumBuf.retainedDuplicate()); + safeWriteAndFlush(ch, dataBuf.retainedDuplicate()); }); checksumBuf.release(); headerBuf.release(); @@ -562,16 +566,18 @@ private void endBlock() throws IOException { headerBuf.writerIndex(headerLen); CompletableFuture future = new CompletableFuture<>(); waitingAckQueue.add(new Callback(future, finalizedLength, datanodeInfoMap.keySet(), 0)); - datanodeInfoMap.keySet().forEach(ch -> ch.writeAndFlush(headerBuf.retainedDuplicate())); + datanodeInfoMap.keySet().forEach(ch -> safeWriteAndFlush(ch, headerBuf.retainedDuplicate())); headerBuf.release(); - try { - future.get(); - } catch (InterruptedException e) { - throw (IOException) new InterruptedIOException().initCause(e); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - Throwables.propagateIfPossible(cause, IOException.class); - throw new IOException(cause); + FutureUtils.get(future); + } + + private void closeDataNodeChannelsAndAwait() { + List futures = new ArrayList<>(); + for (Channel ch : datanodeInfoMap.keySet()) { + futures.add(ch.close()); + } + for (ChannelFuture future : futures) { + consume(future.awaitUninterruptibly()); } } @@ -579,14 +585,12 @@ private void endBlock() throws IOException { * The close method when error occurred. Now we just call recoverFileLease. */ @Override - @SuppressWarnings("FutureReturnValueIgnored") public void recoverAndClose(CancelableProgressable reporter) throws IOException { if (buf != null) { buf.release(); buf = null; } - datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close); - datanodeInfoMap.keySet().forEach(ch -> ch.closeFuture().awaitUninterruptibly()); + closeDataNodeChannelsAndAwait(); endFileLease(client, fileId); RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf, reporter == null ? new CancelOnClose(client) : reporter); @@ -597,12 +601,10 @@ public void recoverAndClose(CancelableProgressable reporter) throws IOException * {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception. */ @Override - @SuppressWarnings("FutureReturnValueIgnored") public void close() throws IOException { endBlock(); state = State.CLOSED; - datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close); - datanodeInfoMap.keySet().forEach(ch -> ch.closeFuture().awaitUninterruptibly()); + closeDataNodeChannelsAndAwait(); block.setNumBytes(ackedBlockLength); completeFile(client, namenode, src, clientName, block, fileId); } diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 2517f2d2c01a..9c66c53b8bfe 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -19,6 +19,9 @@ import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate; +import static org.apache.hadoop.hbase.util.NettyFutureUtils.addListener; +import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeClose; +import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; @@ -351,7 +354,7 @@ private static void requestWriteBlock(Channel channel, StorageType storageType, buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); buffer.writeByte(Op.WRITE_BLOCK.code); proto.writeDelimitedTo(new ByteBufOutputStream(buffer)); - channel.writeAndFlush(buffer); + safeWriteAndFlush(channel, buffer); } private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo, @@ -360,7 +363,7 @@ private static void initialize(Configuration conf, Channel channel, DatanodeInfo throws IOException { Promise saslPromise = channel.eventLoop().newPromise(); trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise); - saslPromise.addListener(new FutureListener() { + addListener(saslPromise, new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -404,7 +407,7 @@ private static List> connectToDataNodes(Configuration conf, DFSC Promise promise = eventLoopGroup.next().newPromise(); futureList.add(promise); String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname); - new Bootstrap().group(eventLoopGroup).channel(channelClass) + addListener(new Bootstrap().group(eventLoopGroup).channel(channelClass) .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer() { @Override @@ -413,7 +416,7 @@ protected void initChannel(Channel ch) throws Exception { // channel connected. Leave an empty implementation here because netty does not allow // a null handler. } - }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() { + }).connect(NetUtils.createSocketAddr(dnAddr)), new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -533,12 +536,12 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d if (!succ) { if (futureList != null) { for (Future f : futureList) { - f.addListener(new FutureListener() { + addListener(f, new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (future.isSuccess()) { - future.getNow().close(); + safeClose(future.getNow()); } } }); diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java index ee02d42d2d3b..4ac46e8cc5dc 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.io.asyncfs; +import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; @@ -448,12 +449,12 @@ private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, size += CodedOutputStream.computeRawVarint32Size(size); ByteBuf buf = ctx.alloc().buffer(size); proto.writeDelimitedTo(new ByteBufOutputStream(buf)); - ctx.write(buf); + safeWrite(ctx, buf); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER)); + safeWrite(ctx, ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER)); sendSaslMessage(ctx, new byte[0]); ctx.flush(); step++; @@ -642,7 +643,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) cBuf.addComponent(buf); cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes()); } else { - ctx.write(msg); + safeWrite(ctx, msg); } } @@ -656,7 +657,7 @@ public void flush(ChannelHandlerContext ctx) throws Exception { ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length); buf.writeInt(wrapped.length); buf.writeBytes(wrapped); - ctx.write(buf); + safeWrite(ctx, buf); } ctx.flush(); } diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 26cbbe034a58..3c3852831033 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.io.asyncfs; +import static org.apache.hadoop.hbase.util.FutureUtils.consume; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; @@ -93,9 +94,9 @@ public static void setUp() throws Exception { } @AfterClass - public static void tearDown() throws IOException, InterruptedException { + public static void tearDown() throws Exception { if (EVENT_LOOP_GROUP != null) { - EVENT_LOOP_GROUP.shutdownGracefully().sync(); + EVENT_LOOP_GROUP.shutdownGracefully().get(); } shutdownMiniDFSCluster(); } @@ -262,7 +263,7 @@ public void testWriteLargeChunk() throws IOException, InterruptedException, Exec byte[] b = new byte[50 * 1024 * 1024]; Bytes.random(b); out.write(b); - out.flush(false); + consume(out.flush(false)); assertEquals(b.length, out.flush(false).get().longValue()); out.close(); assertEquals(b.length, FS.getFileStatus(f).getLen()); diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java index 53fb37a8e0bc..77752789dbb3 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java @@ -20,7 +20,6 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -103,12 +102,12 @@ public static void setUp() throws Exception { } @AfterClass - public static void tearDown() throws IOException, InterruptedException { + public static void tearDown() throws Exception { if (OUT != null) { OUT.recoverAndClose(null); } if (EVENT_LOOP_GROUP != null) { - EVENT_LOOP_GROUP.shutdownGracefully().sync(); + EVENT_LOOP_GROUP.shutdownGracefully().get(); } shutdownMiniDFSCluster(); } diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java index cb936a4e7c65..d1ce128b118d 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java @@ -53,9 +53,9 @@ public class TestLocalAsyncOutput { private static StreamSlowMonitor MONITOR; @AfterClass - public static void tearDownAfterClass() throws IOException { + public static void tearDownAfterClass() throws Exception { TEST_UTIL.cleanupTestDir(); - GROUP.shutdownGracefully(); + GROUP.shutdownGracefully().get(); MONITOR = StreamSlowMonitor.create(TEST_UTIL.getConfiguration(), "testMonitor"); } diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java index cb5fb4006d3e..479b8f4e6034 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java @@ -193,9 +193,9 @@ public static void setUpBeforeClass() throws Exception { } @AfterClass - public static void tearDownAfterClass() throws IOException, InterruptedException { + public static void tearDownAfterClass() throws Exception { if (EVENT_LOOP_GROUP != null) { - EVENT_LOOP_GROUP.shutdownGracefully().sync(); + EVENT_LOOP_GROUP.shutdownGracefully().get(); } if (KDC != null) { KDC.stop(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java index cc34d59c7321..26c5d98b46de 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.util.NettyFutureUtils.consume; +import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeClose; + import java.io.Closeable; import java.io.IOException; import java.lang.reflect.Constructor; @@ -183,7 +186,6 @@ public MulticastListener() { @Override public void connect(Configuration conf) throws IOException { - String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS, HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS); String bindAddress = conf.get(HConstants.STATUS_MULTICAST_BIND_ADDRESS, @@ -218,16 +220,21 @@ public void connect(Configuration conf) throws IOException { } LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina); - channel.joinGroup(ina, ni, null, channel.newPromise()); + try { + consume(channel.joinGroup(ina, ni, null).sync()); + } catch (InterruptedException e) { + close(); + throw ExceptionUtil.asInterrupt(e); + } } @Override public void close() { if (channel != null) { - channel.close(); + safeClose(channel); channel = null; } - group.shutdownGracefully(); + consume(group.shutdownGracefully()); } /** diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java index c327896f72ab..67a8d15c1d02 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java @@ -121,7 +121,7 @@ public void testWrapConnectionException() throws Exception { } @Test - public void testExecute() throws IOException { + public void testExecute() throws Exception { EventLoop eventLoop = new DefaultEventLoop(); MutableInt executed = new MutableInt(0); MutableInt numStackTraceElements = new MutableInt(0); @@ -156,7 +156,7 @@ public void run() { }); FutureUtils.get(future); } finally { - eventLoop.shutdownGracefully(); + eventLoop.shutdownGracefully().get(); } } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java index dc70edd0905e..4f8a7320fb40 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java @@ -93,6 +93,17 @@ public static void addListener(CompletableFuture future, }, executor); } + /** + * Log the error if the future indicates any failure. + */ + public static void consume(CompletableFuture future) { + addListener(future, (r, e) -> { + if (e != null) { + LOG.warn("Async operation fails", e); + } + }); + } + /** * Return a {@link CompletableFuture} which is same with the given {@code future}, but execute all * the callbacks in the given {@code executor}. diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NettyFutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NettyFutureUtils.java new file mode 100644 index 000000000000..1613569fe94f --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NettyFutureUtils.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundInvoker; +import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future; +import org.apache.hbase.thirdparty.io.netty.util.concurrent.GenericFutureListener; + +/** + * Helper class for processing netty futures. + */ +@InterfaceAudience.Private +public final class NettyFutureUtils { + + private static final Logger LOG = LoggerFactory.getLogger(NettyFutureUtils.class); + + private NettyFutureUtils() { + } + + /** + * This is method is used when you just want to add a listener to the given netty future. Ignoring + * the return value of a Future is considered as a bad practice as it may suppress exceptions + * thrown from the code that completes the future, and this method will catch all the exception + * thrown from the {@code listener} to catch possible code bugs. + *

+ * And the error phone check will always report FutureReturnValueIgnored because every method in + * the {@link Future} class will return a new {@link Future}, so you always have one future that + * has not been checked. So we introduce this method and add a suppress warnings annotation here. + */ + @SuppressWarnings({ "FutureReturnValueIgnored", "rawtypes", "unchecked" }) + public static void addListener(Future future, + GenericFutureListener> listener) { + future.addListener(f -> { + try { + // the ? operator in template makes it really hard to pass compile, so here we just cast the + // listener to raw type. + ((GenericFutureListener) listener).operationComplete(f); + } catch (Throwable t) { + LOG.error("Unexpected error caught when processing netty", t); + } + }); + } + + private static void loggingWhenError(Future future) { + if (!future.isSuccess()) { + LOG.warn("IO operation failed", future.cause()); + } + } + + /** + * Log the error if the future indicates any failure. + */ + @SuppressWarnings("FutureReturnValueIgnored") + public static void consume(Future future) { + future.addListener(NettyFutureUtils::loggingWhenError); + } + + /** + * Close the channel and eat the returned future by logging the error when the future is completed + * with error. + */ + public static void safeClose(ChannelOutboundInvoker channel) { + consume(channel.close()); + } + + /** + * Call write on the channel and eat the returned future by logging the error when the future is + * completed with error. + */ + public static void safeWrite(ChannelOutboundInvoker channel, Object msg) { + consume(channel.write(msg)); + } + + /** + * Call writeAndFlush on the channel and eat the returned future by logging the error when the + * future is completed with error. + */ + public static void safeWriteAndFlush(ChannelOutboundInvoker channel, Object msg) { + consume(channel.writeAndFlush(msg)); + } +} diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java index d731ac779bff..9ecd266b5dde 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client.example; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; +import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush; import java.io.IOException; import java.net.InetSocketAddress; @@ -71,6 +72,11 @@ * * Use HTTP GET to fetch data, and use HTTP PUT to put data. Encode the value as the request content * when doing PUT. + *

+ * Notice that, future class methods will all return a new Future, so you always have one future + * that will not been checked, so we need to suppress error-prone "FutureReturnValueIgnored" + * warnings on the methods such as join and stop. In your real production code, you should use your + * own convenient way to address the warning. */ @InterfaceAudience.Private public class HttpProxyExample { @@ -148,7 +154,7 @@ private void write(ChannelHandlerContext ctx, HttpResponseStatus status, String resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status); } resp.headers().set(HttpHeaderNames.CONTENT_TYPE, "text-plain; charset=UTF-8"); - ctx.writeAndFlush(resp); + safeWriteAndFlush(ctx, resp); } private Params parse(FullHttpRequest req) { @@ -239,6 +245,7 @@ protected void initChannel(Channel ch) throws Exception { }).bind(port).syncUninterruptibly().channel(); } + @SuppressWarnings("FutureReturnValueIgnored") public void join() { serverChannel.closeFuture().awaitUninterruptibly(); } @@ -251,6 +258,7 @@ public int port() { } } + @SuppressWarnings("FutureReturnValueIgnored") public void stop() throws IOException { serverChannel.close().syncUninterruptibly(); serverChannel = null; diff --git a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java index 1129921ef350..d5c6bff8cd01 100644 --- a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java +++ b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java @@ -80,23 +80,10 @@ public MemcachedBlockCache(Configuration c) throws IOException { boolean optimize = c.getBoolean(MEMCACHED_OPTIMIZE_KEY, MEMCACHED_OPTIMIZE_DEFAULT); ConnectionFactoryBuilder builder = - new ConnectionFactoryBuilder().setOpTimeout(opTimeout).setOpQueueMaxBlockTime(queueTimeout) // Cap - // the - // max - // time - // before - // anything - // times - // out - .setFailureMode(FailureMode.Redistribute).setShouldOptimize(optimize).setDaemon(true) // Don't - // keep - // threads - // around - // past - // the - // end - // of - // days. + // Cap the max time before anything times out + new ConnectionFactoryBuilder().setOpTimeout(opTimeout).setOpQueueMaxBlockTime(queueTimeout) + // Don't keep threads around past the end of days. + .setFailureMode(FailureMode.Redistribute).setShouldOptimize(optimize).setDaemon(true) .setUseNagleAlgorithm(false) // Ain't nobody got time for that .setReadBufferSize(HConstants.DEFAULT_BLOCKSIZE * 4 * 1024); // Much larger just in case @@ -124,10 +111,17 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) cacheBlock(cacheKey, buf); } + @SuppressWarnings("FutureReturnValueIgnored") @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { if (buf instanceof HFileBlock) { - client.add(cacheKey.toString(), MAX_SIZE, (HFileBlock) buf, tc); + client.add(cacheKey.toString(), MAX_SIZE, (HFileBlock) buf, tc).addListener(f -> { + try { + f.get(); + } catch (ExecutionException e) { + LOG.warn("Failed to cache block", e); + } + }); } else { if (LOG.isDebugEnabled()) { LOG.debug( diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ShellExecEndpointCoprocessor.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ShellExecEndpointCoprocessor.java index edaa2a7e40f3..e80989174c4e 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ShellExecEndpointCoprocessor.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ShellExecEndpointCoprocessor.java @@ -108,7 +108,7 @@ private void runForegroundTask(final Shell.ShellCommandExecutor shell, private void runBackgroundTask(final Shell.ShellCommandExecutor shell, final RpcCallback done) { final long sleepDuration = conf.getLong(BACKGROUND_DELAY_MS_KEY, DEFAULT_BACKGROUND_DELAY_MS); - backgroundExecutor.submit(() -> { + backgroundExecutor.execute(() -> { try { // sleep first so that the RPC can ACK. race condition here as we have no means of blocking // until the IPC response has been acknowledged by the client. diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MergeRandomAdjacentRegionsOfTableAction.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MergeRandomAdjacentRegionsOfTableAction.java index 4dfb1af0196f..b019197b7d0a 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MergeRandomAdjacentRegionsOfTableAction.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MergeRandomAdjacentRegionsOfTableAction.java @@ -72,7 +72,7 @@ public void perform() throws Exception { } try { - admin.mergeRegionsAsync(a.getEncodedNameAsBytes(), b.getEncodedNameAsBytes(), false); + admin.mergeRegionsAsync(a.getEncodedNameAsBytes(), b.getEncodedNameAsBytes(), false).get(); } catch (Exception ex) { getLogger().warn("Merge failed, might be caused by other chaos: " + ex.getMessage()); } diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java index bdb1c719af28..52a88743f557 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.test; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; + import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.DataInput; @@ -684,9 +686,9 @@ protected void map(final LongWritable key, final WARCWritable value, final Conte } final long putStartTime = System.currentTimeMillis(); final CompletableFuture putFuture = table.put(put); - putFuture.thenRun(() -> { + addListener(putFuture, (r, e) -> { inflight.decrementAndGet(); - if (!putFuture.isCompletedExceptionally()) { + if (e == null) { output.getCounter(Counts.RPC_TIME_MS) .increment(System.currentTimeMillis() - putStartTime); output.getCounter(Counts.RPC_BYTES_WRITTEN).increment(put.heapSize()); @@ -732,9 +734,9 @@ protected void map(final LongWritable key, final WARCWritable value, final Conte } final long incrStartTime = System.currentTimeMillis(); final CompletableFuture incrFuture = table.increment(increment); - incrFuture.thenRun(() -> { + addListener(incrFuture, (r, e) -> { inflight.decrementAndGet(); - if (!incrFuture.isCompletedExceptionally()) { + if (e == null) { output.getCounter(Counts.RPC_TIME_MS) .increment(System.currentTimeMillis() - incrStartTime); output.getCounter(Counts.RPC_BYTES_WRITTEN).increment(increment.heapSize()); diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java index 35b70120993d..49ac9f1a2397 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java @@ -150,7 +150,7 @@ public void run() { } } }; - service.submit(runnable); + service.execute(runnable); } } @@ -194,7 +194,7 @@ public void run() { } }; - service.submit(runnable); + service.execute(runnable); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java index 1807fcd9e88b..49abb3035a3a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java @@ -722,6 +722,7 @@ public boolean evaluate() throws Exception { assertEquals(11111111, ADMIN.getDescriptor(tableName).getMaxFileSize()); } + @SuppressWarnings("FutureReturnValueIgnored") @Test public void testTableMergeFollowedByModify() throws Exception { final TableName tableName = TableName.valueOf(name.getMethodName()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestWakeUpUnexpectedProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestWakeUpUnexpectedProcedure.java index 824ccb49eab4..f0c8cf58abeb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestWakeUpUnexpectedProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestWakeUpUnexpectedProcedure.java @@ -236,6 +236,7 @@ public static void tearDown() throws Exception { UTIL.shutdownMiniCluster(); } + @SuppressWarnings("FutureReturnValueIgnored") @Test public void test() throws Exception { RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 72d073ea3d58..3e25f4aa61d3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -1284,7 +1284,7 @@ public void testCreateScannerAndSnapshotConcurrently() throws IOException, Inter storeFlushCtx.prepare(); }; ExecutorService service = Executors.newSingleThreadExecutor(); - service.submit(flush); + service.execute(flush); // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5) // this is blocked until we recreate the active memstore -- phase (3/5) // we get scanner from active memstore but it is empty -- phase (5/5) @@ -1321,7 +1321,7 @@ public void testScanWithDoubleFlush() throws IOException { public void getScanners(MyStore store) throws IOException { final long tmpId = id++; ExecutorService s = Executors.newSingleThreadExecutor(); - s.submit(() -> { + s.execute(() -> { try { // flush the store before storescanner updates the scanners from store. // The current data will be flushed into files, and the memstore will diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java index 58ec1a79cf23..8402617c44b5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java @@ -52,8 +52,8 @@ public static void setUpBeforeClass() { } @AfterClass - public static void tearDownAfterClass() { - GROUP.shutdownGracefully(); + public static void tearDownAfterClass() throws Exception { + GROUP.shutdownGracefully().get(); } @Override diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java index 786c78d56142..9608ffc17c69 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java @@ -604,7 +604,7 @@ public void process(WatchedEvent event) { LOG.debug(prefix("Received ZooKeeper Event, " + "type=" + event.getType() + ", " + "state=" + event.getState() + ", " + "path=" + event.getPath())); final String spanName = ZKWatcher.class.getSimpleName() + "-" + identifier; - zkEventProcessor.submit(TraceUtil.tracedRunnable(() -> processEvent(event), spanName)); + zkEventProcessor.execute(TraceUtil.tracedRunnable(() -> processEvent(event), spanName)); } // Connection management