Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;

Expand All @@ -29,7 +31,7 @@
* cache.
*/
@InterfaceAudience.Private
public interface BlockCache extends Iterable<CachedBlock> {
public interface BlockCache extends Iterable<CachedBlock>, ConfigurationObserver {
/**
* Add block to cache.
* @param cacheKey The block's cache key.
Expand Down Expand Up @@ -264,4 +266,15 @@ default boolean isCacheEnabled() {
default boolean waitForCacheInitialization(long timeout) {
return true;
}

/**
* Allows for BlockCache implementations to provide a mean to refresh their configurations. Since
* HBASE-29249, CacheConfig implements PropagatingConfigurationObserver and registers itself
* together with the used BlockCache implementation for notifications of dynamic configuration
* changes. The default is a noop.
* @param config the new configuration to be updated.
*/
default void onConfigurationChange(Configuration config) {
// noop
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -31,7 +32,7 @@
* Stores all of the cache objects and configuration for a single HFile.
*/
@InterfaceAudience.Private
public class CacheConfig implements ConfigurationObserver {
public class CacheConfig implements PropagatingConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(CacheConfig.class.getName());

/**
Expand Down Expand Up @@ -480,4 +481,14 @@ public void onConfigurationChange(Configuration conf) {
+ "hbase.rs.evictblocksonclose is changed to {}",
cacheDataOnRead, cacheDataOnWrite, evictOnClose);
}

@Override
public void registerChildren(ConfigurationManager manager) {
manager.registerObserver(blockCache);
}

@Override
public void deregisterChildren(ConfigurationManager manager) {
manager.deregisterObserver(blockCache);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Optional;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
Expand Down Expand Up @@ -460,6 +461,12 @@ public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int d

}

@Override
public void onConfigurationChange(Configuration config) {
l1Cache.onConfigurationChange(config);
l2Cache.onConfigurationChange(config);
}

@Override
public Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) {
if (isMetaBlock(block.getBlockType())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,11 @@ public class BucketCache implements BlockCache, HeapSize {
static final float DEFAULT_MEMORY_FACTOR = 0.25f;
static final float DEFAULT_MIN_FACTOR = 0.85f;

private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f;
private static final float DEFAULT_ACCEPT_FACTOR = 0.95f;
static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f;
static final float DEFAULT_ACCEPT_FACTOR = 0.95f;

// Number of blocks to clear for each of the bucket size that is full
private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2;
static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2;

/** Statistics thread */
private static final int statThreadPeriod = 5 * 60;
Expand Down Expand Up @@ -284,7 +284,7 @@ protected enum CacheState {
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";

public static final String QUEUE_ADDITION_WAIT_TIME = "hbase.bucketcache.queue.addition.waittime";
private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0;
static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0;
private long queueAdditionWaitTime;
/**
* Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file
Expand Down Expand Up @@ -332,22 +332,8 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now");
}

this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR);
this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR);
this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR);
this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR);
this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR);
this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR);
this.queueAdditionWaitTime =
conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000);
this.persistenceChunkSize =
conf.getLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE);
if (this.persistenceChunkSize <= 0) {
persistenceChunkSize = DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE;
}

sanityCheckConfigs();
// these sets the dynamic configs
this.onConfigurationChange(conf);

LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor
+ ", minFactor: " + minFactor + ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: "
Expand Down Expand Up @@ -437,6 +423,9 @@ private void sanityCheckConfigs() {
Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1,
SINGLE_FACTOR_CONFIG_NAME + ", " + MULTI_FACTOR_CONFIG_NAME + ", and "
+ MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0");
if (this.persistenceChunkSize <= 0) {
persistenceChunkSize = DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE;
}
}

/**
Expand Down Expand Up @@ -898,6 +887,38 @@ boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEnt
return false;
}

/**
* Since HBASE-29249, the following properties governin freeSpace behaviour and block priorities
* were made dynamically configurable: - hbase.bucketcache.acceptfactor -
* hbase.bucketcache.minfactor - hbase.bucketcache.extrafreefactor -
* hbase.bucketcache.single.factor - hbase.bucketcache.multi.factor -
* hbase.bucketcache.multi.factor - hbase.bucketcache.memory.factor The
* hbase.bucketcache.queue.addition.waittime property allows for introducing a delay in the
* publishing of blocks for the cache writer threads during prefetch reads only (client reads
* wouldn't get delayed). It has also been made dynamic configurable since HBASE-29249. The
* hbase.bucketcache.persist.intervalinmillis propperty determines the frequency for saving the
* persistent cache, and it has also been made dynamically configurable since HBASE-29249. The
* hbase.bucketcache.persistence.chunksize property determines the size of the persistent file
* splits (due to the limitation of maximum allowed protobuff size), and it has also been made
* dynamically configurable since HBASE-29249.
* @param config the new configuration to be updated.
*/
@Override
public void onConfigurationChange(Configuration conf) {
this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR);
this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR);
this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR);
this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR);
this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR);
this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR);
this.queueAdditionWaitTime =
conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000);
this.persistenceChunkSize =
conf.getLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE);
sanityCheckConfigs();
}

protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
return ramCache.remove(cacheKey, re -> {
if (re != null) {
Expand Down Expand Up @@ -2145,6 +2166,18 @@ float getMemoryFactor() {
return memoryFactor;
}

long getQueueAdditionWaitTime() {
return queueAdditionWaitTime;
}

long getPersistenceChunkSize() {
return persistenceChunkSize;
}

long getBucketcachePersistInterval() {
return bucketcachePersistInterval;
}

public String getPersistencePath() {
return persistencePath;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
Expand Down Expand Up @@ -98,6 +99,10 @@ public void testBlockEvictionOnRegionMove() throws Exception {
? cluster.getRegionServer(1)
: cluster.getRegionServer(0);
assertTrue(regionServingRS.getBlockCache().isPresent());

// wait for running prefetch threads to be completed.
Waiter.waitFor(this.conf, 200, () -> PrefetchExecutor.getPrefetchFutures().isEmpty());

long oldUsedCacheSize =
regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize();
assertNotEquals(0, oldUsedCacheSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,19 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;

import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BLOCK_ORPHAN_GRACE_PERIOD;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_MIN_FACTOR;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_SINGLE_FACTOR;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MEMORY_FACTOR_CONFIG_NAME;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MULTI_FACTOR_CONFIG_NAME;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.SINGLE_FACTOR_CONFIG_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
Expand Down Expand Up @@ -472,14 +479,13 @@ public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException {
@Test
public void testGetPartitionSize() throws IOException {
// Test default values
validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR,
BucketCache.DEFAULT_MIN_FACTOR);
validateGetPartitionSize(cache, DEFAULT_SINGLE_FACTOR, DEFAULT_MIN_FACTOR);

Configuration conf = HBaseConfiguration.create();
conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f);
conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f);
conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f);
conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f);

BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
Expand All @@ -493,13 +499,12 @@ public void testGetPartitionSize() throws IOException {
@Test
public void testCacheSizeCapacity() throws IOException {
// Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE
validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR,
BucketCache.DEFAULT_MIN_FACTOR);
validateGetPartitionSize(cache, DEFAULT_SINGLE_FACTOR, DEFAULT_MIN_FACTOR);
Configuration conf = HBaseConfiguration.create();
conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f);
conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f);
conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f);
try {
new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads,
writerQLen, null, 100, conf);
Expand All @@ -515,9 +520,9 @@ public void testValidBucketCacheConfigs() throws IOException {
conf.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f);
conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f);
conf.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f);
conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f);
conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f);
conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f);

BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
Expand All @@ -528,12 +533,12 @@ public void testValidBucketCacheConfigs() throws IOException {
assertEquals(MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, cache.getMinFactor(), 0);
assertEquals(EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f,
cache.getExtraFreeFactor(), 0);
assertEquals(BucketCache.SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f,
cache.getSingleFactor(), 0);
assertEquals(BucketCache.MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f,
cache.getMultiFactor(), 0);
assertEquals(BucketCache.MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f,
cache.getMemoryFactor(), 0);
assertEquals(SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f, cache.getSingleFactor(),
0);
assertEquals(MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f, cache.getMultiFactor(),
0);
assertEquals(MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f, cache.getMemoryFactor(),
0);
}

@Test
Expand Down Expand Up @@ -575,9 +580,9 @@ public void testInvalidCacheSplitFactorConfig() throws IOException {
// be negative, configs don't add to 1.0
boolean[] expectedOutcomes = { true, false, false, false };
Map<String,
float[]> configMappings = ImmutableMap.of(BucketCache.SINGLE_FACTOR_CONFIG_NAME,
singleFactorConfigValues, BucketCache.MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues,
BucketCache.MEMORY_FACTOR_CONFIG_NAME, memoryFactorConfigValues);
float[]> configMappings = ImmutableMap.of(SINGLE_FACTOR_CONFIG_NAME, singleFactorConfigValues,
MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues, MEMORY_FACTOR_CONFIG_NAME,
memoryFactorConfigValues);
Configuration conf = HBaseConfiguration.create();
checkConfigValues(conf, configMappings, expectedOutcomes);
}
Expand Down Expand Up @@ -921,6 +926,51 @@ public void testBlockAdditionWaitWhenCache() throws Exception {
}
}

@Test
public void testOnConfigurationChange() throws Exception {
BucketCache bucketCache = null;
try {
final Path dataTestDir = createAndGetTestDir();

String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";

Configuration config = HBASE_TESTING_UTILITY.getConfiguration();

bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, 1, 1, null, DEFAULT_ERROR_TOLERATION_DURATION, config);

assertTrue(bucketCache.waitForCacheInitialization(10000));

config.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f);
config.setFloat(MIN_FACTOR_CONFIG_NAME, 0.8f);
config.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.15f);
config.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.2f);
config.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.6f);
config.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f);
config.setLong(QUEUE_ADDITION_WAIT_TIME, 100);
config.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 500);
config.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, 1000);

bucketCache.onConfigurationChange(config);

assertEquals(0.9f, bucketCache.getAcceptableFactor(), 0.01);
assertEquals(0.8f, bucketCache.getMinFactor(), 0.01);
assertEquals(0.15f, bucketCache.getExtraFreeFactor(), 0.01);
assertEquals(0.2f, bucketCache.getSingleFactor(), 0.01);
assertEquals(0.6f, bucketCache.getMultiFactor(), 0.01);
assertEquals(0.2f, bucketCache.getMemoryFactor(), 0.01);
assertEquals(100L, bucketCache.getQueueAdditionWaitTime());
assertEquals(500L, bucketCache.getBucketcachePersistInterval());
assertEquals(1000L, bucketCache.getPersistenceChunkSize());

} finally {
if (bucketCache != null) {
bucketCache.shutdown();
}
HBASE_TESTING_UTILITY.cleanupTestDir();
}
}

@Test
public void testNotifyFileCachingCompletedSuccess() throws Exception {
BucketCache bucketCache = null;
Expand Down