Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
Expand Down Expand Up @@ -77,6 +78,7 @@
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.IdReadWriteLock;
Expand Down Expand Up @@ -122,6 +124,7 @@ public class BucketCache implements BlockCache, HeapSize {
static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor";
static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor";
static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor";
static final String BACKING_MAP_PERSISTENCE_CHUNK_SIZE = "hbase.bucketcache.persistence.chunksize";

/** Use strong reference for offsetLock or not */
private static final String STRONG_REF_KEY = "hbase.bucketcache.offsetlock.usestrongref";
Expand All @@ -145,6 +148,10 @@ public class BucketCache implements BlockCache, HeapSize {
final static int DEFAULT_WRITER_THREADS = 3;
final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;

final static long DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE = 10000000;

final static byte[] PB_MAGIC_V2 = new byte[] { 'V', '2', 'U', 'F' };

// Store/read block data
transient final IOEngine ioEngine;

Expand Down Expand Up @@ -273,6 +280,8 @@ public class BucketCache implements BlockCache, HeapSize {
*/
private String algorithm;

private long persistenceChunkSize;

/* Tracing failed Bucket Cache allocations. */
private long allocFailLogPrevTs; // time of previous log event for allocation failure.
private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute.
Expand Down Expand Up @@ -313,6 +322,11 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
this.queueAdditionWaitTime =
conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000);
this.persistenceChunkSize = conf.getLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE,
DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE);
if (this.persistenceChunkSize <= 0) {
persistenceChunkSize = DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE;
}

sanityCheckConfigs();

Expand Down Expand Up @@ -1358,8 +1372,8 @@ void persistToFile() throws IOException {
}
File tempPersistencePath = new File(persistencePath + EnvironmentEdgeManager.currentTime());
try (FileOutputStream fos = new FileOutputStream(tempPersistencePath, false)) {
fos.write(ProtobufMagic.PB_MAGIC);
BucketProtoUtils.toPB(this).writeDelimitedTo(fos);
LOG.debug("Persist in new chunked persistence format.");
persistChunkedBackingMap(fos);
} catch (IOException e) {
LOG.error("Failed to persist bucket cache to file", e);
throw e;
Expand Down Expand Up @@ -1405,16 +1419,23 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException {
throw new IOException("Incorrect number of bytes read while checking for protobuf magic "
+ "number. Requested=" + pblen + ", Received= " + read + ", File=" + persistencePath);
}
if (!ProtobufMagic.isPBMagicPrefix(pbuf)) {
if (ProtobufMagic.isPBMagicPrefix(pbuf)) {
LOG.info("Reading old format of persistence.");
// The old non-chunked version of backing map persistence.
parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in));
} else if (Arrays.equals(pbuf, PB_MAGIC_V2)) {
// The new persistence format of chunked persistence.
LOG.info("Reading new chunked format of persistence.");
retrieveChunkedBackingMap(in, bucketSizes);
} else {
// In 3.0 we have enough flexibility to dump the old cache data.
// TODO: In 2.x line, this might need to be filled in to support reading the old format
throw new IOException(
"Persistence file does not start with protobuf magic number. " + persistencePath);
}
parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in));
bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize);
blockNumber.add(backingMap.size());
LOG.info("Bucket cache retrieved from file successfully");
LOG.info("Bucket cache retrieved from file successfully with size: {}", backingMap.size());
}
}

Expand Down Expand Up @@ -1457,6 +1478,75 @@ private void verifyCapacityAndClasses(long capacitySize, String ioclass, String
}
}

