Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -39,6 +40,17 @@ public final class RecoverLeaseFSUtils {

private static final Logger LOG = LoggerFactory.getLogger(RecoverLeaseFSUtils.class);

private static Class<?> leaseRecoverableClazz = null;

{
try {
leaseRecoverableClazz = Class.forName("org.apache.hadoop.fs.LeaseRecoverable");
} catch (ClassNotFoundException e) {
LOG.debug(
"LeaseRecoverable interface not in the classpath, this means Hadoop 3.3.5 or below.");
}
}

private RecoverLeaseFSUtils() {
}

Expand All @@ -48,18 +60,31 @@ public static void recoverFileLease(FileSystem fs, Path p, Configuration conf)
}

/**
* Recover the lease from HDFS, retrying multiple times.
* Recover the lease from HDFS or LeaseRecoverable fs, retrying multiple times.
*/
public static void recoverFileLease(FileSystem fs, Path p, Configuration conf,
CancelableProgressable reporter) throws IOException {
if (fs instanceof FilterFileSystem) {
fs = ((FilterFileSystem) fs).getRawFileSystem();
}

// lease recovery not needed for local file system case.
if (!(fs instanceof DistributedFileSystem)) {
return;
if (isLeaseRecoverable(fs)) {
recoverDFSFileLease(fs, p, conf, reporter);
}
recoverDFSFileLease((DistributedFileSystem) fs, p, conf, reporter);
}

public static boolean isLeaseRecoverable(FileSystem fs) {
// return true if HDFS.
if (fs instanceof DistributedFileSystem) {
return true;
}
// return true if the file system implements LeaseRecoverable interface.
if (leaseRecoverableClazz != null) {
return (leaseRecoverableClazz.isAssignableFrom(fs.getClass()));
}
// return false if the file system is not HDFS and does not implement LeaseRecoverable.
return false;
}

/*
Expand All @@ -81,7 +106,7 @@ public static void recoverFileLease(FileSystem fs, Path p, Configuration conf,
* false, repeat starting at step 5. above. If HDFS-4525 is available, call it every second, and
* we might be able to exit early.
*/
private static boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p,
private static boolean recoverDFSFileLease(final Object dfs, final Path p,
final Configuration conf, final CancelableProgressable reporter) throws IOException {
LOG.info("Recover lease on dfs file " + p);
long startWaiting = EnvironmentEdgeManager.currentTime();
Expand Down Expand Up @@ -167,21 +192,25 @@ private static boolean checkIfTimedout(final Configuration conf, final long reco
* Try to recover the lease.
* @return True if dfs#recoverLease came by true.
*/
private static boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt,
final Path p, final long startWaiting) throws FileNotFoundException {
private static boolean recoverLease(final Object dfs, final int nbAttempt, final Path p,
final long startWaiting) throws FileNotFoundException {
boolean recovered = false;
try {
recovered = dfs.recoverLease(p);
recovered = (Boolean) dfs.getClass().getMethod("recoverLease", new Class[] { Path.class })
.invoke(dfs, p);
LOG.info((recovered ? "Recovered lease, " : "Failed to recover lease, ")
+ getLogMessageDetail(nbAttempt, p, startWaiting));
} catch (IOException e) {
} catch (InvocationTargetException ite) {
final Throwable e = ite.getCause();
if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) {
// This exception comes out instead of FNFE, fix it
throw new FileNotFoundException("The given WAL wasn't found at " + p);
} else if (e instanceof FileNotFoundException) {
throw (FileNotFoundException) e;
}
LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e);
} catch (IllegalAccessException | NoSuchMethodException e) {
throw new RuntimeException(e);
}
return recovered;
}
Expand All @@ -197,8 +226,7 @@ private static String getLogMessageDetail(final int nbAttempt, final Path p,
* Call HDFS-4525 isFileClosed if it is available.
* @return True if file is closed.
*/
private static boolean isFileClosed(final DistributedFileSystem dfs, final Method m,
final Path p) {
private static boolean isFileClosed(final Object dfs, final Method m, final Path p) {
try {
return (Boolean) m.invoke(dfs, p);
} catch (SecurityException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
*/
package org.apache.hadoop.hbase.util;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
Expand Down Expand Up @@ -92,6 +94,12 @@ public void testIsFileClosed() throws IOException {
Mockito.verify(dfs, Mockito.times(1)).isFileClosed(FILE);
}

@Test
public void testIsLeaseRecoverable() {
assertTrue(RecoverLeaseFSUtils.isLeaseRecoverable(new DistributedFileSystem()));
assertFalse(RecoverLeaseFSUtils.isLeaseRecoverable(new LocalFileSystem()));
}

/**
* Version of DFS that has HDFS-4525 in it.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,21 @@ public final class FSUtils {
// currently only used in testing. TODO refactor into a test class
public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");

private static Class<?> safeModeClazz = null;
private static Class<?> safeModeActionClazz = null;
private static Object safeModeGet = null;
{
try {
safeModeClazz = Class.forName("org.apache.hadoop.fs.SafeMode");
safeModeActionClazz = Class.forName("org.apache.hadoop.fs.SafeModeAction");
safeModeGet = safeModeClazz.getField("SAFEMODE_GET").get(null);
} catch (ClassNotFoundException | NoSuchFieldException e) {
LOG.debug("SafeMode interface not in the classpath, this means Hadoop 3.3.5 or below.");
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}

private FSUtils() {
}

Expand Down Expand Up @@ -247,8 +262,19 @@ public static void checkFileSystemAvailable(final FileSystem fs) throws IOExcept
* @param dfs A DistributedFileSystem object representing the underlying HDFS.
* @return whether we're in safe mode
*/
private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
return dfs.setSafeMode(SAFEMODE_GET, true);
private static boolean isInSafeMode(FileSystem dfs) throws IOException {
if (isDistributedFileSystem(dfs)) {
return ((DistributedFileSystem) dfs).setSafeMode(SAFEMODE_GET, true);
} else {
try {
Object ret = dfs.getClass()
.getMethod("setSafeMode", new Class[] { safeModeActionClazz, Boolean.class })
.invoke(dfs, safeModeGet, true);
return (Boolean) ret;
} catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) {
throw new RuntimeException(e);
}
}
}

/**
Expand All @@ -257,9 +283,8 @@ private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOExceptio
public static void checkDfsSafeMode(final Configuration conf) throws IOException {
boolean isInSafeMode = false;
FileSystem fs = FileSystem.get(conf);
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem dfs = (DistributedFileSystem) fs;
isInSafeMode = isInSafeMode(dfs);
if (supportSafeMode(fs)) {
isInSafeMode = isInSafeMode(fs);
}
if (isInSafeMode) {
throw new IOException("File system is in safemode, it can't be written now");
Expand Down Expand Up @@ -644,10 +669,11 @@ public static void setClusterId(final FileSystem fs, final Path rootdir,
*/
public static void waitOnSafeMode(final Configuration conf, final long wait) throws IOException {
FileSystem fs = FileSystem.get(conf);
if (!(fs instanceof DistributedFileSystem)) return;
DistributedFileSystem dfs = (DistributedFileSystem) fs;
if (!supportSafeMode(fs)) {
return;
}
// Make sure dfs is not in safe mode
while (isInSafeMode(dfs)) {
while (isInSafeMode(fs)) {
LOG.info("Waiting for dfs to exit safe mode...");
try {
Thread.sleep(wait);
Expand All @@ -658,6 +684,19 @@ public static void waitOnSafeMode(final Configuration conf, final long wait) thr
}
}

public static boolean supportSafeMode(FileSystem fs) {
// return true if HDFS.
if (fs instanceof DistributedFileSystem) {
return true;
}
// return true if the file system implements SafeMode interface.
if (safeModeClazz != null) {
return (safeModeClazz.isAssignableFrom(fs.getClass()));
}
// return false if the file system is not HDFS and does not implement SafeMode interface.
return false;
}

/**
* Checks if meta region exists
* @param fs file system
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.permission.FsPermission;
Expand Down Expand Up @@ -92,13 +93,23 @@ public void testIsHDFS() throws Exception {
try {
cluster = htu.startMiniDFSCluster(1);
assertTrue(CommonFSUtils.isHDFS(conf));
assertTrue(FSUtils.supportSafeMode(cluster.getFileSystem()));
FSUtils.checkDfsSafeMode(conf);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}

@Test
public void testLocalFileSystemSafeMode() throws Exception {
conf.setClass("fs.file.impl", LocalFileSystem.class, FileSystem.class);
assertFalse(CommonFSUtils.isHDFS(conf));
assertFalse(FSUtils.supportSafeMode(FileSystem.get(conf)));
FSUtils.checkDfsSafeMode(conf);
}

private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize) throws Exception {
FSDataOutputStream out = fs.create(file);
byte[] data = new byte[dataSize];
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@
<maven.min.version>3.5.0</maven.min.version>
<java.min.version>${compileSource}</java.min.version>
<!-- Dependencies -->
<hadoop-three.version>3.3.5</hadoop-three.version>
<hadoop-three.version>3.3.6</hadoop-three.version>
<!-- These must be defined here for downstream build tools that don't look at profiles.
-->
<hadoop.version>${hadoop-three.version}</hadoop.version>
Expand Down