Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntConsumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
Expand All @@ -29,6 +30,7 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.ReaderContext;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
Expand Down Expand Up @@ -64,6 +66,8 @@ public class HalfStoreFileReader extends StoreFileReader {

private boolean firstKeySeeked = false;

private AtomicBoolean closed = new AtomicBoolean(false);

/**
* Creates a half file reader for a hfile referred to by an hfilelink.
* @param context Reader context info
Expand Down Expand Up @@ -349,4 +353,35 @@ public long getFilterEntries() {
// Estimate the number of entries as half the original file; this may be wildly inaccurate.
return super.getFilterEntries() / 2;
}

@Override
public void close(boolean evictOnClose) throws IOException {
if (closed.compareAndSet(false, true)) {
if (evictOnClose && StoreFileInfo.isReference(this.reader.getPath())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to check isReference here? We will only use HalfStoreFileReader when the store file is a reference file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think there's no need for this extra check, as we'll always have a reference with HalfStoreFileReader. Let me remove this.

final HFileReaderImpl.HFileScannerImpl s =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually it is not a good idea to the implementation class directly at upper layer. Why we need to use HFileScannerImpl here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to seek to the split cell, in order to calculate offsets for the cache keys of referred file to evict.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But there is a seekTo method in HFileScanner interface? I just mean why we need the cast here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need the HFileScannerImpl.getCurBlock for figuring out the offset.

(HFileReaderImpl.HFileScannerImpl) super.getScanner(false, true, false);
final String reference = this.reader.getHFileInfo().getHFileContext().getHFileName();
final String referred = StoreFileInfo.getReferredToRegionAndFile(reference).getSecond();
s.seekTo(splitCell);
if (s.getCurBlock() != null) {
long offset = s.getCurBlock().getOffset();
LOG.trace("Seeking to split cell in reader: {} for file: {} top: {}, split offset: {}",
this, reference, top, offset);
((HFileReaderImpl) reader).getCacheConf().getBlockCache().ifPresent(cache -> {
int numEvictedReferred = top
? cache.evictBlocksRangeByHfileName(referred, offset, Long.MAX_VALUE)
: cache.evictBlocksRangeByHfileName(referred, 0, offset);
int numEvictedReference = cache.evictBlocksByHfileName(reference);
LOG.trace(
"Closing reference: {}; referred file: {}; was top? {}; evicted for referred: {};"
+ "evicted for reference: {}",
reference, referred, top, numEvictedReferred, numEvictedReference);
});
}
reader.close(false);
} else {
reader.close(evictOnClose);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,4 +250,8 @@ default Optional<Map<String, Pair<String, Long>>> getFullyCachedFiles() {
default Optional<Map<String, Long>> getRegionCachedInfo() {
return Optional.empty();
}

default int evictBlocksRangeByHfileName(String hfileName, long initOffset, long endOffset) {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import static org.apache.hadoop.hbase.io.hfile.HFileBlock.FILL_HEADER;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.NavigableMap;
Expand All @@ -25,7 +27,9 @@
import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.metrics.impl.FastLongHistogram;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.GsonUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -244,6 +248,34 @@ public static int getMaxCachedBlocksByFile(Configuration conf) {
return conf == null ? DEFAULT_MAX : conf.getInt("hbase.ui.blockcache.by.file.max", DEFAULT_MAX);
}

public static HFileBlock getBlockForCaching(CacheConfig cacheConf, HFileBlock block) {
HFileContext newContext =
new HFileContextBuilder().withBlockSize(block.getHFileContext().getBlocksize())
.withBytesPerCheckSum(0).withChecksumType(ChecksumType.NULL) // no checksums in cached data
.withCompression(block.getHFileContext().getCompression())
.withDataBlockEncoding(block.getHFileContext().getDataBlockEncoding())
.withHBaseCheckSum(block.getHFileContext().isUseHBaseChecksum())
.withCompressTags(block.getHFileContext().isCompressTags())
.withIncludesMvcc(block.getHFileContext().isIncludesMvcc())
.withIncludesTags(block.getHFileContext().isIncludesTags())
.withColumnFamily(block.getHFileContext().getColumnFamily())
.withTableName(block.getHFileContext().getTableName()).build();
// Build the HFileBlock.
HFileBlockBuilder builder = new HFileBlockBuilder();
ByteBuff buff = block.getBufferReadOnly();
// Calculate how many bytes we need for checksum on the tail of the block.
int numBytes = (int) ChecksumUtil.numBytes(block.getOnDiskDataSizeWithHeader(),
block.getHFileContext().getBytesPerChecksum());
return builder.withBlockType(block.getBlockType())
.withOnDiskSizeWithoutHeader(block.getOnDiskSizeWithoutHeader())
.withUncompressedSizeWithoutHeader(block.getUncompressedSizeWithoutHeader())
.withPrevBlockOffset(block.getPrevBlockOffset()).withByteBuff(buff)
.withFillHeader(FILL_HEADER).withOffset(block.getOffset()).withNextBlockOnDiskSize(-1)
.withOnDiskDataSizeWithHeader(block.getOnDiskDataSizeWithHeader() + numBytes)
.withHFileContext(newContext).withByteBuffAllocator(cacheConf.getByteBuffAllocator())
.withShared(!buff.hasArray()).build();
}

/**
* Use one of these to keep a running account of cached blocks by file. Throw it away when done.
* This is different than metrics in that it is stats on current state of a cache. See
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public class CacheConfig implements ConfigurationObserver {
*/
public static final String EVICT_BLOCKS_ON_CLOSE_KEY = "hbase.rs.evictblocksonclose";

public static final String EVICT_BLOCKS_ON_SPLIT_KEY = "hbase.rs.evictblocksonsplit";

/**
* Configuration key to prefetch all blocks of a given file into the block cache when the file is
* opened.
Expand Down Expand Up @@ -113,6 +115,7 @@ public class CacheConfig implements ConfigurationObserver {
public static final boolean DEFAULT_CACHE_INDEXES_ON_WRITE = false;
public static final boolean DEFAULT_CACHE_BLOOMS_ON_WRITE = false;
public static final boolean DEFAULT_EVICT_ON_CLOSE = false;
public static final boolean DEFAULT_EVICT_ON_SPLIT = true;
public static final boolean DEFAULT_CACHE_DATA_COMPRESSED = false;
public static final boolean DEFAULT_PREFETCH_ON_OPEN = false;
public static final boolean DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repea
}
} else {
if (existInL1) {
LOG.warn("Cache key {} had block type {}, but was found in L1 cache.", cacheKey,
cacheKey.getBlockType());
updateBlockMetrics(block, cacheKey, l1Cache, caching);
} else {
updateBlockMetrics(block, cacheKey, l2Cache, caching);
Expand Down Expand Up @@ -504,4 +502,9 @@ public Optional<Integer> getBlockSize(BlockCacheKey key) {
return l1Result.isPresent() ? l1Result : l2Cache.getBlockSize(key);
}

@Override
public int evictBlocksRangeByHfileName(String hfileName, long initOffset, long endOffset) {
return l1Cache.evictBlocksRangeByHfileName(hfileName, initOffset, endOffset)
+ l2Cache.evictBlocksRangeByHfileName(hfileName, initOffset, endOffset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ public boolean isUnpacked() {
* when block is returned to the cache.
* @return the offset of this block in the file it was read from
*/
long getOffset() {
public long getOffset() {
if (offset < 0) {
throw new IllegalStateException("HFile block offset not initialized properly");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, CacheConfig c
});

// Prefetch file blocks upon open if requested
if (cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff() && shouldCache.booleanValue()) {
if (cacheConf.shouldPrefetchOnOpen() && shouldCache.booleanValue()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why removing cacheIfCompactionsOff?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reason as explained here: https://github.com/apache/hbase/pull/5906/files#r1606971440. We want the prefetch to run even for refs or links, the cache implementation should decide what to do with refs/links.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better add some comments to explain the history here?

PrefetchExecutor.request(path, new Runnable() {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.BLOCK_CACHE_KEY_KEY;

import io.opentelemetry.api.common.Attributes;
Expand All @@ -42,14 +41,12 @@
import org.apache.hadoop.hbase.SizeCachedKeyValue;
import org.apache.hadoop.hbase.SizeCachedNoTagsByteBufferKeyValue;
import org.apache.hadoop.hbase.SizeCachedNoTagsKeyValue;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.IdLock;
Expand Down Expand Up @@ -159,6 +156,10 @@ public BlockIndexNotLoadedException(Path path) {
}
}

public CacheConfig getCacheConf() {
return cacheConf;
}

private Optional<String> toStringFirstKey() {
return getFirstKey().map(CellUtil::getCellKeyAsString);
}
Expand Down Expand Up @@ -307,7 +308,7 @@ public NotSeekedException(Path path) {
}
}

protected static class HFileScannerImpl implements HFileScanner {
public static class HFileScannerImpl implements HFileScanner {
private ByteBuff blockBuffer;
protected final boolean cacheBlocks;
protected final boolean pread;
Expand All @@ -331,6 +332,7 @@ protected static class HFileScannerImpl implements HFileScanner {
* loaded yet.
*/
protected Cell nextIndexedKey;

// Current block being used. NOTICE: DON't release curBlock separately except in shipped() or
// close() methods. Because the shipped() or close() will do the release finally, even if any
// exception occur the curBlock will be released by the close() method (see
Expand All @@ -340,6 +342,11 @@ protected static class HFileScannerImpl implements HFileScanner {
// Whether we returned a result for curBlock's size in recordBlockSize().
// gets reset whenever curBlock is changed.
private boolean providedCurrentBlockSize = false;

public HFileBlock getCurBlock() {
return curBlock;
}

// Previous blocks that were used in the course of the read
protected final ArrayList<HFileBlock> prevBlocks = new ArrayList<>();

Expand Down Expand Up @@ -1293,8 +1300,6 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
new BlockCacheKey(path, dataBlockOffset, this.isPrimaryReplicaReader(), expectedBlockType);
Attributes attributes = Attributes.of(BLOCK_CACHE_KEY_KEY, cacheKey.toString());

boolean cacheable = cacheBlock && cacheIfCompactionsOff();

boolean useLock = false;
IdLock.Entry lockEntry = null;
final Span span = Span.current();
Expand Down Expand Up @@ -1336,7 +1341,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
return cachedBlock;
}

if (!useLock && cacheable && cacheConf.shouldLockOnCacheMiss(expectedBlockType)) {
if (!useLock && cacheBlock && cacheConf.shouldLockOnCacheMiss(expectedBlockType)) {
// check cache again with lock
useLock = true;
continue;
Expand All @@ -1346,8 +1351,9 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo

span.addEvent("block cache miss", attributes);
// Load block from filesystem.
HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread,
!isCompaction, shouldUseHeap(expectedBlockType, cacheable));
HFileBlock hfileBlock =
BlockCacheUtil.getBlockForCaching(cacheConf, fsBlockReader.readBlockData(dataBlockOffset,
onDiskBlockSize, pread, !isCompaction, shouldUseHeap(expectedBlockType, cacheBlock)));
try {
validateBlockType(hfileBlock, expectedBlockType);
} catch (IOException e) {
Expand All @@ -1363,7 +1369,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
cacheConf.getBlockCache().ifPresent(cache -> {
LOG.debug("Skipping decompression of block {} in prefetch", cacheKey);
// Cache the block if necessary
if (cacheable && cacheConf.shouldCacheBlockOnRead(category)) {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), cacheOnly);
}
});
Expand All @@ -1376,7 +1382,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheable && cacheConf.shouldCacheBlockOnRead(category)) {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
// Using the wait on cache during compaction and prefetching.
cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked,
cacheConf.isInMemory(), cacheOnly);
Expand Down Expand Up @@ -1712,9 +1718,4 @@ public int getMajorVersion() {
public void unbufferStream() {
fsBlockReader.unbufferStream();
}

protected boolean cacheIfCompactionsOff() {
return (!StoreFileInfo.isReference(name) && !HFileLink.isHFileLink(name))
|| !conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true);
}
}
Loading