Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -328,34 +328,37 @@ public Void run() throws Exception {

@VisibleForTesting
public long doTailEdits() throws IOException, InterruptedException {
Collection<EditLogInputStream> streams;
FSImage image = namesystem.getFSImage();

long lastTxnId = image.getLastAppliedTxId();
LOG.debug("lastTxnId: {}", lastTxnId);
long startTime = timer.monotonicNow();
try {
streams = editLog.selectInputStreams(lastTxnId + 1, 0,
null, inProgressOk, true);
} catch (IOException ioe) {
// This is acceptable. If we try to tail edits in the middle of an edits
// log roll, i.e. the last one has been finalized but the new inprogress
// edits file hasn't been started yet.
LOG.warn("Edits tailer failed to find any streams. Will try again " +
"later.", ioe);
return 0;
} finally {
NameNode.getNameNodeMetrics().addEditLogFetchTime(
timer.monotonicNow() - startTime);
}
// Write lock needs to be interruptible here because the
// transitionToActive RPC takes the write lock before calling
// tailer.stop() -- so if we're not interruptible, it will
// deadlock.
namesystem.writeLockInterruptibly();
try {
FSImage image = namesystem.getFSImage();

long lastTxnId = image.getLastAppliedTxId();

if (LOG.isDebugEnabled()) {
LOG.debug("lastTxnId: " + lastTxnId);
}
Collection<EditLogInputStream> streams;
long startTime = timer.monotonicNow();
try {
streams = editLog.selectInputStreams(lastTxnId + 1, 0,
null, inProgressOk, true);
} catch (IOException ioe) {
// This is acceptable. If we try to tail edits in the middle of an edits
// log roll, i.e. the last one has been finalized but the new inprogress
// edits file hasn't been started yet.
LOG.warn("Edits tailer failed to find any streams. Will try again " +
"later.", ioe);
long currentLastTxnId = image.getLastAppliedTxId();
if (lastTxnId != currentLastTxnId) {
LOG.warn("The currentLastTxnId({}) is different from preLastTxtId({})",
currentLastTxnId, lastTxnId);
return 0;
} finally {
NameNode.getNameNodeMetrics().addEditLogFetchTime(
timer.monotonicNow() - startTime);
}
if (LOG.isDebugEnabled()) {
LOG.debug("edit streams to load from: " + streams.size());
Expand Down