Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ private AsyncFSOutputHelper() {
*/
public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite,
boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
Class<? extends Channel> channelClass, StreamSlowMonitor monitor, boolean noLocalWrite)
throws IOException, CommonFSUtils.StreamLacksCapabilityException {
if (fs instanceof DistributedFileSystem) {
return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor);
overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor,
noLocalWrite);
}
final FSDataOutputStream out;
int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,8 @@ public NameNodeException(Throwable cause) {
}
}

private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite) {
private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite,
boolean noLocalWrite) {
List<CreateFlag> flags = new ArrayList<>();
flags.add(CreateFlag.CREATE);
if (overwrite) {
Expand All @@ -511,13 +512,17 @@ private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite) {
if (SHOULD_REPLICATE_FLAG != null) {
flags.add(SHOULD_REPLICATE_FLAG);
}
if (noLocalWrite) {
flags.add(CreateFlag.NO_LOCAL_WRITE);
}
flags.add(CreateFlag.SHOULD_REPLICATE);
return new EnumSetWritable<>(EnumSet.copyOf(flags));
}

private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
throws IOException {
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor,
boolean noLocalWrite) throws IOException {
Configuration conf = dfs.getConf();
DFSClient client = dfs.getClient();
String clientName = client.getClientName();
Expand All @@ -534,7 +539,7 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
try {
stat = FILE_CREATOR.create(namenode, src,
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
getCreateFlags(overwrite), createParent, replication, blockSize,
getCreateFlags(overwrite, noLocalWrite), createParent, replication, blockSize,
CryptoProtocolVersion.supported());
} catch (Exception e) {
if (e instanceof RemoteException) {
Expand Down Expand Up @@ -620,14 +625,14 @@ public void operationComplete(Future<Channel> future) throws Exception {
public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass,
final StreamSlowMonitor monitor) throws IOException {
final StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException {
return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {

@Override
public FanOutOneBlockAsyncDFSOutput doCall(Path p)
throws IOException, UnresolvedLinkException {
return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
blockSize, eventLoopGroup, channelClass, monitor);
blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void test() throws IOException, InterruptedException, ExecutionException
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
writeAndVerify(FS, f, out);
}

Expand All @@ -147,7 +147,7 @@ public void testRecover() throws IOException, InterruptedException, ExecutionExc
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
byte[] b = new byte[10];
Bytes.random(b);
out.write(b, 0, b.length);
Expand Down Expand Up @@ -176,7 +176,7 @@ public void testHeartbeat() throws IOException, InterruptedException, ExecutionE
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
Thread.sleep(READ_TIMEOUT_MS * 2);
// the connection to datanode should still alive.
writeAndVerify(FS, f, out);
Expand All @@ -191,7 +191,7 @@ public void testCreateParentFailed() throws IOException {
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
try {
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
fail("should fail with parent does not exist");
} catch (RemoteException e) {
LOG.info("expected exception caught", e);
Expand All @@ -213,8 +213,9 @@ public void testConnectToDatanodeFailed()
DataNodeProperties dnProp = CLUSTER.stopDataNode(0);
Path f = new Path("/test");
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR)) {
try (FanOutOneBlockAsyncDFSOutput output =
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true)) {
// should exclude the dead dn when retry so here we only have 2 DNs in pipeline
assertEquals(2, output.getPipeline().length);
} finally {
Expand Down Expand Up @@ -244,7 +245,7 @@ public void testExcludeFailedConnectToDatanode()
assertEquals(0, excludeDatanodeManager.getExcludeDNs().size());
try (FanOutOneBlockAsyncDFSOutput output =
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, streamSlowDNsMonitor)) {
FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, streamSlowDNsMonitor, true)) {
// should exclude the dead dn when retry so here we only have 2 DNs in pipeline
assertEquals(2, output.getPipeline().length);
assertEquals(1, excludeDatanodeManager.getExcludeDNs().size());
Expand All @@ -259,7 +260,7 @@ public void testWriteLargeChunk() throws IOException, InterruptedException, Exec
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR);
false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR, true);
byte[] b = new byte[50 * 1024 * 1024];
Bytes.random(b);
out.write(b);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public static void setUp() throws Exception {
Path f = new Path("/testHang");
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
OUT = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 2,
FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void test() throws IOException, InterruptedException, ExecutionException,
Path f = new Path(TEST_UTIL.getDataTestDir(), "test");
FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true,
fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS, MONITOR);
fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS, MONITOR, true);
TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(fs, f, out);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ private Path getEncryptionTestFile() {
private void test(Path file) throws IOException, InterruptedException, ExecutionException {
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file,
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.curator.shaded.com.google.common.base.Strings;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseServerException;
import org.apache.hadoop.hbase.HConstants;
Expand All @@ -55,6 +54,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Strings;

/**
* The context, and return value, for a single submit/submitAll call. Note on how this class (one AP
* submit) works. Initially, all requests are split into groups by server; request is sent to each
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,7 @@ public static void checkShortCircuitReadBufferSize(final Configuration conf) {
private static final class DfsBuilderUtility {
private static final Class<?> BUILDER;
private static final Method REPLICATE;
private static final Method NO_LOCAL_WRITE;

static {
String builderName =
Expand All @@ -754,8 +755,19 @@ private static final class DfsBuilderUtility {
+ " creating output stream", e);
}
}
Method noLocalWriteMethod = null;
if (builderClass != null) {
try {
replicateMethod = builderClass.getMethod("noLocalWrite");
LOG.debug("Using builder API via reflection for DFS file creation.");
} catch (NoSuchMethodException e) {
LOG.debug("Could not find noLocalWrite method on builder; will not set replicate when"
+ " creating output stream", e);
}
}
BUILDER = builderClass;
REPLICATE = replicateMethod;
NO_LOCAL_WRITE = noLocalWriteMethod;
}

/**
Expand All @@ -771,6 +783,23 @@ static void replicate(FSDataOutputStreamBuilder<?, ?> builder) {
}
}
}

/**
* Attempt to use builder API via reflection to call the noLocalWrite method on the given
* builder.
*/
static void noLocalWrite(FSDataOutputStreamBuilder<?, ?> builder) {
if (
BUILDER != null && NO_LOCAL_WRITE != null && BUILDER.isAssignableFrom(builder.getClass())
) {
try {
NO_LOCAL_WRITE.invoke(builder);
} catch (IllegalAccessException | InvocationTargetException e) {
// Should have caught this failure during initialization, so log full trace here
LOG.warn("Couldn't use reflection with builder API", e);
}
}
}
}

/**
Expand All @@ -793,7 +822,8 @@ public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean
* Will not attempt to enable replication when passed an HFileSystem.
*/
public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite,
int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
int bufferSize, short replication, long blockSize, boolean isRecursive, boolean noLocalWrite)
throws IOException {
// temporary for use while we work on upgrading clients to hadoop3
if (fs.getConf().getBoolean("use.legacy.hdfs.create.methods", false)) {
if (isRecursive) {
Expand All @@ -808,6 +838,9 @@ public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean
builder.recursive();
}
DfsBuilderUtility.replicate(builder);
if (noLocalWrite) {
DfsBuilderUtility.noLocalWrite(builder);
}
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
public static final String RING_BUFFER_SLOT_COUNT =
"hbase.regionserver.wal.disruptor.event.count";

public static final String WAL_AVOID_LOCAL_WRITES_KEY =
"hbase.regionserver.wal.avoid-local-writes";
public static final boolean WAL_AVOID_LOCAL_WRITES_DEFAULT = false;

/**
* file system instance
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;

import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_DEFAULT;
import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_KEY;
import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.DEFAULT_WAL_TRAILER_WARN_SIZE;
import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRAILER_WARN_SIZE;

Expand Down Expand Up @@ -172,8 +174,10 @@ public void init(FileSystem fs, Path path, Configuration conf, boolean overwrita
int bufferSize = CommonFSUtils.getDefaultBufferSize(fs);
short replication = (short) conf.getInt("hbase.regionserver.hlog.replication",
CommonFSUtils.getDefaultReplication(fs, path));
boolean noLocalWrite =
conf.getBoolean(WAL_AVOID_LOCAL_WRITES_KEY, WAL_AVOID_LOCAL_WRITES_DEFAULT);

initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor);
initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor, noLocalWrite);

boolean doTagCompress =
doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
Expand Down Expand Up @@ -265,7 +269,7 @@ protected void writeWALTrailer() {
}

protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
short replication, long blockSize, StreamSlowMonitor monitor)
short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite)
throws IOException, StreamLacksCapabilityException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,10 @@ public AsyncFSOutput getOutput() {

@Override
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
short replication, long blockSize, StreamSlowMonitor monitor)
short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite)
throws IOException, StreamLacksCapabilityException {
this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
blockSize, eventLoopGroup, channelClass, monitor);
blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite);
this.asyncOutputWrapper = new OutputStreamWrapper(output);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ public FSDataOutputStream getStream() {

@Override
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
short replication, long blockSize, StreamSlowMonitor monitor)
short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite)
throws IOException, StreamLacksCapabilityException {
this.output =
CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication, blockSize, false);
this.output = CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication,
blockSize, false, noLocalWrite);
if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) {
if (!output.hasCapability(StreamCapabilities.HFLUSH)) {
throw new StreamLacksCapabilityException(StreamCapabilities.HFLUSH);
Expand Down