diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/util/RecoverLeaseFSUtils.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/util/RecoverLeaseFSUtils.java
index ff457cb5074e..ca649b92728a 100644
--- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/util/RecoverLeaseFSUtils.java
+++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/util/RecoverLeaseFSUtils.java
@@ -20,6 +20,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -39,6 +40,17 @@ public final class RecoverLeaseFSUtils {
private static final Logger LOG = LoggerFactory.getLogger(RecoverLeaseFSUtils.class);
+ private static Class> leaseRecoverableClazz = null;
+
+ {
+ try {
+ leaseRecoverableClazz = Class.forName("org.apache.hadoop.fs.LeaseRecoverable");
+ } catch (ClassNotFoundException e) {
+ LOG.debug(
+ "LeaseRecoverable interface not in the classpath, this means Hadoop 3.3.5 or below.");
+ }
+ }
+
private RecoverLeaseFSUtils() {
}
@@ -48,18 +60,31 @@ public static void recoverFileLease(FileSystem fs, Path p, Configuration conf)
}
/**
- * Recover the lease from HDFS, retrying multiple times.
+ * Recover the lease from HDFS or LeaseRecoverable fs, retrying multiple times.
*/
public static void recoverFileLease(FileSystem fs, Path p, Configuration conf,
CancelableProgressable reporter) throws IOException {
if (fs instanceof FilterFileSystem) {
fs = ((FilterFileSystem) fs).getRawFileSystem();
}
+
// lease recovery not needed for local file system case.
- if (!(fs instanceof DistributedFileSystem)) {
- return;
+ if (isLeaseRecoverable(fs)) {
+ recoverDFSFileLease(fs, p, conf, reporter);
}
- recoverDFSFileLease((DistributedFileSystem) fs, p, conf, reporter);
+ }
+
+ public static boolean isLeaseRecoverable(FileSystem fs) {
+ // return true if HDFS.
+ if (fs instanceof DistributedFileSystem) {
+ return true;
+ }
+ // return true if the file system implements LeaseRecoverable interface.
+ if (leaseRecoverableClazz != null) {
+ return (leaseRecoverableClazz.isAssignableFrom(fs.getClass()));
+ }
+ // return false if the file system is not HDFS and does not implement LeaseRecoverable.
+ return false;
}
/*
@@ -81,7 +106,7 @@ public static void recoverFileLease(FileSystem fs, Path p, Configuration conf,
* false, repeat starting at step 5. above. If HDFS-4525 is available, call it every second, and
* we might be able to exit early.
*/
- private static boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p,
+ private static boolean recoverDFSFileLease(final Object dfs, final Path p,
final Configuration conf, final CancelableProgressable reporter) throws IOException {
LOG.info("Recover lease on dfs file " + p);
long startWaiting = EnvironmentEdgeManager.currentTime();
@@ -167,14 +192,16 @@ private static boolean checkIfTimedout(final Configuration conf, final long reco
* Try to recover the lease.
* @return True if dfs#recoverLease came by true.
*/
- private static boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt,
- final Path p, final long startWaiting) throws FileNotFoundException {
+ private static boolean recoverLease(final Object dfs, final int nbAttempt, final Path p,
+ final long startWaiting) throws FileNotFoundException {
boolean recovered = false;
try {
- recovered = dfs.recoverLease(p);
+ recovered = (Boolean) dfs.getClass().getMethod("recoverLease", new Class[] { Path.class })
+ .invoke(dfs, p);
LOG.info((recovered ? "Recovered lease, " : "Failed to recover lease, ")
+ getLogMessageDetail(nbAttempt, p, startWaiting));
- } catch (IOException e) {
+ } catch (InvocationTargetException ite) {
+ final Throwable e = ite.getCause();
if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) {
// This exception comes out instead of FNFE, fix it
throw new FileNotFoundException("The given WAL wasn't found at " + p);
@@ -182,6 +209,8 @@ private static boolean recoverLease(final DistributedFileSystem dfs, final int n
throw (FileNotFoundException) e;
}
LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e);
+ } catch (IllegalAccessException | NoSuchMethodException e) {
+ throw new RuntimeException(e);
}
return recovered;
}
@@ -197,8 +226,7 @@ private static String getLogMessageDetail(final int nbAttempt, final Path p,
* Call HDFS-4525 isFileClosed if it is available.
* @return True if file is closed.
*/
- private static boolean isFileClosed(final DistributedFileSystem dfs, final Method m,
- final Path p) {
+ private static boolean isFileClosed(final Object dfs, final Method m, final Path p) {
try {
return (Boolean) m.invoke(dfs, p);
} catch (SecurityException e) {
diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/util/TestRecoverLeaseFSUtils.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/util/TestRecoverLeaseFSUtils.java
index 3740cab6937a..b2c9cc97b0d1 100644
--- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/util/TestRecoverLeaseFSUtils.java
+++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/util/TestRecoverLeaseFSUtils.java
@@ -17,10 +17,12 @@
*/
package org.apache.hadoop.hbase.util;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
@@ -92,6 +94,12 @@ public void testIsFileClosed() throws IOException {
Mockito.verify(dfs, Mockito.times(1)).isFileClosed(FILE);
}
+ @Test
+ public void testIsLeaseRecoverable() {
+ assertTrue(RecoverLeaseFSUtils.isLeaseRecoverable(new DistributedFileSystem()));
+ assertFalse(RecoverLeaseFSUtils.isLeaseRecoverable(new LocalFileSystem()));
+ }
+
/**
* Version of DFS that has HDFS-4525 in it.
*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index 0c61a1b27030..10c9003a1d4d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -113,6 +113,21 @@ public final class FSUtils {
// currently only used in testing. TODO refactor into a test class
public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
+ private static Class> safeModeClazz = null;
+ private static Class> safeModeActionClazz = null;
+ private static Object safeModeGet = null;
+ {
+ try {
+ safeModeClazz = Class.forName("org.apache.hadoop.fs.SafeMode");
+ safeModeActionClazz = Class.forName("org.apache.hadoop.fs.SafeModeAction");
+ safeModeGet = safeModeClazz.getField("SAFEMODE_GET").get(null);
+ } catch (ClassNotFoundException | NoSuchFieldException e) {
+ LOG.debug("SafeMode interface not in the classpath, this means Hadoop 3.3.5 or below.");
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private FSUtils() {
}
@@ -247,8 +262,19 @@ public static void checkFileSystemAvailable(final FileSystem fs) throws IOExcept
* @param dfs A DistributedFileSystem object representing the underlying HDFS.
* @return whether we're in safe mode
*/
- private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
- return dfs.setSafeMode(SAFEMODE_GET, true);
+ private static boolean isInSafeMode(FileSystem dfs) throws IOException {
+ if (isDistributedFileSystem(dfs)) {
+ return ((DistributedFileSystem) dfs).setSafeMode(SAFEMODE_GET, true);
+ } else {
+ try {
+ Object ret = dfs.getClass()
+ .getMethod("setSafeMode", new Class[] { safeModeActionClazz, Boolean.class })
+ .invoke(dfs, safeModeGet, true);
+ return (Boolean) ret;
+ } catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
/**
@@ -257,9 +283,8 @@ private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOExceptio
public static void checkDfsSafeMode(final Configuration conf) throws IOException {
boolean isInSafeMode = false;
FileSystem fs = FileSystem.get(conf);
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem dfs = (DistributedFileSystem) fs;
- isInSafeMode = isInSafeMode(dfs);
+ if (supportSafeMode(fs)) {
+ isInSafeMode = isInSafeMode(fs);
}
if (isInSafeMode) {
throw new IOException("File system is in safemode, it can't be written now");
@@ -644,10 +669,11 @@ public static void setClusterId(final FileSystem fs, final Path rootdir,
*/
public static void waitOnSafeMode(final Configuration conf, final long wait) throws IOException {
FileSystem fs = FileSystem.get(conf);
- if (!(fs instanceof DistributedFileSystem)) return;
- DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ if (!supportSafeMode(fs)) {
+ return;
+ }
// Make sure dfs is not in safe mode
- while (isInSafeMode(dfs)) {
+ while (isInSafeMode(fs)) {
LOG.info("Waiting for dfs to exit safe mode...");
try {
Thread.sleep(wait);
@@ -658,6 +684,19 @@ public static void waitOnSafeMode(final Configuration conf, final long wait) thr
}
}
+ public static boolean supportSafeMode(FileSystem fs) {
+ // return true if HDFS.
+ if (fs instanceof DistributedFileSystem) {
+ return true;
+ }
+ // return true if the file system implements SafeMode interface.
+ if (safeModeClazz != null) {
+ return (safeModeClazz.isAssignableFrom(fs.getClass()));
+ }
+ // return false if the file system is not HDFS and does not implement SafeMode interface.
+ return false;
+ }
+
/**
* Checks if meta region exists
* @param fs file system
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
index 8e2ce8dd8c11..7d18b584d7df 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -92,6 +93,8 @@ public void testIsHDFS() throws Exception {
try {
cluster = htu.startMiniDFSCluster(1);
assertTrue(CommonFSUtils.isHDFS(conf));
+ assertTrue(FSUtils.supportSafeMode(cluster.getFileSystem()));
+ FSUtils.checkDfsSafeMode(conf);
} finally {
if (cluster != null) {
cluster.shutdown();
@@ -99,6 +102,14 @@ public void testIsHDFS() throws Exception {
}
}
+ @Test
+ public void testLocalFileSystemSafeMode() throws Exception {
+ conf.setClass("fs.file.impl", LocalFileSystem.class, FileSystem.class);
+ assertFalse(CommonFSUtils.isHDFS(conf));
+ assertFalse(FSUtils.supportSafeMode(FileSystem.get(conf)));
+ FSUtils.checkDfsSafeMode(conf);
+ }
+
private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize) throws Exception {
FSDataOutputStream out = fs.create(file);
byte[] data = new byte[dataSize];
diff --git a/pom.xml b/pom.xml
index f29e57027e32..b7a9103a77ad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -822,7 +822,7 @@
3.5.0
${compileSource}
- 3.3.5
+ 3.3.6
${hadoop-three.version}