diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index 4a880ed6fc35..391fd857ac54 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -256,9 +256,10 @@ default boolean isCacheEnabled() { } /** - * Wait for the bucket cache to be enabled while server restart - * @param timeout time to wait for the bucket cache to be enable - * @return boolean true if the bucket cache is enabled, false otherwise + * Wait for the block cache implementation to be completely enabled. Some block cache + * implementations may take longer to initialise, and this initialisation may be asynchronous. + * @param timeout time to wait for the cache to become enabled. + * @return boolean true if the cache is enabled, false otherwise. */ default boolean waitForCacheInitialization(long timeout) { return true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java index 6956d584d92a..744a6bbf012d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java @@ -22,11 +22,13 @@ import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; import java.io.IOException; +import java.util.Map; import java.util.concurrent.ForkJoinPool; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.io.util.MemorySizeUtil; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -83,7 +85,8 @@ public final class BlockCacheFactory { private BlockCacheFactory() { } - public static BlockCache createBlockCache(Configuration conf) { + public static BlockCache createBlockCache(Configuration conf, + Map onlineRegions) { FirstLevelBlockCache l1Cache = createFirstLevelCache(conf); if (l1Cache == null) { return null; @@ -96,7 +99,7 @@ public static BlockCache createBlockCache(Configuration conf) { : new InclusiveCombinedBlockCache(l1Cache, l2CacheInstance); } else { // otherwise use the bucket cache. - BucketCache bucketCache = createBucketCache(conf); + BucketCache bucketCache = createBucketCache(conf, onlineRegions); if (!conf.getBoolean("hbase.bucketcache.combinedcache.enabled", true)) { // Non combined mode is off from 2.0 LOG.warn( @@ -106,6 +109,10 @@ public static BlockCache createBlockCache(Configuration conf) { } } + public static BlockCache createBlockCache(Configuration conf) { + return createBlockCache(conf, null); + } + private static FirstLevelBlockCache createFirstLevelCache(final Configuration c) { final long cacheSize = MemorySizeUtil.getOnHeapCacheSize(c); if (cacheSize < 0) { @@ -179,7 +186,8 @@ private static BlockCache createExternalBlockcache(Configuration c) { } - private static BucketCache createBucketCache(Configuration c) { + private static BucketCache createBucketCache(Configuration c, + Map onlineRegions) { // Check for L2. ioengine name must be non-null. String bucketCacheIOEngineName = c.get(BUCKET_CACHE_IOENGINE_KEY, null); if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 0) { @@ -225,7 +233,8 @@ private static BucketCache createBucketCache(Configuration c) { BucketCache.DEFAULT_ERROR_TOLERATION_DURATION); // Bucket cache logs its stats on creation internal to the constructor. bucketCache = new BucketCache(bucketCacheIOEngineName, bucketCacheSize, blockSize, - bucketSizes, writerThreads, writerQueueLen, persistentPath, ioErrorsTolerationDuration, c); + bucketSizes, writerThreads, writerQueueLen, persistentPath, ioErrorsTolerationDuration, c, + onlineRegions); } catch (IOException ioex) { LOG.error("Can't instantiate bucket cache", ioex); throw new RuntimeException(ioex); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java index 3d4698b0047e..7324701efe58 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java @@ -21,13 +21,17 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.Map; import java.util.NavigableMap; import java.util.NavigableSet; +import java.util.Set; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.metrics.impl.FastLongHistogram; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.GsonUtil; @@ -242,6 +246,16 @@ public static boolean shouldReplaceExistingCacheBlock(BlockCache blockCache, } } + public static Set listAllFilesNames(Map onlineRegions) { + Set files = new HashSet<>(); + onlineRegions.values().forEach(r -> { + r.getStores().forEach(s -> { + s.getStorefiles().forEach(f -> files.add(f.getPath().getName())); + }); + }); + return files; + } + private static final int DEFAULT_MAX = 1000000; public static int getMaxCachedBlocksByFile(Configuration conf) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 3ea6602b4fd7..688db638e745 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.RefCnt; import org.apache.hadoop.hbase.protobuf.ProtobufMagic; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -125,6 +126,12 @@ public class BucketCache implements BlockCache, HeapSize { static final String BACKING_MAP_PERSISTENCE_CHUNK_SIZE = "hbase.bucketcache.persistence.chunksize"; + /** The cache age of blocks to check if the related file is present on any online regions. */ + static final String BLOCK_ORPHAN_GRACE_PERIOD = + "hbase.bucketcache.block.orphan.evictgraceperiod.seconds"; + + static final long BLOCK_ORPHAN_GRACE_PERIOD_DEFAULT = 24 * 60 * 60 * 1000L; + /** Priority buckets */ static final float DEFAULT_SINGLE_FACTOR = 0.25f; static final float DEFAULT_MULTI_FACTOR = 0.50f; @@ -292,6 +299,10 @@ protected enum CacheState { private long allocFailLogPrevTs; // time of previous log event for allocation failure. private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute. + private Map onlineRegions; + + private long orphanBlockGracePeriod = 0; + public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath) throws IOException { this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, @@ -301,11 +312,21 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, Configuration conf) throws IOException { + this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, + persistencePath, ioErrorsTolerationDuration, conf, null); + } + + public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, + int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, + Configuration conf, Map onlineRegions) throws IOException { Preconditions.checkArgument(blockSize > 0, "BucketCache capacity is set to " + blockSize + ", can not be less than 0"); this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM); this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); this.writerThreads = new WriterThread[writerThreadNum]; + this.onlineRegions = onlineRegions; + this.orphanBlockGracePeriod = + conf.getLong(BLOCK_ORPHAN_GRACE_PERIOD, BLOCK_ORPHAN_GRACE_PERIOD_DEFAULT); long blockNumCapacity = capacity / blockSize; if (blockNumCapacity >= Integer.MAX_VALUE) { // Enough for about 32TB of cache! @@ -1019,6 +1040,29 @@ private void freeEntireBuckets(int completelyFreeBucketsNeeded) { } } + private long calculateBytesToFree(StringBuilder msgBuffer) { + long bytesToFreeWithoutExtra = 0; + BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); + long[] bytesToFreeForBucket = new long[stats.length]; + for (int i = 0; i < stats.length; i++) { + bytesToFreeForBucket[i] = 0; + long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); + freeGoal = Math.max(freeGoal, 1); + if (stats[i].freeCount() < freeGoal) { + bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); + bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; + if (msgBuffer != null) { + msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" + + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); + } + } + } + if (msgBuffer != null) { + msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); + } + return bytesToFreeWithoutExtra; + } + /** * Free the space if the used size reaches acceptableSize() or one size block couldn't be * allocated. When freeing the space, we use the LRU algorithm and ensure there must be some @@ -1035,43 +1079,21 @@ void freeSpace(final String why) { } try { freeInProgress = true; - long bytesToFreeWithoutExtra = 0; - // Calculate free byte for each bucketSizeinfo StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null; - BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); - long[] bytesToFreeForBucket = new long[stats.length]; - for (int i = 0; i < stats.length; i++) { - bytesToFreeForBucket[i] = 0; - long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); - freeGoal = Math.max(freeGoal, 1); - if (stats[i].freeCount() < freeGoal) { - bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); - bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; - if (msgBuffer != null) { - msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" - + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); - } - } - } - if (msgBuffer != null) { - msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); - } - + long bytesToFreeWithoutExtra = calculateBytesToFree(msgBuffer); if (bytesToFreeWithoutExtra <= 0) { return; } long currentSize = bucketAllocator.getUsedSize(); long totalSize = bucketAllocator.getTotalSize(); if (LOG.isDebugEnabled() && msgBuffer != null) { - LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() - + " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" + LOG.debug("Free started because \"" + why + "\"; " + msgBuffer + " of current used=" + + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" + StringUtils.byteDesc(realCacheSize.sum()) + ", total=" + StringUtils.byteDesc(totalSize)); } - long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor)); - // Instantiate priority buckets BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor)); @@ -1080,10 +1102,48 @@ void freeSpace(final String why) { BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor)); + Set allValidFiles = null; + // We need the region/stores/files tree, in order to figure out if a block is "orphan" or not. + // See further comments below for more details. + if (onlineRegions != null) { + allValidFiles = BlockCacheUtil.listAllFilesNames(onlineRegions); + } + // the cached time is recored in nanos, so we need to convert the grace period accordingly + long orphanGracePeriodNanos = orphanBlockGracePeriod * 1000000; + long bytesFreed = 0; // Scan entire map putting bucket entry into appropriate bucket entry // group for (Map.Entry bucketEntryWithKey : backingMap.entrySet()) { - switch (bucketEntryWithKey.getValue().getPriority()) { + BlockCacheKey key = bucketEntryWithKey.getKey(); + BucketEntry entry = bucketEntryWithKey.getValue(); + // Under certain conditions, blocks for regions not on the current region server might + // be hanging on the cache. For example, when using the persistent cache feature, if the + // RS crashes, then if not the same regions are assigned back once its online again, blocks + // for the previous online regions would be recovered and stay in the cache. These would be + // "orphan" blocks, as the files these blocks belong to are not in any of the online + // regions. + // "Orphan" blocks are a pure waste of cache space and should be evicted first during + // the freespace run. + // Compactions and Flushes may cache blocks before its files are completely written. In + // these cases the file won't be found in any of the online regions stores, but the block + // shouldn't be evicted. To avoid this, we defined this + // hbase.bucketcache.block.orphan.evictgraceperiod property, to account for a grace + // period (default 24 hours) where a block should be checked if it's an orphan block. + if ( + allValidFiles != null + && entry.getCachedTime() < (System.nanoTime() - orphanGracePeriodNanos) + ) { + if (!allValidFiles.contains(key.getHfileName())) { + if (evictBucketEntryIfNoRpcReferenced(key, entry)) { + // We calculate the freed bytes, but we don't stop if the goal was reached because + // these are orphan blocks anyway, so let's leverage this run of freeSpace + // to get rid of all orphans at once. + bytesFreed += entry.getLength(); + continue; + } + } + } + switch (entry.getPriority()) { case SINGLE: { bucketSingle.add(bucketEntryWithKey); break; @@ -1098,7 +1158,6 @@ void freeSpace(final String why) { } } } - PriorityQueue bucketQueue = new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow)); @@ -1107,7 +1166,6 @@ void freeSpace(final String why) { bucketQueue.add(bucketMemory); int remainingBuckets = bucketQueue.size(); - long bytesFreed = 0; BucketEntryGroup bucketGroup; while ((bucketGroup = bucketQueue.poll()) != null) { @@ -1124,18 +1182,15 @@ void freeSpace(final String why) { if (bucketSizesAboveThresholdCount(minFactor) > 0) { bucketQueue.clear(); remainingBuckets = 3; - bucketQueue.add(bucketSingle); bucketQueue.add(bucketMulti); bucketQueue.add(bucketMemory); - while ((bucketGroup = bucketQueue.poll()) != null) { long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; bytesFreed += bucketGroup.free(bucketBytesToFree); remainingBuckets--; } } - // Even after the above free we might still need freeing because of the // De-fragmentation of the buckets (also called Slab Calcification problem), i.e // there might be some buckets where the occupancy is very sparse and thus are not @@ -1154,7 +1209,6 @@ void freeSpace(final String why) { + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory)); } } - } catch (Throwable t) { LOG.warn("Failed freeing space", t); } finally { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 5f0f956eaec0..dabab82805fa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -17,12 +17,17 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BLOCK_ORPHAN_GRACE_PERIOD; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.File; @@ -30,6 +35,8 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -56,6 +63,9 @@ import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -464,7 +474,7 @@ public void testGetPartitionSize() throws IOException { BucketCache.DEFAULT_MIN_FACTOR); Configuration conf = HBaseConfiguration.create(); - conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f); + conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f); conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f); conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f); conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); @@ -500,9 +510,9 @@ public void testCacheSizeCapacity() throws IOException { @Test public void testValidBucketCacheConfigs() throws IOException { Configuration conf = HBaseConfiguration.create(); - conf.setFloat(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, 0.9f); - conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f); - conf.setFloat(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f); + conf.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f); + conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f); + conf.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f); conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f); conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f); conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); @@ -511,11 +521,10 @@ public void testValidBucketCacheConfigs() throws IOException { constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); assertTrue(cache.waitForCacheInitialization(10000)); - assertEquals(BucketCache.ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f, + assertEquals(ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f, cache.getAcceptableFactor(), 0); - assertEquals(BucketCache.MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, - cache.getMinFactor(), 0); - assertEquals(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, + assertEquals(MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, cache.getMinFactor(), 0); + assertEquals(EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, cache.getExtraFreeFactor(), 0); assertEquals(BucketCache.SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f, cache.getSingleFactor(), 0); @@ -529,8 +538,7 @@ public void testValidBucketCacheConfigs() throws IOException { public void testInvalidAcceptFactorConfig() throws IOException { float[] configValues = { -1f, 0.2f, 0.86f, 1.05f }; boolean[] expectedOutcomes = { false, false, true, false }; - Map configMappings = - ImmutableMap.of(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, configValues); + Map configMappings = ImmutableMap.of(ACCEPT_FACTOR_CONFIG_NAME, configValues); Configuration conf = HBaseConfiguration.create(); checkConfigValues(conf, configMappings, expectedOutcomes); } @@ -540,8 +548,7 @@ public void testInvalidMinFactorConfig() throws IOException { float[] configValues = { -1f, 0f, 0.96f, 1.05f }; // throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0 boolean[] expectedOutcomes = { false, true, false, false }; - Map configMappings = - ImmutableMap.of(BucketCache.MIN_FACTOR_CONFIG_NAME, configValues); + Map configMappings = ImmutableMap.of(MIN_FACTOR_CONFIG_NAME, configValues); Configuration conf = HBaseConfiguration.create(); checkConfigValues(conf, configMappings, expectedOutcomes); } @@ -552,7 +559,7 @@ public void testInvalidExtraFreeFactorConfig() throws IOException { // throws due to <0, in expected range, in expected range, config can be > 1.0 boolean[] expectedOutcomes = { false, true, true, true }; Map configMappings = - ImmutableMap.of(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, configValues); + ImmutableMap.of(EXTRA_FREE_FACTOR_CONFIG_NAME, configValues); Configuration conf = HBaseConfiguration.create(); checkConfigValues(conf, configMappings, expectedOutcomes); } @@ -970,4 +977,63 @@ private BucketCache testNotifyFileCachingCompletedForTenBlocks(Path filePath, totalBlocksToCheck * constructedBlockSize); return bucketCache; } + + @Test + public void testEvictOrphansOutOfGracePeriod() throws Exception { + BucketCache bucketCache = testEvictOrphans(0); + assertEquals(10, bucketCache.getBackingMap().size()); + assertEquals(0, bucketCache.blocksByHFile.stream() + .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count()); + } + + @Test + public void testEvictOrphansWithinGracePeriod() throws Exception { + BucketCache bucketCache = testEvictOrphans(60 * 60 * 1000L); + assertEquals(18, bucketCache.getBackingMap().size()); + assertTrue(bucketCache.blocksByHFile.stream() + .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count() > 0); + } + + private BucketCache testEvictOrphans(long orphanEvictionGracePeriod) throws Exception { + Path validFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-valid"); + Path orphanFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-orphan"); + Map onlineRegions = new HashMap<>(); + List stores = new ArrayList<>(); + Collection storeFiles = new ArrayList<>(); + HRegion mockedRegion = mock(HRegion.class); + HStore mockedStore = mock(HStore.class); + HStoreFile mockedStoreFile = mock(HStoreFile.class); + when(mockedStoreFile.getPath()).thenReturn(validFile); + storeFiles.add(mockedStoreFile); + when(mockedStore.getStorefiles()).thenReturn(storeFiles); + stores.add(mockedStore); + when(mockedRegion.getStores()).thenReturn(stores); + onlineRegions.put("mocked_region", mockedRegion); + HBASE_TESTING_UTILITY.getConfiguration().setDouble(MIN_FACTOR_CONFIG_NAME, 0.99); + HBASE_TESTING_UTILITY.getConfiguration().setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1); + HBASE_TESTING_UTILITY.getConfiguration().setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01); + HBASE_TESTING_UTILITY.getConfiguration().setLong(BLOCK_ORPHAN_GRACE_PERIOD, + orphanEvictionGracePeriod); + BucketCache bucketCache = new BucketCache(ioEngineName, (constructedBlockSize + 1024) * 21, + constructedBlockSize, new int[] { constructedBlockSize + 1024 }, 1, 1, null, 60 * 1000, + HBASE_TESTING_UTILITY.getConfiguration(), onlineRegions); + HFileBlockPair[] validBlockPairs = + CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, validFile); + HFileBlockPair[] orphanBlockPairs = + CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, orphanFile); + for (HFileBlockPair pair : validBlockPairs) { + bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true); + } + waitUntilAllFlushedToBucket(bucketCache); + assertEquals(10, bucketCache.getBackingMap().size()); + bucketCache.freeSpace("test"); + assertEquals(10, bucketCache.getBackingMap().size()); + for (HFileBlockPair pair : orphanBlockPairs) { + bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true); + } + waitUntilAllFlushedToBucket(bucketCache); + assertEquals(20, bucketCache.getBackingMap().size()); + bucketCache.freeSpace("test"); + return bucketCache; + } }