From 8272568b895a8a660280174d7235f254f5d8988f Mon Sep 17 00:00:00 2001 From: comnetwork Date: Tue, 18 Jan 2022 18:11:04 +0800 Subject: [PATCH 01/15] HBASE-26679 Wait on the future returned by FanOutOneBlockAsyncDFSOutput.flush would stuck --- .../asyncfs/FanOutOneBlockAsyncDFSOutput.java | 14 +- .../FanOutOneBlockAsyncDFSOutputHelper.java | 21 ++- .../TestFanOutOneBlockAsyncDFSOutput.java | 128 +++++++++++++++++- 3 files changed, 158 insertions(+), 5 deletions(-) diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 41e345f46b82..08d144e414da 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -187,7 +187,7 @@ private enum State { private final StreamSlowMonitor streamSlowMonitor; // all lock-free to make it run faster - private void completed(Channel channel) { + protected void completed(Channel channel) { for (Iterator iter = waitingAckQueue.iterator(); iter.hasNext();) { Callback c = iter.next(); // if the current unfinished replicas does not contain us then it means that we have already @@ -231,7 +231,11 @@ private void completed(Channel channel) { // so that the implementation will not burn up our brain as there are multiple state changes and // checks. private synchronized void failed(Channel channel, Supplier errorSupplier) { - if (state == State.BROKEN || state == State.CLOSED) { + if (state == State.CLOSED) { + return; + } + if (state == State.BROKEN) { + failWaitingAckQueue(channel, errorSupplier); return; } if (state == State.CLOSING) { @@ -243,6 +247,11 @@ private synchronized void failed(Channel channel, Supplier errorSuppl } // disable further write, and fail all pending ack. state = State.BROKEN; + failWaitingAckQueue(channel, errorSupplier); + datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close); + } + + private void failWaitingAckQueue(Channel channel, Supplier errorSupplier) { Throwable error = errorSupplier.get(); for (Iterator iter = waitingAckQueue.iterator(); iter.hasNext();) { Callback c = iter.next(); @@ -259,7 +268,6 @@ private synchronized void failed(Channel channel, Supplier errorSuppl } break; } - datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close); } @Sharable diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 7c62d67c6cee..68e52440c9d6 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -124,6 +125,8 @@ private FanOutOneBlockAsyncDFSOutputHelper() { public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries"; public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10; + + public static final String ASYNC_DFS_OUTPUT_CLASS_NAME = "hbase.fs.async.output.class"; // use pooled allocator for performance. private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT; @@ -505,7 +508,7 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d } Encryptor encryptor = createEncryptor(conf, stat, client); FanOutOneBlockAsyncDFSOutput output = - new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src, + doCreateFanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src, stat.getFileId(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor); succ = true; return output; @@ -551,6 +554,22 @@ public void operationComplete(Future future) throws Exception { } } + private static FanOutOneBlockAsyncDFSOutput doCreateFanOutOneBlockAsyncDFSOutput( + Configuration conf, DistributedFileSystem dfs, DFSClient client, ClientProtocol namenode, + String clientName, + String src, long fileId, LocatedBlock locatedBlock, Encryptor encryptor, + Map datanodeInfoMap, DataChecksum summer, ByteBufAllocator alloc, + StreamSlowMonitor streamSlowMonitor) { + + Class dfsOutputClass = + conf.getClass(ASYNC_DFS_OUTPUT_CLASS_NAME, FanOutOneBlockAsyncDFSOutput.class, + FanOutOneBlockAsyncDFSOutput.class); + return ReflectionUtils.newInstance(dfsOutputClass, new Object[] { conf, dfs, client, namenode, + clientName, src, fileId, locatedBlock, encryptor, datanodeInfoMap, summer, alloc, + streamSlowMonitor }); + + } + /** * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it * inside an {@link EventLoop}. diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 8533d38bae09..84d093ec42c4 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -22,6 +22,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.FileNotFoundException; @@ -31,10 +32,16 @@ import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.Encryptor; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -44,10 +51,15 @@ import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.util.DataChecksum; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -57,7 +69,7 @@ import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; @@ -272,4 +284,118 @@ public void testWriteLargeChunk() throws IOException, InterruptedException, Exec } assertArrayEquals(b, actual); } + + /** + * This test is for HBASE-26679. + */ + @Test + public void testFlushStuckWhenOneDataNodeShutdown() throws Exception { + Path f = new Path("/" + name.getMethodName()); + EventLoop eventLoop = EVENT_LOOP_GROUP.next(); + + Configuration conf = FS.getConf(); + conf.set(FanOutOneBlockAsyncDFSOutputHelper.ASYNC_DFS_OUTPUT_CLASS_NAME, + MyFanOutOneBlockAsyncDFSOutput.class.getName()); + + DataNodeProperties firstDataNodeProperties = null; + try { + MyFanOutOneBlockAsyncDFSOutput out = + (MyFanOutOneBlockAsyncDFSOutput) FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, + true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + + byte[] b = new byte[10]; + ThreadLocalRandom.current().nextBytes(b); + out.write(b, 0, b.length); + CompletableFuture future = out.flush(false); + /** + * First ack is received from dataNode1,we could stop dataNode1 now. + */ + out.stopCyclicBarrier.await(); + Channel firstDataNodeChannel = out.alreadyNotifiedChannelRef.get(); + assertTrue(firstDataNodeChannel != null); + firstDataNodeProperties = findAndKillFirstDataNode(out.datanodeInfoMap, firstDataNodeChannel); + assertTrue(firstDataNodeProperties != null); + try { + future.get(); + fail(); + } catch (Throwable e) { + LOG.info("expected exception caught when get future", e); + } + /** + * Make sure all the data node channel are closed. + */ + out.datanodeInfoMap.keySet().forEach(ch -> ch.closeFuture().awaitUninterruptibly()); + } finally { + conf.unset(FanOutOneBlockAsyncDFSOutputHelper.ASYNC_DFS_OUTPUT_CLASS_NAME); + if (firstDataNodeProperties != null) { + CLUSTER.restartDataNode(firstDataNodeProperties); + } + } + + } + + private static DataNodeProperties findAndKillFirstDataNode( + Map datanodeInfoMap, + Channel firstChannel) { + DatanodeInfo firstDatanodeInfo = datanodeInfoMap.get(firstChannel); + assertTrue(firstDatanodeInfo != null); + ArrayList dataNodes = CLUSTER.getDataNodes(); + ArrayList foundIndexes = new ArrayList(); + int index = 0; + for (DataNode dataNode : dataNodes) { + if (firstDatanodeInfo.getXferAddr().equals(dataNode.getDatanodeId().getXferAddr())) { + foundIndexes.add(index); + } + index++; + } + assertTrue(foundIndexes.size() == 1); + + return CLUSTER.stopDataNode(foundIndexes.get(0)); + } + + static class MyFanOutOneBlockAsyncDFSOutput extends FanOutOneBlockAsyncDFSOutput { + + private final AtomicReference alreadyNotifiedChannelRef = + new AtomicReference(null); + private final CyclicBarrier stopCyclicBarrier = new CyclicBarrier(2); + private final Map datanodeInfoMap; + + @Override + protected void completed(Channel channel) { + /** + * Here it is hard to simulate slow response from datanode because I can not modify the code + * of HDFS, so here because this method is only invoked by + * {@link FanOutOneBlockAsyncDFSOutput.AckHandler#channelRead0}, we simulate slow response + * from datanode by just permit the acks from the first responding data node to forward,and + * discard the acks from other slow data nodes. + */ + boolean success = this.alreadyNotifiedChannelRef.compareAndSet(null, channel); + if (channel.equals(this.alreadyNotifiedChannelRef.get())) { + super.completed(channel); + } + + if (success) { + /** + * Here we tell the test method we could stop the data node now which send the first ack. + */ + try { + stopCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + } + + MyFanOutOneBlockAsyncDFSOutput(Configuration conf, DistributedFileSystem dfs, + DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId, + LocatedBlock locatedBlock, Encryptor encryptor, Map datanodeInfoMap, + DataChecksum summer, ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) { + super(conf, dfs, client, namenode, clientName, src, fileId, locatedBlock, encryptor, + datanodeInfoMap, summer, alloc, streamSlowMonitor); + this.datanodeInfoMap = datanodeInfoMap; + + } + + } + } From 27a27a3526783d2286c542e50ece8404dc130a11 Mon Sep 17 00:00:00 2001 From: comnetwork Date: Tue, 18 Jan 2022 19:47:54 +0800 Subject: [PATCH 02/15] fix checkstyle error --- .../asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java | 6 +++--- .../io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java | 11 +++++++++-- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 68e52440c9d6..007fa74c6564 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -564,9 +564,9 @@ private static FanOutOneBlockAsyncDFSOutput doCreateFanOutOneBlockAsyncDFSOutput Class dfsOutputClass = conf.getClass(ASYNC_DFS_OUTPUT_CLASS_NAME, FanOutOneBlockAsyncDFSOutput.class, FanOutOneBlockAsyncDFSOutput.class); - return ReflectionUtils.newInstance(dfsOutputClass, new Object[] { conf, dfs, client, namenode, - clientName, src, fileId, locatedBlock, encryptor, datanodeInfoMap, summer, alloc, - streamSlowMonitor }); + return ReflectionUtils.newInstance(dfsOutputClass, + new Object[] { conf, dfs, client, namenode, clientName, src, fileId, locatedBlock, encryptor, + datanodeInfoMap, summer, alloc, streamSlowMonitor }); } diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 84d093ec42c4..fd334cf30bb0 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -318,13 +318,20 @@ public void testFlushStuckWhenOneDataNodeShutdown() throws Exception { try { future.get(); fail(); - } catch (Throwable e) { + } catch (ExecutionException e) { + assertTrue(e != null); LOG.info("expected exception caught when get future", e); } /** * Make sure all the data node channel are closed. */ - out.datanodeInfoMap.keySet().forEach(ch -> ch.closeFuture().awaitUninterruptibly()); + out.datanodeInfoMap.keySet().forEach(ch -> { + try { + ch.closeFuture().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); } finally { conf.unset(FanOutOneBlockAsyncDFSOutputHelper.ASYNC_DFS_OUTPUT_CLASS_NAME); if (firstDataNodeProperties != null) { From 4824bbda7e929c902d636d4af1f08f09eed59241 Mon Sep 17 00:00:00 2001 From: comnetwork Date: Mon, 24 Jan 2022 12:39:05 +0800 Subject: [PATCH 03/15] fix the tests --- .../asyncfs/FanOutOneBlockAsyncDFSOutput.java | 24 +++- .../FanOutOneBlockAsyncDFSOutputHelper.java | 23 +--- .../TestFanOutOneBlockAsyncDFSOutput.java | 129 ++++++++---------- 3 files changed, 83 insertions(+), 93 deletions(-) diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 08d144e414da..0fc12495ac56 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -31,6 +31,7 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; +import java.util.Deque; import java.util.Iterator; import java.util.Map; import java.util.Set; @@ -61,6 +62,8 @@ import org.apache.hadoop.util.DataChecksum; import org.apache.yetus.audience.InterfaceAudience; +import com.google.errorprone.annotations.RestrictedApi; + import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Throwables; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; @@ -131,7 +134,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { private final ByteBufAllocator alloc; - private static final class Callback { + protected static final class Callback { private final CompletableFuture future; @@ -157,6 +160,13 @@ public Callback(CompletableFuture future, long ackedLength, replicas.stream().map(Channel::id).forEachOrdered(unfinishedReplicas::add); } } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + Set getUnfinishedReplicas() { + return this.unfinishedReplicas; + } + } private final ConcurrentLinkedDeque waitingAckQueue = new ConcurrentLinkedDeque<>(); @@ -606,4 +616,16 @@ public boolean isBroken() { public long getSyncedLength() { return this.ackedBlockLength; } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + Map getDatanodeInfoMap() { + return this.datanodeInfoMap; + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + Deque getWaitingAckQueue() { + return this.waitingAckQueue; + } } diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 007fa74c6564..80f5031a974d 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.util.CancelableProgressable; -import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -125,8 +124,6 @@ private FanOutOneBlockAsyncDFSOutputHelper() { public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries"; public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10; - - public static final String ASYNC_DFS_OUTPUT_CLASS_NAME = "hbase.fs.async.output.class"; // use pooled allocator for performance. private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT; @@ -508,8 +505,8 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d } Encryptor encryptor = createEncryptor(conf, stat, client); FanOutOneBlockAsyncDFSOutput output = - doCreateFanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src, - stat.getFileId(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor); + new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src, + stat.getFileId(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor); succ = true; return output; } catch (RemoteException e) { @@ -554,22 +551,6 @@ public void operationComplete(Future future) throws Exception { } } - private static FanOutOneBlockAsyncDFSOutput doCreateFanOutOneBlockAsyncDFSOutput( - Configuration conf, DistributedFileSystem dfs, DFSClient client, ClientProtocol namenode, - String clientName, - String src, long fileId, LocatedBlock locatedBlock, Encryptor encryptor, - Map datanodeInfoMap, DataChecksum summer, ByteBufAllocator alloc, - StreamSlowMonitor streamSlowMonitor) { - - Class dfsOutputClass = - conf.getClass(ASYNC_DFS_OUTPUT_CLASS_NAME, FanOutOneBlockAsyncDFSOutput.class, - FanOutOneBlockAsyncDFSOutput.class); - return ReflectionUtils.newInstance(dfsOutputClass, - new Object[] { conf, dfs, client, namenode, clientName, src, fileId, locatedBlock, encryptor, - datanodeInfoMap, summer, alloc, streamSlowMonitor }); - - } - /** * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it * inside an {@link EventLoop}. diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index fd334cf30bb0..764c92cda006 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -31,35 +31,29 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.Deque; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.crypto.Encryptor; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutput.Callback; import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; -import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.util.DataChecksum; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -67,15 +61,17 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; + import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; + @Category({ MiscTests.class, MediumTests.class }) public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { @@ -293,27 +289,66 @@ public void testFlushStuckWhenOneDataNodeShutdown() throws Exception { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - Configuration conf = FS.getConf(); - conf.set(FanOutOneBlockAsyncDFSOutputHelper.ASYNC_DFS_OUTPUT_CLASS_NAME, - MyFanOutOneBlockAsyncDFSOutput.class.getName()); - DataNodeProperties firstDataNodeProperties = null; try { - MyFanOutOneBlockAsyncDFSOutput out = - (MyFanOutOneBlockAsyncDFSOutput) FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, + FanOutOneBlockAsyncDFSOutput out = + FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + Map datanodeInfoMap = out.getDatanodeInfoMap(); + Iterator> iterator = datanodeInfoMap.entrySet().iterator(); + assertTrue(iterator.hasNext()); + Map.Entry dn1Entry= iterator.next(); + DatanodeInfo dn1DatanodeInfo = dn1Entry.getValue(); + + assertTrue(iterator.hasNext()); + Map.Entry dn2Entry= iterator.next(); + Channel dn2Channel= dn2Entry.getKey(); + DatanodeInfo dn2DatanodeInfo = dn2Entry.getValue(); + /** + * Here we simulate slow response from dn2 and dn3 by just discard the message when flushing + * to dn2 and dn3. + */ + final Channel spiedDN2Channel = Mockito.spy(dn2Channel); + Mockito.doAnswer((invocation) -> { + return null; + }).when(spiedDN2Channel).write(Mockito.any()); + Mockito.doAnswer((invocation) -> { + return null; + }).when(spiedDN2Channel).writeAndFlush(Mockito.any()); + + assertTrue(iterator.hasNext()); + Map.Entry dn3Entry = iterator.next(); + Channel dn3Channel= dn3Entry.getKey(); + DatanodeInfo dn3DatanodeInfo = dn3Entry.getValue(); + final Channel spiedDN3Channel = Mockito.spy(dn3Channel); + Mockito.doAnswer((invocation) -> { + return null; + }).when(spiedDN3Channel).write(Mockito.any()); + Mockito.doAnswer((invocation) -> { + return null; + }).when(spiedDN3Channel).writeAndFlush(Mockito.any()); + + datanodeInfoMap.remove(dn2Channel); + datanodeInfoMap.remove(dn3Channel); + + datanodeInfoMap.put(spiedDN2Channel, dn2DatanodeInfo); + datanodeInfoMap.put(spiedDN3Channel, dn3DatanodeInfo); byte[] b = new byte[10]; ThreadLocalRandom.current().nextBytes(b); out.write(b, 0, b.length); CompletableFuture future = out.flush(false); + Deque ackQueue = out.getWaitingAckQueue(); + assertTrue(ackQueue.size() == 1); + Callback callback = ackQueue.getFirst(); + while (callback.getUnfinishedReplicas().size() != 2) { + Thread.sleep(1000); + } + /** - * First ack is received from dataNode1,we could stop dataNode1 now. + * First ack is received from dn1,we could stop dn1 now. */ - out.stopCyclicBarrier.await(); - Channel firstDataNodeChannel = out.alreadyNotifiedChannelRef.get(); - assertTrue(firstDataNodeChannel != null); - firstDataNodeProperties = findAndKillFirstDataNode(out.datanodeInfoMap, firstDataNodeChannel); + firstDataNodeProperties = findAndKillFirstDataNode(dn1DatanodeInfo); assertTrue(firstDataNodeProperties != null); try { future.get(); @@ -325,7 +360,7 @@ public void testFlushStuckWhenOneDataNodeShutdown() throws Exception { /** * Make sure all the data node channel are closed. */ - out.datanodeInfoMap.keySet().forEach(ch -> { + datanodeInfoMap.keySet().forEach(ch -> { try { ch.closeFuture().get(); } catch (InterruptedException | ExecutionException e) { @@ -333,7 +368,6 @@ public void testFlushStuckWhenOneDataNodeShutdown() throws Exception { } }); } finally { - conf.unset(FanOutOneBlockAsyncDFSOutputHelper.ASYNC_DFS_OUTPUT_CLASS_NAME); if (firstDataNodeProperties != null) { CLUSTER.restartDataNode(firstDataNodeProperties); } @@ -342,9 +376,7 @@ public void testFlushStuckWhenOneDataNodeShutdown() throws Exception { } private static DataNodeProperties findAndKillFirstDataNode( - Map datanodeInfoMap, - Channel firstChannel) { - DatanodeInfo firstDatanodeInfo = datanodeInfoMap.get(firstChannel); + DatanodeInfo firstDatanodeInfo) { assertTrue(firstDatanodeInfo != null); ArrayList dataNodes = CLUSTER.getDataNodes(); ArrayList foundIndexes = new ArrayList(); @@ -360,49 +392,4 @@ private static DataNodeProperties findAndKillFirstDataNode( return CLUSTER.stopDataNode(foundIndexes.get(0)); } - static class MyFanOutOneBlockAsyncDFSOutput extends FanOutOneBlockAsyncDFSOutput { - - private final AtomicReference alreadyNotifiedChannelRef = - new AtomicReference(null); - private final CyclicBarrier stopCyclicBarrier = new CyclicBarrier(2); - private final Map datanodeInfoMap; - - @Override - protected void completed(Channel channel) { - /** - * Here it is hard to simulate slow response from datanode because I can not modify the code - * of HDFS, so here because this method is only invoked by - * {@link FanOutOneBlockAsyncDFSOutput.AckHandler#channelRead0}, we simulate slow response - * from datanode by just permit the acks from the first responding data node to forward,and - * discard the acks from other slow data nodes. - */ - boolean success = this.alreadyNotifiedChannelRef.compareAndSet(null, channel); - if (channel.equals(this.alreadyNotifiedChannelRef.get())) { - super.completed(channel); - } - - if (success) { - /** - * Here we tell the test method we could stop the data node now which send the first ack. - */ - try { - stopCyclicBarrier.await(); - } catch (Throwable e) { - throw new RuntimeException(e); - } - } - } - - MyFanOutOneBlockAsyncDFSOutput(Configuration conf, DistributedFileSystem dfs, - DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId, - LocatedBlock locatedBlock, Encryptor encryptor, Map datanodeInfoMap, - DataChecksum summer, ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) { - super(conf, dfs, client, namenode, clientName, src, fileId, locatedBlock, encryptor, - datanodeInfoMap, summer, alloc, streamSlowMonitor); - this.datanodeInfoMap = datanodeInfoMap; - - } - - } - } From 7ed13c4e7fdbb44fbd63f29e8ce73bec49f4a85c Mon Sep 17 00:00:00 2001 From: comnetwork Date: Mon, 24 Jan 2022 12:47:45 +0800 Subject: [PATCH 04/15] fix the tests --- .../hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 764c92cda006..acbc18624d90 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -71,7 +71,6 @@ import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; - @Category({ MiscTests.class, MediumTests.class }) public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { From 2cc37b3aacdcd771481b8608debcdd85d26ea5ad Mon Sep 17 00:00:00 2001 From: comnetwork Date: Mon, 24 Jan 2022 14:48:31 +0800 Subject: [PATCH 05/15] fix checkstyle --- .../hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java | 3 +-- .../hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java | 5 ++--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 0fc12495ac56..c5ca3adb2e40 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -26,6 +26,7 @@ import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE; +import com.google.errorprone.annotations.RestrictedApi; import java.io.IOException; import java.io.InterruptedIOException; import java.nio.ByteBuffer; @@ -62,8 +63,6 @@ import org.apache.hadoop.util.DataChecksum; import org.apache.yetus.audience.InterfaceAudience; -import com.google.errorprone.annotations.RestrictedApi; - import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Throwables; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index acbc18624d90..5cba605c651a 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -44,7 +44,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutput.Callback; import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -337,9 +336,9 @@ public void testFlushStuckWhenOneDataNodeShutdown() throws Exception { ThreadLocalRandom.current().nextBytes(b); out.write(b, 0, b.length); CompletableFuture future = out.flush(false); - Deque ackQueue = out.getWaitingAckQueue(); + Deque ackQueue = out.getWaitingAckQueue(); assertTrue(ackQueue.size() == 1); - Callback callback = ackQueue.getFirst(); + FanOutOneBlockAsyncDFSOutput.Callback callback = ackQueue.getFirst(); while (callback.getUnfinishedReplicas().size() != 2) { Thread.sleep(1000); } From 220b87ee90e9620933ad83d5a74b631a3d5183d7 Mon Sep 17 00:00:00 2001 From: comnetwork Date: Mon, 24 Jan 2022 18:17:47 +0800 Subject: [PATCH 06/15] fix test --- .../TestFanOutOneBlockAsyncDFSOutput.java | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 5cba605c651a..700c8bf9dff0 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -307,28 +307,17 @@ public void testFlushStuckWhenOneDataNodeShutdown() throws Exception { * to dn2 and dn3. */ final Channel spiedDN2Channel = Mockito.spy(dn2Channel); - Mockito.doAnswer((invocation) -> { - return null; - }).when(spiedDN2Channel).write(Mockito.any()); - Mockito.doAnswer((invocation) -> { - return null; - }).when(spiedDN2Channel).writeAndFlush(Mockito.any()); + ignoreWriteMessage(spiedDN2Channel); assertTrue(iterator.hasNext()); Map.Entry dn3Entry = iterator.next(); Channel dn3Channel= dn3Entry.getKey(); DatanodeInfo dn3DatanodeInfo = dn3Entry.getValue(); final Channel spiedDN3Channel = Mockito.spy(dn3Channel); - Mockito.doAnswer((invocation) -> { - return null; - }).when(spiedDN3Channel).write(Mockito.any()); - Mockito.doAnswer((invocation) -> { - return null; - }).when(spiedDN3Channel).writeAndFlush(Mockito.any()); + ignoreWriteMessage(spiedDN3Channel); datanodeInfoMap.remove(dn2Channel); datanodeInfoMap.remove(dn3Channel); - datanodeInfoMap.put(spiedDN2Channel, dn2DatanodeInfo); datanodeInfoMap.put(spiedDN3Channel, dn3DatanodeInfo); @@ -373,6 +362,15 @@ public void testFlushStuckWhenOneDataNodeShutdown() throws Exception { } + private static void ignoreWriteMessage(Channel spiedChannel) { + Mockito.doAnswer((invocation) -> { + return null; + }).when(spiedChannel).write(Mockito.any()); + Mockito.doAnswer((invocation) -> { + return null; + }).when(spiedChannel).writeAndFlush(Mockito.any()); + } + private static DataNodeProperties findAndKillFirstDataNode( DatanodeInfo firstDatanodeInfo) { assertTrue(firstDatanodeInfo != null); From 51fba8372ed113cc96348d4319bf71f539be6011 Mon Sep 17 00:00:00 2001 From: comnetwork Date: Thu, 27 Jan 2022 12:17:21 +0800 Subject: [PATCH 07/15] update tests --- .../TestFanOutOneBlockAsyncDFSOutput.java | 81 ++++++++++++------- 1 file changed, 50 insertions(+), 31 deletions(-) diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 700c8bf9dff0..e28b54a25b5d 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -60,15 +60,16 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; -import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.channel.Channel; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter; @Category({ MiscTests.class, MediumTests.class }) public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { @@ -280,7 +281,33 @@ public void testWriteLargeChunk() throws IOException, InterruptedException, Exec } /** - * This test is for HBASE-26679. + *
+   * This test is for HBASE-26679. Consider there are two dataNodes: dn1 and dn2,dn2 is a slow DN.
+   * The threads sequence before HBASE-26679 is:
+   * 1.We write some data to {@link FanOutOneBlockAsyncDFSOutput} and then flush it, there are one
+   *   {@link FanOutOneBlockAsyncDFSOutput.Callback} in
+   *   {@link FanOutOneBlockAsyncDFSOutput#waitingAckQueue}.
+   * 2.The ack from dn1 arrives firstly and triggers Netty to invoke
+   *   {@link FanOutOneBlockAsyncDFSOutput#completed} with dn1's channel, then in
+   *   {@link FanOutOneBlockAsyncDFSOutput#completed}, dn1's channel is removed from
+   *   {@link FanOutOneBlockAsyncDFSOutput.Callback#unfinishedReplicas}.
+   * 3.But dn2 responds slowly, before dn2 sending ack,dn1 is shut down or have a exception,
+   *   so {@link FanOutOneBlockAsyncDFSOutput#failed} is triggered by Netty with dn1's channel,
+   *   and because the {@link FanOutOneBlockAsyncDFSOutput.Callback#unfinishedReplicas} does not
+   *   contain dn1's channel,the {@link FanOutOneBlockAsyncDFSOutput.Callback} is skipped in
+   *   {@link FanOutOneBlockAsyncDFSOutput#failed} method,and
+   *   {@link FanOutOneBlockAsyncDFSOutput#state} is set to
+   *   {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN},and dn1,dn2 are all closed at the end of
+   *   {@link FanOutOneBlockAsyncDFSOutput#failed}.
+   * 4.{@link FanOutOneBlockAsyncDFSOutput#failed} is triggered again by dn2 because it is closed,
+   *   but because {@link FanOutOneBlockAsyncDFSOutput#state} is already
+   *   {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN},the whole
+   *   {@link FanOutOneBlockAsyncDFSOutput#failed} is skipped. So wait on the future
+   *   returned by {@link FanOutOneBlockAsyncDFSOutput#flush} would be stuck for ever.
+   * After HBASE-26679, for above step 4,even if the {@link FanOutOneBlockAsyncDFSOutput#state}
+   * is already {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN}, we would still try to trigger
+   * {@link FanOutOneBlockAsyncDFSOutput.Callback#future}.
+   * 
*/ @Test public void testFlushStuckWhenOneDataNodeShutdown() throws Exception { @@ -291,7 +318,7 @@ public void testFlushStuckWhenOneDataNodeShutdown() throws Exception { try { FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, - true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + true, false, (short) 2, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); Map datanodeInfoMap = out.getDatanodeInfoMap(); Iterator> iterator = datanodeInfoMap.entrySet().iterator(); assertTrue(iterator.hasNext()); @@ -301,34 +328,32 @@ public void testFlushStuckWhenOneDataNodeShutdown() throws Exception { assertTrue(iterator.hasNext()); Map.Entry dn2Entry= iterator.next(); Channel dn2Channel= dn2Entry.getKey(); - DatanodeInfo dn2DatanodeInfo = dn2Entry.getValue(); + /** - * Here we simulate slow response from dn2 and dn3 by just discard the message when flushing - * to dn2 and dn3. + * Here we add a {@link ChannelInboundHandlerAdapter} to eat all the responses to simulate a + * slow dn2. */ - final Channel spiedDN2Channel = Mockito.spy(dn2Channel); - ignoreWriteMessage(spiedDN2Channel); - - assertTrue(iterator.hasNext()); - Map.Entry dn3Entry = iterator.next(); - Channel dn3Channel= dn3Entry.getKey(); - DatanodeInfo dn3DatanodeInfo = dn3Entry.getValue(); - final Channel spiedDN3Channel = Mockito.spy(dn3Channel); - ignoreWriteMessage(spiedDN3Channel); + dn2Channel.pipeline().addFirst(new ChannelInboundHandlerAdapter() { - datanodeInfoMap.remove(dn2Channel); - datanodeInfoMap.remove(dn3Channel); - datanodeInfoMap.put(spiedDN2Channel, dn2DatanodeInfo); - datanodeInfoMap.put(spiedDN3Channel, dn3DatanodeInfo); + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (!(msg instanceof ByteBuf)) { + ctx.fireChannelRead(msg); + } + } + }); byte[] b = new byte[10]; ThreadLocalRandom.current().nextBytes(b); out.write(b, 0, b.length); CompletableFuture future = out.flush(false); + /** + * Wait for ack from dn1. + */ Deque ackQueue = out.getWaitingAckQueue(); assertTrue(ackQueue.size() == 1); FanOutOneBlockAsyncDFSOutput.Callback callback = ackQueue.getFirst(); - while (callback.getUnfinishedReplicas().size() != 2) { + while (callback.getUnfinishedReplicas().size() != 1) { Thread.sleep(1000); } @@ -338,6 +363,10 @@ public void testFlushStuckWhenOneDataNodeShutdown() throws Exception { firstDataNodeProperties = findAndKillFirstDataNode(dn1DatanodeInfo); assertTrue(firstDataNodeProperties != null); try { + /** + * Before HBASE-26679,here we should be stuck, after HBASE-26679,we would fail soon with + * {@link ExecutionException}. + */ future.get(); fail(); } catch (ExecutionException e) { @@ -362,15 +391,6 @@ public void testFlushStuckWhenOneDataNodeShutdown() throws Exception { } - private static void ignoreWriteMessage(Channel spiedChannel) { - Mockito.doAnswer((invocation) -> { - return null; - }).when(spiedChannel).write(Mockito.any()); - Mockito.doAnswer((invocation) -> { - return null; - }).when(spiedChannel).writeAndFlush(Mockito.any()); - } - private static DataNodeProperties findAndKillFirstDataNode( DatanodeInfo firstDatanodeInfo) { assertTrue(firstDatanodeInfo != null); @@ -384,7 +404,6 @@ private static DataNodeProperties findAndKillFirstDataNode( index++; } assertTrue(foundIndexes.size() == 1); - return CLUSTER.stopDataNode(foundIndexes.get(0)); } From 3c252739041e20085182e8faa72c06f5dd56903c Mon Sep 17 00:00:00 2001 From: comnetwork Date: Thu, 27 Jan 2022 12:19:25 +0800 Subject: [PATCH 08/15] update tests --- .../hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index e28b54a25b5d..74a2d48a3235 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -310,7 +310,7 @@ public void testWriteLargeChunk() throws IOException, InterruptedException, Exec * */ @Test - public void testFlushStuckWhenOneDataNodeShutdown() throws Exception { + public void testFlushStuckWhenOneDataNodeFailedBeforeOtherDataNodeAck() throws Exception { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); From fd66b2483d9ddf21f5d3a3773e69b03c35d18b62 Mon Sep 17 00:00:00 2001 From: comnetwork Date: Thu, 27 Jan 2022 15:09:28 +0800 Subject: [PATCH 09/15] update tests --- .../asyncfs/FanOutOneBlockAsyncDFSOutput.java | 11 ++------ .../TestFanOutOneBlockAsyncDFSOutput.java | 27 +++++++++++++------ 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index c5ca3adb2e40..ed21614173ea 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -133,7 +133,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { private final ByteBufAllocator alloc; - protected static final class Callback { + private static final class Callback { private final CompletableFuture future; @@ -159,13 +159,6 @@ public Callback(CompletableFuture future, long ackedLength, replicas.stream().map(Channel::id).forEachOrdered(unfinishedReplicas::add); } } - - @RestrictedApi(explanation = "Should only be called in tests", link = "", - allowedOnPath = ".*/src/test/.*") - Set getUnfinishedReplicas() { - return this.unfinishedReplicas; - } - } private final ConcurrentLinkedDeque waitingAckQueue = new ConcurrentLinkedDeque<>(); @@ -196,7 +189,7 @@ private enum State { private final StreamSlowMonitor streamSlowMonitor; // all lock-free to make it run faster - protected void completed(Channel channel) { + private void completed(Channel channel) { for (Iterator iter = waitingAckQueue.iterator(); iter.hasNext();) { Callback c = iter.next(); // if the current unfinished replicas does not contain us then it means that we have already diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 74a2d48a3235..b68918b2ee4d 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -31,12 +31,12 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; -import java.util.Deque; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.fs.FSDataInputStream; @@ -319,11 +319,28 @@ public void testFlushStuckWhenOneDataNodeFailedBeforeOtherDataNodeAck() throws E FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 2, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + final CyclicBarrier dn1AckReceivedCyclicBarrier = new CyclicBarrier(2); Map datanodeInfoMap = out.getDatanodeInfoMap(); Iterator> iterator = datanodeInfoMap.entrySet().iterator(); assertTrue(iterator.hasNext()); Map.Entry dn1Entry= iterator.next(); + Channel dn1Channel = dn1Entry.getKey(); DatanodeInfo dn1DatanodeInfo = dn1Entry.getValue(); + List protobufDecoderNames = new ArrayList(); + dn1Channel.pipeline().forEach((entry) -> { + if (ProtobufDecoder.class.isInstance(entry.getValue())) { + protobufDecoderNames.add(entry.getKey()); + } + }); + assertTrue(protobufDecoderNames.size() == 1); + dn1Channel.pipeline().addAfter(protobufDecoderNames.get(0), "dn1AckReceivedHandler", + new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + super.channelRead(ctx, msg); + dn1AckReceivedCyclicBarrier.await(); + } + }); assertTrue(iterator.hasNext()); Map.Entry dn2Entry= iterator.next(); @@ -350,13 +367,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception /** * Wait for ack from dn1. */ - Deque ackQueue = out.getWaitingAckQueue(); - assertTrue(ackQueue.size() == 1); - FanOutOneBlockAsyncDFSOutput.Callback callback = ackQueue.getFirst(); - while (callback.getUnfinishedReplicas().size() != 1) { - Thread.sleep(1000); - } - + dn1AckReceivedCyclicBarrier.await(); /** * First ack is received from dn1,we could stop dn1 now. */ From e6b0ac8377227f9431221d7919cf23de661ac31f Mon Sep 17 00:00:00 2001 From: comnetwork Date: Thu, 27 Jan 2022 15:14:39 +0800 Subject: [PATCH 10/15] fix checkstyle --- .../hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index b68918b2ee4d..b08d84e6f6b8 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -65,11 +65,12 @@ import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter; import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; -import org.apache.hbase.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter; + @Category({ MiscTests.class, MediumTests.class }) public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { From 331368936163a148c38c639bd560b852b78f4651 Mon Sep 17 00:00:00 2001 From: comnetwork Date: Thu, 27 Jan 2022 15:18:25 +0800 Subject: [PATCH 11/15] update tests --- .../hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java | 7 ------- .../hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java | 3 +-- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index ed21614173ea..5885ea685b32 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -32,7 +32,6 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; -import java.util.Deque; import java.util.Iterator; import java.util.Map; import java.util.Set; @@ -614,10 +613,4 @@ public long getSyncedLength() { Map getDatanodeInfoMap() { return this.datanodeInfoMap; } - - @RestrictedApi(explanation = "Should only be called in tests", link = "", - allowedOnPath = ".*/src/test/.*") - Deque getWaitingAckQueue() { - return this.waitingAckQueue; - } } diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index b08d84e6f6b8..d1753695e72c 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -327,7 +327,7 @@ public void testFlushStuckWhenOneDataNodeFailedBeforeOtherDataNodeAck() throws E Map.Entry dn1Entry= iterator.next(); Channel dn1Channel = dn1Entry.getKey(); DatanodeInfo dn1DatanodeInfo = dn1Entry.getValue(); - List protobufDecoderNames = new ArrayList(); + final List protobufDecoderNames = new ArrayList(); dn1Channel.pipeline().forEach((entry) -> { if (ProtobufDecoder.class.isInstance(entry.getValue())) { protobufDecoderNames.add(entry.getKey()); @@ -400,7 +400,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception CLUSTER.restartDataNode(firstDataNodeProperties); } } - } private static DataNodeProperties findAndKillFirstDataNode( From 4f3d56c8dee14d73d0ad75e22dcb072143d2a265 Mon Sep 17 00:00:00 2001 From: comnetwork Date: Thu, 27 Jan 2022 17:37:44 +0800 Subject: [PATCH 12/15] fix checkstyle --- .../hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index d1753695e72c..dec01ef223d5 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -336,12 +336,12 @@ public void testFlushStuckWhenOneDataNodeFailedBeforeOtherDataNodeAck() throws E assertTrue(protobufDecoderNames.size() == 1); dn1Channel.pipeline().addAfter(protobufDecoderNames.get(0), "dn1AckReceivedHandler", new ChannelInboundHandlerAdapter() { - @Override + @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg); dn1AckReceivedCyclicBarrier.await(); - } - }); + } + }); assertTrue(iterator.hasNext()); Map.Entry dn2Entry= iterator.next(); From d9b6a0e9f0b91c04836797a1d1d48efe2abc73d9 Mon Sep 17 00:00:00 2001 From: comnetwork Date: Fri, 28 Jan 2022 11:10:41 +0800 Subject: [PATCH 13/15] separate test class --- .../TestFanOutOneBlockAsyncDFSOutput.java | 146 ---------- .../TestFanOutOneBlockAsyncDFSOutputHang.java | 249 ++++++++++++++++++ 2 files changed, 249 insertions(+), 146 deletions(-) create mode 100644 hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index dec01ef223d5..341b4ee02a49 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -22,7 +22,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.FileNotFoundException; @@ -31,12 +30,9 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Random; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.fs.FSDataInputStream; @@ -50,7 +46,6 @@ import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.ipc.RemoteException; import org.junit.AfterClass; @@ -62,10 +57,7 @@ import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.channel.Channel; -import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; -import org.apache.hbase.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter; import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; @@ -280,142 +272,4 @@ public void testWriteLargeChunk() throws IOException, InterruptedException, Exec } assertArrayEquals(b, actual); } - - /** - *
-   * This test is for HBASE-26679. Consider there are two dataNodes: dn1 and dn2,dn2 is a slow DN.
-   * The threads sequence before HBASE-26679 is:
-   * 1.We write some data to {@link FanOutOneBlockAsyncDFSOutput} and then flush it, there are one
-   *   {@link FanOutOneBlockAsyncDFSOutput.Callback} in
-   *   {@link FanOutOneBlockAsyncDFSOutput#waitingAckQueue}.
-   * 2.The ack from dn1 arrives firstly and triggers Netty to invoke
-   *   {@link FanOutOneBlockAsyncDFSOutput#completed} with dn1's channel, then in
-   *   {@link FanOutOneBlockAsyncDFSOutput#completed}, dn1's channel is removed from
-   *   {@link FanOutOneBlockAsyncDFSOutput.Callback#unfinishedReplicas}.
-   * 3.But dn2 responds slowly, before dn2 sending ack,dn1 is shut down or have a exception,
-   *   so {@link FanOutOneBlockAsyncDFSOutput#failed} is triggered by Netty with dn1's channel,
-   *   and because the {@link FanOutOneBlockAsyncDFSOutput.Callback#unfinishedReplicas} does not
-   *   contain dn1's channel,the {@link FanOutOneBlockAsyncDFSOutput.Callback} is skipped in
-   *   {@link FanOutOneBlockAsyncDFSOutput#failed} method,and
-   *   {@link FanOutOneBlockAsyncDFSOutput#state} is set to
-   *   {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN},and dn1,dn2 are all closed at the end of
-   *   {@link FanOutOneBlockAsyncDFSOutput#failed}.
-   * 4.{@link FanOutOneBlockAsyncDFSOutput#failed} is triggered again by dn2 because it is closed,
-   *   but because {@link FanOutOneBlockAsyncDFSOutput#state} is already
-   *   {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN},the whole
-   *   {@link FanOutOneBlockAsyncDFSOutput#failed} is skipped. So wait on the future
-   *   returned by {@link FanOutOneBlockAsyncDFSOutput#flush} would be stuck for ever.
-   * After HBASE-26679, for above step 4,even if the {@link FanOutOneBlockAsyncDFSOutput#state}
-   * is already {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN}, we would still try to trigger
-   * {@link FanOutOneBlockAsyncDFSOutput.Callback#future}.
-   * 
- */ - @Test - public void testFlushStuckWhenOneDataNodeFailedBeforeOtherDataNodeAck() throws Exception { - Path f = new Path("/" + name.getMethodName()); - EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - - DataNodeProperties firstDataNodeProperties = null; - try { - FanOutOneBlockAsyncDFSOutput out = - FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, - true, false, (short) 2, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); - final CyclicBarrier dn1AckReceivedCyclicBarrier = new CyclicBarrier(2); - Map datanodeInfoMap = out.getDatanodeInfoMap(); - Iterator> iterator = datanodeInfoMap.entrySet().iterator(); - assertTrue(iterator.hasNext()); - Map.Entry dn1Entry= iterator.next(); - Channel dn1Channel = dn1Entry.getKey(); - DatanodeInfo dn1DatanodeInfo = dn1Entry.getValue(); - final List protobufDecoderNames = new ArrayList(); - dn1Channel.pipeline().forEach((entry) -> { - if (ProtobufDecoder.class.isInstance(entry.getValue())) { - protobufDecoderNames.add(entry.getKey()); - } - }); - assertTrue(protobufDecoderNames.size() == 1); - dn1Channel.pipeline().addAfter(protobufDecoderNames.get(0), "dn1AckReceivedHandler", - new ChannelInboundHandlerAdapter() { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - super.channelRead(ctx, msg); - dn1AckReceivedCyclicBarrier.await(); - } - }); - - assertTrue(iterator.hasNext()); - Map.Entry dn2Entry= iterator.next(); - Channel dn2Channel= dn2Entry.getKey(); - - /** - * Here we add a {@link ChannelInboundHandlerAdapter} to eat all the responses to simulate a - * slow dn2. - */ - dn2Channel.pipeline().addFirst(new ChannelInboundHandlerAdapter() { - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (!(msg instanceof ByteBuf)) { - ctx.fireChannelRead(msg); - } - } - }); - - byte[] b = new byte[10]; - ThreadLocalRandom.current().nextBytes(b); - out.write(b, 0, b.length); - CompletableFuture future = out.flush(false); - /** - * Wait for ack from dn1. - */ - dn1AckReceivedCyclicBarrier.await(); - /** - * First ack is received from dn1,we could stop dn1 now. - */ - firstDataNodeProperties = findAndKillFirstDataNode(dn1DatanodeInfo); - assertTrue(firstDataNodeProperties != null); - try { - /** - * Before HBASE-26679,here we should be stuck, after HBASE-26679,we would fail soon with - * {@link ExecutionException}. - */ - future.get(); - fail(); - } catch (ExecutionException e) { - assertTrue(e != null); - LOG.info("expected exception caught when get future", e); - } - /** - * Make sure all the data node channel are closed. - */ - datanodeInfoMap.keySet().forEach(ch -> { - try { - ch.closeFuture().get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - }); - } finally { - if (firstDataNodeProperties != null) { - CLUSTER.restartDataNode(firstDataNodeProperties); - } - } - } - - private static DataNodeProperties findAndKillFirstDataNode( - DatanodeInfo firstDatanodeInfo) { - assertTrue(firstDatanodeInfo != null); - ArrayList dataNodes = CLUSTER.getDataNodes(); - ArrayList foundIndexes = new ArrayList(); - int index = 0; - for (DataNode dataNode : dataNodes) { - if (firstDatanodeInfo.getXferAddr().equals(dataNode.getDatanodeId().getXferAddr())) { - foundIndexes.add(index); - } - index++; - } - assertTrue(foundIndexes.size() == 1); - return CLUSTER.stopDataNode(foundIndexes.get(0)); - } - } diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java new file mode 100644 index 000000000000..496dc127fded --- /dev/null +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.asyncfs; + + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; +import org.apache.hbase.thirdparty.io.netty.channel.Channel; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; +import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; +import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; + + +/** + * Testcase for HBASE-26679, here we introduce a separate test class and not put the testcase in + * {@link TestFanOutOneBlockAsyncDFSOutput} because we will send heartbeat to DN when there is no + * out going packet, the timeout is controlled by + * {@link TestFanOutOneBlockAsyncDFSOutput#READ_TIMEOUT_MS},which is 2 seconds, it will keep sending + * package out and DN will respond immedately and then mess up the testing handler added by us. So + * in this test class we use the default value for timeout which is 60 seconds and it is enough for + * this test. + */ +@Category({ MiscTests.class, MediumTests.class }) +public class TestFanOutOneBlockAsyncDFSOutputHang extends AsyncFSTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestFanOutOneBlockAsyncDFSOutputHang.class); + + private static final Logger LOG = + LoggerFactory.getLogger(TestFanOutOneBlockAsyncDFSOutputHang.class); + + private static DistributedFileSystem FS; + + private static EventLoopGroup EVENT_LOOP_GROUP; + + private static Class CHANNEL_CLASS; + + private static StreamSlowMonitor MONITOR; + + private static FanOutOneBlockAsyncDFSOutput OUT; + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void setUp() throws Exception { + startMiniDFSCluster(2); + FS = CLUSTER.getFileSystem(); + EVENT_LOOP_GROUP = new NioEventLoopGroup(); + CHANNEL_CLASS = NioSocketChannel.class; + MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor"); + Path f = new Path("/testHang"); + EventLoop eventLoop = EVENT_LOOP_GROUP.next(); + OUT = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 2, + FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + } + + @AfterClass + public static void tearDown() throws IOException, InterruptedException { + if (OUT != null) { + OUT.recoverAndClose(null); + } + if (EVENT_LOOP_GROUP != null) { + EVENT_LOOP_GROUP.shutdownGracefully().sync(); + } + shutdownMiniDFSCluster(); + } + + /** + *
+   * This test is for HBASE-26679. Consider there are two dataNodes: dn1 and dn2,dn2 is a slow DN.
+   * The threads sequence before HBASE-26679 is:
+   * 1.We write some data to {@link FanOutOneBlockAsyncDFSOutput} and then flush it, there are one
+   *   {@link FanOutOneBlockAsyncDFSOutput.Callback} in
+   *   {@link FanOutOneBlockAsyncDFSOutput#waitingAckQueue}.
+   * 2.The ack from dn1 arrives firstly and triggers Netty to invoke
+   *   {@link FanOutOneBlockAsyncDFSOutput#completed} with dn1's channel, then in
+   *   {@link FanOutOneBlockAsyncDFSOutput#completed}, dn1's channel is removed from
+   *   {@link FanOutOneBlockAsyncDFSOutput.Callback#unfinishedReplicas}.
+   * 3.But dn2 responds slowly, before dn2 sending ack,dn1 is shut down or have a exception,
+   *   so {@link FanOutOneBlockAsyncDFSOutput#failed} is triggered by Netty with dn1's channel,
+   *   and because the {@link FanOutOneBlockAsyncDFSOutput.Callback#unfinishedReplicas} does not
+   *   contain dn1's channel,the {@link FanOutOneBlockAsyncDFSOutput.Callback} is skipped in
+   *   {@link FanOutOneBlockAsyncDFSOutput#failed} method,and
+   *   {@link FanOutOneBlockAsyncDFSOutput#state} is set to
+   *   {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN},and dn1,dn2 are all closed at the end of
+   *   {@link FanOutOneBlockAsyncDFSOutput#failed}.
+   * 4.{@link FanOutOneBlockAsyncDFSOutput#failed} is triggered again by dn2 because it is closed,
+   *   but because {@link FanOutOneBlockAsyncDFSOutput#state} is already
+   *   {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN},the whole
+   *   {@link FanOutOneBlockAsyncDFSOutput#failed} is skipped. So wait on the future
+   *   returned by {@link FanOutOneBlockAsyncDFSOutput#flush} would be stuck for ever.
+   * After HBASE-26679, for above step 4,even if the {@link FanOutOneBlockAsyncDFSOutput#state}
+   * is already {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN}, we would still try to trigger
+   * {@link FanOutOneBlockAsyncDFSOutput.Callback#future}.
+   * 
+ */ + @Test + public void testFlushHangWhenOneDataNodeFailedBeforeOtherDataNodeAck() throws Exception { + + DataNodeProperties firstDataNodeProperties = null; + try { + + final CyclicBarrier dn1AckReceivedCyclicBarrier = new CyclicBarrier(2); + Map datanodeInfoMap = OUT.getDatanodeInfoMap(); + Iterator> iterator = datanodeInfoMap.entrySet().iterator(); + assertTrue(iterator.hasNext()); + Map.Entry dn1Entry = iterator.next(); + Channel dn1Channel = dn1Entry.getKey(); + DatanodeInfo dn1DatanodeInfo = dn1Entry.getValue(); + final List protobufDecoderNames = new ArrayList(); + dn1Channel.pipeline().forEach((entry) -> { + if (ProtobufDecoder.class.isInstance(entry.getValue())) { + protobufDecoderNames.add(entry.getKey()); + } + }); + assertTrue(protobufDecoderNames.size() == 1); + dn1Channel.pipeline().addAfter(protobufDecoderNames.get(0), "dn1AckReceivedHandler", + new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + super.channelRead(ctx, msg); + dn1AckReceivedCyclicBarrier.await(); + } + }); + + assertTrue(iterator.hasNext()); + Map.Entry dn2Entry = iterator.next(); + Channel dn2Channel = dn2Entry.getKey(); + + /** + * Here we add a {@link ChannelInboundHandlerAdapter} to eat all the responses to simulate a + * slow dn2. + */ + dn2Channel.pipeline().addFirst(new ChannelInboundHandlerAdapter() { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (!(msg instanceof ByteBuf)) { + ctx.fireChannelRead(msg); + } + } + }); + + byte[] b = new byte[10]; + ThreadLocalRandom.current().nextBytes(b); + OUT.write(b, 0, b.length); + CompletableFuture future = OUT.flush(false); + /** + * Wait for ack from dn1. + */ + dn1AckReceivedCyclicBarrier.await(); + /** + * First ack is received from dn1,we could stop dn1 now. + */ + firstDataNodeProperties = findAndKillFirstDataNode(dn1DatanodeInfo); + assertTrue(firstDataNodeProperties != null); + try { + /** + * Before HBASE-26679,here we should be stuck, after HBASE-26679,we would fail soon with + * {@link ExecutionException}. + */ + future.get(); + fail(); + } catch (ExecutionException e) { + assertTrue(e != null); + LOG.info("expected exception caught when get future", e); + } + /** + * Make sure all the data node channel are closed. + */ + datanodeInfoMap.keySet().forEach(ch -> { + try { + ch.closeFuture().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + } finally { + if (firstDataNodeProperties != null) { + CLUSTER.restartDataNode(firstDataNodeProperties); + } + } + } + + private static DataNodeProperties findAndKillFirstDataNode(DatanodeInfo firstDatanodeInfo) { + assertTrue(firstDatanodeInfo != null); + ArrayList dataNodes = CLUSTER.getDataNodes(); + ArrayList foundIndexes = new ArrayList(); + int index = 0; + for (DataNode dataNode : dataNodes) { + if (firstDatanodeInfo.getXferAddr().equals(dataNode.getDatanodeId().getXferAddr())) { + foundIndexes.add(index); + } + index++; + } + assertTrue(foundIndexes.size() == 1); + return CLUSTER.stopDataNode(foundIndexes.get(0)); + } + +} From 78d56c10b1a7054db5934e1abad25cebc3180f8e Mon Sep 17 00:00:00 2001 From: comnetwork Date: Fri, 28 Jan 2022 11:14:05 +0800 Subject: [PATCH 14/15] update test --- .../hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 341b4ee02a49..8533d38bae09 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -57,13 +57,13 @@ import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; - @Category({ MiscTests.class, MediumTests.class }) public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { From 2b8f92b2ddec2d4632a41fa0e3406288836848db Mon Sep 17 00:00:00 2001 From: comnetwork Date: Fri, 28 Jan 2022 11:22:17 +0800 Subject: [PATCH 15/15] update test --- .../hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 80f5031a974d..7c62d67c6cee 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -505,8 +505,8 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d } Encryptor encryptor = createEncryptor(conf, stat, client); FanOutOneBlockAsyncDFSOutput output = - new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src, - stat.getFileId(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor); + new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src, + stat.getFileId(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor); succ = true; return output; } catch (RemoteException e) {