Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.regionserver;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -66,6 +67,10 @@ protected void scheduleFlush(String encodedRegionName, List<byte[]> families) {

@VisibleForTesting
Map<WAL, Boolean> getWalNeedsRoll() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exposed for test cases only. This is a private class.. So its ok to change the return type. The new RollController gives clear idea whether a wal instance needs roll because of periodic roll or being explicitly asked for. So that is better.
A return type of Map<WAL, RollController>
Any way then we dont need synchronized block. Else, if we have to do as what is being done below in patch, we would need synchronized block

return this.walNeedsRoll;
Map<WAL, Boolean> walNeedsRoll = new HashMap<>();
for (Map.Entry<WAL, RollController> entry : this.walNeedsRoll.entrySet()) {
walNeedsRoll.put(entry.getKey(), entry.getValue().isRequestRoll());
}
return walNeedsRoll;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.Closeable;
import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -58,14 +57,13 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread

protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period";

protected final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
protected final ConcurrentMap<WAL, RollController> walNeedsRoll = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We add all WAL instances into this once it is created. We can just call it wals?

protected final T abortable;
private volatile long lastRollTime = System.currentTimeMillis();
// Period to roll log.
private final long rollPeriod;
private final int threadWakeFrequency;
// The interval to check low replication on hlog's pipeline
private long checkLowReplicationInterval;
private final long checkLowReplicationInterval;

private volatile boolean running = true;

Expand All @@ -76,13 +74,14 @@ public void addWAL(WAL wal) {
}
// this is to avoid race between addWAL and requestRollAll.
synchronized (this) {
if (walNeedsRoll.putIfAbsent(wal, Boolean.FALSE) == null) {
if (walNeedsRoll.putIfAbsent(wal, new RollController()) == null) {
wal.registerWALActionsListener(new WALActionsListener() {
@Override
public void logRollRequested(WALActionsListener.RollRequestReason reason) {
// TODO logs will contend with each other here, replace with e.g. DelayedQueue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this talking about what this PR is trying to do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the todo is pre-existing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ramkrish86 the purpose of this PR is to make each wal separate roll when using multiwal. thanks review.

synchronized (AbstractWALRoller.this) {
walNeedsRoll.put(wal, Boolean.TRUE);
RollController controller = walNeedsRoll.computeIfAbsent(wal, rc -> new RollController());
controller.requestRoll();
AbstractWALRoller.this.notifyAll();
}
}
Expand All @@ -93,9 +92,8 @@ public void logRollRequested(WALActionsListener.RollRequestReason reason) {

public void requestRollAll() {
synchronized (this) {
List<WAL> wals = new ArrayList<WAL>(walNeedsRoll.keySet());
for (WAL wal : wals) {
walNeedsRoll.put(wal, Boolean.TRUE);
for (RollController controller : walNeedsRoll.values()) {
controller.requestRoll();
}
notifyAll();
}
Expand All @@ -115,9 +113,9 @@ protected AbstractWALRoller(String name, Configuration conf, T abortable) {
*/
private void checkLowReplication(long now) {
try {
for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
for (Entry<WAL, RollController> entry : walNeedsRoll.entrySet()) {
WAL wal = entry.getKey();
boolean needRollAlready = entry.getValue();
boolean needRollAlready = entry.getValue().isRequestRoll;
if (needRollAlready || !(wal instanceof AbstractFSWAL)) {
continue;
}
Expand Down Expand Up @@ -148,16 +146,14 @@ private void abort(String reason, Throwable cause) {
@Override
public void run() {
while (running) {
boolean periodic = false;
long now = System.currentTimeMillis();
checkLowReplication(now);
periodic = (now - this.lastRollTime) > this.rollPeriod;
if (periodic) {
if (walNeedsRoll.values().stream().anyMatch(rc -> rc.isPeriodRoll(now))) {
// Time for periodic roll, fall through
LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log is not much value added now as we dont say for which log(s) need this periodic roll. We can clearly say WAL roll period {} elapsed for one of the WAL.
Below we can make sure we log which wal(s) are getting rolled for what purpose

} else {
synchronized (this) {
if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) {
if (walNeedsRoll.values().stream().anyMatch(rc -> rc.isRequestRoll)) {
// WAL roll requested, fall through
LOG.debug("WAL roll requested");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above comment

} else {
Expand All @@ -174,18 +170,21 @@ public void run() {
}
}
try {
this.lastRollTime = System.currentTimeMillis();
for (Iterator<Entry<WAL, Boolean>> iter = walNeedsRoll.entrySet().iterator(); iter
.hasNext();) {
Entry<WAL, Boolean> entry = iter.next();
for (Iterator<Entry<WAL, RollController>> iter = walNeedsRoll.entrySet().iterator();
iter.hasNext();) {
Entry<WAL, RollController> entry = iter.next();
RollController controller = entry.getValue();
if (!controller.isRequestRoll && !controller.isPeriodRoll(now)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can have a single method in RollController which says this WAL needs roll? RollController#needsRoll().

continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is they key. We ensure we only roll if one of the condition is met (if either size reach caused a log roll ) or the time elapsed. That is also tracked per wal.

}
WAL wal = entry.getKey();
// reset the flag in front to avoid missing roll request before we return from rollWriter.
walNeedsRoll.put(wal, Boolean.FALSE);
controller.finishRoll();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we need this call also within a synchronized block. This is an existing issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for your tips

Map<byte[], List<byte[]>> regionsToFlush = null;
try {
// Force the roll if the logroll.period is elapsed or if a roll was requested.
// The returned value is an collection of actual region and family names.
regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
regionsToFlush = wal.rollWriter(true);
} catch (WALClosedException e) {
LOG.warn("WAL has been closed. Skipping rolling of writer and just remove it", e);
iter.remove();
Expand Down Expand Up @@ -232,7 +231,7 @@ private boolean isWaiting() {
* @return true if all WAL roll finished
*/
public boolean walRollFinished() {
return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll) && isWaiting();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking its usage, I think this API impl is already buggy. This just checks the status of the boolean. Once we start a roll on a WAL, we reset the boolean (Even before this patch). So it is not clearly telling anything abt the roll status. This can return true even while an active wal roll is going on. We can keep it as an another jira and fix (if required).. Just add some TODO comments here.
We might need another boolean in Controller which clearly tracks whether we are ongoing a roll. So this really need to check that status as well as a requested roll status.

return walNeedsRoll.values().stream().noneMatch(rc -> rc.isRequestRoll) && isWaiting();
}

/**
Expand All @@ -249,4 +248,35 @@ public void close() {
running = false;
interrupt();
}

/**
* Independently control the roll of each wal. When use multiwal,
* can avoid all wal roll together. see HBASE-24665 for detail
*/
protected class RollController {
boolean isRequestRoll;
long lastRollTime;

RollController() {
this.isRequestRoll = false;
this.lastRollTime = System.currentTimeMillis();
}

void requestRoll() {
this.isRequestRoll = true;
}

void finishRoll() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is bit confusing. This is not called once roll is finished. We can just call this resetStatus()? Give proper comment that this resets rollReq status as well as lastRollTime. We can pass the ts as param 'lastRollTime' so that this is clear.

this.isRequestRoll = false;
this.lastRollTime = System.currentTimeMillis();
}

public boolean isRequestRoll() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isRollRequested() can be the better name?

return isRequestRoll;
}

boolean isPeriodRoll(long now) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better name ? needsPeriodicRoll ?

return (now - lastRollTime) > rollPeriod;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
Expand All @@ -35,6 +38,8 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.HashMap;
import java.util.Map;

@Category({RegionServerTests.class, MediumTests.class})
public class TestLogRoller {
Expand All @@ -43,7 +48,7 @@ public class TestLogRoller {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestLogRoller.class);

private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();

private static final int logRollPeriod = 20 * 1000;

Expand Down Expand Up @@ -92,4 +97,41 @@ public void testRemoveClosedWAL() throws Exception {

assertEquals(originalSize, logRoller.getWalNeedsRoll().size());
}

/**
* verify that each wal roll separately
*/
@Test
public void testRequestRollWithMultiWal() throws Exception {
HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
Configuration conf = rs.getConfiguration();
LogRoller logRoller = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getWalRoller();
FileSystem fs = rs.getFileSystem();
// add multiple wal
Map<FSHLog, Path> wals = new HashMap<>();
for (int i = 1; i <= 3; i++) {
FSHLog wal = new FSHLog(fs, rs.getWALRootDir(),
AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()),
AbstractFSWALProvider.getWALArchiveDirectoryName(conf, rs.getServerName().getServerName()),
conf, null, true, "wal-test", "." + i);
wal.init();
wals.put(wal, wal.getCurrentFileName());
logRoller.addWAL(wal);
Thread.sleep(3000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a 3 sec sleep here? !

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want 3 WALs will different periodic roll time period? Why we need? If so, can we limit the time to be 1 sec or lesser?

}

// request roll
Map.Entry<FSHLog, Path> rollWal = wals.entrySet().iterator().next();
rollWal.getKey().requestLogRoll();
Thread.sleep(5000);
assertNotEquals(rollWal.getValue(), rollWal.getKey().getCurrentFileName());
wals.put(rollWal.getKey(), rollWal.getKey().getCurrentFileName());

// period roll
Thread.sleep(logRollPeriod + 5000);
for (Map.Entry<FSHLog, Path> entry : wals.entrySet()) {
assertNotEquals(entry.getValue(), entry.getKey().getCurrentFileName());
entry.getKey().close();
}
}
}