Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ public FakeTimer() {
nowNanos = TimeUnit.MILLISECONDS.toNanos(1000);
}

/**
* FakeTimer constructor with milliseconds to keep as initial value.
*
* @param time time in millis.
*/
public FakeTimer(long time) {
now = time;
nowNanos = TimeUnit.MILLISECONDS.toNanos(time);
}

@Override
public long now() {
return now;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import org.apache.hadoop.thirdparty.com.google.common.collect.Iterators;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
Expand All @@ -55,12 +56,10 @@
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.SecurityUtil;

import static org.apache.hadoop.util.Time.monotonicNow;
import static org.apache.hadoop.util.ExitUtil.terminate;

import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.util.Time;


/**
Expand Down Expand Up @@ -172,14 +171,21 @@ public class EditLogTailer {
*/
private final long maxTxnsPerLock;

/**
* Timer instance to be set only using constructor.
* Only tests can reassign this by using setTimerForTests().
* For source code, this timer instance should be treated as final.
*/
private Timer timer;

public EditLogTailer(FSNamesystem namesystem, Configuration conf) {
this.tailerThread = new EditLogTailerThread();
this.conf = conf;
this.namesystem = namesystem;
this.timer = new Timer();
this.editLog = namesystem.getEditLog();

lastLoadTimeMs = monotonicNow();
lastRollTimeMs = monotonicNow();
this.lastLoadTimeMs = timer.monotonicNow();
this.lastRollTimeMs = timer.monotonicNow();

logRollPeriodMs = conf.getTimeDuration(
DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY,
Expand Down Expand Up @@ -301,7 +307,7 @@ public Void run() throws Exception {
long editsTailed = 0;
// Fully tail the journal to the end
do {
long startTime = Time.monotonicNow();
long startTime = timer.monotonicNow();
try {
NameNode.getNameNodeMetrics().addEditLogTailInterval(
startTime - lastLoadTimeMs);
Expand All @@ -312,7 +318,7 @@ public Void run() throws Exception {
throw new IOException(e);
} finally {
NameNode.getNameNodeMetrics().addEditLogTailTime(
Time.monotonicNow() - startTime);
timer.monotonicNow() - startTime);
}
} while(editsTailed > 0);
return null;
Expand All @@ -336,7 +342,7 @@ public long doTailEdits() throws IOException, InterruptedException {
LOG.debug("lastTxnId: " + lastTxnId);
}
Collection<EditLogInputStream> streams;
long startTime = Time.monotonicNow();
long startTime = timer.monotonicNow();
try {
streams = editLog.selectInputStreams(lastTxnId + 1, 0,
null, inProgressOk, true);
Expand All @@ -349,7 +355,7 @@ public long doTailEdits() throws IOException, InterruptedException {
return 0;
} finally {
NameNode.getNameNodeMetrics().addEditLogFetchTime(
Time.monotonicNow() - startTime);
timer.monotonicNow() - startTime);
}
if (LOG.isDebugEnabled()) {
LOG.debug("edit streams to load from: " + streams.size());
Expand All @@ -374,7 +380,7 @@ public long doTailEdits() throws IOException, InterruptedException {
}

if (editsLoaded > 0) {
lastLoadTimeMs = monotonicNow();
lastLoadTimeMs = timer.monotonicNow();
}
lastLoadedTxnId = image.getLastAppliedTxId();
return editsLoaded;
Expand All @@ -395,7 +401,7 @@ public long getLastLoadTimeMs() {
*/
private boolean tooLongSinceLastLoad() {
return logRollPeriodMs >= 0 &&
(monotonicNow() - lastRollTimeMs) > logRollPeriodMs;
(timer.monotonicNow() - lastRollTimeMs) > logRollPeriodMs;
}

/**
Expand Down Expand Up @@ -423,21 +429,40 @@ void triggerActiveLogRoll() {
try {
future = rollEditsRpcExecutor.submit(getNameNodeProxy());
future.get(rollEditsTimeoutMs, TimeUnit.MILLISECONDS);
lastRollTimeMs = monotonicNow();
this.lastRollTimeMs = timer.monotonicNow();
lastRollTriggerTxId = lastLoadedTxnId;
} catch (ExecutionException e) {
} catch (ExecutionException | InterruptedException e) {
LOG.warn("Unable to trigger a roll of the active NN", e);
} catch (TimeoutException e) {
if (future != null) {
future.cancel(true);
}
LOG.warn(String.format(
"Unable to finish rolling edits in %d ms", rollEditsTimeoutMs));
} catch (InterruptedException e) {
LOG.warn("Unable to trigger a roll of the active NN", e);
}
}

/**
* This is only to be used by tests. For source code, the only way to
* set timer is by using EditLogTailer constructor.
*
* @param newTimer Timer instance provided by tests.
*/
@VisibleForTesting
void setTimerForTest(final Timer newTimer) {
this.timer = newTimer;
}

/**
* Used by tests. Return Timer instance used by EditLogTailer.
*
* @return Return Timer instance used by EditLogTailer.
*/
@VisibleForTesting
Timer getTimer() {
return timer;
}

@VisibleForTesting
void sleep(long sleepTimeMillis) throws InterruptedException {
Thread.sleep(sleepTimeMillis);
Expand Down Expand Up @@ -497,15 +522,15 @@ private void doWork() {
// name system lock will be acquired to further block even the block
// state updates.
namesystem.cpLockInterruptibly();
long startTime = Time.monotonicNow();
long startTime = timer.monotonicNow();
try {
NameNode.getNameNodeMetrics().addEditLogTailInterval(
startTime - lastLoadTimeMs);
editsTailed = doTailEdits();
} finally {
namesystem.cpUnlock();
NameNode.getNameNodeMetrics().addEditLogTailTime(
Time.monotonicNow() - startTime);
timer.monotonicNow() - startTime);
}
//Update NameDirSize Metric
if (triggeredLogRoll) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.FakeTimer;
import org.slf4j.event.Level;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -394,13 +395,15 @@ public void testStandbyTriggersLogRollsWhenTailInProgressEdits()
// Time in seconds to wait before checking if edit logs are rolled while
// expecting no edit log roll
final int noLogRollWaitTime = 2;

// Time in seconds to wait before checking if edit logs are rolled while
// expecting edit log roll
// expecting edit log roll.
final int logRollWaitTime = 3;

final int logRollPeriod = standbyCatchupWaitTime + noLogRollWaitTime + 1;
final long logRollPeriodMs = TimeUnit.SECONDS.toMillis(logRollPeriod);
Configuration conf = getConf();
conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY,
standbyCatchupWaitTime + noLogRollWaitTime + 1);
conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, logRollPeriod);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);

Expand Down Expand Up @@ -429,19 +432,29 @@ public void testStandbyTriggersLogRollsWhenTailInProgressEdits()
waitForStandbyToCatchUpWithInProgressEdits(standby, activeTxId,
standbyCatchupWaitTime);

long curTime = standby.getNamesystem().getEditLogTailer().getTimer()
.monotonicNow();
long insufficientTimeForLogRoll = logRollPeriodMs / 3;
final FakeTimer testTimer =
new FakeTimer(curTime + insufficientTimeForLogRoll);
standby.getNamesystem().getEditLogTailer().setTimerForTest(testTimer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a thought. it would be great if we can refactor the MiniDfsCluster, the NameNode, FSNamesystem and EditLogTailer such that they take a FakeTimer as a parameter during initialization. If all the tests adopt the way of FakeTimer we wouldn't have so many flaky tests. But I reckon it's out of scope of this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice idea, I think we can target this as follow up work. Similar to EditLogTailer, we should introduce Timer instance such that we keep using Timer's default version of now, monotonicNow etc utilities but tests would get a way to inject FakeTimer.

Thread.sleep(2000);

for (int i = DIRS_TO_MAKE / 2; i < DIRS_TO_MAKE; i++) {
NameNodeAdapter.mkdirs(active, getDirPath(i),
new PermissionStatus("test", "test",
new FsPermission((short)00755)), true);
}

boolean exceptionThrown = false;
try {
checkForLogRoll(active, origTxId, noLogRollWaitTime);
fail("Expected to timeout");
} catch (TimeoutException e) {
exceptionThrown = true;
// expected
}
assertTrue(exceptionThrown);

long sufficientTimeForLogRoll = logRollPeriodMs * 3;
testTimer.advance(sufficientTimeForLogRoll);

checkForLogRoll(active, origTxId, logRollWaitTime);
} finally {
Expand All @@ -452,26 +465,20 @@ public void testStandbyTriggersLogRollsWhenTailInProgressEdits()
private static void waitForStandbyToCatchUpWithInProgressEdits(
final NameNode standby, final long activeTxId,
int maxWaitSec) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
long standbyTxId = standby.getNamesystem().getFSImage()
.getLastAppliedTxId();
return (standbyTxId >= activeTxId);
}
}, 100, maxWaitSec * 1000);
GenericTestUtils.waitFor(() -> {
long standbyTxId = standby.getNamesystem().getFSImage()
.getLastAppliedTxId();
return (standbyTxId >= activeTxId);
}, 100, TimeUnit.SECONDS.toMillis(maxWaitSec));
}

private static void checkForLogRoll(final NameNode active,
final long origTxId, int maxWaitSec) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
long curSegmentTxId = active.getNamesystem().getFSImage().getEditLog()
.getCurSegmentTxId();
return (origTxId != curSegmentTxId);
}
}, 100, maxWaitSec * 1000);
GenericTestUtils.waitFor(() -> {
long curSegmentTxId = active.getNamesystem().getFSImage().getEditLog()
.getCurSegmentTxId();
return (origTxId != curSegmentTxId);
}, 100, TimeUnit.SECONDS.toMillis(maxWaitSec));
}

private static MiniDFSCluster createMiniDFSCluster(Configuration conf,
Expand All @@ -488,4 +495,5 @@ private static MiniDFSCluster createMiniDFSCluster(Configuration conf,
.build();
return cluster;
}

}