Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,10 @@ default String getNameAsString() {

/** Returns the compaction state of this region */
CompactionState getCompactionState();

/** Returns the total size of the hfiles in the region */
Size getRegionSizeMB();

/** Returns current prefetch ratio of this region on this server */
float getCurrentRegionCachedRatio();
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public static RegionMetrics toRegionMetrics(ClusterStatusProtos.RegionLoad regio
ClusterStatusProtos.StoreSequenceId::getSequenceId)))
.setUncompressedStoreFileSize(
new Size(regionLoadPB.getStoreUncompressedSizeMB(), Size.Unit.MEGABYTE))
.build();
.setRegionSizeMB(new Size(regionLoadPB.getRegionSizeMB(), Size.Unit.MEGABYTE))
.setCurrentRegionCachedRatio(regionLoadPB.getCurrentRegionCachedRatio()).build();
}

private static List<ClusterStatusProtos.StoreSequenceId>
Expand Down Expand Up @@ -120,7 +121,8 @@ public static ClusterStatusProtos.RegionLoad toRegionLoad(RegionMetrics regionMe
.addAllStoreCompleteSequenceId(toStoreSequenceId(regionMetrics.getStoreSequenceId()))
.setStoreUncompressedSizeMB(
(int) regionMetrics.getUncompressedStoreFileSize().get(Size.Unit.MEGABYTE))
.build();
.setRegionSizeMB((int) regionMetrics.getRegionSizeMB().get(Size.Unit.MEGABYTE))
.setCurrentRegionCachedRatio(regionMetrics.getCurrentRegionCachedRatio()).build();
}

