Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
Expand All @@ -30,7 +28,6 @@
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
Expand Down Expand Up @@ -766,77 +763,6 @@ public static void checkShortCircuitReadBufferSize(final Configuration conf) {
conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
}

private static final class DfsBuilderUtility {
private static final Class<?> BUILDER;
private static final Method REPLICATE;

static {
String builderName = "org.apache.hadoop.hdfs.DistributedFileSystem$HdfsDataOutputStreamBuilder";
Class<?> builderClass = null;
try {
builderClass = Class.forName(builderName);
} catch (ClassNotFoundException e) {
LOG.debug("{} not available, will not set replicate when creating output stream", builderName);
}
Method replicateMethod = null;
if (builderClass != null) {
try {
replicateMethod = builderClass.getMethod("replicate");
LOG.debug("Using builder API via reflection for DFS file creation.");
} catch (NoSuchMethodException e) {
LOG.debug("Could not find replicate method on builder; will not set replicate when" +
" creating output stream", e);
}
}
BUILDER = builderClass;
REPLICATE = replicateMethod;
}

/**
* Attempt to use builder API via reflection to call the replicate method on the given builder.
*/
static void replicate(FSDataOutputStreamBuilder<?, ?> builder) {
if (BUILDER != null && REPLICATE != null && BUILDER.isAssignableFrom(builder.getClass())) {
try {
REPLICATE.invoke(builder);
} catch (IllegalAccessException | InvocationTargetException e) {
// Should have caught this failure during initialization, so log full trace here
LOG.warn("Couldn't use reflection with builder API", e);
}
}
}
}

/**
* Attempt to use builder API via reflection to create a file with the given parameters and
* replication enabled.
* <p/>
* Will not attempt to enable replication when passed an HFileSystem.
*/
public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite)
throws IOException {
FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite);
DfsBuilderUtility.replicate(builder);
return builder.build();
}

/**
* Attempt to use builder API via reflection to create a file with the given parameters and
* replication enabled.
* <p/>
* Will not attempt to enable replication when passed an HFileSystem.
*/
public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite,
int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite)
.bufferSize(bufferSize).replication(replication).blockSize(blockSize);
if (isRecursive) {
builder.recursive();
}
DfsBuilderUtility.replicate(builder);
return builder.build();
}

/**
* Helper exception for those cases where the place where we need to check a stream capability
* is not where we have the needed context to explain the impact and mitigation for a lack.
Expand Down
4 changes: 4 additions & 0 deletions hbase-procedure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<artifactId>hadoop-hdfs-client</artifactId>
<groupId>org.apache.hadoop</groupId>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -55,6 +56,7 @@
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -1070,7 +1072,13 @@ boolean rollWriter(long logId) throws IOException {
long startPos = -1;
newLogFile = getLogFilePath(logId);
try {
newStream = CommonFSUtils.createForWal(fs, newLogFile, false);
FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(newLogFile).overwrite(false);
if (builder instanceof DistributedFileSystem.HdfsDataOutputStreamBuilder) {
newStream = ((DistributedFileSystem.HdfsDataOutputStreamBuilder) builder)
.replicate().build();
} else {
newStream = builder.build();
}
} catch (FileAlreadyExistsException e) {
LOG.error("Log file with id={} already exists", logId, e);
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
Expand All @@ -31,6 +32,7 @@
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -105,8 +107,19 @@ public FSDataOutputStream getStream() {
@Override
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
this.output = CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication,
blockSize, false);
FSDataOutputStreamBuilder<?, ?> builder = fs
.createFile(path)
.overwrite(overwritable)
.bufferSize(bufferSize)
.replication(replication)
.blockSize(blockSize);
if (builder instanceof DistributedFileSystem.HdfsDataOutputStreamBuilder) {
this.output = ((DistributedFileSystem.HdfsDataOutputStreamBuilder) builder)
.replicate().build();
} else {
this.output = builder.build();
}

if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) {
if (!output.hasCapability(StreamCapabilities.HFLUSH)) {
throw new StreamLacksCapabilityException(StreamCapabilities.HFLUSH);
Expand Down