Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -566,13 +566,16 @@ public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
/**
* This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap}
*/
void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) {
void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber,
boolean evictedByEvictionProcess) {
bucketEntry.markAsEvicted();
blocksByHFile.remove(cacheKey);
if (decrementBlockNumber) {
this.blockNumber.decrement();
}
cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
if (evictedByEvictionProcess) {
cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
}
}

/**
Expand Down Expand Up @@ -602,7 +605,7 @@ void freeBucketEntry(BucketEntry bucketEntry) {
*/
@Override
public boolean evictBlock(BlockCacheKey cacheKey) {
return doEvictBlock(cacheKey, null);
return doEvictBlock(cacheKey, null, false);
}

/**
Expand All @@ -614,7 +617,8 @@ public boolean evictBlock(BlockCacheKey cacheKey) {
* @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict.
* @return true to indicate whether we've evicted successfully or not.
*/
private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry) {
private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry,
boolean evictedByEvictionProcess) {
if (!cacheEnabled) {
return false;
}
Expand All @@ -625,14 +629,14 @@ private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry) {
final BucketEntry bucketEntryToUse = bucketEntry;

if (bucketEntryToUse == null) {
if (existedInRamCache) {
if (existedInRamCache && evictedByEvictionProcess) {
cacheStats.evicted(0, cacheKey.isPrimary());
}
return existedInRamCache;
} else {
return bucketEntryToUse.withWriteLock(offsetLock, () -> {
if (backingMap.remove(cacheKey, bucketEntryToUse)) {
blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache);
blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache, evictedByEvictionProcess);
return true;
}
return false;
Expand Down Expand Up @@ -682,7 +686,7 @@ public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) {
*/
boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) {
if (!bucketEntry.isRpcRef()) {
return doEvictBlock(blockCacheKey, bucketEntry);
return doEvictBlock(blockCacheKey, bucketEntry, true);
}
return false;
}
Expand Down Expand Up @@ -801,7 +805,7 @@ private void freeEntireBuckets(int completelyFreeBucketsNeeded) {
* blocks evicted
* @param why Why we are being called
*/
private void freeSpace(final String why) {
void freeSpace(final String why) {
// Ensure only one freeSpace progress at a time
if (!freeSpaceLock.tryLock()) {
return;
Expand Down Expand Up @@ -995,7 +999,7 @@ protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
BucketEntry previousEntry = backingMap.put(key, bucketEntry);
if (previousEntry != null && previousEntry != bucketEntry) {
previousEntry.withWriteLock(offsetLock, () -> {
blockEvicted(key, previousEntry, false);
blockEvicted(key, previousEntry, false, false);
return null;
});
}
Expand Down Expand Up @@ -1145,7 +1149,7 @@ void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws Inte
final BucketEntry bucketEntry = bucketEntries[i];
bucketEntry.withWriteLock(offsetLock, () -> {
if (backingMap.remove(key, bucketEntry)) {
blockEvicted(key, bucketEntry, false);
blockEvicted(key, bucketEntry, false, false);
}
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public void run() {
};
evictThread.start();
cache.offsetLock.waitForWaiters(lockId, 1);
cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true);
cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true);
assertEquals(0, cache.getBlockCount());
cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
new CacheTestUtils.ByteArrayCacheable(new byte[10]));
Expand Down Expand Up @@ -565,6 +565,56 @@ public void testOffsetProducesPositiveOutput() {
assertEquals(testValue, bucketEntry.offset());
}

@Test
public void testEvictionCount() throws InterruptedException {
int size = 100;
int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
HFileContext meta = new HFileContextBuilder().build();
ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);

BlockCacheKey key = new BlockCacheKey("testEvictionCount", 0);
ByteBuffer actualBuffer = ByteBuffer.allocate(length);
ByteBuffer block1Buffer = ByteBuffer.allocate(length);
ByteBuffer block2Buffer = ByteBuffer.allocate(length);
blockWithNextBlockMetadata.serialize(block1Buffer, true);
blockWithoutNextBlockMetadata.serialize(block2Buffer, true);

// Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
block1Buffer);

waitUntilFlushedToBucket(cache, key);

assertEquals(0, cache.getStats().getEvictionCount());

// evict call should return 1, but then eviction count be 0
assertEquals(1, cache.evictBlocksByHfileName("testEvictionCount"));
assertEquals(0, cache.getStats().getEvictionCount());

// add back
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
block1Buffer);
waitUntilFlushedToBucket(cache, key);

// should not increment
assertTrue(cache.evictBlock(key));
assertEquals(0, cache.getStats().getEvictionCount());

// add back
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
block1Buffer);
waitUntilFlushedToBucket(cache, key);

// should finally increment eviction count
cache.freeSpace("testing");
assertEquals(1, cache.getStats().getEvictionCount());
}

@Test
public void testCacheBlockNextBlockMetadataMissing() throws Exception {
int size = 100;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,10 +557,10 @@ protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cach
}

@Override
void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry,
boolean decrementBlockNumber) {
void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber,
boolean evictedByEvictionProcess) {
blockEvictCounter.incrementAndGet();
super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber);
super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber, evictedByEvictionProcess);
}

/**
Expand Down Expand Up @@ -709,16 +709,16 @@ protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
}

@Override
void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry,
boolean decrementBlockNumber) {
void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber,
boolean evictedByEvictionProcess) {
/**
* This is only invoked by {@link BucketCache.WriterThread}. {@link MyMyBucketCache2} create
* only one {@link BucketCache.WriterThread}.
*/
assertTrue(Thread.currentThread() == this.writerThreads[0]);

blockEvictCounter.incrementAndGet();
super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber);
super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber, evictedByEvictionProcess);
}

/**
Expand Down