diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java index a530ca4a2a0d..cbb0648f3afb 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java @@ -48,11 +48,12 @@ private AsyncFSOutputHelper() { */ public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup, - Class channelClass, StreamSlowMonitor monitor) + Class channelClass, StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException, CommonFSUtils.StreamLacksCapabilityException { if (fs instanceof DistributedFileSystem) { return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f, - overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor); + overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor, + noLocalWrite); } final FSDataOutputStream out; int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 04b137014515..77f072a2778c 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -502,7 +502,8 @@ public NameNodeException(Throwable cause) { } } - private static EnumSetWritable getCreateFlags(boolean overwrite) { + private static EnumSetWritable getCreateFlags(boolean overwrite, + boolean noLocalWrite) { List flags = new ArrayList<>(); flags.add(CreateFlag.CREATE); if (overwrite) { @@ -511,13 +512,17 @@ private static EnumSetWritable getCreateFlags(boolean overwrite) { if (SHOULD_REPLICATE_FLAG != null) { flags.add(SHOULD_REPLICATE_FLAG); } + if (noLocalWrite) { + flags.add(CreateFlag.NO_LOCAL_WRITE); + } + flags.add(CreateFlag.SHOULD_REPLICATE); return new EnumSetWritable<>(EnumSet.copyOf(flags)); } private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, boolean overwrite, boolean createParent, short replication, long blockSize, - EventLoopGroup eventLoopGroup, Class channelClass, StreamSlowMonitor monitor) - throws IOException { + EventLoopGroup eventLoopGroup, Class channelClass, StreamSlowMonitor monitor, + boolean noLocalWrite) throws IOException { Configuration conf = dfs.getConf(); DFSClient client = dfs.getClient(); String clientName = client.getClientName(); @@ -534,7 +539,7 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d try { stat = FILE_CREATOR.create(namenode, src, FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, - getCreateFlags(overwrite), createParent, replication, blockSize, + getCreateFlags(overwrite, noLocalWrite), createParent, replication, blockSize, CryptoProtocolVersion.supported()); } catch (Exception e) { if (e instanceof RemoteException) { @@ -620,14 +625,14 @@ public void operationComplete(Future future) throws Exception { public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup, Class channelClass, - final StreamSlowMonitor monitor) throws IOException { + final StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException { return new FileSystemLinkResolver() { @Override public FanOutOneBlockAsyncDFSOutput doCall(Path p) throws IOException, UnresolvedLinkException { return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, - blockSize, eventLoopGroup, channelClass, monitor); + blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite); } @Override diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 3c3852831033..0c1bf1ef1aa8 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -138,7 +138,7 @@ public void test() throws IOException, InterruptedException, ExecutionException Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); writeAndVerify(FS, f, out); } @@ -147,7 +147,7 @@ public void testRecover() throws IOException, InterruptedException, ExecutionExc Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); byte[] b = new byte[10]; Bytes.random(b); out.write(b, 0, b.length); @@ -176,7 +176,7 @@ public void testHeartbeat() throws IOException, InterruptedException, ExecutionE Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); Thread.sleep(READ_TIMEOUT_MS * 2); // the connection to datanode should still alive. writeAndVerify(FS, f, out); @@ -191,7 +191,7 @@ public void testCreateParentFailed() throws IOException { EventLoop eventLoop = EVENT_LOOP_GROUP.next(); try { FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, - FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); fail("should fail with parent does not exist"); } catch (RemoteException e) { LOG.info("expected exception caught", e); @@ -213,8 +213,9 @@ public void testConnectToDatanodeFailed() DataNodeProperties dnProp = CLUSTER.stopDataNode(0); Path f = new Path("/test"); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, - f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR)) { + try (FanOutOneBlockAsyncDFSOutput output = + FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, + FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true)) { // should exclude the dead dn when retry so here we only have 2 DNs in pipeline assertEquals(2, output.getPipeline().length); } finally { @@ -244,7 +245,7 @@ public void testExcludeFailedConnectToDatanode() assertEquals(0, excludeDatanodeManager.getExcludeDNs().size()); try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, - FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, streamSlowDNsMonitor)) { + FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, streamSlowDNsMonitor, true)) { // should exclude the dead dn when retry so here we only have 2 DNs in pipeline assertEquals(2, output.getPipeline().length); assertEquals(1, excludeDatanodeManager.getExcludeDNs().size()); @@ -259,7 +260,7 @@ public void testWriteLargeChunk() throws IOException, InterruptedException, Exec Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR); + false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR, true); byte[] b = new byte[50 * 1024 * 1024]; Bytes.random(b); out.write(b); diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java index 77752789dbb3..7f6535a93a93 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java @@ -98,7 +98,7 @@ public static void setUp() throws Exception { Path f = new Path("/testHang"); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); OUT = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 2, - FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); } @AfterClass diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java index 992dde7e9cba..ec87316e5f1b 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java @@ -65,7 +65,7 @@ public void test() throws IOException, InterruptedException, ExecutionException, Path f = new Path(TEST_UTIL.getDataTestDir(), "test"); FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration()); AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true, - fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS, MONITOR); + fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS, MONITOR, true); TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(fs, f, out); } } diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java index 54a49ffebd94..e178d0960eae 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java @@ -258,7 +258,7 @@ private Path getEncryptionTestFile() { private void test(Path file) throws IOException, InterruptedException, ExecutionException { EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file, - true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index 3c70fe504718..89c7b616a27f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import org.apache.curator.shaded.com.google.common.base.Strings; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseServerException; import org.apache.hadoop.hbase.HConstants; @@ -55,6 +54,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Strings; + /** * The context, and return value, for a single submit/submitAll call. Note on how this class (one AP * submit) works. Initially, all requests are split into groups by server; request is sent to each diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java index 39112e603b03..9e39a2a0929b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java @@ -733,6 +733,7 @@ public static void checkShortCircuitReadBufferSize(final Configuration conf) { private static final class DfsBuilderUtility { private static final Class BUILDER; private static final Method REPLICATE; + private static final Method NO_LOCAL_WRITE; static { String builderName = @@ -754,8 +755,19 @@ private static final class DfsBuilderUtility { + " creating output stream", e); } } + Method noLocalWriteMethod = null; + if (builderClass != null) { + try { + replicateMethod = builderClass.getMethod("noLocalWrite"); + LOG.debug("Using builder API via reflection for DFS file creation."); + } catch (NoSuchMethodException e) { + LOG.debug("Could not find noLocalWrite method on builder; will not set replicate when" + + " creating output stream", e); + } + } BUILDER = builderClass; REPLICATE = replicateMethod; + NO_LOCAL_WRITE = noLocalWriteMethod; } /** @@ -771,6 +783,23 @@ static void replicate(FSDataOutputStreamBuilder builder) { } } } + + /** + * Attempt to use builder API via reflection to call the noLocalWrite method on the given + * builder. + */ + static void noLocalWrite(FSDataOutputStreamBuilder builder) { + if ( + BUILDER != null && NO_LOCAL_WRITE != null && BUILDER.isAssignableFrom(builder.getClass()) + ) { + try { + NO_LOCAL_WRITE.invoke(builder); + } catch (IllegalAccessException | InvocationTargetException e) { + // Should have caught this failure during initialization, so log full trace here + LOG.warn("Couldn't use reflection with builder API", e); + } + } + } } /** @@ -793,7 +822,8 @@ public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean * Will not attempt to enable replication when passed an HFileSystem. */ public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite, - int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException { + int bufferSize, short replication, long blockSize, boolean isRecursive, boolean noLocalWrite) + throws IOException { // temporary for use while we work on upgrading clients to hadoop3 if (fs.getConf().getBoolean("use.legacy.hdfs.create.methods", false)) { if (isRecursive) { @@ -808,6 +838,9 @@ public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean builder.recursive(); } DfsBuilderUtility.replicate(builder); + if (noLocalWrite) { + DfsBuilderUtility.noLocalWrite(builder); + } return builder.build(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 3d7678c37c6e..3859e45e17ae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -143,6 +143,10 @@ public abstract class AbstractFSWAL implements WAL { public static final String RING_BUFFER_SLOT_COUNT = "hbase.regionserver.wal.disruptor.event.count"; + public static final String WAL_AVOID_LOCAL_WRITES_KEY = + "hbase.regionserver.wal.avoid-local-writes"; + public static final boolean WAL_AVOID_LOCAL_WRITES_DEFAULT = false; + /** * file system instance */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java index 8437fef3bc23..5a59ac66939b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_DEFAULT; +import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_KEY; import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.DEFAULT_WAL_TRAILER_WARN_SIZE; import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRAILER_WARN_SIZE; @@ -172,8 +174,10 @@ public void init(FileSystem fs, Path path, Configuration conf, boolean overwrita int bufferSize = CommonFSUtils.getDefaultBufferSize(fs); short replication = (short) conf.getInt("hbase.regionserver.hlog.replication", CommonFSUtils.getDefaultReplication(fs, path)); + boolean noLocalWrite = + conf.getBoolean(WAL_AVOID_LOCAL_WRITES_KEY, WAL_AVOID_LOCAL_WRITES_DEFAULT); - initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor); + initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor, noLocalWrite); boolean doTagCompress = doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); @@ -265,7 +269,7 @@ protected void writeWALTrailer() { } protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize, StreamSlowMonitor monitor) + short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException, StreamLacksCapabilityException; /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index 44affaf734a0..95b4e2d3a3a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -187,10 +187,10 @@ public AsyncFSOutput getOutput() { @Override protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize, StreamSlowMonitor monitor) + short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException, StreamLacksCapabilityException { this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, - blockSize, eventLoopGroup, channelClass, monitor); + blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite); this.asyncOutputWrapper = new OutputStreamWrapper(output); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index a9f2a0a36365..6c3a61549cbd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -98,10 +98,10 @@ public FSDataOutputStream getStream() { @Override protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize, StreamSlowMonitor monitor) + short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException, StreamLacksCapabilityException { - this.output = - CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication, blockSize, false); + this.output = CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication, + blockSize, false, noLocalWrite); if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) { if (!output.hasCapability(StreamCapabilities.HFLUSH)) { throw new StreamLacksCapabilityException(StreamCapabilities.HFLUSH);