Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -390,17 +390,17 @@ private void startPersistenceRetriever(int[] bucketSizes, long capacity) {
try {
retrieveFromFile(bucketSizes);
LOG.info("Persistent bucket cache recovery from {} is complete.", persistencePath);
} catch (IOException ioex) {
LOG.error("Can't restore from file[{}] because of ", persistencePath, ioex);
} catch (Throwable ex) {
LOG.error("Can't restore from file[{}] because of ", persistencePath, ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since we are handling the error, shouldn't this be a warn? And let's explain the cache will be reset and reload would happen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

backingMap.clear();
fullyCachedFiles.clear();
backingMapValidated.set(true);
regionCachedSize.clear();
try {
bucketAllocator = new BucketAllocator(capacity, bucketSizes);
} catch (BucketAllocatorException ex) {
LOG.error("Exception during Bucket Allocation", ex);
} catch (BucketAllocatorException allocatorException) {
LOG.error("Exception during Bucket Allocation", allocatorException);
}
regionCachedSize.clear();
} finally {
this.cacheState = CacheState.ENABLED;
startWriterThreads();
Expand Down Expand Up @@ -951,7 +951,8 @@ public void logStats() {
: (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", "))
+ "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted="
+ cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction()
+ ", " + "allocationFailCount=" + cacheStats.getAllocationFailCount());
+ ", " + "allocationFailCount=" + cacheStats.getAllocationFailCount() + ", blocksCount="
+ backingMap.size());
cacheStats.reset();

bucketAllocator.logDebugStatistics();
Expand Down Expand Up @@ -1496,7 +1497,7 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException {
} else if (Arrays.equals(pbuf, BucketProtoUtils.PB_MAGIC_V2)) {
// The new persistence format of chunked persistence.
LOG.info("Reading new chunked format of persistence.");
retrieveChunkedBackingMap(in, bucketSizes);
retrieveChunkedBackingMap(in);
} else {
// In 3.0 we have enough flexibility to dump the old cache data.
// TODO: In 2.x line, this might need to be filled in to support reading the old format
Expand Down Expand Up @@ -1626,52 +1627,33 @@ private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOExceptio
}

private void persistChunkedBackingMap(FileOutputStream fos) throws IOException {
long numChunks = backingMap.size() / persistenceChunkSize;
if (backingMap.size() % persistenceChunkSize != 0) {
numChunks += 1;
}

LOG.debug(
"persistToFile: before persisting backing map size: {}, "
+ "fullycachedFiles size: {}, chunkSize: {}, numberofChunks: {}",
backingMap.size(), fullyCachedFiles.size(), persistenceChunkSize, numChunks);
backingMap.size(), fullyCachedFiles.size(), persistenceChunkSize);

BucketProtoUtils.serializeAsPB(this, fos, persistenceChunkSize, numChunks);
BucketProtoUtils.serializeAsPB(this, fos, persistenceChunkSize);

LOG.debug(
"persistToFile: after persisting backing map size: {}, "
+ "fullycachedFiles size: {}, numChunksPersisteed: {}",
backingMap.size(), fullyCachedFiles.size(), numChunks);
"persistToFile: after persisting backing map size: {}, " + "fullycachedFiles size: {}",
backingMap.size(), fullyCachedFiles.size());
}

private void retrieveChunkedBackingMap(FileInputStream in, int[] bucketSizes) throws IOException {
byte[] bytes = new byte[Long.BYTES];
int readSize = in.read(bytes);
if (readSize != Long.BYTES) {
throw new IOException("Invalid size of chunk-size read from persistence: " + readSize);
}
long batchSize = Bytes.toLong(bytes, 0);

readSize = in.read(bytes);
if (readSize != Long.BYTES) {
throw new IOException("Invalid size for number of chunks read from persistence: " + readSize);
}
long numChunks = Bytes.toLong(bytes, 0);

LOG.info("Number of chunks: {}, chunk size: {}", numChunks, batchSize);
private void retrieveChunkedBackingMap(FileInputStream in) throws IOException {

// Read the first chunk that has all the details.
BucketCacheProtos.BucketCacheEntry firstChunk =
BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in);
parseFirstChunk(firstChunk);

// Subsequent chunks have the backingMap entries.
for (int i = 1; i < numChunks; i++) {
LOG.info("Reading chunk no: {}", i + 1);
int numChunks = 0;
while (in.available() > 0) {
parseChunkPB(BucketCacheProtos.BackingMap.parseDelimitedFrom(in),
firstChunk.getDeserializersMap());
LOG.info("Retrieved chunk: {}", i + 1);
numChunks++;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we change the way we write to the file as I suggested, so that we have only the BucketCacheProtos.BucketCacheEntry at the beginning followed by all BucketCacheProtos.BackingMap chunks , then we should change the naming here, firstChunk should now be entry. And these parseFirstChunk, parseChunkPB are not really parsing anything (parsing is delegated to the proto utils), but rather updating the cache index structures, so we should rename it to something like updateCacheIndex.

Also looking at parseFirstChunck and parseChunk, we should replace duplicate code inside parseFirstChunck by call to parseChunk. Or since parseFirstChunk is only used here, we could just get rid of it and simply do:

fullyCachedFiles.clear(); fullyCachedFiles.putAll(BucketProtoUtils.fromPB(entry.getCachedFilesMap()));

Whilst the backingMap and blocksByHfile would get updated all in the loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack!

LOG.info("Retrieved {} of chunks with blockCount = {}.", numChunks, backingMap.size());
verifyFileIntegrity(firstChunk);
verifyCapacityAndClasses(firstChunk.getCacheCapacity(), firstChunk.getIoClass(),
firstChunk.getMapClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;

Expand Down Expand Up @@ -62,31 +61,29 @@ static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache,
.build();
}

public static void serializeAsPB(BucketCache cache, FileOutputStream fos, long chunkSize,
long numChunks) throws IOException {
public static void serializeAsPB(BucketCache cache, FileOutputStream fos, long chunkSize)
throws IOException {
// Write the new version of magic number.
fos.write(PB_MAGIC_V2);

int blockCount = 0;
int chunkCount = 0;
int backingMapSize = cache.backingMap.size();
BucketCacheProtos.BackingMap.Builder builder = BucketCacheProtos.BackingMap.newBuilder();

fos.write(PB_MAGIC_V2);
fos.write(Bytes.toBytes(chunkSize));
fos.write(Bytes.toBytes(numChunks));

boolean firstChunkPersisted = false;
BucketCacheProtos.BackingMapEntry.Builder entryBuilder =
BucketCacheProtos.BackingMapEntry.newBuilder();

for (Map.Entry<BlockCacheKey, BucketEntry> entry : cache.backingMap.entrySet()) {
blockCount++;
entryBuilder.clear();
entryBuilder.setKey(BucketProtoUtils.toPB(entry.getKey()));
entryBuilder.setValue(BucketProtoUtils.toPB(entry.getValue()));
builder.addEntry(entryBuilder.build());

if (blockCount % chunkSize == 0 || (blockCount == backingMapSize)) {
chunkCount++;
if (chunkCount == 1) {
if (firstChunkPersisted == false) {
// Persist all details along with the first chunk into BucketCacheEntry
BucketProtoUtils.toPB(cache, builder.build()).writeDelimitedTo(fos);
firstChunkPersisted = true;
} else {
// Directly persist subsequent backing-map chunks.
builder.build().writeDelimitedTo(fos);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe for consistency and simplicity, we should just write the BucketCacheProtos.BucketCacheEntry before the for loop, then the backmap chunks only within this loop. That way we wouldn't need this firstChunkPersisted thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack!

Expand Down