Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,10 @@ default boolean isCacheEnabled() {
}

/**
* Wait for the bucket cache to be enabled while server restart
* @param timeout time to wait for the bucket cache to be enable
* @return boolean true if the bucket cache is enabled, false otherwise
* Wait for the block cache implementation to be completely enabled. Some block cache
* implementations may take longer to initialise, and this initialisation may be asynchronous.
* @param timeout time to wait for the cache to become enabled.
* @return boolean true if the cache is enabled, false otherwise.
*/
default boolean waitForCacheInitialization(long timeout) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -83,7 +85,8 @@ public final class BlockCacheFactory {
private BlockCacheFactory() {
}

public static BlockCache createBlockCache(Configuration conf) {
public static BlockCache createBlockCache(Configuration conf,
Map<String, HRegion> onlineRegions) {
FirstLevelBlockCache l1Cache = createFirstLevelCache(conf);
if (l1Cache == null) {
return null;
Expand All @@ -96,7 +99,7 @@ public static BlockCache createBlockCache(Configuration conf) {
: new InclusiveCombinedBlockCache(l1Cache, l2CacheInstance);
} else {
// otherwise use the bucket cache.
BucketCache bucketCache = createBucketCache(conf);
BucketCache bucketCache = createBucketCache(conf, onlineRegions);
if (!conf.getBoolean("hbase.bucketcache.combinedcache.enabled", true)) {
// Non combined mode is off from 2.0
LOG.warn(
Expand All @@ -106,6 +109,10 @@ public static BlockCache createBlockCache(Configuration conf) {
}
}

public static BlockCache createBlockCache(Configuration conf) {
return createBlockCache(conf, null);
}

private static FirstLevelBlockCache createFirstLevelCache(final Configuration c) {
final long cacheSize = MemorySizeUtil.getOnHeapCacheSize(c);
if (cacheSize < 0) {
Expand Down Expand Up @@ -179,7 +186,8 @@ private static BlockCache createExternalBlockcache(Configuration c) {

}

private static BucketCache createBucketCache(Configuration c) {
private static BucketCache createBucketCache(Configuration c,
Map<String, HRegion> onlineRegions) {
// Check for L2. ioengine name must be non-null.
String bucketCacheIOEngineName = c.get(BUCKET_CACHE_IOENGINE_KEY, null);
if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 0) {
Expand Down Expand Up @@ -225,7 +233,8 @@ private static BucketCache createBucketCache(Configuration c) {
BucketCache.DEFAULT_ERROR_TOLERATION_DURATION);
// Bucket cache logs its stats on creation internal to the constructor.
bucketCache = new BucketCache(bucketCacheIOEngineName, bucketCacheSize, blockSize,
bucketSizes, writerThreads, writerQueueLen, persistentPath, ioErrorsTolerationDuration, c);
bucketSizes, writerThreads, writerQueueLen, persistentPath, ioErrorsTolerationDuration, c,
onlineRegions);
} catch (IOException ioex) {
LOG.error("Can't instantiate bucket cache", ioex);
throw new RuntimeException(ioex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.metrics.impl.FastLongHistogram;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.GsonUtil;
Expand Down Expand Up @@ -242,6 +246,16 @@ public static boolean shouldReplaceExistingCacheBlock(BlockCache blockCache,
}
}

public static Set<String> listAllFilesNames(Map<String, HRegion> onlineRegions) {
Set<String> files = new HashSet<>();
onlineRegions.values().forEach(r -> {
r.getStores().forEach(s -> {
s.getStorefiles().forEach(f -> files.add(f.getPath().getName()));
});
});
return files;
}

private static final int DEFAULT_MAX = 1000000;

public static int getMaxCachedBlocksByFile(Configuration conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
Expand Down Expand Up @@ -125,6 +126,12 @@ public class BucketCache implements BlockCache, HeapSize {
static final String BACKING_MAP_PERSISTENCE_CHUNK_SIZE =
"hbase.bucketcache.persistence.chunksize";

/** The cache age of blocks to check if the related file is present on any online regions. */
static final String BLOCK_ORPHAN_GRACE_PERIOD =
"hbase.bucketcache.block.orphan.evictgraceperiod.seconds";

static final long BLOCK_ORPHAN_GRACE_PERIOD_DEFAULT = 24 * 60 * 60 * 1000L;

/** Priority buckets */
static final float DEFAULT_SINGLE_FACTOR = 0.25f;
static final float DEFAULT_MULTI_FACTOR = 0.50f;
Expand Down Expand Up @@ -292,6 +299,10 @@ protected enum CacheState {
private long allocFailLogPrevTs; // time of previous log event for allocation failure.
private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute.

private Map<String, HRegion> onlineRegions;

private long orphanBlockGracePeriod = 0;

public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
Expand All @@ -301,11 +312,21 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
Configuration conf) throws IOException {
this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
persistencePath, ioErrorsTolerationDuration, conf, null);
}

public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
Configuration conf, Map<String, HRegion> onlineRegions) throws IOException {
Preconditions.checkArgument(blockSize > 0,
"BucketCache capacity is set to " + blockSize + ", can not be less than 0");
this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM);
this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath);
this.writerThreads = new WriterThread[writerThreadNum];
this.onlineRegions = onlineRegions;
this.orphanBlockGracePeriod =
conf.getLong(BLOCK_ORPHAN_GRACE_PERIOD, BLOCK_ORPHAN_GRACE_PERIOD_DEFAULT);
long blockNumCapacity = capacity / blockSize;
if (blockNumCapacity >= Integer.MAX_VALUE) {
// Enough for about 32TB of cache!
Expand Down Expand Up @@ -1019,6 +1040,29 @@ private void freeEntireBuckets(int completelyFreeBucketsNeeded) {
}
}

private long calculateBytesToFree(StringBuilder msgBuffer) {
long bytesToFreeWithoutExtra = 0;
BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
long[] bytesToFreeForBucket = new long[stats.length];
for (int i = 0; i < stats.length; i++) {
bytesToFreeForBucket[i] = 0;
long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor));
freeGoal = Math.max(freeGoal, 1);
if (stats[i].freeCount() < freeGoal) {
bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount());
bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
if (msgBuffer != null) {
msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
+ StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
}
}
}
if (msgBuffer != null) {
msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
}
return bytesToFreeWithoutExtra;
}

/**
* Free the space if the used size reaches acceptableSize() or one size block couldn't be
* allocated. When freeing the space, we use the LRU algorithm and ensure there must be some
Expand All @@ -1035,43 +1079,21 @@ void freeSpace(final String why) {
}
try {
freeInProgress = true;
long bytesToFreeWithoutExtra = 0;
// Calculate free byte for each bucketSizeinfo
StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null;
BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
long[] bytesToFreeForBucket = new long[stats.length];
for (int i = 0; i < stats.length; i++) {
bytesToFreeForBucket[i] = 0;
long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor));
freeGoal = Math.max(freeGoal, 1);
if (stats[i].freeCount() < freeGoal) {
bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount());
bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
if (msgBuffer != null) {
msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
+ StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
}
}
}
if (msgBuffer != null) {
msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
}

long bytesToFreeWithoutExtra = calculateBytesToFree(msgBuffer);
if (bytesToFreeWithoutExtra <= 0) {
return;
}
long currentSize = bucketAllocator.getUsedSize();
long totalSize = bucketAllocator.getTotalSize();
if (LOG.isDebugEnabled() && msgBuffer != null) {
LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString()
+ " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize="
LOG.debug("Free started because \"" + why + "\"; " + msgBuffer + " of current used="
+ StringUtils.byteDesc(currentSize) + ", actual cacheSize="
+ StringUtils.byteDesc(realCacheSize.sum()) + ", total="
+ StringUtils.byteDesc(totalSize));
}

long bytesToFreeWithExtra =
(long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor));

// Instantiate priority buckets
BucketEntryGroup bucketSingle =
new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor));
Expand All @@ -1080,10 +1102,48 @@ void freeSpace(final String why) {
BucketEntryGroup bucketMemory =
new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor));

Set<String> allValidFiles = null;
// We need the region/stores/files tree, in order to figure out if a block is "orphan" or not.
// See further comments below for more details.
if (onlineRegions != null) {
allValidFiles = BlockCacheUtil.listAllFilesNames(onlineRegions);
}
// the cached time is recored in nanos, so we need to convert the grace period accordingly
long orphanGracePeriodNanos = orphanBlockGracePeriod * 1000000;
long bytesFreed = 0;
// Scan entire map putting bucket entry into appropriate bucket entry
// group
for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) {
switch (bucketEntryWithKey.getValue().getPriority()) {
BlockCacheKey key = bucketEntryWithKey.getKey();
BucketEntry entry = bucketEntryWithKey.getValue();
// Under certain conditions, blocks for regions not on the current region server might
// be hanging on the cache. For example, when using the persistent cache feature, if the
// RS crashes, then if not the same regions are assigned back once its online again, blocks
// for the previous online regions would be recovered and stay in the cache. These would be
// "orphan" blocks, as the files these blocks belong to are not in any of the online
// regions.
// "Orphan" blocks are a pure waste of cache space and should be evicted first during
// the freespace run.
// Compactions and Flushes may cache blocks before its files are completely written. In
// these cases the file won't be found in any of the online regions stores, but the block
// shouldn't be evicted. To avoid this, we defined this
// hbase.bucketcache.block.orphan.evictgraceperiod property, to account for a grace
// period (default 24 hours) where a block should be checked if it's an orphan block.
if (
allValidFiles != null
&& entry.getCachedTime() < (System.nanoTime() - orphanGracePeriodNanos)
) {
if (!allValidFiles.contains(key.getHfileName())) {
if (evictBucketEntryIfNoRpcReferenced(key, entry)) {
// We calculate the freed bytes, but we don't stop if the goal was reached because
// these are orphan blocks anyway, so let's leverage this run of freeSpace
// to get rid of all orphans at once.
bytesFreed += entry.getLength();
continue;
}
}
}
switch (entry.getPriority()) {
case SINGLE: {
bucketSingle.add(bucketEntryWithKey);
break;
Expand All @@ -1098,7 +1158,6 @@ void freeSpace(final String why) {
}
}
}

PriorityQueue<BucketEntryGroup> bucketQueue =
new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow));

Expand All @@ -1107,7 +1166,6 @@ void freeSpace(final String why) {
bucketQueue.add(bucketMemory);

int remainingBuckets = bucketQueue.size();
long bytesFreed = 0;

BucketEntryGroup bucketGroup;
while ((bucketGroup = bucketQueue.poll()) != null) {
Expand All @@ -1124,18 +1182,15 @@ void freeSpace(final String why) {
if (bucketSizesAboveThresholdCount(minFactor) > 0) {
bucketQueue.clear();
remainingBuckets = 3;

bucketQueue.add(bucketSingle);
bucketQueue.add(bucketMulti);
bucketQueue.add(bucketMemory);

while ((bucketGroup = bucketQueue.poll()) != null) {
long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets;
bytesFreed += bucketGroup.free(bucketBytesToFree);
remainingBuckets--;
}
}

// Even after the above free we might still need freeing because of the
// De-fragmentation of the buckets (also called Slab Calcification problem), i.e
// there might be some buckets where the occupancy is very sparse and thus are not
Expand All @@ -1154,7 +1209,6 @@ void freeSpace(final String why) {
+ StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory));
}
}

} catch (Throwable t) {
LOG.warn("Failed freeing space", t);
} finally {
Expand Down
Loading