Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ public class CacheConfig {
public static final String BUCKETCACHE_PERSIST_INTERVAL_KEY =
"hbase.bucketcache.persist.intervalinmillis";

/**
* Configuration key to set the heap usage threshold limit once prefetch threads should be
* interrupted.
*/
public static final String PREFETCH_HEAP_USAGE_THRESHOLD = "hbase.rs.prefetchheapusage";

// Defaults
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
public static final boolean DEFAULT_CACHE_DATA_ON_WRITE = false;
Expand All @@ -111,6 +117,7 @@ public class CacheConfig {
public static final boolean DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE = false;
public static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true;
public static final long DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD = Long.MAX_VALUE;
public static final double DEFAULT_PREFETCH_HEAP_USAGE_THRESHOLD = 1d;

/**
* Whether blocks should be cached on read (default is on if there is a cache but this can be
Expand Down Expand Up @@ -157,6 +164,8 @@ public class CacheConfig {

private final ByteBuffAllocator byteBuffAllocator;

private final double heapUsageThreshold;

/**
* Create a cache configuration using the specified configuration object and defaults for family
* level settings. Only use if no column family context.
Expand Down Expand Up @@ -201,6 +210,8 @@ public CacheConfig(Configuration conf, ColumnFamilyDescriptor family, BlockCache
this.cacheCompactedDataOnWrite =
conf.getBoolean(CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE);
this.cacheCompactedDataOnWriteThreshold = getCacheCompactedBlocksOnWriteThreshold(conf);
this.heapUsageThreshold =
conf.getDouble(PREFETCH_HEAP_USAGE_THRESHOLD, DEFAULT_PREFETCH_HEAP_USAGE_THRESHOLD);
this.blockCache = blockCache;
this.byteBuffAllocator = byteBuffAllocator;
}
Expand All @@ -222,6 +233,7 @@ public CacheConfig(CacheConfig cacheConf) {
this.dropBehindCompaction = cacheConf.dropBehindCompaction;
this.blockCache = cacheConf.blockCache;
this.byteBuffAllocator = cacheConf.byteBuffAllocator;
this.heapUsageThreshold = cacheConf.heapUsageThreshold;
}

private CacheConfig() {
Expand All @@ -237,6 +249,7 @@ private CacheConfig() {
this.dropBehindCompaction = false;
this.blockCache = null;
this.byteBuffAllocator = ByteBuffAllocator.HEAP;
this.heapUsageThreshold = DEFAULT_PREFETCH_HEAP_USAGE_THRESHOLD;
}

/**
Expand Down Expand Up @@ -386,6 +399,17 @@ public boolean shouldReadBlockFromCache(BlockType blockType) {
return false;
}

/**
* Checks if the current heap usage is below the threshold configured by
* "hbase.rs.prefetchheapusage" (0.8 by default).
*/
public boolean isHeapUsageBelowThreshold() {
double total = Runtime.getRuntime().maxMemory();
double available = Runtime.getRuntime().freeMemory();
double usedRatio = 1d - (available / total);
return heapUsageThreshold > usedRatio;
Comment on lines +407 to +410
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] I found the hbase has of MemorySizeUtil.safeGetHeapMemoryUsage(); in region server to get heap usage and other place with a single instance, although I think it's still using these two methods at the end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem to get accurate values, specially when running in the UTs.

}

/**
* If we make sure the block could not be cached, we will not acquire the lock otherwise we will
* acquire lock
Expand Down Expand Up @@ -413,6 +437,10 @@ public ByteBuffAllocator getByteBuffAllocator() {
return this.byteBuffAllocator;
}

public double getHeapUsageThreshold() {
return heapUsageThreshold;
}

private long getCacheCompactedBlocksOnWriteThreshold(Configuration conf) {
long cacheCompactedBlocksOnWriteThreshold =
conf.getLong(CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,23 @@ public void run() {
HFileBlock block = prefetchStreamReader.readBlock(offset, onDiskSizeOfNextBlock,
/* cacheBlock= */true, /* pread= */false, false, false, null, null, true);
try {
if (!cacheConf.isInMemory() && !cache.blockFitsIntoTheCache(block).orElse(true)) {
LOG.warn(
"Interrupting prefetch for file {} because block {} of size {} "
+ "doesn't fit in the available cache space.",
path, cacheKey, block.getOnDiskSizeWithHeader());
interrupted = true;
break;
if (!cacheConf.isInMemory()) {
if (!cache.blockFitsIntoTheCache(block).orElse(true)) {
LOG.warn(
"Interrupting prefetch for file {} because block {} of size {} "
+ "doesn't fit in the available cache space.",
path, cacheKey, block.getOnDiskSizeWithHeader());
interrupted = true;
break;
}
if (!cacheConf.isHeapUsageBelowThreshold()) {
LOG.warn(
"Interrupting prefetch because heap usage is above the threshold: {} "
+ "configured via {}",
cacheConf.getHeapUsageThreshold(), CacheConfig.PREFETCH_HEAP_USAGE_THRESHOLD);
interrupted = true;
break;
}
}
onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
offset += block.getOnDiskSizeWithHeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -156,6 +157,43 @@ public void testPrefetchBlockCacheDisabled() throws Exception {
poolExecutor.getCompletedTaskCount() + poolExecutor.getQueue().size());
}

@Test
public void testPrefetchHeapUsageAboveThreshold() throws Exception {
ColumnFamilyDescriptor columnFamilyDescriptor =
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true)
.setBlockCacheEnabled(true).build();
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
Configuration newConf = new Configuration(conf);
newConf.setDouble(CacheConfig.PREFETCH_HEAP_USAGE_THRESHOLD, 0.1);
CacheConfig cacheConfig =
new CacheConfig(newConf, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP);
Path storeFile = writeStoreFile("testPrefetchHeapUsageAboveThreshold", meta, cacheConfig);
MutableInt cachedCount = new MutableInt(0);
MutableInt unCachedCount = new MutableInt(0);
readStoreFile(storeFile, (r, o) -> {
HFileBlock block = null;
try {
block = r.readBlock(o, -1, false, true, false, true, null, null);
} catch (IOException e) {
fail(e.getMessage());
}
return block;
}, (key, block) -> {
boolean isCached = blockCache.getBlock(key, true, false, true) != null;
if (
block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
|| block.getBlockType() == BlockType.INTERMEDIATE_INDEX
) {
if (isCached) {
cachedCount.increment();
} else {
unCachedCount.increment();
}
}
}, cacheConfig);
assertTrue(unCachedCount.compareTo(cachedCount) > 0);
}

