Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
Expand Down Expand Up @@ -323,10 +324,10 @@ private void connectToNNAndHandshake() throws IOException {
void triggerBlockReportForTests() {
synchronized (ibrManager) {
scheduler.scheduleHeartbeat();
long oldBlockReportTime = scheduler.nextBlockReportTime;
long oldBlockReportTime = scheduler.getNextBlockReportTime();
scheduler.forceFullBlockReportNow();
ibrManager.notifyAll();
while (oldBlockReportTime == scheduler.nextBlockReportTime) {
while (oldBlockReportTime == scheduler.getNextBlockReportTime()) {
try {
ibrManager.wait(100);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -1163,8 +1164,8 @@ static class Scheduler {
// nextBlockReportTime and nextHeartbeatTime may be assigned/read
// by testing threads (through BPServiceActor#triggerXXX), while also
// assigned/read by the actor thread.
@VisibleForTesting
volatile long nextBlockReportTime = monotonicNow();
private final AtomicLong nextBlockReportTime =
new AtomicLong(monotonicNow());

@VisibleForTesting
volatile long nextHeartbeatTime = monotonicNow();
Expand Down Expand Up @@ -1257,7 +1258,7 @@ boolean isLifelineDue(long startTime) {
}

boolean isBlockReportDue(long curTime) {
return nextBlockReportTime - curTime <= 0;
return nextBlockReportTime.get() - curTime <= 0;
}

boolean isOutliersReportDue(long curTime) {
Expand All @@ -1281,15 +1282,15 @@ void forceFullBlockReportNow() {
long scheduleBlockReport(long delay, boolean isRegistration) {
if (delay > 0) { // send BR after random delay
// Numerical overflow is possible here and is okay.
nextBlockReportTime =
monotonicNow() + ThreadLocalRandom.current().nextInt((int) (delay));
nextBlockReportTime.getAndSet(
monotonicNow() + ThreadLocalRandom.current().nextInt((int) (delay)));
} else { // send at next heartbeat
nextBlockReportTime = monotonicNow();
nextBlockReportTime.getAndSet(monotonicNow());
}
resetBlockReportTime = isRegistration; // reset future BRs for
// randomness, post first block report to avoid regular BRs from all
// DN's coming at one time.
return nextBlockReportTime;
return nextBlockReportTime.get();
}

/**
Expand All @@ -1302,8 +1303,8 @@ void scheduleNextBlockReport() {
// If we have sent the first set of block reports, then wait a random
// time before we start the periodic block reports.
if (resetBlockReportTime) {
nextBlockReportTime = monotonicNow() +
ThreadLocalRandom.current().nextInt((int)(blockReportIntervalMs));
nextBlockReportTime.getAndSet(monotonicNow() +
ThreadLocalRandom.current().nextInt((int) (blockReportIntervalMs)));
resetBlockReportTime = false;
} else {
/* say the last block report was at 8:20:14. The current report
Expand All @@ -1313,17 +1314,16 @@ void scheduleNextBlockReport() {
* 2) unexpected like 21:35:43, next report should be at 2:20:14
* on the next day.
*/
long factor =
(monotonicNow() - nextBlockReportTime + blockReportIntervalMs)
/ blockReportIntervalMs;
long factor = (monotonicNow() - nextBlockReportTime.get()
+ blockReportIntervalMs) / blockReportIntervalMs;
if (factor != 0) {
nextBlockReportTime += factor * blockReportIntervalMs;
nextBlockReportTime.getAndAdd(factor * blockReportIntervalMs);
} else {
// If the difference between the present time and the scheduled
// time is very less, the factor can be 0, so in that case, we can
// ignore that negligible time, spent while sending the BRss and
// schedule the next BR after the blockReportInterval.
nextBlockReportTime += blockReportIntervalMs;
nextBlockReportTime.getAndAdd(blockReportIntervalMs);
}
}
}
Expand All @@ -1336,6 +1336,16 @@ long getLifelineWaitTime() {
return nextLifelineTime - monotonicNow();
}

@VisibleForTesting
long getNextBlockReportTime() {
return nextBlockReportTime.get();
}

@VisibleForTesting
void setNextBlockReportTime(long nextBlockReportTime) {
this.nextBlockReportTime.getAndSet(nextBlockReportTime);
}

/**
* Wrapped for testing.
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import static java.lang.Math.abs;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -70,7 +71,7 @@ public void testScheduleBlockReportImmediate() {
Scheduler scheduler = makeMockScheduler(now);
scheduler.scheduleBlockReport(0, true);
assertTrue(scheduler.resetBlockReportTime);
assertThat(scheduler.nextBlockReportTime, is(now));
assertThat(scheduler.getNextBlockReportTime(), is(now));
}
}

Expand All @@ -81,8 +82,8 @@ public void testScheduleBlockReportDelayed() {
final long delayMs = 10;
scheduler.scheduleBlockReport(delayMs, true);
assertTrue(scheduler.resetBlockReportTime);
assertTrue(scheduler.nextBlockReportTime - now >= 0);
assertTrue(scheduler.nextBlockReportTime - (now + delayMs) < 0);
assertTrue(scheduler.getNextBlockReportTime() - now >= 0);
assertTrue(scheduler.getNextBlockReportTime() - (now + delayMs) < 0);
}
}

Expand All @@ -96,7 +97,8 @@ public void testScheduleNextBlockReport() {
Scheduler scheduler = makeMockScheduler(now);
assertTrue(scheduler.resetBlockReportTime);
scheduler.scheduleNextBlockReport();
assertTrue(scheduler.nextBlockReportTime - (now + BLOCK_REPORT_INTERVAL_MS) < 0);
assertTrue(scheduler.getNextBlockReportTime()
- (now + BLOCK_REPORT_INTERVAL_MS) < 0);
}
}

Expand All @@ -110,7 +112,8 @@ public void testScheduleNextBlockReport2() {
Scheduler scheduler = makeMockScheduler(now);
scheduler.resetBlockReportTime = false;
scheduler.scheduleNextBlockReport();
assertThat(scheduler.nextBlockReportTime, is(now + BLOCK_REPORT_INTERVAL_MS));
assertThat(scheduler.getNextBlockReportTime(),
is(now + BLOCK_REPORT_INTERVAL_MS));
}
}

Expand All @@ -129,10 +132,12 @@ public void testScheduleNextBlockReport3() {
final long blockReportDelay =
BLOCK_REPORT_INTERVAL_MS + random.nextInt(2 * (int) BLOCK_REPORT_INTERVAL_MS);
final long origBlockReportTime = now - blockReportDelay;
scheduler.nextBlockReportTime = origBlockReportTime;
scheduler.setNextBlockReportTime(origBlockReportTime);
scheduler.scheduleNextBlockReport();
assertTrue(scheduler.nextBlockReportTime - now < BLOCK_REPORT_INTERVAL_MS);
assertTrue(((scheduler.nextBlockReportTime - origBlockReportTime) % BLOCK_REPORT_INTERVAL_MS) == 0);
assertTrue((scheduler.getNextBlockReportTime() - now)
< BLOCK_REPORT_INTERVAL_MS);
assertEquals(0, ((scheduler.getNextBlockReportTime() - origBlockReportTime)
% BLOCK_REPORT_INTERVAL_MS));
}
}

Expand Down Expand Up @@ -201,7 +206,7 @@ private Scheduler makeMockScheduler(long now) {
HEARTBEAT_INTERVAL_MS, LIFELINE_INTERVAL_MS,
BLOCK_REPORT_INTERVAL_MS, OUTLIER_REPORT_INTERVAL_MS));
doReturn(now).when(mockScheduler).monotonicNow();
mockScheduler.nextBlockReportTime = now;
mockScheduler.setNextBlockReportTime(now);
mockScheduler.nextHeartbeatTime = now;
mockScheduler.nextOutliersReportTime = now;
return mockScheduler;
Expand Down