public static RegionMetricsBuilder newBuilder(byte[] name) {
Expand Down Expand Up @@ -154,6 +156,8 @@ public static RegionMetricsBuilder newBuilder(byte[] name) {
private long blocksLocalWithSsdWeight;
private long blocksTotalWeight;
private CompactionState compactionState;
private Size regionSizeMB = Size.ZERO;
private float currentRegionCachedRatio;

private RegionMetricsBuilder(byte[] name) {
this.name = name;
Expand Down Expand Up @@ -289,14 +293,24 @@ public RegionMetricsBuilder setCompactionState(CompactionState compactionState)
return this;
}

public RegionMetricsBuilder setRegionSizeMB(Size value) {
this.regionSizeMB = value;
return this;
}

public RegionMetricsBuilder setCurrentRegionCachedRatio(float value) {
this.currentRegionCachedRatio = value;
return this;
}

public RegionMetrics build() {
return new RegionMetricsImpl(name, storeCount, storeFileCount, storeRefCount,
maxCompactedStoreFileRefCount, compactingCellCount, compactedCellCount, storeFileSize,
memStoreSize, indexSize, rootLevelIndexSize, uncompressedDataIndexSize, bloomFilterSize,
uncompressedStoreFileSize, writeRequestCount, readRequestCount, cpRequestCount,
filteredReadRequestCount, completedSequenceId, storeSequenceIds, dataLocality,
lastMajorCompactionTimestamp, dataLocalityForSsd, blocksLocalWeight, blocksLocalWithSsdWeight,
blocksTotalWeight, compactionState);
blocksTotalWeight, compactionState, regionSizeMB, currentRegionCachedRatio);
}

private static class RegionMetricsImpl implements RegionMetrics {
Expand Down Expand Up @@ -327,6 +341,8 @@ private static class RegionMetricsImpl implements RegionMetrics {
private final long blocksLocalWithSsdWeight;
private final long blocksTotalWeight;
private final CompactionState compactionState;
private final Size regionSizeMB;
private final float currentRegionCachedRatio;

RegionMetricsImpl(byte[] name, int storeCount, int storeFileCount, int storeRefCount,
int maxCompactedStoreFileRefCount, final long compactingCellCount, long compactedCellCount,
Expand All @@ -336,7 +352,7 @@ private static class RegionMetricsImpl implements RegionMetrics {
long filteredReadRequestCount, long completedSequenceId, Map<byte[], Long> storeSequenceIds,
float dataLocality, long lastMajorCompactionTimestamp, float dataLocalityForSsd,
long blocksLocalWeight, long blocksLocalWithSsdWeight, long blocksTotalWeight,
CompactionState compactionState) {
CompactionState compactionState, Size regionSizeMB, float currentRegionCachedRatio) {
this.name = Preconditions.checkNotNull(name);
this.storeCount = storeCount;
this.storeFileCount = storeFileCount;
Expand Down Expand Up @@ -364,6 +380,8 @@ private static class RegionMetricsImpl implements RegionMetrics {
this.blocksLocalWithSsdWeight = blocksLocalWithSsdWeight;
this.blocksTotalWeight = blocksTotalWeight;
this.compactionState = compactionState;
this.regionSizeMB = regionSizeMB;
this.currentRegionCachedRatio = currentRegionCachedRatio;
}

@Override
Expand Down Expand Up @@ -501,6 +519,16 @@ public CompactionState getCompactionState() {
return compactionState;
}

@Override
public Size getRegionSizeMB() {
return regionSizeMB;
}

@Override
public float getCurrentRegionCachedRatio() {
return currentRegionCachedRatio;
}

@Override
public String toString() {
StringBuilder sb =
Expand Down Expand Up @@ -541,6 +569,8 @@ public String toString() {
Strings.appendKeyValue(sb, "blocksLocalWithSsdWeight", blocksLocalWithSsdWeight);
Strings.appendKeyValue(sb, "blocksTotalWeight", blocksTotalWeight);
Strings.appendKeyValue(sb, "compactionState", compactionState);
Strings.appendKeyValue(sb, "regionSizeMB", regionSizeMB);
Strings.appendKeyValue(sb, "currentRegionCachedRatio", currentRegionCachedRatio);
return sb.toString();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,10 @@ default String getVersion() {
@Nullable
List<ServerTask> getTasks();

/**
* Returns the region cache information for the regions hosted on this server
* @return map of region encoded name and the size of the region cached on this region server
* rounded to MB
*/
Map<String, Integer> getRegionCachedInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public static ServerMetrics toServerMetrics(ServerName serverName, int versionNu
: null)
.setTasks(serverLoadPB.getTasksList().stream().map(ProtobufUtil::getServerTask)
.collect(Collectors.toList()))
.setRegionCachedInfo(serverLoadPB.getRegionCachedInfoMap())
.setReportTimestamp(serverLoadPB.getReportEndTime())
.setLastReportTimestamp(serverLoadPB.getReportStartTime()).setVersionNumber(versionNumber)
.setVersion(version).build();
Expand All @@ -111,6 +112,7 @@ public static ClusterStatusProtos.ServerLoad toServerLoad(ServerMetrics metrics)
.map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList()))
.addAllTasks(
metrics.getTasks().stream().map(ProtobufUtil::toServerTask).collect(Collectors.toList()))
.putAllRegionCachedInfo(metrics.getRegionCachedInfo())
.setReportStartTime(metrics.getLastReportTimestamp())
.setReportEndTime(metrics.getReportTimestamp());
if (metrics.getReplicationLoadSink() != null) {
Expand Down Expand Up @@ -142,6 +144,7 @@ public static ServerMetricsBuilder newBuilder(ServerName sn) {
private long reportTimestamp = EnvironmentEdgeManager.currentTime();
private long lastReportTimestamp = 0;
private final List<ServerTask> tasks = new ArrayList<>();
private Map<String, Integer> regionCachedInfo = new HashMap<>();

private ServerMetricsBuilder(ServerName serverName) {
this.serverName = serverName;
Expand Down Expand Up @@ -232,11 +235,16 @@ public ServerMetricsBuilder setTasks(List<ServerTask> tasks) {
return this;
}

public ServerMetricsBuilder setRegionCachedInfo(Map<String, Integer> value) {
this.regionCachedInfo = value;
return this;
}

public ServerMetrics build() {
return new ServerMetricsImpl(serverName, versionNumber, version, requestCountPerSecond,
requestCount, readRequestCount, writeRequestCount, usedHeapSize, maxHeapSize, infoServerPort,
sources, sink, regionStatus, coprocessorNames, reportTimestamp, lastReportTimestamp,
userMetrics, tasks);
userMetrics, tasks, regionCachedInfo);
}

private static class ServerMetricsImpl implements ServerMetrics {
Expand All @@ -259,13 +267,15 @@ private static class ServerMetricsImpl implements ServerMetrics {
private final long lastReportTimestamp;
private final Map<byte[], UserMetrics> userMetrics;
private final List<ServerTask> tasks;
private final Map<String, Integer> regionCachedInfo;

ServerMetricsImpl(ServerName serverName, int versionNumber, String version,
long requestCountPerSecond, long requestCount, long readRequestsCount,
long writeRequestsCount, Size usedHeapSize, Size maxHeapSize, int infoServerPort,
List<ReplicationLoadSource> sources, ReplicationLoadSink sink,
Map<byte[], RegionMetrics> regionStatus, Set<String> coprocessorNames, long reportTimestamp,
long lastReportTimestamp, Map<byte[], UserMetrics> userMetrics, List<ServerTask> tasks) {
long lastReportTimestamp, Map<byte[], UserMetrics> userMetrics, List<ServerTask> tasks,
Map<String, Integer> regionCachedInfo) {
this.serverName = Preconditions.checkNotNull(serverName);
this.versionNumber = versionNumber;
this.version = version;
Expand All @@ -284,6 +294,7 @@ private static class ServerMetricsImpl implements ServerMetrics {
this.reportTimestamp = reportTimestamp;
this.lastReportTimestamp = lastReportTimestamp;
this.tasks = tasks;
this.regionCachedInfo = regionCachedInfo;
}

@Override
Expand Down Expand Up @@ -386,6 +397,11 @@ public List<ServerTask> getTasks() {
return tasks;
}

@Override
public Map<String, Integer> getRegionCachedInfo() {
return Collections.unmodifiableMap(regionCachedInfo);
}

@Override
public String toString() {
int storeCount = 0;
Expand Down
11 changes: 11 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ message RegionLoad {
MAJOR = 2;
MAJOR_AND_MINOR = 3;
}

/** Total region size in MB */
optional uint32 region_size_MB = 28;

/** Current region cache ratio on this server */
optional float current_region_cached_ratio = 29;
}

message UserLoad {
Expand Down Expand Up @@ -315,6 +321,11 @@ message ServerLoad {
* The active monitored tasks
*/
repeated ServerTask tasks = 15;

/**
* The metrics for region cached on this region server
*/
map<string, uint32> regionCachedInfo = 16;
}

message LiveServerInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, CacheConfig c
Configuration conf) throws IOException {
super(context, fileInfo, cacheConf, conf);
final MutableBoolean fileAlreadyCached = new MutableBoolean(false);
BucketCache.getBuckedCacheFromCacheConfig(cacheConf).ifPresent(bc -> fileAlreadyCached
BucketCache.getBucketCacheFromCacheConfig(cacheConf).ifPresent(bc -> fileAlreadyCached
.setValue(bc.getFullyCachedFiles().get(path.getName()) == null ? false : true));
// Prefetch file blocks upon open if requested
if (
Expand All @@ -66,7 +66,7 @@ public void run() {
LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset, end));
}
Optional<BucketCache> bucketCacheOptional =
BucketCache.getBuckedCacheFromCacheConfig(cacheConf);
BucketCache.getBucketCacheFromCacheConfig(cacheConf);
// Don't use BlockIterator here, because it's designed to read load-on-open section.
long onDiskSizeOfNextBlock = -1;
while (offset < end) {
Expand Down Expand Up @@ -112,8 +112,8 @@ public void run() {
}
}
final long fileSize = offset;
BucketCache.getBuckedCacheFromCacheConfig(cacheConf)
.ifPresent(bc -> bc.fileCacheCompleted(path,fileSize));
BucketCache.getBucketCacheFromCacheConfig(cacheConf)
.ifPresent(bc -> bc.fileCacheCompleted(path, fileSize));

} catch (IOException e) {
// IOExceptions are probably due to region closes (relocation, etc.)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -152,14 +153,15 @@ public class BucketCache implements BlockCache, HeapSize {
transient Map<BlockCacheKey, BucketEntry> backingMap;
/**
* Map of hFile -> Region -> File size. This map is used to track all files completed prefetch,
* together with the region those belong to and the total cached size for the region.TestBlockEvictionOnRegionMovement
* together with the region those belong to and the total cached size for the
* region.TestBlockEvictionOnRegionMovement
*/
final Map<String,Pair<String, Long>> fullyCachedFiles = new ConcurrentHashMap<>();
final Map<String, Pair<String, Long>> fullyCachedFiles = new ConcurrentHashMap<>();
/**
* Map of region -> total size of the region prefetched on this region server. This is the total
* size of hFiles for this region prefetched on this region server
*/
final Map<String,Long> regionCachedSizeMap = new ConcurrentHashMap<>();
final Map<String, Long> regionCachedSizeMap = new ConcurrentHashMap<>();

private BucketCachePersister cachePersister;

Expand Down Expand Up @@ -1285,10 +1287,14 @@ void persistToFile() throws IOException {
}
}

private boolean isCachePersistent() {
public boolean isCachePersistent() {
return ioEngine.isPersistent() && persistencePath != null;
}

public Map<String, Long> getRegionCachedInfo() {
return Collections.unmodifiableMap(regionCachedSizeMap);
}

/**
* @see #persistToFile()
*/
Expand Down Expand Up @@ -1319,6 +1325,29 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException {
}
}

private void updateRegionSizeMapWhileRetrievingFromFile() {
// Update the regionCachedSizeMap with the region size while restarting the region server
if (LOG.isDebugEnabled()) {
LOG.debug("Updating region size map after retrieving cached file list");
dumpPrefetchList();
}
regionCachedSizeMap.clear();
fullyCachedFiles.forEach((hFileName, hFileSize) -> {
// Get the region name for each file
String regionEncodedName = hFileSize.getFirst();
long cachedFileSize = hFileSize.getSecond();
regionCachedSizeMap.merge(regionEncodedName, cachedFileSize,
(oldpf, fileSize) -> oldpf + fileSize);
});
}

private void dumpPrefetchList() {
for (Map.Entry<String, Pair<String, Long>> outerEntry : fullyCachedFiles.entrySet()) {
LOG.debug("Cached File Entry:<{},<{},{}>>", outerEntry.getKey(),
outerEntry.getValue().getFirst(), outerEntry.getValue().getSecond());
}
}

/**
* Create an input stream that deletes the file after reading it. Use in try-with-resources to
* avoid this pattern where an exception thrown from a finally block may mask earlier exceptions:
Expand Down Expand Up @@ -1410,6 +1439,7 @@ private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOExceptio
// if has not checksum, it means the persistence file is old format
LOG.info("Persistent file is old format, it does not support verifying file integrity!");
}
updateRegionSizeMapWhileRetrievingFromFile();
verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass());
}

Expand Down Expand Up @@ -1542,7 +1572,7 @@ protected String getAlgorithm() {
*/
@Override
public int evictBlocksByHfileName(String hfileName) {
this.fullyCachedFiles.remove(hfileName);
removeFileFromPrefetch(hfileName);
Set<BlockCacheKey> keySet = blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE),
true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true);

Expand Down Expand Up @@ -1926,7 +1956,7 @@ public Map<String, Pair<String, Long>> getFullyCachedFiles() {
return fullyCachedFiles;
}

public static Optional<BucketCache> getBuckedCacheFromCacheConfig(CacheConfig cacheConf) {
public static Optional<BucketCache> getBucketCacheFromCacheConfig(CacheConfig cacheConf) {
if (cacheConf.getBlockCache().isPresent()) {
BlockCache bc = cacheConf.getBlockCache().get();
if (bc instanceof CombinedBlockCache) {
Expand All @@ -1948,19 +1978,21 @@ private void removeFileFromPrefetch(String hfileName) {
String regionEncodedName = regionEntry.getFirst();
long filePrefetchSize = regionEntry.getSecond();
LOG.debug("Removing file {} for region {}", hfileName, regionEncodedName);
regionCachedSizeMap.computeIfPresent(regionEncodedName,
(rn, pf) -> pf - filePrefetchSize);
regionCachedSizeMap.computeIfPresent(regionEncodedName, (rn, pf) -> pf - filePrefetchSize);
// If all the blocks for a region are evicted from the cache, remove the entry for that region
if (regionCachedSizeMap.containsKey(regionEncodedName) &&
regionCachedSizeMap.get(regionEncodedName) == 0) {
if (
regionCachedSizeMap.containsKey(regionEncodedName)
&& regionCachedSizeMap.get(regionEncodedName) == 0
) {
regionCachedSizeMap.remove(regionEncodedName);
}
}
fullyCachedFiles.remove(hfileName);
}

public void fileCacheCompleted(Path filePath, long size) {
Pair<String,Long> pair = new Pair<>();
//sets the region name
Pair<String, Long> pair = new Pair<>();
// sets the region name
String regionName = filePath.getParent().getParent().getName();
pair.setFirst(regionName);
pair.setSecond(size);
Expand Down
Loading