diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufWALTailingReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufWALTailingReader.java index 62091acdd1db..6cf141d7053e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufWALTailingReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufWALTailingReader.java @@ -46,7 +46,7 @@ public class ProtobufWALTailingReader extends AbstractProtobufWALReader implements WALTailingReader { - private static final Logger LOG = LoggerFactory.getLogger(ProtobufWALStreamReader.class); + private static final Logger LOG = LoggerFactory.getLogger(ProtobufWALTailingReader.class); private DelegatingInputStream delegatingInput; @@ -117,8 +117,7 @@ private ReadWALKeyResult readWALKey(long originalPosition) { return KEY_ERROR_AND_RESET; } if (available > 0 && available < size) { - LOG.info( - "Available stream not enough for edit, available={}, " + "entry size={} at offset={}", + LOG.info("Available stream not enough for edit, available={}, entry size={} at offset={}", available, size, getPositionQuietly()); return KEY_EOF_AND_RESET; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 1b8562bab330..60212f5ea1a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -205,6 +205,15 @@ private void setCurrentPath(Path path) { this.currentPath = path; } + private void resetReader() throws IOException { + if (currentPositionOfEntry > 0) { + reader.resetTo(currentPositionOfEntry, state.resetCompression()); + } else { + // we will read from the beginning so we should always clear the compression context + reader.resetTo(-1, true); + } + } + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION", justification = "HDFS-4380") private HasNext prepareReader() { @@ -214,12 +223,7 @@ private HasNext prepareReader() { LOG.debug("Reset reader {} to pos {}, reset compression={}", currentPath, currentPositionOfEntry, state.resetCompression()); try { - if (currentPositionOfEntry > 0) { - reader.resetTo(currentPositionOfEntry, state.resetCompression()); - } else { - // we will read from the beginning so we should always clear the compression context - reader.resetTo(-1, true); - } + resetReader(); return HasNext.YES; } catch (FileNotFoundException e) { // For now, this could happen only when reading meta wal for meta replicas. @@ -295,7 +299,7 @@ private HasNext lastAttempt() { LOG.debug("Reset reader {} for the last time to pos {}, reset compression={}", currentPath, currentPositionOfEntry, state.resetCompression()); try { - reader.resetTo(currentPositionOfEntry, state.resetCompression()); + resetReader(); } catch (IOException e) { LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath, currentPositionOfEntry, state.resetCompression(), e); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java index 2906c922abc1..b1e2f2c16341 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -45,7 +46,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -56,6 +59,7 @@ import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.HasNext; @@ -63,7 +67,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKeyImpl; @@ -76,6 +79,7 @@ import org.mockito.Mockito; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase { @@ -93,7 +97,7 @@ public void setUp() throws Exception { initWAL(); } - private Entry next(WALEntryStream entryStream) { + private WAL.Entry next(WALEntryStream entryStream) { assertEquals(HasNext.YES, entryStream.hasNext()); return entryStream.next(); } @@ -562,7 +566,7 @@ private WALEntryFilter getDummyFilter() { return new WALEntryFilter() { @Override - public Entry filter(Entry entry) { + public WAL.Entry filter(WAL.Entry entry) { return entry; } }; @@ -581,7 +585,7 @@ public FailingWALEntryFilter(int numFailuresInFilter) { } @Override - public Entry filter(Entry entry) { + public WAL.Entry filter(WAL.Entry entry) { if (countFailures == numFailures) { return entry; } @@ -839,6 +843,44 @@ public void testReplicationSourceWALReaderWithPartialWALEntryFailingFilter() thr assertNull(reader.poll(10)); } + // testcase for HBASE-28748 + @Test + public void testWALEntryStreamEOFRightAfterHeader() throws Exception { + assertEquals(1, logQueue.getQueueSize(fakeWalGroupId)); + AbstractFSWAL abstractWAL = (AbstractFSWAL) log; + Path emptyLogFile = abstractWAL.getCurrentFileName(); + log.rollWriter(true); + + // AsyncFSWAl and FSHLog both moves the log from WALs to oldWALs directory asynchronously. + // Wait for in flight wal close count to become 0. This makes sure that empty wal is moved to + // oldWALs directory. + Waiter.waitFor(CONF, 5000, + (Waiter.Predicate) () -> abstractWAL.getInflightWALCloseCount() == 0); + // There will 2 logs in the queue. + assertEquals(2, logQueue.getQueueSize(fakeWalGroupId)); + appendToLogAndSync(); + + Path archivedEmptyLogFile = AbstractFSWALProvider.findArchivedLog(emptyLogFile, CONF); + + // read the wal header + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + bos.write(AbstractProtobufWALReader.PB_WAL_MAGIC); + try (FSDataInputStream in = fs.open(archivedEmptyLogFile)) { + IOUtils.skipFully(in, AbstractProtobufWALReader.PB_WAL_MAGIC.length); + WALHeader header = WALHeader.parseDelimitedFrom(in); + header.writeDelimitedTo(bos); + } + // truncate the first empty log so we have an incomplete header + try (FSDataOutputStream out = fs.create(archivedEmptyLogFile, true)) { + bos.writeTo(out); + } + try (WALEntryStream entryStream = + new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) { + assertEquals(HasNext.RETRY_IMMEDIATELY, entryStream.hasNext()); + assertNotNull(next(entryStream)); + } + } + private static class PartialWALEntryFailingWALEntryFilter implements WALEntryFilter { private int filteredWALEntryCount = -1; private int walEntryCount = 0; @@ -851,7 +893,7 @@ public PartialWALEntryFailingWALEntryFilter(int throwExceptionLimit, int walEntr } @Override - public Entry filter(Entry entry) { + public WAL.Entry filter(WAL.Entry entry) { filteredWALEntryCount++; if (filteredWALEntryCount < walEntryCount - 1) { return entry;