Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import static org.apache.hadoop.hbase.regionserver.HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY;
import static org.apache.hadoop.hbase.regionserver.HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;

import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
Expand All @@ -45,42 +49,75 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner {

public static final String STEP_KEY = "hbase.regionserver.heapmemory.autotuner.step";
public static final float DEFAULT_STEP_VALUE = 0.02f; // 2%
public static final int maxNumLookupPeriods = 20;

private static final TunerResult TUNER_RESULT = new TunerResult(true);
private static final TunerResult NO_OP_TUNER_RESULT = new TunerResult(false);

private Configuration conf;
private float step = DEFAULT_STEP_VALUE;
private Queue<Long> prevWriteCounts = new LinkedList<Long>();
private Queue<Long> prevReadCounts = new LinkedList<Long>();
private int lookupCounts = 0;

private float globalMemStorePercentMinRange;
private float globalMemStorePercentMaxRange;
private float blockCachePercentMinRange;
private float blockCachePercentMaxRange;

private boolean stepDirection; // true if last time tuner increased block cache size
private boolean isFirstTuning = true;
private long prevFlushCount;
private long prevEvictCount;


@Override
public TunerResult tune(TunerContext context) {
long blockedFlushCount = context.getBlockedFlushCount();
long unblockedFlushCount = context.getUnblockedFlushCount();
long evictCount = context.getEvictCount();
long writeRequestCount = context.getWriteRequestCount();
long readRequestCount = context.getReadRequestCount();
boolean memstoreSufficient = blockedFlushCount == 0 && unblockedFlushCount == 0;
boolean blockCacheSufficient = evictCount == 0;
boolean loadSenario = checkLoadSenario(writeRequestCount,readRequestCount);
if (memstoreSufficient && blockCacheSufficient) {
prevFlushCount = blockedFlushCount + unblockedFlushCount;
prevEvictCount = evictCount;
return NO_OP_TUNER_RESULT;
}
float newMemstoreSize;
float newBlockCacheSize;
if (memstoreSufficient) {
// Increase the block cache size and corresponding decrease in memstore size
newBlockCacheSize = context.getCurBlockCacheSize() + step;
newMemstoreSize = context.getCurMemStoreSize() - step;
stepDirection = true;
} else if (blockCacheSufficient) {
// Increase the memstore size and corresponding decrease in block cache size
newBlockCacheSize = context.getCurBlockCacheSize() - step;
newMemstoreSize = context.getCurMemStoreSize() + step;
stepDirection = false;
} else if(!isFirstTuning) {
float percentChangeInEvictCount = (float)(evictCount-prevEvictCount)/(float)(prevEvictCount);
float percentChangeInFlushes =
(float)(blockedFlushCount + unblockedFlushCount-prevFlushCount)/(float)(prevFlushCount);
//Negative is desirable , should repeat previous step
//if it is positive , we should move in opposite direction
if (percentChangeInEvictCount + percentChangeInFlushes > 0.0) {
//revert last step if it went wrong
stepDirection = !stepDirection;
} else {
//last step was useful, taking step based on current stats
stepDirection = loadSenario;
}
} else {
return NO_OP_TUNER_RESULT;
// As of now not making any tuning in write/read heavy scenario.
stepDirection = loadSenario;
}

if (stepDirection){
newBlockCacheSize = context.getCurBlockCacheSize() + step;
newMemstoreSize = context.getCurMemStoreSize() - step;
} else {
newBlockCacheSize = context.getCurBlockCacheSize() - step;
newMemstoreSize = context.getCurMemStoreSize() + step;
}
if (newMemstoreSize > globalMemStorePercentMaxRange) {
newMemstoreSize = globalMemStorePercentMaxRange;
} else if (newMemstoreSize < globalMemStorePercentMinRange) {
Expand All @@ -93,6 +130,9 @@ public TunerResult tune(TunerContext context) {
}
TUNER_RESULT.setBlockCacheSize(newBlockCacheSize);
TUNER_RESULT.setMemstoreSize(newMemstoreSize);
prevFlushCount = blockedFlushCount + unblockedFlushCount;
prevEvictCount = evictCount;
isFirstTuning = false;
return TUNER_RESULT;
}

Expand All @@ -114,4 +154,28 @@ public void setConf(Configuration conf) {
this.globalMemStorePercentMaxRange = conf.getFloat(MEMSTORE_SIZE_MAX_RANGE_KEY,
HeapMemorySizeUtil.getGlobalMemStorePercent(conf, false));
}
/*
* @Returns true if read it seems its getting read heavy
* and need to increase block cache size
*/
private boolean checkLoadSenario(long writeRequestCount , long readRequestCount) {
lookupCounts++;
prevWriteCounts.offer(writeRequestCount);
prevReadCounts.offer(readRequestCount);
Iterator<Long> readCountIterator = prevReadCounts.iterator();
Iterator<Long> writeCountIterator = prevWriteCounts.iterator();
int loadCount = 0;
while(readCountIterator.hasNext() && writeCountIterator.hasNext()){
if (readCountIterator.next() > writeCountIterator.next()) {
loadCount++;
} else {
loadCount--;
}
}
if (lookupCounts > maxNumLookupPeriods){
prevWriteCounts.poll();
prevReadCounts.poll();
}
return (loadCount>=0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1113,13 +1113,23 @@ private boolean areAllUserRegionsOffline() {
/**
* @return Current write count for all online regions.
*/
private long getWriteRequestCount() {
int writeCount = 0;
public long getWriteRequestCount() {
long writeCount = 0;
for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
writeCount += e.getValue().getWriteRequestsCount();
}
return writeCount;
}
/**
* @return Current read count for all online regions.
*/
public long getReadRequestCount() {
long readCount = 0;
for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
readCount += e.getValue().getReadRequestsCount();
}
return readCount;
}

@VisibleForTesting
protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class HeapMemoryManager {
"hbase.regionserver.global.memstore.size.min.range";
public static final String HBASE_RS_HEAP_MEMORY_TUNER_PERIOD =
"hbase.regionserver.heapmemory.tuner.period";
public static final int HBASE_RS_HEAP_MEMORY_TUNER_DEFAULT_PERIOD = 60 * 1000;
public static final int HBASE_RS_HEAP_MEMORY_TUNER_DEFAULT_PERIOD = 10 * 1000;
public static final String HBASE_RS_HEAP_MEMORY_TUNER_CLASS =
"hbase.regionserver.heapmemory.tuner.class";

Expand All @@ -75,7 +75,7 @@ public class HeapMemoryManager {

private final ResizableBlockCache blockCache;
private final FlushRequester memStoreFlusher;
private final Server server;
private final HRegionServer server;

private HeapMemoryTunerChore heapMemTunerChore = null;
private final boolean tunerOn;
Expand All @@ -85,7 +85,7 @@ public class HeapMemoryManager {
private long maxHeapSize = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();

public static HeapMemoryManager create(Configuration conf, FlushRequester memStoreFlusher,
Server server) {
HRegionServer server) {
BlockCache blockCache = CacheConfig.instantiateBlockCache(conf);
if (blockCache instanceof ResizableBlockCache) {
return new HeapMemoryManager((ResizableBlockCache) blockCache, memStoreFlusher, server);
Expand All @@ -95,7 +95,7 @@ public static HeapMemoryManager create(Configuration conf, FlushRequester memSto

@VisibleForTesting
HeapMemoryManager(ResizableBlockCache blockCache, FlushRequester memStoreFlusher,
Server server) {
HRegionServer server) {
Configuration conf = server.getConfiguration();
this.blockCache = blockCache;
this.memStoreFlusher = memStoreFlusher;
Expand Down Expand Up @@ -181,6 +181,7 @@ private boolean doInit(Configuration conf) {
+ blockCachePercentMaxRange);
}
return true;

}

public void start(ChoreService service) {
Expand Down Expand Up @@ -216,7 +217,9 @@ private class HeapMemoryTunerChore extends ScheduledChore implements FlushReques
private HeapMemoryTuner heapMemTuner;
private AtomicLong blockedFlushCount = new AtomicLong();
private AtomicLong unblockedFlushCount = new AtomicLong();
private long evictCount = 0L;
private long evictCount = 0;
private long writeRequestCount = 0;
private long readRequestCount =0;
private TunerContext tunerContext = new TunerContext();
private boolean alarming = false;

Expand Down Expand Up @@ -264,12 +267,26 @@ protected void chore() {
}

private void tune() {
evictCount = blockCache.getStats().getEvictedCount() - evictCount;
//TODO check if we can increase the memory boundaries
//while remaining in the limits
long curEvictCount;
long curWriteRequestCount;
long curReadRequestCount;
curEvictCount = blockCache.getStats().getEvictedCount();
tunerContext.setEvictCount(curEvictCount - evictCount);
evictCount = curEvictCount;
curWriteRequestCount = server.getWriteRequestCount();
tunerContext.setWriteRequestCount(curWriteRequestCount - writeRequestCount);
writeRequestCount = curWriteRequestCount;
curReadRequestCount = server.getReadRequestCount();
tunerContext.setReadRequestCount(curReadRequestCount - readRequestCount);
readRequestCount = curReadRequestCount;
tunerContext.setBlockedFlushCount(blockedFlushCount.getAndSet(0));
tunerContext.setUnblockedFlushCount(unblockedFlushCount.getAndSet(0));
tunerContext.setEvictCount(evictCount);
tunerContext.setCurBlockCacheSize(blockCachePercent);
tunerContext.setCurMemStoreSize(globalMemStorePercent);
LOG.info("Data passed to HeapMemoryTuner : " + evictCount + " "
+ readRequestCount + " " + writeRequestCount);
TunerResult result = null;
try {
result = this.heapMemTuner.tune(tunerContext);
Expand Down Expand Up @@ -320,6 +337,8 @@ private void tune() {
globalMemStorePercent = memstoreSize;
memStoreFlusher.setGlobalMemstoreLimit(newMemstoreSize);
}
} else {
LOG.info("No changes made by HeapMemoryTuner.");
}
}

Expand Down Expand Up @@ -348,6 +367,8 @@ public static final class TunerContext {
private long blockedFlushCount;
private long unblockedFlushCount;
private long evictCount;
private long readRequestCount;
private long writeRequestCount;
private float curMemStoreSize;
private float curBlockCacheSize;

Expand Down Expand Up @@ -390,6 +411,22 @@ public float getCurBlockCacheSize() {
public void setCurBlockCacheSize(float curBlockCacheSize) {
this.curBlockCacheSize = curBlockCacheSize;
}

public long getReadRequestCount() {
return readRequestCount;
}

public void setReadRequestCount(long readRequestCount) {
this.readRequestCount = readRequestCount;
}

public long getWriteRequestCount() {
return writeRequestCount;
}

public void setWriteRequestCount(long writeRequestCount) {
this.writeRequestCount = writeRequestCount;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testAutoTunerShouldBeOffWhenMaxMinRangesForMemstoreIsNotGiven() thro
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.75f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
HeapMemoryManager manager = new HeapMemoryManager(new BlockCacheStub(0),
new MemstoreFlusherStub(0), new RegionServerStub(conf));
new MemstoreFlusherStub(0), new HRegionServer(conf));
assertFalse(manager.isTunerOn());
}

Expand All @@ -71,7 +71,7 @@ public void testAutoTunerShouldBeOffWhenMaxMinRangesForBlockCacheIsNotGiven() th
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.05f);
HeapMemoryManager manager = new HeapMemoryManager(new BlockCacheStub(0),
new MemstoreFlusherStub(0), new RegionServerStub(conf));
new MemstoreFlusherStub(0), new HRegionServer(conf));
assertFalse(manager.isTunerOn());
}

Expand All @@ -83,15 +83,15 @@ public void testWhenMemstoreAndBlockCacheMaxMinChecksFails() throws Exception {
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.06f);
try {
new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(conf));
new HeapMemoryManager(blockCache, memStoreFlusher, new HRegionServer(conf));
fail();
} catch (RuntimeException e) {
}
conf = HBaseConfiguration.create();
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.2f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
try {
new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(conf));
new HeapMemoryManager(blockCache, memStoreFlusher, new HRegionServer(conf));
fail();
} catch (RuntimeException e) {
}
Expand All @@ -109,7 +109,7 @@ public void testWhenClusterIsWriteHeavy() throws Exception {
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
// Let the system start with default values for memstore heap and block cache size.
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
new RegionServerStub(conf));
new HRegionServer(conf));
long oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
long oldBlockCacheSize = blockCache.maxSize;
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
Expand Down Expand Up @@ -150,7 +150,7 @@ public void testWhenClusterIsReadHeavy() throws Exception {
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
// Let the system start with default values for memstore heap and block cache size.
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
new RegionServerStub(conf));
new HRegionServer(conf));
long oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
long oldBlockCacheSize = blockCache.maxSize;
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
Expand Down Expand Up @@ -188,7 +188,7 @@ public void testPluggingInHeapMemoryTuner() throws Exception {
HeapMemoryTuner.class);
// Let the system start with default values for memstore heap and block cache size.
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
new RegionServerStub(conf));
new HRegionServer(conf));
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService);
// Now we wants to be in write mode. Set bigger memstore size from CustomHeapMemoryTuner
Expand Down Expand Up @@ -218,7 +218,7 @@ public void testWhenSizeGivenByHeapTunerGoesOutsideRange() throws Exception {
conf.setClass(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_CLASS, CustomHeapMemoryTuner.class,
HeapMemoryTuner.class);
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
new RegionServerStub(conf));
new HRegionServer(conf));
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService);
CustomHeapMemoryTuner.memstoreSize = 0.78f;
Expand All @@ -243,7 +243,7 @@ public void testWhenCombinedHeapSizesFromTunerGoesOutSideMaxLimit() throws Excep
conf.setClass(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_CLASS, CustomHeapMemoryTuner.class,
HeapMemoryTuner.class);
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
new RegionServerStub(conf));
new HRegionServer(conf));
long oldMemstoreSize = memStoreFlusher.memstoreSize;
long oldBlockCacheSize = blockCache.maxSize;
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
Expand Down Expand Up @@ -276,7 +276,7 @@ public void testWhenL2BlockCacheIsOnHeap() throws Exception {
HeapMemoryTuner.class);

try {
heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(
heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, new HRegionServer(
conf));
fail("Should have failed as the collective heap memory need is above 80%");
} catch (Exception e) {
Expand All @@ -285,7 +285,7 @@ public void testWhenL2BlockCacheIsOnHeap() throws Exception {
// Change the max/min ranges for memstore and bock cache so as to pass the criteria check
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.6f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.6f);
heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(
heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, new HRegionServer(
conf));
long oldMemstoreSize = memStoreFlusher.memstoreSize;
long oldBlockCacheSize = blockCache.maxSize;
Expand Down Expand Up @@ -440,6 +440,8 @@ public void setGlobalMemstoreLimit(long globalMemStoreSize) {
}
}


//Probably we dont need this class now
private static class RegionServerStub implements Server {
private Configuration conf;
private boolean stopped = false;
Expand Down