diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegionWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegionWALRoller.java index e18aa0cc9942..ef3dd121133b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegionWALRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegionWALRoller.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hbase.HConstants.HREGION_OLDLOGDIR_NAME; import java.io.IOException; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -39,8 +40,8 @@ * roller logic by our own. *

* We can reuse most of the code for normal wal roller, the only difference is that there is only - * one region, so in {@link #scheduleFlush(String)} method we can just schedule flush for the master - * local region. + * one region, so in {@link #scheduleFlush(String, List)} method we can just schedule flush + * for the master local region. */ @InterfaceAudience.Private public final class MasterRegionWALRoller extends AbstractWALRoller { @@ -79,7 +80,7 @@ protected void afterRoll(WAL wal) { } @Override - protected void scheduleFlush(String encodedRegionName) { + protected void scheduleFlush(String encodedRegionName, List families) { MasterRegionFlusherAndCompactor flusher = this.flusherAndCompactor; if (flusher != null) { flusher.requestFlush(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java index fecbd2f283d5..66bd095a7a27 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java @@ -45,5 +45,4 @@ protected void configureForRegion(HRegion region) { * @return the stores need to be flushed. */ public abstract Collection selectStoresToFlush(); - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java index 4191fbfb6b5d..92aed2bb9ebd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.regionserver; +import java.util.List; + import org.apache.yetus.audience.InterfaceAudience; /** @@ -30,22 +32,28 @@ public interface FlushRequester { * Tell the listener the cache needs to be flushed. * * @param region the Region requesting the cache flush - * @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log - * rolling. * @return true if our region is added into the queue, false otherwise */ - boolean requestFlush(HRegion region, boolean forceFlushAllStores, FlushLifeCycleTracker tracker); + boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker); + + /** + * Tell the listener the cache needs to be flushed. + * + * @param region the Region requesting the cache flush + * @param families stores of region to flush, if null then use flush policy + * @return true if our region is added into the queue, false otherwise + */ + boolean requestFlush(HRegion region, List families, + FlushLifeCycleTracker tracker); /** * Tell the listener the cache needs to be flushed after a delay * * @param region the Region requesting the cache flush * @param delay after how much time should the flush happen - * @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log - * rolling. * @return true if our region is added into the queue, false otherwise */ - boolean requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores); + boolean requestDelayedFlush(HRegion region, long delay); /** * Register a FlushRequestListener diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 1f73fd44269c..30c489175833 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2354,7 +2354,7 @@ public boolean compact(CompactionContext compaction, HStore store, * *

This method may block for some time, so it should not be called from a * time-sensitive thread. - * @param force whether we want to force a flush of all stores + * @param flushAllStores whether we want to force a flush of all stores * @return FlushResult indicating whether the flush was successful or not and if * the region needs compacting * @@ -2362,8 +2362,8 @@ public boolean compact(CompactionContext compaction, HStore store, * because a snapshot was not properly persisted. */ // TODO HBASE-18905. We might have to expose a requestFlush API for CPs - public FlushResult flush(boolean force) throws IOException { - return flushcache(force, false, FlushLifeCycleTracker.DUMMY); + public FlushResult flush(boolean flushAllStores) throws IOException { + return flushcache(flushAllStores, false, FlushLifeCycleTracker.DUMMY); } public interface FlushResult { @@ -2386,6 +2386,16 @@ enum Result { boolean isCompactionNeeded(); } + public FlushResultImpl flushcache(boolean flushAllStores, boolean writeFlushRequestWalMarker, + FlushLifeCycleTracker tracker) throws IOException { + List families = null; + if (flushAllStores) { + families = new ArrayList(); + families.addAll(this.getTableDescriptor().getColumnFamilyNames()); + } + return this.flushcache(families, writeFlushRequestWalMarker, tracker); + } + /** * Flush the cache. * @@ -2399,7 +2409,7 @@ enum Result { * *

This method may block for some time, so it should not be called from a * time-sensitive thread. - * @param forceFlushAllStores whether we want to flush all stores + * @param families stores of region to flush. * @param writeFlushRequestWalMarker whether to write the flush request marker to WAL * @param tracker used to track the life cycle of this flush * @return whether the flush is success and whether the region needs compacting @@ -2409,8 +2419,8 @@ enum Result { * because a Snapshot was not properly persisted. The region is put in closing mode, and the * caller MUST abort after this. */ - public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker, - FlushLifeCycleTracker tracker) throws IOException { + public FlushResultImpl flushcache(List families, + boolean writeFlushRequestWalMarker, FlushLifeCycleTracker tracker) throws IOException { // fail-fast instead of waiting on the lock if (this.closing.get()) { String msg = "Skipping flush on " + this + " because closing"; @@ -2457,8 +2467,15 @@ public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlus } try { - Collection specificStoresToFlush = - forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush(); + // The reason that we do not always use flushPolicy is, when the flush is + // caused by logRoller, we should select stores which must be flushed + // rather than could be flushed. + Collection specificStoresToFlush = null; + if (families != null) { + specificStoresToFlush = getSpecificStores(families); + } else { + specificStoresToFlush = flushPolicy.selectStoresToFlush(); + } FlushResultImpl fs = internalFlushcache(specificStoresToFlush, status, writeFlushRequestWalMarker, tracker); @@ -2488,6 +2505,19 @@ public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlus } } + /** + * get stores which matches the specified families + * + * @return the stores need to be flushed. + */ + private Collection getSpecificStores(List families) { + Collection specificStoresToFlush = new ArrayList<>(); + for (byte[] family : families) { + specificStoresToFlush.add(stores.get(family)); + } + return specificStoresToFlush; + } + /** * Should the store be flushed because it is old enough. *

@@ -8963,7 +8993,7 @@ private void requestFlush0(FlushLifeCycleTracker tracker) { } if (shouldFlush) { // Make request outside of synchronize block; HBASE-818. - this.rsServices.getFlushRequester().requestFlush(this, false, tracker); + this.rsServices.getFlushRequester().requestFlush(this, tracker); if (LOG.isDebugEnabled()) { LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 34052bbfba3b..13d0513e4bd2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1837,7 +1837,7 @@ protected void chore() { //Throttle the flushes by putting a delay. If we don't throttle, and there //is a balanced write-load on the regions in a table, we might end up //overwhelming the filesystem with too many flushes at once. - if (requester.requestDelayedFlush(r, randomDelay, false)) { + if (requester.requestDelayedFlush(r, randomDelay)) { LOG.info("{} requesting flush of {} because {} after random delay {} ms", getName(), r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), randomDelay); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index f5049c9fcfbb..58ac82ee6cd7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -18,7 +18,9 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.util.List; import java.util.Map; + import org.apache.hadoop.hbase.wal.AbstractWALRoller; import org.apache.hadoop.hbase.wal.WAL; import org.apache.yetus.audience.InterfaceAudience; @@ -45,7 +47,7 @@ public LogRoller(RegionServerServices services) { super("LogRoller", services.getConfiguration(), services); } - protected void scheduleFlush(String encodedRegionName) { + protected void scheduleFlush(String encodedRegionName, List families) { RegionServerServices services = this.abortable; HRegion r = (HRegion) services.getRegion(encodedRegionName); if (r == null) { @@ -58,8 +60,8 @@ protected void scheduleFlush(String encodedRegionName) { encodedRegionName, r); return; } - // force flushing all stores to clean old logs - requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY); + // flush specified stores to clean old logs + requester.requestFlush(r, families, FlushLifeCycleTracker.DUMMY); } @VisibleForTesting diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index e191f0442a28..c133a7a6429d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -286,7 +286,7 @@ private boolean flushOneForGlobalPressure(FlushType flushType) { server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) + ", Region memstore size=" + TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1)); - flushedOne = flushRegion(regionToFlush, true, false, FlushLifeCycleTracker.DUMMY); + flushedOne = flushRegion(regionToFlush, true, null, FlushLifeCycleTracker.DUMMY); if (!flushedOne) { LOG.info("Excluding unflushable region " + regionToFlush + @@ -458,13 +458,18 @@ private FlushType isAboveLowWaterMark() { } @Override - public boolean requestFlush(HRegion r, boolean forceFlushAllStores, - FlushLifeCycleTracker tracker) { + public boolean requestFlush(HRegion r, FlushLifeCycleTracker tracker) { + return this.requestFlush(r, null, tracker); + } + + @Override + public boolean requestFlush(HRegion r, List families, + FlushLifeCycleTracker tracker) { synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has no delay so it will be added at the top of the flush // queue. It'll come out near immediately. - FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores, tracker); + FlushRegionEntry fqe = new FlushRegionEntry(r, families, tracker); this.regionsInQueue.put(r, fqe); this.flushQueue.add(fqe); r.incrementFlushesQueuedCount(); @@ -477,12 +482,12 @@ public boolean requestFlush(HRegion r, boolean forceFlushAllStores, } @Override - public boolean requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) { + public boolean requestDelayedFlush(HRegion r, long delay) { synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has some delay FlushRegionEntry fqe = - new FlushRegionEntry(r, forceFlushAllStores, FlushLifeCycleTracker.DUMMY); + new FlushRegionEntry(r, null, FlushLifeCycleTracker.DUMMY); fqe.requeue(delay); this.regionsInQueue.put(r, fqe); this.flushQueue.add(fqe); @@ -581,7 +586,7 @@ private boolean flushRegion(final FlushRegionEntry fqe) { return true; } } - return flushRegion(region, false, fqe.isForceFlushAllStores(), fqe.getTracker()); + return flushRegion(region, false, fqe.families, fqe.getTracker()); } /** @@ -591,13 +596,13 @@ private boolean flushRegion(final FlushRegionEntry fqe) { * needs to be removed from the flush queue. If false, when we were called * from the main flusher run loop and we got the entry to flush by calling * poll on the flush queue (which removed it). - * @param forceFlushAllStores whether we want to flush all store. + * @param families stores of region to flush. * @return true if the region was successfully flushed, false otherwise. If * false, there will be accompanying log messages explaining why the region was * not flushed. */ - private boolean flushRegion(HRegion region, boolean emergencyFlush, boolean forceFlushAllStores, - FlushLifeCycleTracker tracker) { + private boolean flushRegion(HRegion region, boolean emergencyFlush, + List families, FlushLifeCycleTracker tracker) { synchronized (this.regionsInQueue) { FlushRegionEntry fqe = this.regionsInQueue.remove(region); // Use the start time of the FlushRegionEntry if available @@ -612,7 +617,7 @@ private boolean flushRegion(HRegion region, boolean emergencyFlush, boolean forc lock.readLock().lock(); try { notifyFlushRequest(region, emergencyFlush); - FlushResult flushResult = region.flushcache(forceFlushAllStores, false, tracker); + FlushResult flushResult = region.flushcache(families, false, tracker); boolean shouldCompact = flushResult.isCompactionNeeded(); // We just want to check the size boolean shouldSplit = region.checkSplit() != null; @@ -845,15 +850,16 @@ static class FlushRegionEntry implements FlushQueueEntry { private long whenToExpire; private int requeueCount = 0; - private final boolean forceFlushAllStores; + private final List families; private final FlushLifeCycleTracker tracker; - FlushRegionEntry(final HRegion r, boolean forceFlushAllStores, FlushLifeCycleTracker tracker) { + FlushRegionEntry(final HRegion r, List families, + FlushLifeCycleTracker tracker) { this.region = r; this.createTime = EnvironmentEdgeManager.currentTime(); this.whenToExpire = this.createTime; - this.forceFlushAllStores = forceFlushAllStores; + this.families = families; this.tracker = tracker; } @@ -873,13 +879,6 @@ public int getRequeueCount() { return this.requeueCount; } - /** - * @return whether we need to flush all stores. - */ - public boolean isForceFlushAllStores() { - return forceFlushAllStores; - } - public FlushLifeCycleTracker getTracker() { return tracker; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index f6531c09a866..12ab9037ad20 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; + import org.apache.commons.lang3.mutable.MutableLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -544,7 +545,7 @@ public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyNam } @Override - public byte[][] rollWriter() throws FailedLogCloseException, IOException { + public Map> rollWriter() throws FailedLogCloseException, IOException { return rollWriter(false); } @@ -639,10 +640,10 @@ public int getNumLogFiles() { * If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed, * check the first (oldest) WAL, and return those regions which should be flushed so that * it can be let-go/'archived'. - * @return regions (encodedRegionNames) to flush in order to archive oldest WAL file. + * @return stores of regions (encodedRegionNames) to flush in order to archive oldest WAL file. */ - byte[][] findRegionsToForceFlush() throws IOException { - byte[][] regions = null; + Map> findRegionsToForceFlush() throws IOException { + Map> regions = null; int logCount = getNumRolledLogFiles(); if (logCount > this.maxLogs && logCount > 0) { Map.Entry firstWALEntry = this.walFile2Props.firstEntry(); @@ -650,15 +651,20 @@ byte[][] findRegionsToForceFlush() throws IOException { this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId); } if (regions != null) { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < regions.length; i++) { - if (i > 0) { - sb.append(", "); + List listForPrint = new ArrayList(); + for (Map.Entry> r : regions.entrySet()) { + StringBuilder families = new StringBuilder(); + for (int i = 0; i < r.getValue().size(); i++) { + if (i > 0) { + families.append(","); + } + families.append(Bytes.toString(r.getValue().get(i))); } - sb.append(Bytes.toStringBinary(regions[i])); + listForPrint.add(Bytes.toStringBinary(r.getKey()) + "[" + families.toString() + "]"); } LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs + - "; forcing flush of " + regions.length + " regions(s): " + sb.toString()); + "; forcing (partial) flush of " + regions.size() + " region(s): " + + StringUtils.join(",", listForPrint)); } return regions; } @@ -820,7 +826,7 @@ private IOException convertInterruptedExceptionToIOException(final InterruptedEx } @Override - public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException { + public Map> rollWriter(boolean force) throws IOException { rollWriterLock.lock(); try { if (this.closed) { @@ -830,7 +836,7 @@ public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOExce if (!force && this.writer != null && this.numEntries.get() <= 0) { return null; } - byte[][] regionsToFlush = null; + Map> regionsToFlush = null; try (TraceScope scope = TraceUtil.createTrace("FSHLog.rollWriter")) { Path oldPath = getOldPath(); Path newPath = getNewPath(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java index 7e667ced45cd..986a10f68dd5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; @@ -440,10 +441,10 @@ boolean areAllLower(Map sequenceids, Collection keysBlocki * {@link #lowestUnflushedSequenceIds} has a sequence id less than that passed in * sequenceids then return it. * @param sequenceids Sequenceids keyed by encoded region name. - * @return regions found in this instance with sequence ids less than those passed in. + * @return stores of regions found in this instance with sequence ids less than those passed in. */ - byte[][] findLower(Map sequenceids) { - List toFlush = null; + Map> findLower(Map sequenceids) { + Map> toFlush = null; // Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock. synchronized (tieLock) { for (Map.Entry e : sequenceids.entrySet()) { @@ -451,16 +452,17 @@ byte[][] findLower(Map sequenceids) { if (m == null) { continue; } - // The lowest sequence id outstanding for this region. - long lowest = getLowestSequenceId(m); - if (lowest != HConstants.NO_SEQNUM && lowest <= e.getValue()) { - if (toFlush == null) { - toFlush = new ArrayList<>(); + for (Map.Entry me : m.entrySet()) { + if (me.getValue() <= e.getValue()) { + if (toFlush == null) { + toFlush = new TreeMap(Bytes.BYTES_COMPARATOR); + } + toFlush.computeIfAbsent(e.getKey(), k -> new ArrayList<>()) + .add(Bytes.toBytes(me.getKey().toString())); } - toFlush.add(e.getKey()); } } } - return toFlush == null ? null : toFlush.toArray(new byte[0][]); + return toFlush; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java index d668cf1ae8c1..d2b67170409f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -45,8 +46,8 @@ * NOTE: This class extends Thread rather than Chore because the sleep time can be interrupted when * there is something to do, rather than the Chore sleep time which is invariant. *

- * The {@link #scheduleFlush(String)} is abstract here, as sometimes we hold a region without a - * region server but we still want to roll its WAL. + * The {@link #scheduleFlush(String, List)} is abstract here, + * as sometimes we hold a region without a region server but we still want to roll its WAL. *

* TODO: change to a pool of threads */ @@ -180,18 +181,18 @@ public void run() { WAL wal = entry.getKey(); // reset the flag in front to avoid missing roll request before we return from rollWriter. walNeedsRoll.put(wal, Boolean.FALSE); - byte[][] regionsToFlush = null; + Map> regionsToFlush = null; try { // Force the roll if the logroll.period is elapsed or if a roll was requested. - // The returned value is an array of actual region names. + // The returned value is an collection of actual region and family names. regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue()); } catch (WALClosedException e) { LOG.warn("WAL has been closed. Skipping rolling of writer and just remove it", e); iter.remove(); } if (regionsToFlush != null) { - for (byte[] r : regionsToFlush) { - scheduleFlush(Bytes.toString(r)); + for (Map.Entry> r : regionsToFlush.entrySet()) { + scheduleFlush(Bytes.toString(r.getKey()), r.getValue()); } } afterRoll(wal); @@ -218,8 +219,9 @@ protected void afterRoll(WAL wal) { /** * @param encodedRegionName Encoded name of region to flush. + * @param families stores of region to flush. */ - protected abstract void scheduleFlush(String encodedRegionName); + protected abstract void scheduleFlush(String encodedRegionName, List families); private boolean isWaiting() { Thread.State state = getState(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 98773c205af2..dbc08cc84828 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -115,7 +115,7 @@ public boolean unregisterWALActionsListener(final WALActionsListener listener) { } @Override - public byte[][] rollWriter() { + public Map> rollWriter() { if (!listeners.isEmpty()) { for (WALActionsListener listener : listeners) { listener.logRollRequested(WALActionsListener.RollRequestReason.ERROR); @@ -139,7 +139,7 @@ public byte[][] rollWriter() { } @Override - public byte[][] rollWriter(boolean force) { + public Map> rollWriter(boolean force) { return rollWriter(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index d57784f6ca06..902ca6d398a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -19,6 +19,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Set; import org.apache.hadoop.hbase.HConstants; @@ -60,11 +61,11 @@ public interface WAL extends Closeable, WALFileLengthProvider { * The implementation is synchronized in order to make sure there's one rollWriter * running at any given time. * - * @return If lots of logs, flush the returned regions so next time through we + * @return If lots of logs, flush the stores of returned regions so next time through we * can clean logs. Returns null if nothing to flush. Names are actual * region names as returned by {@link RegionInfo#getEncodedName()} */ - byte[][] rollWriter() throws FailedLogCloseException, IOException; + Map> rollWriter() throws FailedLogCloseException, IOException; /** * Roll the log writer. That is, start writing log messages to a new file. @@ -76,11 +77,11 @@ public interface WAL extends Closeable, WALFileLengthProvider { * @param force * If true, force creation of a new writer even if no entries have * been written to the current writer - * @return If lots of logs, flush the returned regions so next time through we + * @return If lots of logs, flush the stores of returned regions so next time through we * can clean logs. Returns null if nothing to flush. Names are actual * region names as returned by {@link RegionInfo#getEncodedName()} */ - byte[][] rollWriter(boolean force) throws IOException; + Map> rollWriter(boolean force) throws IOException; /** * Stop accepting new writes. If we have unsynced writes still in buffer, sync them. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java index c7145e399d3b..464afad37147 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java @@ -23,6 +23,8 @@ import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -113,8 +115,9 @@ public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf) } @Override - public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException { - byte[][] regions = super.rollWriter(force); + public Map> rollWriter(boolean force) + throws FailedLogCloseException, IOException { + Map> regions = super.rollWriter(force); rolls.getAndIncrement(); return regions; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java index e91ff12c9a40..d273501d8eff 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java @@ -66,8 +66,8 @@ public void testFlushRegionEntryEquality() { HRegion r = mock(HRegion.class); doReturn(hri).when(r).getRegionInfo(); - FlushRegionEntry entry = new FlushRegionEntry(r, true, FlushLifeCycleTracker.DUMMY); - FlushRegionEntry other = new FlushRegionEntry(r, true, FlushLifeCycleTracker.DUMMY); + FlushRegionEntry entry = new FlushRegionEntry(r, null, FlushLifeCycleTracker.DUMMY); + FlushRegionEntry other = new FlushRegionEntry(r, null, FlushLifeCycleTracker.DUMMY); assertEquals(entry.hashCode(), other.hashCode()); assertEquals(entry, other); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java index eba138a1cc05..622a09f351f7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java @@ -24,6 +24,8 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.Iterator; +import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.ChoreService; @@ -139,11 +141,11 @@ public void testWhenClusterIsWriteHeavyWithEmptyMemstore() throws Exception { final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); heapMemoryManager.start(choreService); memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK; - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); // Allow the tuner to run once and do necessary memory up Thread.sleep(1500); // No changes should be made by tuner as we already have lot of empty space @@ -182,10 +184,10 @@ public void testHeapMemoryManagerWhenOffheapFlushesHappenUnderReadHeavyCase() th // do some offheap flushes also. So there should be decrease in memstore but // not as that when we don't have offheap flushes memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK; - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); // Allow the tuner to run once and do necessary memory up waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize); assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize); @@ -230,10 +232,10 @@ public void testHeapMemoryManagerWithOffheapMemstoreAndMixedWorkload() throws Ex // do some offheap flushes also. So there should be decrease in memstore but // not as that when we don't have offheap flushes memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK; - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); // Allow the tuner to run once and do necessary memory up waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize); assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize); @@ -246,10 +248,10 @@ public void testHeapMemoryManagerWithOffheapMemstoreAndMixedWorkload() throws Ex // flushes are due to onheap overhead. This should once again call for increase in // memstore size but that increase should be to the safe size memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK; - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); // Allow the tuner to run once and do necessary memory up waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize); assertHeapSpaceDelta(maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize); @@ -312,10 +314,10 @@ public void testWhenClusterIsWriteHeavy() throws Exception { final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); heapMemoryManager.start(choreService); memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); // Allow the tuner to run once and do necessary memory up waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize); assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize, @@ -326,8 +328,8 @@ public void testWhenClusterIsWriteHeavy() throws Exception { oldBlockCacheSize = blockCache.maxSize; // Do some more flushes before the next run of HeapMemoryTuner memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); // Allow the tuner to run once and do necessary memory up waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize); assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize, @@ -361,10 +363,10 @@ public void testWhenClusterIsWriteHeavyWithOffheapMemstore() throws Exception { heapMemoryManager.start(choreService); // this should not change anything with onheap memstore memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK; - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); // Allow the tuner to run once and do necessary memory up Thread.sleep(1500); // No changes should be made by tuner as we already have lot of empty space @@ -448,9 +450,9 @@ public void testWhenClusterIsHavingMoreWritesThanReads() throws Exception { final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); heapMemoryManager.start(choreService); memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); blockCache.evictBlock(null); // Allow the tuner to run once and do necessary memory up Thread.sleep(1500); @@ -459,9 +461,9 @@ public void testWhenClusterIsHavingMoreWritesThanReads() throws Exception { assertEquals(oldBlockCacheSize, blockCache.maxSize); // Do some more flushes before the next run of HeapMemoryTuner memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); // Allow the tuner to run once and do necessary memory up waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize); assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize, @@ -494,9 +496,9 @@ public void testBlockedFlushesIncreaseMemstoreInSteadyState() throws Exception { final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); heapMemoryManager.start(choreService); memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); blockCache.evictBlock(null); blockCache.evictBlock(null); // Allow the tuner to run once and do necessary memory up @@ -506,7 +508,7 @@ public void testBlockedFlushesIncreaseMemstoreInSteadyState() throws Exception { assertEquals(oldBlockCacheSize, blockCache.maxSize); // Flushes that block updates memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK; - memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY); blockCache.evictBlock(null); blockCache.evictBlock(null); blockCache.evictBlock(null); @@ -752,14 +754,19 @@ public MemstoreFlusherStub(long memstoreSize) { } @Override - public boolean requestFlush(HRegion region, boolean forceFlushAllStores, - FlushLifeCycleTracker tracker) { + public boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker) { this.listener.flushRequested(flushType, region); return true; } @Override - public boolean requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores) { + public boolean requestFlush(HRegion region, List families, + FlushLifeCycleTracker tracker) { + return true; + } + + @Override + public boolean requestDelayedFlush(HRegion region, long delay) { return true; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java index 0de5cb0d8c6f..9cd0ec2a16b9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java @@ -141,7 +141,7 @@ Matchers. any(), Matchers. any(), long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family); LOG.info("CHANGE OLDEST " + oldestSeqIdOfStore); assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM); - rs.getMemStoreFlusher().requestFlush(spiedRegion, false, FlushLifeCycleTracker.DUMMY); + rs.getMemStoreFlusher().requestFlush(spiedRegion, FlushLifeCycleTracker.DUMMY); synchronized (flushed) { while (!flushed.booleanValue()) { flushed.wait(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java index 75dcad111e2e..7c491dc2583d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -53,6 +54,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; @@ -168,9 +170,9 @@ public void testWALCoprocessorLoaded() throws Exception { } protected void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times, - MultiVersionConcurrencyControl mvcc, NavigableMap scopes) + MultiVersionConcurrencyControl mvcc, NavigableMap scopes, String cf) throws IOException { - final byte[] row = Bytes.toBytes("row"); + final byte[] row = Bytes.toBytes(cf); for (int i = 0; i < times; i++) { long timestamp = System.currentTimeMillis(); WALEdit cols = new WALEdit(); @@ -252,8 +254,8 @@ public void testWALComparator() throws Exception { * regions which should be flushed in order to archive the oldest wal file. *

* This method tests this behavior by inserting edits and rolling the wal enough times to reach - * the max number of logs threshold. It checks whether we get the "right regions" for flush on - * rolling the wal. + * the max number of logs threshold. It checks whether we get the "right regions and stores" for + * flush on rolling the wal. * @throws Exception */ @Test @@ -263,12 +265,23 @@ public void testFindMemStoresEligibleForFlush() throws Exception { conf1.setInt("hbase.regionserver.maxlogs", 1); AbstractFSWAL wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null); + String cf1 = "cf1"; + String cf2 = "cf2"; + String cf3 = "cf3"; TableDescriptor t1 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t1")) - .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf1)).build(); TableDescriptor t2 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t2")) - .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf1)).build(); RegionInfo hri1 = RegionInfoBuilder.newBuilder(t1.getTableName()).build(); RegionInfo hri2 = RegionInfoBuilder.newBuilder(t2.getTableName()).build(); + + List cfs = new ArrayList(); + cfs.add(ColumnFamilyDescriptorBuilder.of(cf1)); + cfs.add(ColumnFamilyDescriptorBuilder.of(cf2)); + TableDescriptor t3 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t3")) + .setColumnFamilies(cfs).build(); + RegionInfo hri3 = RegionInfoBuilder.newBuilder(t3.getTableName()).build(); + // add edits and roll the wal MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); NavigableMap scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR); @@ -279,26 +292,30 @@ public void testFindMemStoresEligibleForFlush() throws Exception { for (byte[] fam : t2.getColumnFamilyNames()) { scopes2.put(fam, 0); } + NavigableMap scopes3 = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (byte[] fam : t3.getColumnFamilyNames()) { + scopes3.put(fam, 0); + } try { - addEdits(wal, hri1, t1, 2, mvcc, scopes1); + addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); wal.rollWriter(); // add some more edits and roll the wal. This would reach the log number threshold - addEdits(wal, hri1, t1, 2, mvcc, scopes1); + addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); wal.rollWriter(); // with above rollWriter call, the max logs limit is reached. assertTrue(wal.getNumRolledLogFiles() == 2); // get the regions to flush; since there is only one region in the oldest wal, it should // return only one region. - byte[][] regionsToFlush = wal.findRegionsToForceFlush(); - assertEquals(1, regionsToFlush.length); - assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]); + Map> regionsToFlush = wal.findRegionsToForceFlush(); + assertEquals(1, regionsToFlush.size()); + assertEquals(hri1.getEncodedNameAsBytes(), (byte[])regionsToFlush.keySet().toArray()[0]); // insert edits in second region - addEdits(wal, hri2, t2, 2, mvcc, scopes2); + addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1); // get the regions to flush, it should still read region1. regionsToFlush = wal.findRegionsToForceFlush(); - assertEquals(1, regionsToFlush.length); - assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]); + assertEquals(1, regionsToFlush.size()); + assertEquals(hri1.getEncodedNameAsBytes(), (byte[])regionsToFlush.keySet().toArray()[0]); // flush region 1, and roll the wal file. Only last wal which has entries for region1 should // remain. flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); @@ -311,29 +328,50 @@ public void testFindMemStoresEligibleForFlush() throws Exception { // no wal should remain now. assertEquals(0, wal.getNumRolledLogFiles()); // add edits both to region 1 and region 2, and roll. - addEdits(wal, hri1, t1, 2, mvcc, scopes1); - addEdits(wal, hri2, t2, 2, mvcc, scopes2); + addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); + addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1); wal.rollWriter(); // add edits and roll the writer, to reach the max logs limit. assertEquals(1, wal.getNumRolledLogFiles()); - addEdits(wal, hri1, t1, 2, mvcc, scopes1); + addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); wal.rollWriter(); // it should return two regions to flush, as the oldest wal file has entries // for both regions. regionsToFlush = wal.findRegionsToForceFlush(); - assertEquals(2, regionsToFlush.length); + assertEquals(2, regionsToFlush.size()); // flush both regions flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames()); wal.rollWriter(true); assertEquals(0, wal.getNumRolledLogFiles()); // Add an edit to region1, and roll the wal. - addEdits(wal, hri1, t1, 2, mvcc, scopes1); + addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); // tests partial flush: roll on a partial flush, and ensure that wal is not archived. wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); wal.rollWriter(); wal.completeCacheFlush(hri1.getEncodedNameAsBytes()); assertEquals(1, wal.getNumRolledLogFiles()); + + // clear test data + flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); + wal.rollWriter(true); + // add edits for three familes + addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1); + addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf2); + addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf3); + wal.rollWriter(); + addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1); + wal.rollWriter(); + assertEquals(2, wal.getNumRolledLogFiles()); + // flush one family before archive oldest wal + Set flushedFamilyNames = new HashSet<>(); + flushedFamilyNames.add(Bytes.toBytes(cf1)); + flushRegion(wal, hri3.getEncodedNameAsBytes(), flushedFamilyNames); + regionsToFlush = wal.findRegionsToForceFlush(); + // then only two family need to be flushed when archive oldest wal + assertEquals(1, regionsToFlush.size()); + assertEquals(hri3.getEncodedNameAsBytes(), (byte[])regionsToFlush.keySet().toArray()[0]); + assertEquals(2, regionsToFlush.get(hri3.getEncodedNameAsBytes()).size()); } finally { if (wal != null) { wal.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index cb851b9b0583..5fdac7291c7f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -1117,9 +1117,9 @@ static class TestFlusher implements FlushRequester { private HRegion r; @Override - public boolean requestFlush(HRegion region, boolean force, FlushLifeCycleTracker tracker) { + public boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker) { try { - r.flush(force); + r.flush(false); return true; } catch (IOException e) { throw new RuntimeException("Exception flushing", e); @@ -1127,7 +1127,13 @@ public boolean requestFlush(HRegion region, boolean force, FlushLifeCycleTracker } @Override - public boolean requestDelayedFlush(HRegion region, long when, boolean forceFlushAllStores) { + public boolean requestFlush(HRegion region, List families, + FlushLifeCycleTracker tracker) { + return true; + } + + @Override + public boolean requestDelayedFlush(HRegion region, long when) { return true; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index 87bec9ac0607..d1a5a0aa7f4e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -124,7 +124,7 @@ public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldExcepti RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); for (int i = 0; i < 10; i++) { - addEdits(log, hri, htd, 1, mvcc, scopes); + addEdits(log, hri, htd, 1, mvcc, scopes, "row"); } } finally { log.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java index ff20ab5849fc..22b24abfb95f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java @@ -137,7 +137,7 @@ public void testFindLower() { sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); assertTrue(sida.findLower(m) == null); m.put(ENCODED_REGION_NAME, sida.getLowestSequenceId(ENCODED_REGION_NAME)); - assertTrue(sida.findLower(m).length == 1); + assertTrue(sida.findLower(m).size() == 1); m.put(ENCODED_REGION_NAME, sida.getLowestSequenceId(ENCODED_REGION_NAME) - 1); assertTrue(sida.findLower(m) == null); }