Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
Expand Down Expand Up @@ -274,11 +275,15 @@ private boolean handleEofException(Exception e, WALEntryBatch batch) {
// since we don't add current log to recovered source queue so it is safe to remove.
if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
(source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
Path head = queue.peek();
Path path = queue.peek();
try {
if (fs.getFileStatus(head).getLen() == 0) {
// head of the queue is an empty log file
LOG.warn("Forcing removal of 0 length log in queue: {}", head);
if (!fs.exists(path)) {
// There is a chance that wal has moved to oldWALs directory, so look there also.
path = AbstractFSWALProvider.findArchivedLog(path, conf);
// path is null if it couldn't find archive path.
}
if (path != null && fs.getFileStatus(path).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: {}", path);
logQueue.remove(walGroupId);
currentPosition = 0;
if (batch != null) {
Expand All @@ -289,7 +294,7 @@ private boolean handleEofException(Exception e, WALEntryBatch batch) {
return true;
}
} catch (IOException ioe) {
LOG.warn("Couldn't get file length information about log " + queue.peek(), ioe);
LOG.warn("Couldn't get file length information about log " + path, ioe);
} catch (InterruptedException ie) {
LOG.trace("Interrupted while adding WAL batch to ship queue");
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALFactory;
Expand Down Expand Up @@ -316,35 +316,10 @@ private boolean openNextLog() throws IOException {
return false;
}

private Path getArchivedLog(Path path) throws IOException {
Path walRootDir = CommonFSUtils.getWALRootDir(conf);

// Try found the log in old dir
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path archivedLogLocation = new Path(oldLogDir, path.getName());
if (fs.exists(archivedLogLocation)) {
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
return archivedLogLocation;
}

// Try found the log in the seperate old log dir
oldLogDir =
new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
.append(Path.SEPARATOR).append(serverName.getServerName()).toString());
archivedLogLocation = new Path(oldLogDir, path.getName());
if (fs.exists(archivedLogLocation)) {
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
return archivedLogLocation;
}

LOG.error("Couldn't locate log: " + path);
return path;
}

private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
// If the log was archived, continue reading from there
Path archivedLog = getArchivedLog(path);
if (!path.equals(archivedLog)) {
Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
if (archivedLog != null) {
openReader(archivedLog);
} else {
throw fnfe;
Expand Down Expand Up @@ -408,8 +383,8 @@ private void resetReader() throws IOException {
seek();
} catch (FileNotFoundException fnfe) {
// If the log was archived, continue reading from there
Path archivedLog = getArchivedLog(currentPath);
if (!currentPath.equals(archivedLog)) {
Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf);
if (archivedLog != null) {
openReader(archivedLog);
} else {
throw fnfe;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.wal;


import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -500,6 +499,43 @@ public static Path getArchivedLogPath(Path path, Configuration conf) throws IOEx
}
}

/**
* Find the archived WAL file path if it is not able to locate in WALs dir.
* @param path - active WAL file path
* @param conf - configuration
* @return archived path if exists, null - otherwise
* @throws IOException exception
*/
public static Path findArchivedLog(Path path, Configuration conf) throws IOException {
// If the path contains oldWALs keyword then exit early.
if (path.toString().contains(HConstants.HREGION_OLDLOGDIR_NAME)) {
return null;
}
Path walRootDir = CommonFSUtils.getWALRootDir(conf);
FileSystem fs = path.getFileSystem(conf);
// Try finding the log in old dir
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path archivedLogLocation = new Path(oldLogDir, path.getName());
if (fs.exists(archivedLogLocation)) {
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
return archivedLogLocation;
}

ServerName serverName = getServerNameFromWALDirectoryName(path);
// Try finding the log in separate old log dir
oldLogDir =
new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
.append(Path.SEPARATOR).append(serverName.getServerName()).toString());
archivedLogLocation = new Path(oldLogDir, path.getName());
if (fs.exists(archivedLogLocation)) {
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
return archivedLogLocation;
}

LOG.error("Couldn't locate log: " + path);
return null;
}

/**
* Opens WAL reader with retries and additional exception handling
* @param path path to WAL file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
Expand Down Expand Up @@ -716,4 +718,48 @@ public void testCleanClosedWALs() throws Exception {
assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
}
}

/**
* Tests that we handle EOFException properly if the wal has moved to oldWALs directory.
* @throws Exception exception
*/
@Test
public void testEOFExceptionInOldWALsDirectory() throws Exception {
assertEquals(1, logQueue.getQueueSize(fakeWalGroupId));
AbstractFSWAL abstractWAL = (AbstractFSWAL)log;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We don't need this down cast, do we?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bharathv It is needed. We are getting the current wal file name via AbstractFSWAL#getCurrentFileName so that we can truncate that wal file to create 0 size wal file. WAL class doesn't have this method.

Path emptyLogFile = abstractWAL.getCurrentFileName();
log.rollWriter(true);

// AsyncFSWAl and FSHLog both moves the log from WALs to oldWALs directory asynchronously.
// Wait for in flight wal close count to become 0. This makes sure that empty wal is moved to
// oldWALs directory.
Waiter.waitFor(CONF, 5000,
(Waiter.Predicate<Exception>) () -> abstractWAL.getInflightWALCloseCount() == 0);
// There will 2 logs in the queue.
assertEquals(2, logQueue.getQueueSize(fakeWalGroupId));

// Get the archived dir path for the first wal.
Path archivePath = AbstractFSWALProvider.findArchivedLog(emptyLogFile, CONF);
// Make sure that the wal path is not the same as archived Dir path.
assertNotNull(archivePath);
assertTrue(fs.exists(archivePath));
fs.truncate(archivePath, 0);
// make sure the size of the wal file is 0.
assertEquals(0, fs.getFileStatus(archivePath).getLen());

ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.isPeerEnabled()).thenReturn(true);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));

Configuration localConf = new Configuration(CONF);
localConf.setInt("replication.source.maxretriesmultiplier", 1);
localConf.setBoolean("replication.source.eof.autorecovery", true);
// Start the reader thread.
createReader(false, localConf);
// Wait for the replication queue size to be 1. This means that we have handled
// 0 length wal from oldWALs directory.
Waiter.waitFor(localConf, 10000,
(Waiter.Predicate<Exception>) () -> logQueue.getQueueSize(fakeWalGroupId) == 1);
}
}