Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
public class ProtobufWALTailingReader extends AbstractProtobufWALReader
implements WALTailingReader {

private static final Logger LOG = LoggerFactory.getLogger(ProtobufWALStreamReader.class);
private static final Logger LOG = LoggerFactory.getLogger(ProtobufWALTailingReader.class);

private DelegatingInputStream delegatingInput;

Expand Down Expand Up @@ -117,8 +117,7 @@ private ReadWALKeyResult readWALKey(long originalPosition) {
return KEY_ERROR_AND_RESET;
}
if (available > 0 && available < size) {
LOG.info(
"Available stream not enough for edit, available={}, " + "entry size={} at offset={}",
LOG.info("Available stream not enough for edit, available={}, entry size={} at offset={}",
available, size, getPositionQuietly());
return KEY_EOF_AND_RESET;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,15 @@ private void setCurrentPath(Path path) {
this.currentPath = path;
}

private void resetReader() throws IOException {
if (currentPositionOfEntry > 0) {
reader.resetTo(currentPositionOfEntry, state.resetCompression());
} else {
// we will read from the beginning so we should always clear the compression context
reader.resetTo(-1, true);
}
}

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
justification = "HDFS-4380")
private HasNext prepareReader() {
Expand All @@ -214,12 +223,7 @@ private HasNext prepareReader() {
LOG.debug("Reset reader {} to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression());
try {
if (currentPositionOfEntry > 0) {
reader.resetTo(currentPositionOfEntry, state.resetCompression());
} else {
// we will read from the beginning so we should always clear the compression context
reader.resetTo(-1, true);
}
resetReader();
return HasNext.YES;
} catch (FileNotFoundException e) {
// For now, this could happen only when reading meta wal for meta replicas.
Expand Down Expand Up @@ -295,7 +299,7 @@ private HasNext lastAttempt() {
LOG.debug("Reset reader {} for the last time to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression());
try {
reader.resetTo(currentPositionOfEntry, state.resetCompression());
resetReader();
} catch (IOException e) {
LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -45,7 +46,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
Expand All @@ -56,14 +59,14 @@
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.HasNext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
Expand All @@ -76,6 +79,7 @@
import org.mockito.Mockito;

import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;

public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {

Expand All @@ -93,7 +97,7 @@ public void setUp() throws Exception {
initWAL();
}

private Entry next(WALEntryStream entryStream) {
private WAL.Entry next(WALEntryStream entryStream) {
assertEquals(HasNext.YES, entryStream.hasNext());
return entryStream.next();
}
Expand Down Expand Up @@ -562,7 +566,7 @@ private WALEntryFilter getDummyFilter() {
return new WALEntryFilter() {

@Override
public Entry filter(Entry entry) {
public WAL.Entry filter(WAL.Entry entry) {
return entry;
}
};
Expand All @@ -581,7 +585,7 @@ public FailingWALEntryFilter(int numFailuresInFilter) {
}

@Override
public Entry filter(Entry entry) {
public WAL.Entry filter(WAL.Entry entry) {
if (countFailures == numFailures) {
return entry;
}
Expand Down Expand Up @@ -839,6 +843,44 @@ public void testReplicationSourceWALReaderWithPartialWALEntryFailingFilter() thr
assertNull(reader.poll(10));
}

// testcase for HBASE-28748
@Test
public void testWALEntryStreamEOFRightAfterHeader() throws Exception {
assertEquals(1, logQueue.getQueueSize(fakeWalGroupId));
AbstractFSWAL<?> abstractWAL = (AbstractFSWAL<?>) log;
Path emptyLogFile = abstractWAL.getCurrentFileName();
log.rollWriter(true);

// AsyncFSWAl and FSHLog both moves the log from WALs to oldWALs directory asynchronously.
// Wait for in flight wal close count to become 0. This makes sure that empty wal is moved to
// oldWALs directory.
Waiter.waitFor(CONF, 5000,
(Waiter.Predicate<Exception>) () -> abstractWAL.getInflightWALCloseCount() == 0);
// There will 2 logs in the queue.
assertEquals(2, logQueue.getQueueSize(fakeWalGroupId));
appendToLogAndSync();

Path archivedEmptyLogFile = AbstractFSWALProvider.findArchivedLog(emptyLogFile, CONF);

// read the wal header
ByteArrayOutputStream bos = new ByteArrayOutputStream();
bos.write(AbstractProtobufWALReader.PB_WAL_MAGIC);
try (FSDataInputStream in = fs.open(archivedEmptyLogFile)) {
IOUtils.skipFully(in, AbstractProtobufWALReader.PB_WAL_MAGIC.length);
WALHeader header = WALHeader.parseDelimitedFrom(in);
header.writeDelimitedTo(bos);
}
// truncate the first empty log so we have an incomplete header
try (FSDataOutputStream out = fs.create(archivedEmptyLogFile, true)) {
bos.writeTo(out);
}
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
assertEquals(HasNext.RETRY_IMMEDIATELY, entryStream.hasNext());
assertNotNull(next(entryStream));
}
}

private static class PartialWALEntryFailingWALEntryFilter implements WALEntryFilter {
private int filteredWALEntryCount = -1;
private int walEntryCount = 0;
Expand All @@ -851,7 +893,7 @@ public PartialWALEntryFailingWALEntryFilter(int throwExceptionLimit, int walEntr
}

@Override
public Entry filter(Entry entry) {
public WAL.Entry filter(WAL.Entry entry) {
filteredWALEntryCount++;
if (filteredWALEntryCount < walEntryCount - 1) {
return entry;
Expand Down