private void verifyFileIntegrity(BucketCacheProtos.BucketCacheEntry proto) {
try {
if (proto.hasChecksum()) {
((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(),
algorithm);
}
backingMapValidated.set(true);
} catch (IOException e) {
LOG.warn("Checksum for cache file failed. "
+ "We need to validate each cache key in the backing map. "
+ "This may take some time, so we'll do it in a background thread,");

Runnable cacheValidator = () -> {
while (bucketAllocator == null) {
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
long startTime = EnvironmentEdgeManager.currentTime();
int totalKeysOriginally = backingMap.size();
for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry : backingMap.entrySet()) {
try {
((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue());
} catch (IOException e1) {
LOG.debug("Check for key {} failed. Evicting.", keyEntry.getKey());
evictBlock(keyEntry.getKey());
fileNotFullyCached(keyEntry.getKey().getHfileName());
}
}
backingMapValidated.set(true);
LOG.info("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms.",
totalKeysOriginally, backingMap.size(),
(EnvironmentEdgeManager.currentTime() - startTime));
};
Thread t = new Thread(cacheValidator);
t.setDaemon(true);
t.start();
}
}

private void parsePB(BucketCacheProtos.BucketCacheEntry firstChunk,
List<BucketCacheProtos.BackingMap> chunks) throws IOException {
fullyCachedFiles.clear();
Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair =
BucketProtoUtils.fromPB(firstChunk.getDeserializersMap(), firstChunk.getBackingMap(),
this::createRecycler);
backingMap.putAll(pair.getFirst());
blocksByHFile.addAll(pair.getSecond());
fullyCachedFiles.putAll(BucketProtoUtils.fromPB(firstChunk.getCachedFilesMap()));

LOG.debug("Number of blocks after first chunk: {}, blocksByHFile: {}",
backingMap.size(), fullyCachedFiles.size());
int i = 1;
for (BucketCacheProtos.BackingMap chunk : chunks) {
Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair2 =
BucketProtoUtils.fromPB(firstChunk.getDeserializersMap(), chunk,
this::createRecycler);
backingMap.putAll(pair2.getFirst());
blocksByHFile.addAll(pair2.getSecond());
LOG.debug("Number of blocks after {} reading chunk: {}, blocksByHFile: {}",
++i, backingMap.size(), fullyCachedFiles.size());
}
verifyFileIntegrity(firstChunk);
verifyCapacityAndClasses(firstChunk.getCacheCapacity(), firstChunk.getIoClass(), firstChunk.getMapClass());
updateRegionSizeMapWhileRetrievingFromFile();
}

private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException {
Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair =
BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(),
Expand All @@ -1465,52 +1555,54 @@ private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOExceptio
blocksByHFile = pair.getSecond();
fullyCachedFiles.clear();
fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap()));
if (proto.hasChecksum()) {
try {
((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(),
algorithm);
backingMapValidated.set(true);
} catch (IOException e) {
LOG.warn("Checksum for cache file failed. "
+ "We need to validate each cache key in the backing map. "
+ "This may take some time, so we'll do it in a background thread,");
Runnable cacheValidator = () -> {
while (bucketAllocator == null) {
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
long startTime = EnvironmentEdgeManager.currentTime();
int totalKeysOriginally = backingMap.size();
for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry : backingMap.entrySet()) {
try {
((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue());
} catch (IOException e1) {
LOG.debug("Check for key {} failed. Evicting.", keyEntry.getKey());
evictBlock(keyEntry.getKey());
fullyCachedFiles.remove(keyEntry.getKey().getHfileName());
}
}
backingMapValidated.set(true);
LOG.info("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms.",
totalKeysOriginally, backingMap.size(),
(EnvironmentEdgeManager.currentTime() - startTime));
};
Thread t = new Thread(cacheValidator);
t.setDaemon(true);
t.start();
}
} else {
// if has not checksum, it means the persistence file is old format
LOG.info("Persistent file is old format, it does not support verifying file integrity!");
backingMapValidated.set(true);
}
verifyFileIntegrity(proto);
updateRegionSizeMapWhileRetrievingFromFile();
verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass());
}

private void persistChunkedBackingMap(FileOutputStream fos) throws IOException {
fos.write(PB_MAGIC_V2);
long numChunks = backingMap.size() / persistenceChunkSize;
if (backingMap.size() % persistenceChunkSize != 0) {
numChunks += 1;
}

LOG.debug("persistToFile: before persisting backing map size: {}, "
+ "fullycachedFiles size: {}, chunkSize: {}, numberofChunks: {}",
backingMap.size(), fullyCachedFiles.size(), persistenceChunkSize, numChunks);

fos.write(Bytes.toBytes(persistenceChunkSize));
fos.write(Bytes.toBytes(numChunks));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are already passing the output stream to BucketProtoUtils, who's now also responsible for serialization, can we just move everything there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

BucketProtoUtils.toPB(this, fos, persistenceChunkSize);

LOG.debug("persistToFile: after persisting backing map size: {}, "
+ "fullycachedFiles size: {}, numChunksPersisteed: {}",
backingMap.size(), fullyCachedFiles.size(), numChunks);
}

private void retrieveChunkedBackingMap(FileInputStream in, int[] bucketSizes) throws IOException {
byte[] bytes = new byte[Long.BYTES];
in.read(bytes);
long batchSize = Bytes.toLong(bytes, 0);
in.read(bytes);
long numChunks = Bytes.toLong(bytes, 0);

LOG.info("Number of chunks: {}, chunk size: {}", numChunks, batchSize);

ArrayList<BucketCacheProtos.BackingMap> bucketCacheMaps = new ArrayList<>();
// Read the first chunk that has all the details.
BucketCacheProtos.BucketCacheEntry firstChunk =
BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in);

// Subsequent chunks have the backingMap entries.
for (int i = 1; i < numChunks; i++) {
LOG.info("Reading chunk no: {}", i+1);
bucketCacheMaps.add(BucketCacheProtos.BackingMap.parseDelimitedFrom(in));
LOG.info("Retrieved chunk: {}", i+1);
}
parsePB(firstChunk, bucketCacheMaps);
}

