diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java index a530ca4a2a0d..cbb0648f3afb 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java @@ -48,11 +48,12 @@ private AsyncFSOutputHelper() { */ public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup, - Class channelClass, StreamSlowMonitor monitor) + Class channelClass, StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException, CommonFSUtils.StreamLacksCapabilityException { if (fs instanceof DistributedFileSystem) { return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f, - overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor); + overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor, + noLocalWrite); } final FSDataOutputStream out; int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 98590173ed2a..d4a71a77a79d 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -445,20 +445,24 @@ public NameNodeException(Throwable cause) { } } - private static EnumSetWritable getCreateFlags(boolean overwrite) { + private static EnumSetWritable getCreateFlags(boolean overwrite, + boolean noLocalWrite) { List flags = new ArrayList<>(); flags.add(CreateFlag.CREATE); if (overwrite) { flags.add(CreateFlag.OVERWRITE); } + if (noLocalWrite) { + flags.add(CreateFlag.NO_LOCAL_WRITE); + } flags.add(CreateFlag.SHOULD_REPLICATE); return new EnumSetWritable<>(EnumSet.copyOf(flags)); } private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, boolean overwrite, boolean createParent, short replication, long blockSize, - EventLoopGroup eventLoopGroup, Class channelClass, StreamSlowMonitor monitor) - throws IOException { + EventLoopGroup eventLoopGroup, Class channelClass, StreamSlowMonitor monitor, + boolean noLocalWrite) throws IOException { Configuration conf = dfs.getConf(); DFSClient client = dfs.getClient(); String clientName = client.getClientName(); @@ -475,7 +479,7 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d try { stat = FILE_CREATOR.create(namenode, src, FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, - getCreateFlags(overwrite), createParent, replication, blockSize, + getCreateFlags(overwrite, noLocalWrite), createParent, replication, blockSize, CryptoProtocolVersion.supported()); } catch (Exception e) { if (e instanceof RemoteException) { @@ -561,14 +565,14 @@ public void operationComplete(Future future) throws Exception { public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup, Class channelClass, - final StreamSlowMonitor monitor) throws IOException { + final StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException { return new FileSystemLinkResolver() { @Override public FanOutOneBlockAsyncDFSOutput doCall(Path p) throws IOException, UnresolvedLinkException { return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, - blockSize, eventLoopGroup, channelClass, monitor); + blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite); } @Override diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 68b8bfa3d9f3..f0910684eddf 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -141,7 +141,7 @@ public void test() throws IOException, InterruptedException, ExecutionException Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); writeAndVerify(FS, f, out); } @@ -154,7 +154,7 @@ public void test0Recover() throws IOException, InterruptedException, ExecutionEx Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); byte[] b = new byte[10]; Bytes.random(b); out.write(b, 0, b.length); @@ -183,7 +183,7 @@ public void testHeartbeat() throws IOException, InterruptedException, ExecutionE Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); Thread.sleep(READ_TIMEOUT_MS * 2); // the connection to datanode should still alive. writeAndVerify(FS, f, out); @@ -198,7 +198,7 @@ public void testCreateParentFailed() throws IOException { EventLoop eventLoop = EVENT_LOOP_GROUP.next(); try { FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, - FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); fail("should fail with parent does not exist"); } catch (RemoteException e) { LOG.info("expected exception caught", e); @@ -220,8 +220,9 @@ public void testConnectToDatanodeFailed() DataNodeProperties dnProp = CLUSTER.stopDataNode(0); Path f = new Path("/test"); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, - f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR)) { + try (FanOutOneBlockAsyncDFSOutput output = + FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, + FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true)) { // should exclude the dead dn when retry so here we only have 2 DNs in pipeline assertEquals(2, output.getPipeline().length); } finally { @@ -251,7 +252,7 @@ public void testExcludeFailedConnectToDatanode() assertEquals(0, excludeDatanodeManager.getExcludeDNs().size()); try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, - FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, streamSlowDNsMonitor)) { + FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, streamSlowDNsMonitor, true)) { // should exclude the dead dn when retry so here we only have 2 DNs in pipeline assertEquals(2, output.getPipeline().length); assertEquals(1, excludeDatanodeManager.getExcludeDNs().size()); @@ -266,7 +267,7 @@ public void testWriteLargeChunk() throws IOException, InterruptedException, Exec Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR); + false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR, true); byte[] b = new byte[50 * 1024 * 1024]; Bytes.random(b); out.write(b); diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java index 77752789dbb3..7f6535a93a93 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java @@ -98,7 +98,7 @@ public static void setUp() throws Exception { Path f = new Path("/testHang"); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); OUT = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 2, - FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); } @AfterClass diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java index d1ce128b118d..4171b60c5b82 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java @@ -65,7 +65,7 @@ public void test() throws IOException, InterruptedException, ExecutionException, Path f = new Path(TEST_UTIL.getDataTestDir(), "test"); FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration()); AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true, - fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS, MONITOR); + fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS, MONITOR, true); TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(fs, f, out); } } diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java index 479b8f4e6034..99048ff2bed1 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java @@ -255,7 +255,7 @@ private Path getEncryptionTestFile() { private void test(Path file) throws IOException, InterruptedException, ExecutionException { EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file, - true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index ef25068512f0..a94d827e8e2d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -179,6 +179,10 @@ public abstract class AbstractFSWAL implements WAL { public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size"; public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024; + public static final String WAL_AVOID_LOCAL_WRITES_KEY = + "hbase.regionserver.wal.avoid-local-writes"; + public static final boolean WAL_AVOID_LOCAL_WRITES_DEFAULT = false; + /** * file system instance */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java index 890fb4e444c7..e6463c563a05 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_DEFAULT; +import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_KEY; import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.DEFAULT_WAL_TRAILER_WARN_SIZE; import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.PB_WAL_COMPLETE_MAGIC; import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.PB_WAL_MAGIC; @@ -163,8 +165,10 @@ public void init(FileSystem fs, Path path, Configuration conf, boolean overwrita int bufferSize = CommonFSUtils.getDefaultBufferSize(fs); short replication = (short) conf.getInt("hbase.regionserver.hlog.replication", CommonFSUtils.getDefaultReplication(fs, path)); + boolean noLocalWrite = + conf.getBoolean(WAL_AVOID_LOCAL_WRITES_KEY, WAL_AVOID_LOCAL_WRITES_DEFAULT); - initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor); + initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor, noLocalWrite); boolean doTagCompress = doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); @@ -253,7 +257,7 @@ protected final void writeWALTrailer() { } protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize, StreamSlowMonitor monitor) + short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException, StreamLacksCapabilityException; /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index e50a02f6f80d..f10f39222722 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -178,10 +178,10 @@ public AsyncFSOutput getOutput() { @Override protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize, StreamSlowMonitor monitor) + short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException, StreamLacksCapabilityException { this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, - blockSize, eventLoopGroup, channelClass, monitor); + blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite); this.asyncOutputWrapper = new OutputStreamWrapper(output); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index 212788c940ed..52317949cc83 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -100,13 +100,18 @@ public FSDataOutputStream getStream() { @Override protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize, StreamSlowMonitor monitor) + short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException, StreamLacksCapabilityException { FSDataOutputStreamBuilder builder = fs.createFile(path).overwrite(overwritable) .bufferSize(bufferSize).replication(replication).blockSize(blockSize); if (builder instanceof DistributedFileSystem.HdfsDataOutputStreamBuilder) { - this.output = - ((DistributedFileSystem.HdfsDataOutputStreamBuilder) builder).replicate().build(); + DistributedFileSystem.HdfsDataOutputStreamBuilder dfsBuilder = + (DistributedFileSystem.HdfsDataOutputStreamBuilder) builder; + dfsBuilder.replicate(); + if (noLocalWrite) { + dfsBuilder.noLocalWrite(); + } + this.output = dfsBuilder.build(); } else { this.output = builder.build(); }