Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ public void enqueueLog(Path log) {
}
}

@InterfaceAudience.Private
public Map<String, PriorityBlockingQueue<Path>> getQueues() {
return logQueue.getQueues();
}

@Override
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public ReplicationSourceManager(final ReplicationQueues replicationQueues,
* position. It will also clean old logs from the queue.
* @param log Path to the log currently being replicated from
* replication status in zookeeper. It will also delete older entries.
* @param id id of the peer cluster
* @param id id of the replication queue
* @param position current location in the log
* @param queueRecovered indicates if this queue comes from another region server
* @param holdLogInZK if true then the log is retained in ZK
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
import org.apache.hadoop.hbase.wal.WALKey;

/**
* Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue
* Reads and filters WAL entries, groups the filtered entries into batches,
* and puts the batches onto a queue
*
*/
@InterfaceAudience.Private
Expand Down Expand Up @@ -88,7 +89,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
* entries, and puts them on a batch queue.
* @param manager replication manager
* @param replicationQueueInfo
* @param replicationQueueInfo replication queue info
* @param logQueue The WAL queue to read off of
* @param startPosition position in the first WAL to start reading from
* @param fs the files system to use
Expand Down Expand Up @@ -135,71 +136,128 @@ public ReplicationSourceWALReaderThread(ReplicationSourceManager manager,
@Override
public void run() {
int sleepMultiplier = 1;
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics, walGroupId)) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!source.isPeerEnabled()) {
Threads.sleep(sleepForRetries);
continue;
}
if (!checkQuota()) {
continue;
}
WALEntryBatch batch = new WALEntryBatch(replicationBatchCountCapacity);
boolean hasNext;
while ((hasNext = entryStream.hasNext()) == true) {
Entry entry = entryStream.next();
entry = filterEntry(entry);
if (entry != null) {
WALEdit edit = entry.getEdit();
if (edit != null && !edit.isEmpty()) {
long entrySize = getEntrySizeIncludeBulkLoad(entry);
long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
batch.addEntry(entry, entrySize);
updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
// Stop if too many entries or too big
if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
WALEntryBatch batch = null;
WALEntryStream entryStream =
new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics, walGroupId);
try {
while (isReaderRunning()) { // we only loop back here if something fatal happens to stream
try {
entryStream = new WALEntryStream(logQueue, fs, conf,
lastReadPosition, metrics, walGroupId);
while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!source.isPeerEnabled()) {
Threads.sleep(sleepForRetries);
continue;
}
if (!checkQuota()) {
continue;
}
batch = new WALEntryBatch(replicationBatchCountCapacity);
boolean hasNext = entryStream.hasNext();
while (hasNext) {
Entry entry = entryStream.next();
entry = filterEntry(entry);
if (entry != null) {
WALEdit edit = entry.getEdit();
if (edit != null && !edit.isEmpty()) {
long entrySize = getEntrySizeIncludeBulkLoad(entry);
long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
batch.addEntry(entry, entrySize);
updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
// Stop if too many entries or too big
if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
|| batch.getNbEntries() >= replicationBatchCountCapacity) {
break;
break;
}
}
}
hasNext = entryStream.hasNext();
}
}

updateBatch(entryStream, batch, hasNext);
if (isShippable(batch)) {
sleepMultiplier = 1;
entryBatchQueue.put(batch);
if (!batch.hasMoreEntries()) {
// we're done with queue recovery, shut ourselves down
setReaderRunning(false);
// If the batch has data to max capacity or stream doesn't have anything
// try to ship it
if (updateBatchAndShippingQueue(entryStream, batch, hasNext, false)) {
sleepMultiplier = 1;
}
}
} catch (IOException | WALEntryStreamRuntimeException e) { // stream related
if (handleEofException(e, entryStream, batch)) {
sleepMultiplier = 1;
} else {
Thread.sleep(sleepForRetries);
if (sleepMultiplier < maxRetriesMultiplier) {
LOG.debug("Failed to read stream of replication entries: " + e);
sleepMultiplier++;
} else {
LOG.error("Failed to read stream of replication entries", e);
}
Threads.sleep(sleepForRetries * sleepMultiplier);
}
resetStream(entryStream);
}
} catch (IOException | WALEntryStreamRuntimeException e) { // stream related
if (sleepMultiplier < maxRetriesMultiplier) {
LOG.debug("Failed to read stream of replication entries: " + e);
sleepMultiplier++;
} else {
LOG.error("Failed to read stream of replication entries", e);
handleEofException(e);
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads");
Thread.currentThread().interrupt();
} finally {
entryStream.close();
}
Threads.sleep(sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads");
Thread.currentThread().interrupt();
}
} catch (IOException e) {
if (sleepMultiplier < maxRetriesMultiplier) {
LOG.debug("Failed to read stream of replication entries: " + e);
sleepMultiplier++;
} else {
LOG.error("Failed to read stream of replication entries", e);
}
Threads.sleep(sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads");
Thread.currentThread().interrupt();
}
}

/**
* Update the batch try to ship and return true if shipped
* @param entryStream stream of the WALs
* @param batch Batch of entries to ship
* @param hasMoreData if the stream has more yet more data to read
* @param isEOFException if we have hit the EOF exception before this. For EOF exception,
* we do not want to reset the stream since entry stream doesn't
* have correct information.
* @return if batch is shipped successfully
* @throws InterruptedException throws interrupted exception
* @throws IOException throws io exception from stream
*/
private boolean updateBatchAndShippingQueue(WALEntryStream entryStream, WALEntryBatch batch,
boolean hasMoreData, boolean isEOFException) throws InterruptedException, IOException {
updateBatch(entryStream, batch, hasMoreData, isEOFException);
boolean isDataQueued = false;
if (isShippable(batch)) {
isDataQueued = true;
entryBatchQueue.put(batch);
if (!batch.hasMoreEntries()) {
// we're done with queue recovery, shut ourselves down
LOG.debug("Stopping the reader after recovering the queue");
setReaderRunning(false);
}
} else {
Thread.sleep(sleepForRetries);
}

if (!isEOFException) {
resetStream(entryStream);
}
return isDataQueued;
}

private void updateBatch(WALEntryStream entryStream, WALEntryBatch batch, boolean moreData) {
private void updateBatch(WALEntryStream entryStream, WALEntryBatch batch, boolean moreData,
boolean isEOFException) {
logMessage(batch);
batch.updatePosition(entryStream);
// In case of EOF exception we can utilize the last read path and position
// since we do not have the current information.
if (isEOFException) {
batch.updatePosition(lastReadPath, lastReadPosition);
} else {
batch.updatePosition(entryStream.getCurrentPath(), entryStream.getPosition());
}
batch.setMoreEntries(!replicationQueueInfo.isQueueRecovered() || moreData);
}

Expand Down Expand Up @@ -229,10 +287,18 @@ private void resetStream(WALEntryStream stream) throws IOException {
stream.reset(); // reuse stream
}

// if we get an EOF due to a zero-length log, and there are other logs in queue
// (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
// enabled, then dump the log
private void handleEofException(Exception e) {
/**
* This is to handle the EOFException from the WAL entry stream. EOFException should
* be handled carefully because there are chances of data loss because of never replicating
* the data.
* If EOFException happens on the last log in recovered queue, we can safely stop
* the reader.
* If EOException doesn't happen on the last log in recovered queue, we should
* not stop the reader.
* @return true only the IOE can be handled
*/
private boolean handleEofException(Exception e, WALEntryStream entryStream,
WALEntryBatch batch) throws InterruptedException {
boolean isRecoveredSource = manager.getOldSources().contains(source);
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
// Dump the log even if logQueue size is 1 if the source is from recovered Source since we don't
Expand All @@ -245,11 +311,22 @@ private void handleEofException(Exception e) {
lastReadPath = queue.peek();
logQueue.remove(walGroupId);
lastReadPosition = 0;

// If it was on last log in the recovered queue,
// the stream doesn't have more data, we should stop the reader
boolean hasMoreData = !queue.isEmpty();
// After we removed the WAL from the queue, we should
// try shipping the existing batch of entries, we do not want to reset
// stream since entry stream doesn't have the correct data at this point
updateBatchAndShippingQueue(entryStream, batch, hasMoreData, true);
return true;
}
} catch (IOException ioe) {
LOG.warn("Couldn't get file length information about log " + queue.peek());
}
}

return false;
}

public Path getCurrentPath() {
Expand Down Expand Up @@ -299,7 +376,8 @@ public long getEntrySizeExcludeBulkLoad(Entry entry) {
return edit.heapSize() + key.estimatedSerializedSizeOf();
}

private void updateBatchStats(WALEntryBatch batch, Entry entry, long entryPosition, long entrySize) {
private void updateBatchStats(WALEntryBatch batch, Entry entry,
long entryPosition, long entrySize) {
WALEdit edit = entry.getEdit();
if (edit != null && !edit.isEmpty()) {
batch.incrementHeapSize(entrySize);
Expand Down Expand Up @@ -409,7 +487,7 @@ public long getLastReadPosition() {
* Holds a batch of WAL entries to replicate, along with some statistics
*
*/
static class WALEntryBatch {
final static class WALEntryBatch {
private List<Pair<Entry, Long>> walEntriesWithSize;
// last WAL that was read
private Path lastWalPath;
Expand Down Expand Up @@ -515,9 +593,15 @@ public boolean isEmpty() {
return walEntriesWithSize.isEmpty();
}

public void updatePosition(WALEntryStream entryStream) {
lastWalPath = entryStream.getCurrentPath();
lastWalPosition = entryStream.getPosition();
/**
* Update the wal entry batch with latest wal and position which will be used by
* shipper to update the log position in ZK node
* @param currentPath the path of WAL
* @param currentPosition the position of the WAL
*/
public void updatePosition(Path currentPath, long currentPosition) {
lastWalPath = currentPath;
lastWalPosition = currentPosition;
}

public boolean hasMoreEntries() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,25 +72,23 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
* @param conf {@link Configuration} to use to create {@link Reader} for this stream
* @param metrics replication metrics
* @param walGroupId wal prefix
* @throws IOException
*/
public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf,
MetricsSource metrics, String walGroupId)
throws IOException {
MetricsSource metrics, String walGroupId) {
this(logQueue, fs, conf, 0, metrics, walGroupId);
}

/**
* Create an entry stream over the given queue at the given start position
* @param logQueue the queue of WAL paths
* @param fs {@link FileSystem} to use to create {@link Reader} for this stream
* @param conf the {@link Configuration} to use to create {@link Reader} for this stream
* @param startPosition the position in the first WAL to start reading at
* @param metrics the replication metrics
* @param walGroupId wal prefix
* @throws IOException
*/
public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf,
long startPosition, MetricsSource metrics, String walGroupId) throws IOException {
long startPosition, MetricsSource metrics, String walGroupId) {
this.logQueue = logQueue;
this.fs = fs;
this.conf = conf;
Expand Down Expand Up @@ -122,7 +120,9 @@ public boolean hasNext() {
*/
@Override
public Entry next() {
if (!hasNext()) throw new NoSuchElementException();
if (!hasNext()) {
throw new NoSuchElementException();
}
Entry save = currentEntry;
currentEntry = null; // gets reloaded by hasNext()
return save;
Expand Down Expand Up @@ -180,7 +180,7 @@ private String getCurrentPathStat() {
/**
* Should be called if the stream is to be reused (i.e. used again after hasNext() has returned
* false)
* @throws IOException
* @throws IOException io exception while resetting the reader
*/
public void reset() throws IOException {
if (reader != null && currentPath != null) {
Expand Down Expand Up @@ -306,7 +306,9 @@ private boolean openNextLog() throws IOException {
Path nextPath = queue.peek();
if (nextPath != null) {
openReader(nextPath);
if (reader != null) return true;
if (reader != null) {
return true;
}
}
return false;
}
Expand Down Expand Up @@ -349,7 +351,9 @@ private void openReader(Path path) throws IOException {
handleFileNotFound(path, fnfe);
} catch (RemoteException re) {
IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
if (!(ioe instanceof FileNotFoundException)) throw ioe;
if (!(ioe instanceof FileNotFoundException)) {
throw ioe;
}
handleFileNotFound(path, (FileNotFoundException)ioe);
} catch (LeaseNotRecoveredException lnre) {
// HBASE-15019 the WAL was not closed due to some hiccup.
Expand Down
Loading