diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java index fef8e291b31e..b3cadedbfaa1 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java @@ -20,8 +20,6 @@ import java.io.FileNotFoundException; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.net.URI; import java.net.URISyntaxException; import java.util.List; @@ -30,7 +28,6 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FSDataOutputStreamBuilder; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; @@ -766,77 +763,6 @@ public static void checkShortCircuitReadBufferSize(final Configuration conf) { conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); } - private static final class DfsBuilderUtility { - private static final Class BUILDER; - private static final Method REPLICATE; - - static { - String builderName = "org.apache.hadoop.hdfs.DistributedFileSystem$HdfsDataOutputStreamBuilder"; - Class builderClass = null; - try { - builderClass = Class.forName(builderName); - } catch (ClassNotFoundException e) { - LOG.debug("{} not available, will not set replicate when creating output stream", builderName); - } - Method replicateMethod = null; - if (builderClass != null) { - try { - replicateMethod = builderClass.getMethod("replicate"); - LOG.debug("Using builder API via reflection for DFS file creation."); - } catch (NoSuchMethodException e) { - LOG.debug("Could not find replicate method on builder; will not set replicate when" + - " creating output stream", e); - } - } - BUILDER = builderClass; - REPLICATE = replicateMethod; - } - - /** - * Attempt to use builder API via reflection to call the replicate method on the given builder. - */ - static void replicate(FSDataOutputStreamBuilder builder) { - if (BUILDER != null && REPLICATE != null && BUILDER.isAssignableFrom(builder.getClass())) { - try { - REPLICATE.invoke(builder); - } catch (IllegalAccessException | InvocationTargetException e) { - // Should have caught this failure during initialization, so log full trace here - LOG.warn("Couldn't use reflection with builder API", e); - } - } - } - } - - /** - * Attempt to use builder API via reflection to create a file with the given parameters and - * replication enabled. - *

- * Will not attempt to enable replication when passed an HFileSystem. - */ - public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite) - throws IOException { - FSDataOutputStreamBuilder builder = fs.createFile(path).overwrite(overwrite); - DfsBuilderUtility.replicate(builder); - return builder.build(); - } - - /** - * Attempt to use builder API via reflection to create a file with the given parameters and - * replication enabled. - *

- * Will not attempt to enable replication when passed an HFileSystem. - */ - public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite, - int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException { - FSDataOutputStreamBuilder builder = fs.createFile(path).overwrite(overwrite) - .bufferSize(bufferSize).replication(replication).blockSize(blockSize); - if (isRecursive) { - builder.recursive(); - } - DfsBuilderUtility.replicate(builder); - return builder.build(); - } - /** * Helper exception for those cases where the place where we need to check a stream capability * is not where we have the needed context to explain the impact and mitigation for a lack. diff --git a/hbase-procedure/pom.xml b/hbase-procedure/pom.xml index 5cf027096bd5..208682c10689 100644 --- a/hbase-procedure/pom.xml +++ b/hbase-procedure/pom.xml @@ -144,6 +144,10 @@ log4j-slf4j-impl test + + hadoop-hdfs-client + org.apache.hadoop + diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index c5bd0001de5b..29bda4732d0f 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataOutputStreamBuilder; import org.apache.hadoop.fs.FSError; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; @@ -55,6 +56,7 @@ import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -1070,7 +1072,13 @@ boolean rollWriter(long logId) throws IOException { long startPos = -1; newLogFile = getLogFilePath(logId); try { - newStream = CommonFSUtils.createForWal(fs, newLogFile, false); + FSDataOutputStreamBuilder builder = fs.createFile(newLogFile).overwrite(false); + if (builder instanceof DistributedFileSystem.HdfsDataOutputStreamBuilder) { + newStream = ((DistributedFileSystem.HdfsDataOutputStreamBuilder) builder) + .replicate().build(); + } else { + newStream = builder.build(); + } } catch (FileAlreadyExistsException e) { LOG.error("Log file with id={} already exists", logId, e); return false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index 4bbc13d3ab88..fbcfc4c0ee8d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataOutputStreamBuilder; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StreamCapabilities; @@ -31,6 +32,7 @@ import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,8 +107,19 @@ public FSDataOutputStream getStream() { @Override protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, short replication, long blockSize) throws IOException, StreamLacksCapabilityException { - this.output = CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication, - blockSize, false); + FSDataOutputStreamBuilder builder = fs + .createFile(path) + .overwrite(overwritable) + .bufferSize(bufferSize) + .replication(replication) + .blockSize(blockSize); + if (builder instanceof DistributedFileSystem.HdfsDataOutputStreamBuilder) { + this.output = ((DistributedFileSystem.HdfsDataOutputStreamBuilder) builder) + .replicate().build(); + } else { + this.output = builder.build(); + } + if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) { if (!output.hasCapability(StreamCapabilities.HFLUSH)) { throw new StreamLacksCapabilityException(StreamCapabilities.HFLUSH);