/**
* Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors
* exceeds ioErrorsDurationTimeTolerated, we will disable the cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;

import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -45,28 +46,45 @@ private BucketProtoUtils() {

}

static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache) {
static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache, BucketCacheProtos.BackingMap backingMap) {
return BucketCacheProtos.BucketCacheEntry.newBuilder().setCacheCapacity(cache.getMaxSize())
.setIoClass(cache.ioEngine.getClass().getName())
.setMapClass(cache.backingMap.getClass().getName())
.putAllDeserializers(CacheableDeserializerIdManager.save())
.putAllCachedFiles(toCachedPB(cache.fullyCachedFiles))
.setBackingMap(BucketProtoUtils.toPB(cache.backingMap))
.setBackingMap(backingMap)
.setChecksum(ByteString
.copyFrom(((PersistentIOEngine) cache.ioEngine).calculateChecksum(cache.getAlgorithm())))
.build();
}

private static BucketCacheProtos.BackingMap toPB(Map<BlockCacheKey, BucketEntry> backingMap) {
static void toPB(BucketCache cache, FileOutputStream fos, long chunkSize) throws IOException{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We should rename it to serializeAsPB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

int blockCount = 0;
int chunkCount = 0;
int backingMapSize = cache.backingMap.size();
BucketCacheProtos.BackingMap.Builder builder = BucketCacheProtos.BackingMap.newBuilder();
for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) {
builder.addEntry(BucketCacheProtos.BackingMapEntry.newBuilder().setKey(toPB(entry.getKey()))
.setValue(toPB(entry.getValue())).build());
for (Map.Entry<BlockCacheKey, BucketEntry> entry : cache.backingMap.entrySet()) {
blockCount++;
builder.addEntry(BucketCacheProtos.BackingMapEntry.newBuilder()
.setKey(BucketProtoUtils.toPB(entry.getKey()))
.setValue(BucketProtoUtils.toPB(entry.getValue())).build());
if (blockCount % chunkSize == 0 || (blockCount == backingMapSize)) {
chunkCount++;
if (chunkCount == 1) {
// Persist all details along with the first chunk into BucketCacheEntry
BucketProtoUtils.toPB(cache, builder.build()).writeDelimitedTo(fos);
} else {
// Directly persist subsequent backing-map chunks.
builder.build().writeDelimitedTo(fos);
}
if (blockCount < backingMapSize) {
builder = BucketCacheProtos.BackingMap.newBuilder();
}
}
}
return builder.build();
}

private static BucketCacheProtos.BlockCacheKey toPB(BlockCacheKey key) {
static BucketCacheProtos.BlockCacheKey toPB(BlockCacheKey key) {
return BucketCacheProtos.BlockCacheKey.newBuilder().setHfilename(key.getHfileName())
.setOffset(key.getOffset()).setPrimaryReplicaBlock(key.isPrimary())
.setBlockType(toPB(key.getBlockType())).build();
Expand Down Expand Up @@ -103,7 +121,7 @@ private static BucketCacheProtos.BlockType toPB(BlockType blockType) {
}
}

private static BucketCacheProtos.BucketEntry toPB(BucketEntry entry) {
static BucketCacheProtos.BucketEntry toPB(BucketEntry entry) {
return BucketCacheProtos.BucketEntry.newBuilder().setOffset(entry.offset())
.setCachedTime(entry.getCachedTime()).setLength(entry.getLength())
.setDiskSizeWithHeader(entry.getOnDiskSizeWithHeader())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
Expand Down Expand Up @@ -350,6 +351,52 @@ public void testBucketCacheRecovery() throws Exception {
TEST_UTIL.cleanupTestDir();
}

@Test
public void testSingleChunk() throws Exception {
testChunkedBackingMapRecovery(5, 5);
}

@Test
public void testMultipleChunks() throws Exception {
testChunkedBackingMapRecovery(5, 10);
}

private void testChunkedBackingMapRecovery(int chunkSize, int numBlocks) throws Exception {
HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
Path testDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
Configuration conf = HBaseConfiguration.create();
conf.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, chunkSize);

String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
DEFAULT_ERROR_TOLERATION_DURATION, conf);

CacheTestUtils.HFileBlockPair[] blocks =
CacheTestUtils.generateHFileBlocks(constructedBlockSize, numBlocks);

for (int i = 0; i < numBlocks; i++) {
cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[i].getBlockName(), blocks[i].getBlock());
}

// saves the current state
bucketCache.persistToFile();

// Create a new bucket which reads from persistence file.
BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
DEFAULT_ERROR_TOLERATION_DURATION, conf);

assertEquals(numBlocks, newBucketCache.backingMap.size());

for (int i = 0; i < numBlocks; i++) {
assertEquals(blocks[i].getBlock(),
newBucketCache.getBlock(blocks[i].getBlockName(), false, false, false));
}
TEST_UTIL.cleanupTestDir();
}

private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
throws InterruptedException {
while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
Expand Down