@Test
public void testPrefetch() throws Exception {
TraceUtil.trace(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,10 @@ public Configuration setupBucketCacheConfig(long bucketCachePersistInterval) thr
return conf;
}

public BucketCache setupBucketCache(Configuration conf) throws IOException {
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
public BucketCache setupBucketCache(Configuration conf, String persistentCacheFile)
throws IOException {
BucketCache bucketCache = new BucketCache("file:" + testDir + "/" + persistentCacheFile,
capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
testDir + "/bucket.persistence", 60 * 1000, conf);
return bucketCache;
}
Expand All @@ -103,7 +104,7 @@ public void cleanupBucketCache(BucketCache bucketCache) throws IOException {
public void testPrefetchPersistenceCrash() throws Exception {
long bucketCachePersistInterval = 3000;
Configuration conf = setupBucketCacheConfig(bucketCachePersistInterval);
BucketCache bucketCache = setupBucketCache(conf);
BucketCache bucketCache = setupBucketCache(conf, "testPrefetchPersistenceCrash");
CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
FileSystem fs = HFileSystem.get(conf);
// Load Cache
Expand All @@ -121,7 +122,7 @@ public void testPrefetchPersistenceCrash() throws Exception {
public void testPrefetchPersistenceCrashNegative() throws Exception {
long bucketCachePersistInterval = Long.MAX_VALUE;
Configuration conf = setupBucketCacheConfig(bucketCachePersistInterval);
BucketCache bucketCache = setupBucketCache(conf);
BucketCache bucketCache = setupBucketCache(conf, "testPrefetchPersistenceCrashNegative");
CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
FileSystem fs = HFileSystem.get(conf);
// Load Cache
Expand All @@ -134,7 +135,7 @@ public void testPrefetchPersistenceCrashNegative() throws Exception {
@Test
public void testPrefetchListUponBlockEviction() throws Exception {
Configuration conf = setupBucketCacheConfig(200);
BucketCache bucketCache = setupBucketCache(conf);
BucketCache bucketCache = setupBucketCache(conf, "testPrefetchListUponBlockEviction");
CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
FileSystem fs = HFileSystem.get(conf);
// Load Blocks in cache
Expand All @@ -156,7 +157,8 @@ public void testPrefetchListUponBlockEviction() throws Exception {
@Test
public void testPrefetchBlockEvictionWhilePrefetchRunning() throws Exception {
Configuration conf = setupBucketCacheConfig(200);
BucketCache bucketCache = setupBucketCache(conf);
BucketCache bucketCache =
setupBucketCache(conf, "testPrefetchBlockEvictionWhilePrefetchRunning");
CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
FileSystem fs = HFileSystem.get(conf);
// Load Blocks in cache
Expand Down