Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
private long totalBufferQuota;

private ReplicationSource source;
private ReplicationSourceManager manager;

/**
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
Expand Down Expand Up @@ -121,6 +122,7 @@ public ReplicationSourceWALReaderThread(ReplicationSourceManager manager,
this.metrics = metrics;
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
this.source = source;
this.manager = manager;
LOG.info("peerClusterZnode=" + replicationQueueInfo.getPeerClusterZnode()
+ ", ReplicationSourceWALReaderThread : " + replicationQueueInfo.getPeerId()
+ " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
Expand Down Expand Up @@ -229,7 +231,10 @@ private void resetStream(WALEntryStream stream) throws IOException {
// (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
// enabled, then dump the log
private void handleEofException(Exception e) {
if (e.getCause() instanceof EOFException && logQueue.size() > 1
boolean isRecoveredSource = manager.getOldSources().contains(source);
// Dump the log even if logQueue size is 1 if the source is from recovered Source since we don't
// add current log to recovered source queue so it is safe to remove.
if (e.getCause() instanceof EOFException && (isRecoveredSource || logQueue.size() > 1)
&& conf.getBoolean("replication.source.eof.autorecovery", false)) {
try {
if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -49,6 +50,7 @@
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
Expand Down Expand Up @@ -692,4 +694,40 @@ public WALEntryBatch call() throws Exception {
assertEquals(walPath, entryBatch.getLastWalPath());
assertEquals(3, entryBatch.getNbRowKeys());
}

/*
Test removal of 0 length log from logQueue if the source is a recovered source and
size of logQueue is only 1.
*/
@Test
public void testEOFExceptionForRecoveredQueue() throws Exception {
PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
// Create a 0 length log.
Path emptyLog = new Path("emptyLog");
FSDataOutputStream fsdos = fs.create(emptyLog);
fsdos.close();
assertEquals(0, fs.getFileStatus(emptyLog).getLen());
queue.add(emptyLog);

ReplicationSource source = Mockito.mock(ReplicationSource.class);

ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
// Make it look like the source is from recovered source.
when(mockSourceManager.getOldSources())
.thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface)source)));
when(source.isPeerEnabled()).thenReturn(true);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
// Override the max retries multiplier to fail fast.
conf.setInt("replication.source.maxretriesmultiplier", 1);
conf.setBoolean("replication.source.eof.autorecovery", true);
// Create a reader thread.
ReplicationSourceWALReaderThread reader =
new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(),
queue, 0, fs, conf, getDummyFilter(),
new MetricsSource("1"), (ReplicationSource) source);
reader.run();
// ReplicationSourceWALReaderThread#handleEofException method will
// remove empty log from logQueue.
assertEquals(0, queue.size());
}
}