Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,13 @@ public void run() {
}
}
} catch (IOException e) { // stream related
if (sleepMultiplier < maxRetriesMultiplier) {
LOG.debug("Failed to read stream of replication entries: " + e);
sleepMultiplier++;
} else {
LOG.error("Failed to read stream of replication entries", e);
handleEofException(e);
if (!handleEofException(e)) {
LOG.warn("Failed to read stream of replication entries", e);
if (sleepMultiplier < maxRetriesMultiplier) {
sleepMultiplier ++;
}
Threads.sleep(sleepForRetries * sleepMultiplier);
}
Threads.sleep(sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads");
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -245,25 +244,30 @@ private void handleEmptyWALEntryBatch() throws InterruptedException {
}
}

// if we get an EOF due to a zero-length log, and there are other logs in queue
// (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
// enabled, then dump the log
private void handleEofException(IOException e) {
/**
* if we get an EOF due to a zero-length log, and there are other logs in queue
* (highly likely we've closed the current log), and autorecovery is
* enabled, then dump the log
* @return true only the IOE can be handled
*/
private boolean handleEofException(IOException e) {
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
// Dump the log even if logQueue size is 1 if the source is from recovered Source
// since we don't add current log to recovered source queue so it is safe to remove.
if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
(source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
try {
if (fs.getFileStatus(queue.peek()).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: " + queue.peek());
LOG.warn("Forcing removal of 0 length log in queue: {}", queue.peek());
logQueue.remove(walGroupId);
currentPosition = 0;
return true;
}
} catch (IOException ioe) {
LOG.warn("Couldn't get file length information about log " + queue.peek());
LOG.warn("Couldn't get file length information about log {}", queue.peek());
}
}
return false;
}

public Path getCurrentPath() {
Expand Down