Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -124,10 +125,12 @@ public void run() {
int sleepMultiplier = 1;
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
WALEntryBatch batch = null;
try (WALEntryStream entryStream =
WALEntryStream entryStream = null;
try {
entryStream =
new WALEntryStream(logQueue, conf, currentPosition,
source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
source.getSourceMetrics(), walGroupId)) {
source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
source.getSourceMetrics(), walGroupId);
while (isReaderRunning()) { // loop here to keep reusing stream while we can
batch = null;
if (!source.isPeerEnabled()) {
Expand Down Expand Up @@ -156,7 +159,7 @@ public void run() {
sleepMultiplier = 1;
}
} catch (WALEntryFilterRetryableException | IOException e) { // stream related
if (!handleEofException(e, batch)) {
if (!handleEofException(e, batch, entryStream)) {
LOG.warn("Failed to read stream of replication entries", e);
if (sleepMultiplier < maxRetriesMultiplier) {
sleepMultiplier++;
Expand All @@ -166,6 +169,9 @@ public void run() {
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads or adding WAL batch to ship queue");
Thread.currentThread().interrupt();
} finally {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the advantage here to not use try-with-resources but a try finally?

Copy link
Contributor Author

@shahrs87 shahrs87 Jul 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to use entryStream object in handleEofException method. If I use try-wth-resources, I don't have access to entryStream in catch block.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so if we could move the main logic in WALEntryStream.getArchivedLog to a util class, then we do not need try finally then? In general, I do not think we should expose WALEntryStream.getArchivedLog directly. Let's add a public static method in AbstractFSWALProvider, just below the getWALArchiveDirectoryName method. I think this is the right place for holding a public method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @Apache9 for the review. Didn't know a utility method AbstractFSWALProvider#getArchivedLogPath already exists.
Removed all the changes to make WALEntryStream class. Please review again.

IOUtils.closeQuietly(entryStream,
e -> LOG.warn("Exception while closing WALEntryStream", e));
}
}
}
Expand Down Expand Up @@ -268,17 +274,20 @@ private WALEntryBatch tryAdvanceStreamAndCreateWALBatch(WALEntryStream entryStre
* logs from replication queue
* @return true only the IOE can be handled
*/
private boolean handleEofException(Exception e, WALEntryBatch batch) {
private boolean handleEofException(Exception e, WALEntryBatch batch, WALEntryStream entryStream) {
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
// Dump the log even if logQueue size is 1 if the source is from recovered Source
// since we don't add current log to recovered source queue so it is safe to remove.
if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
(source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
Path head = queue.peek();
Path path = queue.peek();
try {
if (fs.getFileStatus(head).getLen() == 0) {
// head of the queue is an empty log file
LOG.warn("Forcing removal of 0 length log in queue: {}", head);
if (!fs.exists(path)) {
// There is a chance that wal has moved to oldWALs directory, so look there also.
path = entryStream.getArchivedLog(path);
}
if (fs.getFileStatus(path).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: {}", path);
logQueue.remove(walGroupId);
currentPosition = 0;
if (batch != null) {
Expand All @@ -289,7 +298,7 @@ private boolean handleEofException(Exception e, WALEntryBatch batch) {
return true;
}
} catch (IOException ioe) {
LOG.warn("Couldn't get file length information about log " + queue.peek(), ioe);
LOG.warn("Couldn't get file length information about log " + path, ioe);
} catch (InterruptedException ie) {
LOG.trace("Interrupted while adding WAL batch to ship queue");
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,8 @@ private boolean openNextLog() throws IOException {
return false;
}

private Path getArchivedLog(Path path) throws IOException {
Path getArchivedLog(Path path) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this method to a util class so we do not need to change the modifier and also do not need to pass the WALEntryStream to the handleEofException?

Path walRootDir = CommonFSUtils.getWALRootDir(conf);

// Try found the log in old dir
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path archivedLogLocation = new Path(oldLogDir, path.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
Expand Down Expand Up @@ -716,4 +717,52 @@ public void testCleanClosedWALs() throws Exception {
assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
}
}

/**
* Tests that we handle EOFException properly if the wal has moved to oldWALs directory.
* @throws Exception exception
*/
@Test
public void testEOFExceptionInOldWALsDirectory() throws Exception {
assertEquals(1, logQueue.getQueueSize(fakeWalGroupId));
AbstractFSWAL abstractWAL = (AbstractFSWAL)log;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We don't need this down cast, do we?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bharathv It is needed. We are getting the current wal file name via AbstractFSWAL#getCurrentFileName so that we can truncate that wal file to create 0 size wal file. WAL class doesn't have this method.

Path emptyLogFile = abstractWAL.getCurrentFileName();
log.rollWriter(true);

// AsyncFSWAl and FSHLog both moves the log from WALs to oldWALs directory asynchronously.
// Wait for in flight wal close count to become 0. This makes sure that empty wal is moved to
// oldWALs directory.
Waiter.waitFor(CONF, 5000,
(Waiter.Predicate<Exception>) () ->abstractWAL.getInflightWALCloseCount() == 0);
// There will 2 logs in the queue.
assertEquals(2, logQueue.getQueueSize(fakeWalGroupId));

Configuration localConf = new Configuration(CONF);
localConf.setInt("replication.source.maxretriesmultiplier", 1);
localConf.setBoolean("replication.source.eof.autorecovery", true);

try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, localConf, 0, log,
null, logQueue.getMetrics(), fakeWalGroupId)) {
// Get the archived dir path for the first wal.
Path archivePath = entryStream.getArchivedLog(emptyLogFile);
// Make sure that the wal path is not the same as archived Dir path.
assertNotEquals(emptyLogFile.toString(), archivePath.toString());
assertTrue(fs.exists(archivePath));
fs.truncate(archivePath, 0);
// make sure the size of the wal file is 0.
assertEquals(0, fs.getFileStatus(archivePath).getLen());
}

ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.isPeerEnabled()).thenReturn(true);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));

// Start the reader thread.
createReader(false, localConf);
// Wait for the replication queue size to be 1. This means that we have handled
// 0 length wal from oldWALs directory.
Waiter.waitFor(localConf, 10000,
(Waiter.Predicate<Exception>) () -> logQueue.getQueueSize(fakeWalGroupId) == 1);
}
}