diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java index e3c4f87f1f67..b67ff53cf3bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java @@ -81,6 +81,7 @@ public class ReplicationSourceWALReaderThread extends Thread { private long totalBufferQuota; private ReplicationSource source; + private ReplicationSourceManager manager; /** * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the @@ -121,6 +122,7 @@ public ReplicationSourceWALReaderThread(ReplicationSourceManager manager, this.metrics = metrics; this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); this.source = source; + this.manager = manager; LOG.info("peerClusterZnode=" + replicationQueueInfo.getPeerClusterZnode() + ", ReplicationSourceWALReaderThread : " + replicationQueueInfo.getPeerId() + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity @@ -229,7 +231,10 @@ private void resetStream(WALEntryStream stream) throws IOException { // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is // enabled, then dump the log private void handleEofException(Exception e) { - if (e.getCause() instanceof EOFException && logQueue.size() > 1 + boolean isRecoveredSource = manager.getOldSources().contains(source); + // Dump the log even if logQueue size is 1 if the source is from recovered Source since we don't + // add current log to recovered source queue so it is safe to remove. + if (e.getCause() instanceof EOFException && (isRecoveredSource || logQueue.size() > 1) && conf.getBoolean("replication.source.eof.autorecovery", false)) { try { if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 1828ad8f9cf7..eaf7e0a316a5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -49,6 +50,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -692,4 +694,40 @@ public WALEntryBatch call() throws Exception { assertEquals(walPath, entryBatch.getLastWalPath()); assertEquals(3, entryBatch.getNbRowKeys()); } + + /* + Test removal of 0 length log from logQueue if the source is a recovered source and + size of logQueue is only 1. + */ + @Test + public void testEOFExceptionForRecoveredQueue() throws Exception { + PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); + // Create a 0 length log. + Path emptyLog = new Path("emptyLog"); + FSDataOutputStream fsdos = fs.create(emptyLog); + fsdos.close(); + assertEquals(0, fs.getFileStatus(emptyLog).getLen()); + queue.add(emptyLog); + + ReplicationSource source = Mockito.mock(ReplicationSource.class); + + ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class); + // Make it look like the source is from recovered source. + when(mockSourceManager.getOldSources()) + .thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface)source))); + when(source.isPeerEnabled()).thenReturn(true); + when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + // Override the max retries multiplier to fail fast. + conf.setInt("replication.source.maxretriesmultiplier", 1); + conf.setBoolean("replication.source.eof.autorecovery", true); + // Create a reader thread. + ReplicationSourceWALReaderThread reader = + new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(), + queue, 0, fs, conf, getDummyFilter(), + new MetricsSource("1"), (ReplicationSource) source); + reader.run(); + // ReplicationSourceWALReaderThread#handleEofException method will + // remove empty log from logQueue. + assertEquals(0, queue.size()); + } }