Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ private enum State {
private final StreamSlowMonitor streamSlowMonitor;

// all lock-free to make it run faster
private void completed(Channel channel) {
protected void completed(Channel channel) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need to make it protected?

Copy link
Contributor Author

@comnetwork comnetwork Jan 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had fix it

for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
Callback c = iter.next();
// if the current unfinished replicas does not contain us then it means that we have already
Expand Down Expand Up @@ -231,7 +231,11 @@ private void completed(Channel channel) {
// so that the implementation will not burn up our brain as there are multiple state changes and
// checks.
private synchronized void failed(Channel channel, Supplier<Throwable> errorSupplier) {
if (state == State.BROKEN || state == State.CLOSED) {
if (state == State.CLOSED) {
return;
}
if (state == State.BROKEN) {
failWaitingAckQueue(channel, errorSupplier);
return;
}
if (state == State.CLOSING) {
Expand All @@ -243,6 +247,11 @@ private synchronized void failed(Channel channel, Supplier<Throwable> errorSuppl
}
// disable further write, and fail all pending ack.
state = State.BROKEN;
failWaitingAckQueue(channel, errorSupplier);
datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
}

private void failWaitingAckQueue(Channel channel, Supplier<Throwable> errorSupplier) {
Throwable error = errorSupplier.get();
for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
Callback c = iter.next();
Expand All @@ -259,7 +268,6 @@ private synchronized void failed(Channel channel, Supplier<Throwable> errorSuppl
}
break;
}
datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
}

@Sharable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DistributedFileSystem;
Expand Down Expand Up @@ -124,6 +125,8 @@ private FanOutOneBlockAsyncDFSOutputHelper() {
public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries";

public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10;

public static final String ASYNC_DFS_OUTPUT_CLASS_NAME = "hbase.fs.async.output.class";
// use pooled allocator for performance.
private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;

Expand Down Expand Up @@ -505,7 +508,7 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
}
Encryptor encryptor = createEncryptor(conf, stat, client);
FanOutOneBlockAsyncDFSOutput output =
new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src,
doCreateFanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src,
stat.getFileId(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
succ = true;
return output;
Expand Down Expand Up @@ -551,6 +554,22 @@ public void operationComplete(Future<Channel> future) throws Exception {
}
}

private static FanOutOneBlockAsyncDFSOutput doCreateFanOutOneBlockAsyncDFSOutput(
Configuration conf, DistributedFileSystem dfs, DFSClient client, ClientProtocol namenode,
String clientName,
String src, long fileId, LocatedBlock locatedBlock, Encryptor encryptor,
Map<Channel, DatanodeInfo> datanodeInfoMap, DataChecksum summer, ByteBufAllocator alloc,
StreamSlowMonitor streamSlowMonitor) {

Class<? extends FanOutOneBlockAsyncDFSOutput> dfsOutputClass =
conf.getClass(ASYNC_DFS_OUTPUT_CLASS_NAME, FanOutOneBlockAsyncDFSOutput.class,
FanOutOneBlockAsyncDFSOutput.class);
return ReflectionUtils.newInstance(dfsOutputClass,
new Object[] { conf, dfs, client, namenode, clientName, src, fileId, locatedBlock, encryptor,
datanodeInfoMap, summer, alloc, streamSlowMonitor });

}

/**
* Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it
* inside an {@link EventLoop}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.FileNotFoundException;
Expand All @@ -31,10 +32,16 @@
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -44,10 +51,15 @@
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.DataChecksum;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
Expand All @@ -57,7 +69,7 @@
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
Expand Down Expand Up @@ -272,4 +284,125 @@ public void testWriteLargeChunk() throws IOException, InterruptedException, Exec
}
assertArrayEquals(b, actual);
}

/**
* This test is for HBASE-26679.
*/
@Test
public void testFlushStuckWhenOneDataNodeShutdown() throws Exception {
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();

Configuration conf = FS.getConf();
conf.set(FanOutOneBlockAsyncDFSOutputHelper.ASYNC_DFS_OUTPUT_CLASS_NAME,
MyFanOutOneBlockAsyncDFSOutput.class.getName());

DataNodeProperties firstDataNodeProperties = null;
try {
MyFanOutOneBlockAsyncDFSOutput out =
(MyFanOutOneBlockAsyncDFSOutput) FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);

byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b);
out.write(b, 0, b.length);
CompletableFuture<Long> future = out.flush(false);
/**
* First ack is received from dataNode1,we could stop dataNode1 now.
*/
out.stopCyclicBarrier.await();
Channel firstDataNodeChannel = out.alreadyNotifiedChannelRef.get();
assertTrue(firstDataNodeChannel != null);
firstDataNodeProperties = findAndKillFirstDataNode(out.datanodeInfoMap, firstDataNodeChannel);
assertTrue(firstDataNodeProperties != null);
try {
future.get();
fail();
} catch (ExecutionException e) {
assertTrue(e != null);
LOG.info("expected exception caught when get future", e);
}
/**
* Make sure all the data node channel are closed.
*/
out.datanodeInfoMap.keySet().forEach(ch -> {
try {
ch.closeFuture().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
} finally {
conf.unset(FanOutOneBlockAsyncDFSOutputHelper.ASYNC_DFS_OUTPUT_CLASS_NAME);
if (firstDataNodeProperties != null) {
CLUSTER.restartDataNode(firstDataNodeProperties);
}
}

}

private static DataNodeProperties findAndKillFirstDataNode(
Map<Channel, DatanodeInfo> datanodeInfoMap,
Channel firstChannel) {
DatanodeInfo firstDatanodeInfo = datanodeInfoMap.get(firstChannel);
assertTrue(firstDatanodeInfo != null);
ArrayList<DataNode> dataNodes = CLUSTER.getDataNodes();
ArrayList<Integer> foundIndexes = new ArrayList<Integer>();
int index = 0;
for (DataNode dataNode : dataNodes) {
if (firstDatanodeInfo.getXferAddr().equals(dataNode.getDatanodeId().getXferAddr())) {
foundIndexes.add(index);
}
index++;
}
assertTrue(foundIndexes.size() == 1);

return CLUSTER.stopDataNode(foundIndexes.get(0));
}

static class MyFanOutOneBlockAsyncDFSOutput extends FanOutOneBlockAsyncDFSOutput {

private final AtomicReference<Channel> alreadyNotifiedChannelRef =
new AtomicReference<Channel>(null);
private final CyclicBarrier stopCyclicBarrier = new CyclicBarrier(2);
private final Map<Channel, DatanodeInfo> datanodeInfoMap;

@Override
protected void completed(Channel channel) {
/**
* Here it is hard to simulate slow response from datanode because I can not modify the code
* of HDFS, so here because this method is only invoked by
* {@link FanOutOneBlockAsyncDFSOutput.AckHandler#channelRead0}, we simulate slow response
* from datanode by just permit the acks from the first responding data node to forward,and
* discard the acks from other slow data nodes.
*/
boolean success = this.alreadyNotifiedChannelRef.compareAndSet(null, channel);
if (channel.equals(this.alreadyNotifiedChannelRef.get())) {
super.completed(channel);
}

if (success) {
/**
* Here we tell the test method we could stop the data node now which send the first ack.
*/
try {
stopCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}

MyFanOutOneBlockAsyncDFSOutput(Configuration conf, DistributedFileSystem dfs,
DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId,
LocatedBlock locatedBlock, Encryptor encryptor, Map<Channel, DatanodeInfo> datanodeInfoMap,
DataChecksum summer, ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) {
super(conf, dfs, client, namenode, clientName, src, fileId, locatedBlock, encryptor,
datanodeInfoMap, summer, alloc, streamSlowMonitor);
this.datanodeInfoMap = datanodeInfoMap;

}

}

}