Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hbase.HConstants.HREGION_OLDLOGDIR_NAME;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -39,8 +40,8 @@
* roller logic by our own.
* <p/>
* We can reuse most of the code for normal wal roller, the only difference is that there is only
* one region, so in {@link #scheduleFlush(String)} method we can just schedule flush for the master
* local region.
* one region, so in {@link #scheduleFlush(String, List)} method we can just schedule flush
* for the master local region.
*/
@InterfaceAudience.Private
public final class MasterRegionWALRoller extends AbstractWALRoller<Abortable> {
Expand Down Expand Up @@ -79,7 +80,7 @@ protected void afterRoll(WAL wal) {
}

@Override
protected void scheduleFlush(String encodedRegionName) {
protected void scheduleFlush(String encodedRegionName, List<byte[]> families) {
MasterRegionFlusherAndCompactor flusher = this.flusherAndCompactor;
if (flusher != null) {
flusher.requestFlush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
*/
package org.apache.hadoop.hbase.regionserver;

import java.util.ArrayList;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should remove these imports? Seems we add a method in this class and then moved it out but forgot to remove these imports?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you are right, fixed.
Thanks.

import java.util.Collection;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configured;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -46,4 +49,16 @@ protected void configureForRegion(HRegion region) {
*/
public abstract Collection<HStore> selectStoresToFlush();

/**
* select stores which matches the specified families
*
* @return the stores need to be flushed.
*/
public Collection<HStore> selectStoresToFlush(Map<byte[], HStore> stores, List<byte[]> families) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be static?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if still as part of FlushPolicy after discuss.

Collection<HStore> specificStoresToFlush = new ArrayList<>();
for (byte[] family : families) {
specificStoresToFlush.add(stores.get(family));
}
return specificStoresToFlush;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.hadoop.hbase.regionserver;

import java.util.List;

import org.apache.yetus.audience.InterfaceAudience;

/**
Expand All @@ -36,6 +38,18 @@ public interface FlushRequester {
*/
boolean requestFlush(HRegion region, boolean forceFlushAllStores, FlushLifeCycleTracker tracker);

/**
* Tell the listener the cache needs to be flushed.
*
* @param region the Region requesting the cache flush
* @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log
* rolling.
* @param families stores of region to flush.
* @return true if our region is added into the queue, false otherwise
*/
boolean requestFlush(HRegion region, boolean forceFlushAllStores, List<byte[]> families,
FlushLifeCycleTracker tracker);

/**
* Tell the listener the cache needs to be flushed after a delay
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2386,6 +2386,11 @@ enum Result {
boolean isCompactionNeeded();
}

public FlushResultImpl flushcache(boolean forceFlushAllStores,boolean writeFlushRequestWalMarker,
FlushLifeCycleTracker tracker) throws IOException {
return this.flushcache(forceFlushAllStores, null, writeFlushRequestWalMarker, tracker);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good


/**
* Flush the cache.
*
Expand All @@ -2400,6 +2405,7 @@ enum Result {
* <p>This method may block for some time, so it should not be called from a
* time-sensitive thread.
* @param forceFlushAllStores whether we want to flush all stores
* @param families stores of region to flush.
* @param writeFlushRequestWalMarker whether to write the flush request marker to WAL
* @param tracker used to track the life cycle of this flush
* @return whether the flush is success and whether the region needs compacting
Expand All @@ -2409,8 +2415,8 @@ enum Result {
* because a Snapshot was not properly persisted. The region is put in closing mode, and the
* caller MUST abort after this.
*/
public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker,
FlushLifeCycleTracker tracker) throws IOException {
public FlushResultImpl flushcache(boolean forceFlushAllStores, List<byte[]> families,
boolean writeFlushRequestWalMarker, FlushLifeCycleTracker tracker) throws IOException {
// fail-fast instead of waiting on the lock
if (this.closing.get()) {
String msg = "Skipping flush on " + this + " because closing";
Expand Down Expand Up @@ -2457,8 +2463,13 @@ public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlus
}

try {
Collection<HStore> specificStoresToFlush =
Collection<HStore> specificStoresToFlush = null;
if (!forceFlushAllStores && families != null) {
specificStoresToFlush = flushPolicy.selectStoresToFlush(stores, families);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, looking at the flushPolicy implementations we currently have, I see two:

FlushAllStoresPolicy

and

FlushLargeStoresPolicy

Given the current implementations, passing a subset of families doesn't seem appropriate at least for the FlushAllStoresPolicy -- if we flush some families only, then we are not flushing AllStores -- and perhaps for the FlushLargeStoresPolicy since we are not flushing the large but the passed families.

Do you need to create a new flush policy? One that does the subset of families passed? If user configures the FlushAllStoresPolicy and instead we only flush a subset, then they will be confused.

Why do we pass in stores into selectStoresToFlush? We do not need the list of stores in the flushAllStoresPolicy. It has access to 'this' so can figure what Stores are in the Region.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we pass in stores into selectStoresToFlush? We do not need the list of stores in the flushAllStoresPolicy. It has access to 'this' so can figure what Stores are in the Region.

As comment below, maybe it should not be part of flushPolicy?

} else {
specificStoresToFlush =
forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
}
FlushResultImpl fs =
internalFlushcache(specificStoresToFlush, status, writeFlushRequestWalMarker, tracker);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.hadoop.hbase.regionserver;

import java.util.List;
import java.util.Map;

import org.apache.hadoop.hbase.wal.AbstractWALRoller;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -45,7 +47,7 @@ public LogRoller(RegionServerServices services) {
super("LogRoller", services.getConfiguration(), services);
}

protected void scheduleFlush(String encodedRegionName) {
protected void scheduleFlush(String encodedRegionName, List<byte[]> families) {
RegionServerServices services = this.abortable;
HRegion r = (HRegion) services.getRegion(encodedRegionName);
if (r == null) {
Expand All @@ -58,8 +60,8 @@ protected void scheduleFlush(String encodedRegionName) {
encodedRegionName, r);
return;
}
// force flushing all stores to clean old logs
requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY);
// force flushing specified stores to clean old logs
requester.requestFlush(r, false, families, FlushLifeCycleTracker.DUMMY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are in here because a force roll was requested. You change the true to false. Isn't that changing behavior at least for the default flush all stores case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it changes and only changes behavior in case that wals is too many.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure. You change the force from true to false in line above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Errr, is this line could be executed in other case except too many wals?
Checked the code again, seems not, Correct me if i miss something.

Copy link
Contributor

@saintstack saintstack Jun 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flag is set when we are in a critical condition -- the WAL count is in excess of our WAL limit. The flag's intent IIRC is that we flush all stores regardless of what determination is made at flush time as to which stores are in need of flush or not; the old edit may actually be hanging out in a store that is small and not in need of flush normally or in accordance w/ some flush policy. The flag says 'force' the flush. My understanding is that this is a FlushRequest usually but the flag changes the request to a demand.

Here you are passing a set of stores instead where the stores chosen are the ones that will free up WALs. So, this flag is now redundant? We should purge it. Now it will confuse since the 'force' instead is a list of families to flush -- else its null if old behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking around, this is the only time forceFlushAllStores is set to true -- when we have too many WALs and we need to clear the old ones out.

}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ private boolean flushOneForGlobalPressure(FlushType flushType) {
server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) +
", Region memstore size=" +
TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1));
flushedOne = flushRegion(regionToFlush, true, false, FlushLifeCycleTracker.DUMMY);
flushedOne = flushRegion(regionToFlush, true, false, null, FlushLifeCycleTracker.DUMMY);

if (!flushedOne) {
LOG.info("Excluding unflushable region " + regionToFlush +
Expand Down Expand Up @@ -459,12 +459,18 @@ private FlushType isAboveLowWaterMark() {

@Override
public boolean requestFlush(HRegion r, boolean forceFlushAllStores,
FlushLifeCycleTracker tracker) {
FlushLifeCycleTracker tracker) {
return this.requestFlush(r, forceFlushAllStores, null, tracker);
}

@Override
public boolean requestFlush(HRegion r, boolean forceFlushAllStores, List<byte[]> families,
FlushLifeCycleTracker tracker) {
synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) {
// This entry has no delay so it will be added at the top of the flush
// queue. It'll come out near immediately.
FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores, tracker);
FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores, families, tracker);
this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe);
r.incrementFlushesQueuedCount();
Expand All @@ -482,7 +488,7 @@ public boolean requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllS
if (!regionsInQueue.containsKey(r)) {
// This entry has some delay
FlushRegionEntry fqe =
new FlushRegionEntry(r, forceFlushAllStores, FlushLifeCycleTracker.DUMMY);
new FlushRegionEntry(r, forceFlushAllStores, null, FlushLifeCycleTracker.DUMMY);
fqe.requeue(delay);
this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe);
Expand Down Expand Up @@ -581,7 +587,7 @@ private boolean flushRegion(final FlushRegionEntry fqe) {
return true;
}
}
return flushRegion(region, false, fqe.isForceFlushAllStores(), fqe.getTracker());
return flushRegion(region, false, fqe.isForceFlushAllStores(), fqe.families, fqe.getTracker());
}

/**
Expand All @@ -592,12 +598,13 @@ private boolean flushRegion(final FlushRegionEntry fqe) {
* from the main flusher run loop and we got the entry to flush by calling
* poll on the flush queue (which removed it).
* @param forceFlushAllStores whether we want to flush all store.
* @param families stores of region to flush.
* @return true if the region was successfully flushed, false otherwise. If
* false, there will be accompanying log messages explaining why the region was
* not flushed.
*/
private boolean flushRegion(HRegion region, boolean emergencyFlush, boolean forceFlushAllStores,
FlushLifeCycleTracker tracker) {
List<byte[]> families, FlushLifeCycleTracker tracker) {
synchronized (this.regionsInQueue) {
FlushRegionEntry fqe = this.regionsInQueue.remove(region);
// Use the start time of the FlushRegionEntry if available
Expand All @@ -612,7 +619,7 @@ private boolean flushRegion(HRegion region, boolean emergencyFlush, boolean forc
lock.readLock().lock();
try {
notifyFlushRequest(region, emergencyFlush);
FlushResult flushResult = region.flushcache(forceFlushAllStores, false, tracker);
FlushResult flushResult = region.flushcache(forceFlushAllStores, families, false, tracker);
boolean shouldCompact = flushResult.isCompactionNeeded();
// We just want to check the size
boolean shouldSplit = region.checkSplit() != null;
Expand Down Expand Up @@ -846,14 +853,17 @@ static class FlushRegionEntry implements FlushQueueEntry {
private int requeueCount = 0;

private final boolean forceFlushAllStores;
private final List<byte[]> families;

private final FlushLifeCycleTracker tracker;

FlushRegionEntry(final HRegion r, boolean forceFlushAllStores, FlushLifeCycleTracker tracker) {
FlushRegionEntry(final HRegion r, boolean forceFlushAllStores, List<byte[]> families,
FlushLifeCycleTracker tracker) {
this.region = r;
this.createTime = EnvironmentEdgeManager.currentTime();
this.whenToExpire = this.createTime;
this.forceFlushAllStores = forceFlushAllStores;
this.families = families;
this.tracker = tracker;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -544,7 +545,7 @@ public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyNam
}

@Override
public byte[][] rollWriter() throws FailedLogCloseException, IOException {
public Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IOException {
return rollWriter(false);
}

Expand Down Expand Up @@ -639,26 +640,31 @@ public int getNumLogFiles() {
* If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed,
* check the first (oldest) WAL, and return those regions which should be flushed so that
* it can be let-go/'archived'.
* @return regions (encodedRegionNames) to flush in order to archive oldest WAL file.
* @return stores of regions (encodedRegionNames) to flush in order to archive oldest WAL file.
*/
byte[][] findRegionsToForceFlush() throws IOException {
byte[][] regions = null;
Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
Map<byte[], List<byte[]>> regions = null;
int logCount = getNumRolledLogFiles();
if (logCount > this.maxLogs && logCount > 0) {
Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry();
regions =
this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
}
if (regions != null) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < regions.length; i++) {
if (i > 0) {
sb.append(", ");
List<String> listForPrint = new ArrayList();
for (Map.Entry<byte[], List<byte[]>> r : regions.entrySet()) {
StringBuilder families = new StringBuilder();
for (int i = 0; i < r.getValue().size(); i++) {
if (i > 0) {
families.append(",");
}
families.append(Bytes.toString(r.getValue().get(i)));
}
sb.append(Bytes.toStringBinary(regions[i]));
listForPrint.add(Bytes.toStringBinary(r.getKey()) + "[" + families.toString() + "]");
}
LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs +
"; forcing flush of " + regions.length + " regions(s): " + sb.toString());
"; forcing (partial) flush of " + regions.size() + " region(s): " +
StringUtils.join(",", listForPrint));
}
return regions;
}
Expand Down Expand Up @@ -820,7 +826,7 @@ private IOException convertInterruptedExceptionToIOException(final InterruptedEx
}

@Override
public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
rollWriterLock.lock();
try {
if (this.closed) {
Expand All @@ -830,7 +836,7 @@ public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOExce
if (!force && this.writer != null && this.numEntries.get() <= 0) {
return null;
}
byte[][] regionsToFlush = null;
Map<byte[], List<byte[]>> regionsToFlush = null;
try (TraceScope scope = TraceUtil.createTrace("FSHLog.rollWriter")) {
Path oldPath = getOldPath();
Path newPath = getNewPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -440,27 +441,28 @@ boolean areAllLower(Map<byte[], Long> sequenceids, Collection<byte[]> keysBlocki
* {@link #lowestUnflushedSequenceIds} has a sequence id less than that passed in
* <code>sequenceids</code> then return it.
* @param sequenceids Sequenceids keyed by encoded region name.
* @return regions found in this instance with sequence ids less than those passed in.
* @return stores of regions found in this instance with sequence ids less than those passed in.
*/
byte[][] findLower(Map<byte[], Long> sequenceids) {
List<byte[]> toFlush = null;
Map<byte[], List<byte[]>> findLower(Map<byte[], Long> sequenceids) {
Map<byte[], List<byte[]>> toFlush = null;
// Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock.
synchronized (tieLock) {
for (Map.Entry<byte[], Long> e : sequenceids.entrySet()) {
Map<ImmutableByteArray, Long> m = this.lowestUnflushedSequenceIds.get(e.getKey());
if (m == null) {
continue;
}
// The lowest sequence id outstanding for this region.
long lowest = getLowestSequenceId(m);
if (lowest != HConstants.NO_SEQNUM && lowest <= e.getValue()) {
if (toFlush == null) {
toFlush = new ArrayList<>();
for (Map.Entry<ImmutableByteArray, Long> me : m.entrySet()) {
if (me.getValue() <= e.getValue()) {
if (toFlush == null) {
toFlush = new TreeMap(Bytes.BYTES_COMPARATOR);
}
toFlush.computeIfAbsent(e.getKey(), k -> new ArrayList<>())
.add(Bytes.toBytes(me.getKey().toString()));
}
toFlush.add(e.getKey());
}
}
}
return toFlush == null ? null : toFlush.toArray(new byte[0][]);
return toFlush;
}
}
Loading