From 4da1acadaa6413b4a46f47979f94b1c14ce8fd38 Mon Sep 17 00:00:00 2001 From: Rahul Agarkar Date: Wed, 2 Aug 2023 23:06:30 +0530 Subject: [PATCH 1/4] =?UTF-8?q?HBASE-27997=20Enhance=20prefetch=20executor?= =?UTF-8?q?=20to=20record=20region=20prefetch=20infor=E2=80=A6=20(#5339)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Wellington Chevreuil Reviewew-by: Kota-SH --- .../main/protobuf/PrefetchPersistence.proto | 7 ++++- .../hbase/io/hfile/HFilePreadReader.java | 12 ++++++++- .../hbase/io/hfile/PrefetchProtoUtils.java | 26 ++++++++++++++++--- 3 files changed, 40 insertions(+), 5 deletions(-) diff --git a/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto b/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto index d1a2b4cfd1b7..a024b94baa62 100644 --- a/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto +++ b/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto @@ -27,5 +27,10 @@ option optimize_for = SPEED; message PrefetchedHfileName { - map prefetched_files = 1; + map prefetched_files = 1; +} + +message RegionFileSizeMap { + required string region_name = 1; + required uint64 region_prefetch_size = 2; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java index f1579ea53b8e..c9768bab0345 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java @@ -132,13 +132,23 @@ public void run() { LOG.warn("Close prefetch stream reader failed, path: " + path, e); } } - PrefetchExecutor.complete(path); + String regionName = getRegionName(path); + PrefetchExecutor.complete(regionName, path, offset); } } }); } } + /* + * Get the region name for the given file path. A HFile is always kept under the //. To find the region for a given hFile, just find the name of the grandparent + * directory. + */ + private static String getRegionName(Path path) { + return path.getParent().getParent().getName(); + } + private static String getPathOffsetEndStr(final Path path, final long offset, final long end) { return "path=" + path.toString() + ", offset=" + offset + ", end=" + end; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java index e75e8a6a6522..df67e4429a2d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hbase.io.hfile; +import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos; @@ -26,8 +28,26 @@ private PrefetchProtoUtils() { } static PersistentPrefetchProtos.PrefetchedHfileName - toPB(Map prefetchedHfileNames) { - return PersistentPrefetchProtos.PrefetchedHfileName.newBuilder() - .putAllPrefetchedFiles(prefetchedHfileNames).build(); + toPB(Map> prefetchedHfileNames) { + Map tmpMap = new HashMap<>(); + prefetchedHfileNames.forEach((hFileName, regionPrefetchMap) -> { + PersistentPrefetchProtos.RegionFileSizeMap tmpRegionFileSize = + PersistentPrefetchProtos.RegionFileSizeMap.newBuilder() + .setRegionName(regionPrefetchMap.getFirst()) + .setRegionPrefetchSize(regionPrefetchMap.getSecond()).build(); + tmpMap.put(hFileName, tmpRegionFileSize); + }); + return PersistentPrefetchProtos.PrefetchedHfileName.newBuilder().putAllPrefetchedFiles(tmpMap) + .build(); + } + + static Map> + fromPB(Map prefetchHFileNames) { + Map> hFileMap = new HashMap<>(); + prefetchHFileNames.forEach((hFileName, regionPrefetchMap) -> { + hFileMap.put(hFileName, + new Pair<>(regionPrefetchMap.getRegionName(), regionPrefetchMap.getRegionPrefetchSize())); + }); + return hFileMap; } } From 373b6ce8be918f8b8b1a2133f420c128a9bdd2a6 Mon Sep 17 00:00:00 2001 From: Rahul Agarkar Date: Tue, 29 Aug 2023 15:36:23 +0530 Subject: [PATCH 2/4] =?UTF-8?q?HBASE-27998=20Enhance=20region=20metrics=20?= =?UTF-8?q?to=20include=20prefetch=20ratio=20for=20each=E2=80=A6=20(#5342)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Wellington Chevreuil --- .../apache/hadoop/hbase/RegionMetrics.java | 6 ++ .../hadoop/hbase/RegionMetricsBuilder.java | 38 ++++++++- .../apache/hadoop/hbase/ServerMetrics.java | 6 ++ .../hadoop/hbase/ServerMetricsBuilder.java | 20 ++++- .../main/protobuf/PrefetchPersistence.proto | 36 -------- .../main/protobuf/server/ClusterStatus.proto | 11 +++ .../protobuf/server/io/BucketCacheEntry.proto | 8 +- .../hadoop/hbase/io/hfile/BlockCache.java | 3 +- .../hbase/io/hfile/CombinedBlockCache.java | 3 +- .../hbase/io/hfile/HFilePreadReader.java | 6 +- .../hbase/io/hfile/PrefetchProtoUtils.java | 53 ------------ .../hbase/io/hfile/bucket/BucketCache.java | 85 ++++++++++++++++--- .../io/hfile/bucket/BucketProtoUtils.java | 26 +++++- .../hbase/regionserver/HRegionServer.java | 40 ++++++++- .../hadoop/hbase/TestServerMetrics.java | 18 ++-- .../master/TestRegionsRecoveryChore.java | 14 +++ 16 files changed, 251 insertions(+), 122 deletions(-) delete mode 100644 hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionMetrics.java index 47b36a7a1516..b029d0288564 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionMetrics.java @@ -138,4 +138,10 @@ default String getNameAsString() { /** Returns the compaction state of this region */ CompactionState getCompactionState(); + + /** Returns the total size of the hfiles in the region */ + Size getRegionSizeMB(); + + /** Returns current prefetch ratio of this region on this server */ + float getCurrentRegionCachedRatio(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionMetricsBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionMetricsBuilder.java index 43b3a17aac17..d3361693079a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionMetricsBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionMetricsBuilder.java @@ -80,7 +80,8 @@ public static RegionMetrics toRegionMetrics(ClusterStatusProtos.RegionLoad regio ClusterStatusProtos.StoreSequenceId::getSequenceId))) .setUncompressedStoreFileSize( new Size(regionLoadPB.getStoreUncompressedSizeMB(), Size.Unit.MEGABYTE)) - .build(); + .setRegionSizeMB(new Size(regionLoadPB.getRegionSizeMB(), Size.Unit.MEGABYTE)) + .setCurrentRegionCachedRatio(regionLoadPB.getCurrentRegionCachedRatio()).build(); } private static List @@ -120,7 +121,8 @@ public static ClusterStatusProtos.RegionLoad toRegionLoad(RegionMetrics regionMe .addAllStoreCompleteSequenceId(toStoreSequenceId(regionMetrics.getStoreSequenceId())) .setStoreUncompressedSizeMB( (int) regionMetrics.getUncompressedStoreFileSize().get(Size.Unit.MEGABYTE)) - .build(); + .setRegionSizeMB((int) regionMetrics.getRegionSizeMB().get(Size.Unit.MEGABYTE)) + .setCurrentRegionCachedRatio(regionMetrics.getCurrentRegionCachedRatio()).build(); } public static RegionMetricsBuilder newBuilder(byte[] name) { @@ -154,6 +156,8 @@ public static RegionMetricsBuilder newBuilder(byte[] name) { private long blocksLocalWithSsdWeight; private long blocksTotalWeight; private CompactionState compactionState; + private Size regionSizeMB = Size.ZERO; + private float currentRegionCachedRatio; private RegionMetricsBuilder(byte[] name) { this.name = name; @@ -289,6 +293,16 @@ public RegionMetricsBuilder setCompactionState(CompactionState compactionState) return this; } + public RegionMetricsBuilder setRegionSizeMB(Size value) { + this.regionSizeMB = value; + return this; + } + + public RegionMetricsBuilder setCurrentRegionCachedRatio(float value) { + this.currentRegionCachedRatio = value; + return this; + } + public RegionMetrics build() { return new RegionMetricsImpl(name, storeCount, storeFileCount, storeRefCount, maxCompactedStoreFileRefCount, compactingCellCount, compactedCellCount, storeFileSize, @@ -296,7 +310,7 @@ public RegionMetrics build() { uncompressedStoreFileSize, writeRequestCount, readRequestCount, cpRequestCount, filteredReadRequestCount, completedSequenceId, storeSequenceIds, dataLocality, lastMajorCompactionTimestamp, dataLocalityForSsd, blocksLocalWeight, blocksLocalWithSsdWeight, - blocksTotalWeight, compactionState); + blocksTotalWeight, compactionState, regionSizeMB, currentRegionCachedRatio); } private static class RegionMetricsImpl implements RegionMetrics { @@ -327,6 +341,8 @@ private static class RegionMetricsImpl implements RegionMetrics { private final long blocksLocalWithSsdWeight; private final long blocksTotalWeight; private final CompactionState compactionState; + private final Size regionSizeMB; + private final float currentRegionCachedRatio; RegionMetricsImpl(byte[] name, int storeCount, int storeFileCount, int storeRefCount, int maxCompactedStoreFileRefCount, final long compactingCellCount, long compactedCellCount, @@ -336,7 +352,7 @@ private static class RegionMetricsImpl implements RegionMetrics { long filteredReadRequestCount, long completedSequenceId, Map storeSequenceIds, float dataLocality, long lastMajorCompactionTimestamp, float dataLocalityForSsd, long blocksLocalWeight, long blocksLocalWithSsdWeight, long blocksTotalWeight, - CompactionState compactionState) { + CompactionState compactionState, Size regionSizeMB, float currentRegionCachedRatio) { this.name = Preconditions.checkNotNull(name); this.storeCount = storeCount; this.storeFileCount = storeFileCount; @@ -364,6 +380,8 @@ private static class RegionMetricsImpl implements RegionMetrics { this.blocksLocalWithSsdWeight = blocksLocalWithSsdWeight; this.blocksTotalWeight = blocksTotalWeight; this.compactionState = compactionState; + this.regionSizeMB = regionSizeMB; + this.currentRegionCachedRatio = currentRegionCachedRatio; } @Override @@ -501,6 +519,16 @@ public CompactionState getCompactionState() { return compactionState; } + @Override + public Size getRegionSizeMB() { + return regionSizeMB; + } + + @Override + public float getCurrentRegionCachedRatio() { + return currentRegionCachedRatio; + } + @Override public String toString() { StringBuilder sb = @@ -541,6 +569,8 @@ public String toString() { Strings.appendKeyValue(sb, "blocksLocalWithSsdWeight", blocksLocalWithSsdWeight); Strings.appendKeyValue(sb, "blocksTotalWeight", blocksTotalWeight); Strings.appendKeyValue(sb, "compactionState", compactionState); + Strings.appendKeyValue(sb, "regionSizeMB", regionSizeMB); + Strings.appendKeyValue(sb, "currentRegionCachedRatio", currentRegionCachedRatio); return sb.toString(); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java index 2684886ba3d5..2cf55a1abdc0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java @@ -106,4 +106,10 @@ default String getVersion() { @Nullable List getTasks(); + /** + * Returns the region cache information for the regions hosted on this server + * @return map of region encoded name and the size of the region cached on this region server + * rounded to MB + */ + Map getRegionCachedInfo(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java index 7a0312f22fdc..c7aea21e845a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java @@ -85,6 +85,7 @@ public static ServerMetrics toServerMetrics(ServerName serverName, int versionNu : null) .setTasks(serverLoadPB.getTasksList().stream().map(ProtobufUtil::getServerTask) .collect(Collectors.toList())) + .setRegionCachedInfo(serverLoadPB.getRegionCachedInfoMap()) .setReportTimestamp(serverLoadPB.getReportEndTime()) .setLastReportTimestamp(serverLoadPB.getReportStartTime()).setVersionNumber(versionNumber) .setVersion(version).build(); @@ -111,6 +112,7 @@ public static ClusterStatusProtos.ServerLoad toServerLoad(ServerMetrics metrics) .map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList())) .addAllTasks( metrics.getTasks().stream().map(ProtobufUtil::toServerTask).collect(Collectors.toList())) + .putAllRegionCachedInfo(metrics.getRegionCachedInfo()) .setReportStartTime(metrics.getLastReportTimestamp()) .setReportEndTime(metrics.getReportTimestamp()); if (metrics.getReplicationLoadSink() != null) { @@ -142,6 +144,7 @@ public static ServerMetricsBuilder newBuilder(ServerName sn) { private long reportTimestamp = EnvironmentEdgeManager.currentTime(); private long lastReportTimestamp = 0; private final List tasks = new ArrayList<>(); + private Map regionCachedInfo = new HashMap<>(); private ServerMetricsBuilder(ServerName serverName) { this.serverName = serverName; @@ -232,11 +235,16 @@ public ServerMetricsBuilder setTasks(List tasks) { return this; } + public ServerMetricsBuilder setRegionCachedInfo(Map value) { + this.regionCachedInfo = value; + return this; + } + public ServerMetrics build() { return new ServerMetricsImpl(serverName, versionNumber, version, requestCountPerSecond, requestCount, readRequestCount, writeRequestCount, usedHeapSize, maxHeapSize, infoServerPort, sources, sink, regionStatus, coprocessorNames, reportTimestamp, lastReportTimestamp, - userMetrics, tasks); + userMetrics, tasks, regionCachedInfo); } private static class ServerMetricsImpl implements ServerMetrics { @@ -259,13 +267,15 @@ private static class ServerMetricsImpl implements ServerMetrics { private final long lastReportTimestamp; private final Map userMetrics; private final List tasks; + private final Map regionCachedInfo; ServerMetricsImpl(ServerName serverName, int versionNumber, String version, long requestCountPerSecond, long requestCount, long readRequestsCount, long writeRequestsCount, Size usedHeapSize, Size maxHeapSize, int infoServerPort, List sources, ReplicationLoadSink sink, Map regionStatus, Set coprocessorNames, long reportTimestamp, - long lastReportTimestamp, Map userMetrics, List tasks) { + long lastReportTimestamp, Map userMetrics, List tasks, + Map regionCachedInfo) { this.serverName = Preconditions.checkNotNull(serverName); this.versionNumber = versionNumber; this.version = version; @@ -284,6 +294,7 @@ private static class ServerMetricsImpl implements ServerMetrics { this.reportTimestamp = reportTimestamp; this.lastReportTimestamp = lastReportTimestamp; this.tasks = tasks; + this.regionCachedInfo = regionCachedInfo; } @Override @@ -386,6 +397,11 @@ public List getTasks() { return tasks; } + @Override + public Map getRegionCachedInfo() { + return Collections.unmodifiableMap(regionCachedInfo); + } + @Override public String toString() { int storeCount = 0; diff --git a/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto b/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto deleted file mode 100644 index a024b94baa62..000000000000 --- a/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -syntax = "proto2"; - -package hbase.pb; - -option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated"; -option java_outer_classname = "PersistentPrefetchProtos"; -option java_generic_services = true; -option java_generate_equals_and_hash = true; -option optimize_for = SPEED; - - -message PrefetchedHfileName { - map prefetched_files = 1; -} - -message RegionFileSizeMap { - required string region_name = 1; - required uint64 region_prefetch_size = 2; -} diff --git a/hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto b/hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto index 28cc5a865c23..58fd3c8d2a5b 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto @@ -177,6 +177,12 @@ message RegionLoad { MAJOR = 2; MAJOR_AND_MINOR = 3; } + + /** Total region size in MB */ + optional uint32 region_size_MB = 28; + + /** Current region cache ratio on this server */ + optional float current_region_cached_ratio = 29; } message UserLoad { @@ -315,6 +321,11 @@ message ServerLoad { * The active monitored tasks */ repeated ServerTask tasks = 15; + + /** + * The metrics for region cached on this region server + */ + map regionCachedInfo = 16; } message LiveServerInfo { diff --git a/hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto b/hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto index ae1980fe51e6..80fc10ada786 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto @@ -32,7 +32,7 @@ message BucketCacheEntry { map deserializers = 4; required BackingMap backing_map = 5; optional bytes checksum = 6; - map prefetched_files = 7; + map cached_files = 7; } message BackingMap { @@ -81,3 +81,9 @@ enum BlockPriority { multi = 1; memory = 2; } + +message RegionFileSizeMap { + required string region_name = 1; + required uint64 region_cached_size = 2; +} + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index e480c9b5789b..91ebaaabd422 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -20,6 +20,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Optional; +import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; /** @@ -167,7 +168,7 @@ default boolean isMetaBlock(BlockType blockType) { /** * Returns the list of fully cached files */ - default Optional> getFullyCachedFiles() { + default Optional>> getFullyCachedFiles() { return Optional.empty(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index a421dfc83aa0..1e0fe7709292 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -22,6 +22,7 @@ import java.util.Optional; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -431,7 +432,7 @@ public BlockCache[] getBlockCaches() { * Returns the list of fully cached files */ @Override - public Optional> getFullyCachedFiles() { + public Optional>> getFullyCachedFiles() { return this.l2Cache.getFullyCachedFiles(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java index c9768bab0345..7cdbd5aff486 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java @@ -115,7 +115,8 @@ public void run() { block.release(); } } - bucketCacheOptional.ifPresent(bc -> bc.fileCacheCompleted(path.getName())); + final long fileSize = offset; + bucketCacheOptional.ifPresent(bc -> bc.fileCacheCompleted(path, fileSize)); } catch (IOException e) { // IOExceptions are probably due to region closes (relocation, etc.) if (LOG.isTraceEnabled()) { @@ -132,8 +133,7 @@ public void run() { LOG.warn("Close prefetch stream reader failed, path: " + path, e); } } - String regionName = getRegionName(path); - PrefetchExecutor.complete(regionName, path, offset); + PrefetchExecutor.complete(path); } } }); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java deleted file mode 100644 index df67e4429a2d..000000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io.hfile; - -import java.util.HashMap; -import java.util.Map; -import org.apache.hadoop.hbase.util.Pair; - -import org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos; - -final class PrefetchProtoUtils { - private PrefetchProtoUtils() { - } - - static PersistentPrefetchProtos.PrefetchedHfileName - toPB(Map> prefetchedHfileNames) { - Map tmpMap = new HashMap<>(); - prefetchedHfileNames.forEach((hFileName, regionPrefetchMap) -> { - PersistentPrefetchProtos.RegionFileSizeMap tmpRegionFileSize = - PersistentPrefetchProtos.RegionFileSizeMap.newBuilder() - .setRegionName(regionPrefetchMap.getFirst()) - .setRegionPrefetchSize(regionPrefetchMap.getSecond()).build(); - tmpMap.put(hFileName, tmpRegionFileSize); - }); - return PersistentPrefetchProtos.PrefetchedHfileName.newBuilder().putAllPrefetchedFiles(tmpMap) - .build(); - } - - static Map> - fromPB(Map prefetchHFileNames) { - Map> hFileMap = new HashMap<>(); - prefetchHFileNames.forEach((hFileName, regionPrefetchMap) -> { - hFileMap.put(hFileName, - new Pair<>(regionPrefetchMap.getRegionName(), regionPrefetchMap.getRegionPrefetchSize())); - }); - return hFileMap; - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index e3d740383085..eeed85ef0c3a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; @@ -51,6 +52,7 @@ import java.util.function.Consumer; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.TableName; @@ -79,6 +81,7 @@ import org.apache.hadoop.hbase.util.IdReadWriteLockStrongRef; import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool; import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool.ReferenceType; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -151,8 +154,17 @@ public class BucketCache implements BlockCache, HeapSize { private AtomicBoolean backingMapValidated = new AtomicBoolean(false); - /** Set of files for which prefetch is completed */ - final Map fullyCachedFiles = new ConcurrentHashMap<>(); + /** + * Map of hFile -> Region -> File size. This map is used to track all files completed prefetch, + * together with the region those belong to and the total cached size for the + * region.TestBlockEvictionOnRegionMovement + */ + final Map> fullyCachedFiles = new ConcurrentHashMap<>(); + /** + * Map of region -> total size of the region prefetched on this region server. This is the total + * size of hFiles for this region prefetched on this region server + */ + final Map regionCachedSizeMap = new ConcurrentHashMap<>(); private BucketCachePersister cachePersister; @@ -546,7 +558,6 @@ protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cach } else { this.blockNumber.increment(); this.heapSize.add(cachedItem.heapSize()); - blocksByHFile.add(cacheKey); } } @@ -636,14 +647,11 @@ void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decre cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); } if (ioEngine.isPersistent()) { - fullyCachedFiles.remove(cacheKey.getHfileName()); + removeFileFromPrefetch(cacheKey.getHfileName()); setCacheInconsistent(true); } } - public void fileCacheCompleted(String fileName) { - fullyCachedFiles.put(fileName, true); - } /** * Free the {{@link BucketEntry} actually,which could only be invoked when the @@ -1300,6 +1308,10 @@ public boolean isCachePersistent() { return ioEngine.isPersistent() && persistencePath != null; } + public Map getRegionCachedInfo() { + return Collections.unmodifiableMap(regionCachedSizeMap); + } + /** * @see #persistToFile() */ @@ -1337,6 +1349,29 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException { } } + private void updateRegionSizeMapWhileRetrievingFromFile() { + // Update the regionCachedSizeMap with the region size while restarting the region server + if (LOG.isDebugEnabled()) { + LOG.debug("Updating region size map after retrieving cached file list"); + dumpPrefetchList(); + } + regionCachedSizeMap.clear(); + fullyCachedFiles.forEach((hFileName, hFileSize) -> { + // Get the region name for each file + String regionEncodedName = hFileSize.getFirst(); + long cachedFileSize = hFileSize.getSecond(); + regionCachedSizeMap.merge(regionEncodedName, cachedFileSize, + (oldpf, fileSize) -> oldpf + fileSize); + }); + } + + private void dumpPrefetchList() { + for (Map.Entry> outerEntry : fullyCachedFiles.entrySet()) { + LOG.debug("Cached File Entry:<{},<{},{}>>", outerEntry.getKey(), + outerEntry.getValue().getFirst(), outerEntry.getValue().getSecond()); + } + } + /** * Create an input stream that deletes the file after reading it. Use in try-with-resources to * avoid this pattern where an exception thrown from a finally block may mask earlier exceptions: @@ -1401,7 +1436,7 @@ private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOExceptio backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), this::createRecycler); fullyCachedFiles.clear(); - fullyCachedFiles.putAll(proto.getPrefetchedFilesMap()); + fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap())); if (proto.hasChecksum()) { try { ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), @@ -1444,6 +1479,7 @@ private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOExceptio LOG.info("Persistent file is old format, it does not support verifying file integrity!"); backingMapValidated.set(true); } + updateRegionSizeMapWhileRetrievingFromFile(); verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); } @@ -1581,7 +1617,7 @@ protected String getAlgorithm() { */ @Override public int evictBlocksByHfileName(String hfileName) { - this.fullyCachedFiles.remove(hfileName); + removeFileFromPrefetch(hfileName); Set keySet = blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE), true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true); @@ -1966,7 +2002,7 @@ public AtomicBoolean getBackingMapValidated() { } @Override - public Optional> getFullyCachedFiles() { + public Optional>> getFullyCachedFiles() { return Optional.of(fullyCachedFiles); } @@ -1985,4 +2021,33 @@ public static Optional getBucketCacheFromCacheConfig(CacheConfig ca return Optional.empty(); } + private void removeFileFromPrefetch(String hfileName) { + // Update the regionPrefetchedSizeMap before removing the file from prefetchCompleted + if (fullyCachedFiles.containsKey(hfileName)) { + Pair regionEntry = fullyCachedFiles.get(hfileName); + String regionEncodedName = regionEntry.getFirst(); + long filePrefetchSize = regionEntry.getSecond(); + LOG.debug("Removing file {} for region {}", hfileName, regionEncodedName); + regionCachedSizeMap.computeIfPresent(regionEncodedName, (rn, pf) -> pf - filePrefetchSize); + // If all the blocks for a region are evicted from the cache, remove the entry for that region + if ( + regionCachedSizeMap.containsKey(regionEncodedName) + && regionCachedSizeMap.get(regionEncodedName) == 0 + ) { + regionCachedSizeMap.remove(regionEncodedName); + } + } + fullyCachedFiles.remove(hfileName); + } + + public void fileCacheCompleted(Path filePath, long size) { + Pair pair = new Pair<>(); + // sets the region name + String regionName = filePath.getParent().getParent().getName(); + pair.setFirst(regionName); + pair.setSecond(size); + fullyCachedFiles.put(filePath.getName(), pair); + regionCachedSizeMap.merge(regionName, size, (oldpf, fileSize) -> oldpf + fileSize); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java index 8830e5d3255a..7cc5050506e4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; @@ -28,6 +29,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; @@ -45,7 +47,7 @@ static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache) { .setIoClass(cache.ioEngine.getClass().getName()) .setMapClass(cache.backingMap.getClass().getName()) .putAllDeserializers(CacheableDeserializerIdManager.save()) - .putAllPrefetchedFiles(cache.fullyCachedFiles) + .putAllCachedFiles(toCachedPB(cache.fullyCachedFiles)) .setBackingMap(BucketProtoUtils.toPB(cache.backingMap)) .setChecksum(ByteString .copyFrom(((PersistentIOEngine) cache.ioEngine).calculateChecksum(cache.getAlgorithm()))) @@ -185,4 +187,26 @@ private static BlockType fromPb(BucketCacheProtos.BlockType blockType) { throw new Error("Unrecognized BlockType."); } } + + static Map + toCachedPB(Map> prefetchedHfileNames) { + Map tmpMap = new HashMap<>(); + prefetchedHfileNames.forEach((hfileName, regionPrefetchMap) -> { + BucketCacheProtos.RegionFileSizeMap tmpRegionFileSize = + BucketCacheProtos.RegionFileSizeMap.newBuilder().setRegionName(regionPrefetchMap.getFirst()) + .setRegionCachedSize(regionPrefetchMap.getSecond()).build(); + tmpMap.put(hfileName, tmpRegionFileSize); + }); + return tmpMap; + } + + static Map> + fromPB(Map prefetchHFileNames) { + Map> hfileMap = new HashMap<>(); + prefetchHFileNames.forEach((hfileName, regionPrefetchMap) -> { + hfileMap.put(hfileName, + new Pair<>(regionPrefetchMap.getRegionName(), regionPrefetchMap.getRegionCachedSize())); + }); + return hfileMap; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 85721a354977..3042a2eae451 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -65,10 +65,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; import java.util.stream.Collectors; import javax.management.MalformedObjectNameException; import javax.servlet.http.HttpServlet; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.mutable.MutableFloat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -108,7 +110,9 @@ import org.apache.hadoop.hbase.http.InfoServer; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; +import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.ipc.RpcClient; @@ -239,6 +243,9 @@ public class HRegionServer extends HBaseServerBase private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class); + int unitMB = 1024 * 1024; + int unitKB = 1024; + /** * For testing only! Set to true to skip notifying region assignment to master . */ @@ -1211,6 +1218,11 @@ private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, lon serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); } } + computeIfPersistentBucketCache(bc -> { + bc.getRegionCachedInfo().forEach((regionName, prefetchSize) -> { + serverLoad.putRegionCachedInfo(regionName, roundSize(prefetchSize, unitMB)); + }); + }); serverLoad.setReportStartTime(reportStartTime); serverLoad.setReportEndTime(reportEndTime); if (this.infoServer != null) { @@ -1510,6 +1522,15 @@ private static int roundSize(long sizeInByte, int sizeUnit) { } } + private void computeIfPersistentBucketCache(Consumer computation) { + if (blockCache instanceof CombinedBlockCache) { + BlockCache l2 = ((CombinedBlockCache) blockCache).getSecondLevelCache(); + if (l2 instanceof BucketCache && ((BucketCache) l2).isCachePersistent()) { + computation.accept((BucketCache) l2); + } + } + } + /** * @param r Region to get RegionLoad for. * @param regionLoadBldr the RegionLoad.Builder, can be null @@ -1519,6 +1540,7 @@ private static int roundSize(long sizeInByte, int sizeUnit) { RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, RegionSpecifier.Builder regionSpecifier) throws IOException { byte[] name = r.getRegionInfo().getRegionName(); + String regionEncodedName = r.getRegionInfo().getEncodedName(); int stores = 0; int storefiles = 0; int storeRefCount = 0; @@ -1531,6 +1553,7 @@ RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, long totalStaticBloomSize = 0L; long totalCompactingKVs = 0L; long currentCompactedKVs = 0L; + long totalRegionSize = 0L; List storeList = r.getStores(); stores += storeList.size(); for (HStore store : storeList) { @@ -1542,6 +1565,7 @@ RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, Math.max(maxCompactedStoreFileRefCount, currentMaxCompactedStoreFileRefCount); storeUncompressedSize += store.getStoreSizeUncompressed(); storefileSize += store.getStorefilesSize(); + totalRegionSize += store.getHFilesSize(); // TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB? storefileIndexSize += store.getStorefilesRootLevelIndexSize(); CompactionProgress progress = store.getCompactionProgress(); @@ -1554,9 +1578,6 @@ RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, totalStaticBloomSize += store.getTotalStaticBloomSize(); } - int unitMB = 1024 * 1024; - int unitKB = 1024; - int memstoreSizeMB = roundSize(r.getMemStoreDataSize(), unitMB); int storeUncompressedSizeMB = roundSize(storeUncompressedSize, unitMB); int storefileSizeMB = roundSize(storefileSize, unitMB); @@ -1564,6 +1585,16 @@ RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, int rootLevelIndexSizeKB = roundSize(rootLevelIndexSize, unitKB); int totalStaticIndexSizeKB = roundSize(totalStaticIndexSize, unitKB); int totalStaticBloomSizeKB = roundSize(totalStaticBloomSize, unitKB); + int regionSizeMB = roundSize(totalRegionSize, unitMB); + final MutableFloat currentRegionCachedRatio = new MutableFloat(0.0f); + computeIfPersistentBucketCache(bc -> { + if (bc.getRegionCachedInfo().containsKey(regionEncodedName)) { + currentRegionCachedRatio.setValue(regionSizeMB == 0 + ? 0.0f + : (float) roundSize(bc.getRegionCachedInfo().get(regionEncodedName), unitMB) + / regionSizeMB); + } + }); HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution(); float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname()); @@ -1594,7 +1625,8 @@ RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, .setDataLocalityForSsd(dataLocalityForSsd).setBlocksLocalWeight(blocksLocalWeight) .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight).setBlocksTotalWeight(blocksTotalWeight) .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState())) - .setLastMajorCompactionTs(r.getOldestHfileTs(true)); + .setLastMajorCompactionTs(r.getOldestHfileTs(true)).setRegionSizeMB(regionSizeMB) + .setCurrentRegionCachedRatio(currentRegionCachedRatio.floatValue()); r.setCompleteSequenceId(regionLoadBldr); return regionLoadBldr.build(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerMetrics.java index 8bcf3e600f88..8dfb8b1a4632 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerMetrics.java @@ -60,6 +60,10 @@ public void testRegionLoadAggregation() { metrics.getRegionMetrics().values().stream().mapToLong(v -> v.getCpRequestCount()).sum()); assertEquals(300, metrics.getRegionMetrics().values().stream() .mapToLong(v -> v.getFilteredReadRequestCount()).sum()); + assertEquals(2, metrics.getRegionMetrics().values().stream() + .mapToLong(v -> (long) v.getCurrentRegionCachedRatio()).count()); + assertEquals(150, metrics.getRegionMetrics().values().stream() + .mapToDouble(v -> v.getRegionSizeMB().get(Size.Unit.MEGABYTE)).sum(), 0); } @Test @@ -99,12 +103,14 @@ private ClusterStatusProtos.ServerLoad createServerLoadProto() { ClusterStatusProtos.RegionLoad.newBuilder().setRegionSpecifier(rSpecOne).setStores(10) .setStorefiles(101).setStoreUncompressedSizeMB(106).setStorefileSizeMB(520) .setFilteredReadRequestsCount(100).setStorefileIndexSizeKB(42).setRootIndexSizeKB(201) - .setReadRequestsCount(Integer.MAX_VALUE).setWriteRequestsCount(Integer.MAX_VALUE).build(); - ClusterStatusProtos.RegionLoad rlTwo = ClusterStatusProtos.RegionLoad.newBuilder() - .setRegionSpecifier(rSpecTwo).setStores(3).setStorefiles(13).setStoreUncompressedSizeMB(23) - .setStorefileSizeMB(300).setFilteredReadRequestsCount(200).setStorefileIndexSizeKB(40) - .setRootIndexSizeKB(303).setReadRequestsCount(Integer.MAX_VALUE) - .setWriteRequestsCount(Integer.MAX_VALUE).setCpRequestsCount(100).build(); + .setReadRequestsCount(Integer.MAX_VALUE).setWriteRequestsCount(Integer.MAX_VALUE) + .setRegionSizeMB(100).setCurrentRegionCachedRatio(0.9f).build(); + ClusterStatusProtos.RegionLoad rlTwo = + ClusterStatusProtos.RegionLoad.newBuilder().setRegionSpecifier(rSpecTwo).setStores(3) + .setStorefiles(13).setStoreUncompressedSizeMB(23).setStorefileSizeMB(300) + .setFilteredReadRequestsCount(200).setStorefileIndexSizeKB(40).setRootIndexSizeKB(303) + .setReadRequestsCount(Integer.MAX_VALUE).setWriteRequestsCount(Integer.MAX_VALUE) + .setCpRequestsCount(100).setRegionSizeMB(50).setCurrentRegionCachedRatio(1.0f).build(); ClusterStatusProtos.ServerLoad sl = ClusterStatusProtos.ServerLoad.newBuilder() .addRegionLoads(rlOne).addRegionLoads(rlTwo).build(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryChore.java index 594db4d7c303..31fcf9fd47f5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryChore.java @@ -400,6 +400,10 @@ public List getTasks() { return null; } + @Override + public Map getRegionCachedInfo() { + return new HashMap<>(); + } }; return serverMetrics; } @@ -541,6 +545,16 @@ public long getBlocksTotalWeight() { public CompactionState getCompactionState() { return null; } + + @Override + public Size getRegionSizeMB() { + return null; + } + + @Override + public float getCurrentRegionCachedRatio() { + return 0.0f; + } }; return regionMetrics; } From 07a44b2319331e0344ae031eeccb347ac11e8c61 Mon Sep 17 00:00:00 2001 From: Rahul Agarkar Date: Tue, 19 Sep 2023 20:30:01 +0530 Subject: [PATCH 3/4] HBASE-27999 Implement cache prefetch aware load balancer (#5376) Signed-off-by: Wellington Chevreuil --- .../master/balancer/BalancerClusterState.java | 156 +++++- .../master/balancer/BalancerRegionLoad.java | 12 + .../master/balancer/BaseLoadBalancer.java | 3 +- .../balancer/CacheAwareLoadBalancer.java | 479 ++++++++++++++++++ .../balancer/StochasticLoadBalancer.java | 54 +- .../master/balancer/BalancerTestBase.java | 14 + .../balancer/TestStochasticLoadBalancer.java | 4 + .../org/apache/hadoop/hbase/HConstants.java | 12 + .../hbase/io/hfile/BlockCacheFactory.java | 13 +- .../hbase/io/hfile/bucket/BucketCache.java | 3 +- .../balancer/TestCacheAwareLoadBalancer.java | 397 +++++++++++++++ ...stCacheAwareLoadBalancerCostFunctions.java | 316 ++++++++++++ ...rWithStochasticLoadBalancerAsInternal.java | 2 + 13 files changed, 1429 insertions(+), 36 deletions(-) create mode 100644 hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancerCostFunctions.java diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java index a7ae8b4d1a5a..4b3809c107cb 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,6 +115,12 @@ class BalancerClusterState { private float[][] rackLocalities; // Maps localityType -> region -> [server|rack]Index with highest locality private int[][] regionsToMostLocalEntities; + // Maps region -> serverIndex -> regionCacheRatio of a region on a server + private Map, Float> regionIndexServerIndexRegionCachedRatio; + // Maps regionIndex -> serverIndex with best region cache ratio + private int[] regionServerIndexWithBestRegionCachedRatio; + // Maps regionName -> oldServerName -> cache ratio of the region on the old server + Map> regionCacheRatioOnOldServerMap; static class DefaultRackManager extends RackManager { @Override @@ -125,13 +132,20 @@ public String getRack(ServerName server) { BalancerClusterState(Map> clusterState, Map> loads, RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager) { - this(null, clusterState, loads, regionFinder, rackManager); + this(null, clusterState, loads, regionFinder, rackManager, null); + } + + protected BalancerClusterState(Map> clusterState, + Map> loads, RegionHDFSBlockLocationFinder regionFinder, + RackManager rackManager, Map> oldRegionServerRegionCacheRatio) { + this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerRegionCacheRatio); } @SuppressWarnings("unchecked") BalancerClusterState(Collection unassignedRegions, Map> clusterState, Map> loads, - RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager) { + RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager, + Map> oldRegionServerRegionCacheRatio) { if (unassignedRegions == null) { unassignedRegions = Collections.emptyList(); } @@ -145,6 +159,8 @@ public String getRack(ServerName server) { tables = new ArrayList<>(); this.rackManager = rackManager != null ? rackManager : new DefaultRackManager(); + this.regionCacheRatioOnOldServerMap = oldRegionServerRegionCacheRatio; + numRegions = 0; List> serversPerHostList = new ArrayList<>(); @@ -541,6 +557,142 @@ private void computeCachedLocalities() { } + /** + * Returns the size of hFiles from the most recent RegionLoad for region + */ + public int getTotalRegionHFileSizeMB(int region) { + Deque load = regionLoads[region]; + if (load == null) { + // This means, that the region has no actual data on disk + return 0; + } + return regionLoads[region].getLast().getRegionSizeMB(); + } + + /** + * Returns the weighted cache ratio of a region on the given region server + */ + public float getOrComputeWeightedRegionCacheRatio(int region, int server) { + return getTotalRegionHFileSizeMB(region) * getOrComputeRegionCacheRatio(region, server); + } + + /** + * Returns the amount by which a region is cached on a given region server. If the region is not + * currently hosted on the given region server, then find out if it was previously hosted there + * and return the old cache ratio. + */ + protected float getRegionCacheRatioOnRegionServer(int region, int regionServerIndex) { + float regionCacheRatio = 0.0f; + + // Get the current region cache ratio if the region is hosted on the server regionServerIndex + for (int regionIndex : regionsPerServer[regionServerIndex]) { + if (region != regionIndex) { + continue; + } + + Deque regionLoadList = regionLoads[regionIndex]; + + // The region is currently hosted on this region server. Get the region cache ratio for this + // region on this server + regionCacheRatio = + regionLoadList == null ? 0.0f : regionLoadList.getLast().getCurrentRegionCacheRatio(); + + return regionCacheRatio; + } + + // Region is not currently hosted on this server. Check if the region was cached on this + // server earlier. This can happen when the server was shutdown and the cache was persisted. + // Search using the region name and server name and not the index id and server id as these ids + // may change when a server is marked as dead or a new server is added. + String regionEncodedName = regions[region].getEncodedName(); + ServerName serverName = servers[regionServerIndex]; + if ( + regionCacheRatioOnOldServerMap != null + && regionCacheRatioOnOldServerMap.containsKey(regionEncodedName) + ) { + Pair cacheRatioOfRegionOnServer = + regionCacheRatioOnOldServerMap.get(regionEncodedName); + if (ServerName.isSameAddress(cacheRatioOfRegionOnServer.getFirst(), serverName)) { + regionCacheRatio = cacheRatioOfRegionOnServer.getSecond(); + if (LOG.isDebugEnabled()) { + LOG.debug("Old cache ratio found for region {} on server {}: {}", regionEncodedName, + serverName, regionCacheRatio); + } + } + } + return regionCacheRatio; + } + + /** + * Populate the maps containing information about how much a region is cached on a region server. + */ + private void computeRegionServerRegionCacheRatio() { + regionIndexServerIndexRegionCachedRatio = new HashMap<>(); + regionServerIndexWithBestRegionCachedRatio = new int[numRegions]; + + for (int region = 0; region < numRegions; region++) { + float bestRegionCacheRatio = 0.0f; + int serverWithBestRegionCacheRatio = 0; + for (int server = 0; server < numServers; server++) { + float regionCacheRatio = getRegionCacheRatioOnRegionServer(region, server); + if (regionCacheRatio > 0.0f || server == regionIndexToServerIndex[region]) { + // A region with cache ratio 0 on a server means nothing. Hence, just make a note of + // cache ratio only if the cache ratio is greater than 0. + Pair regionServerPair = new Pair<>(region, server); + regionIndexServerIndexRegionCachedRatio.put(regionServerPair, regionCacheRatio); + } + if (regionCacheRatio > bestRegionCacheRatio) { + serverWithBestRegionCacheRatio = server; + // If the server currently hosting the region has equal cache ratio to a historical + // server, consider the current server to keep hosting the region + bestRegionCacheRatio = regionCacheRatio; + } else if ( + regionCacheRatio == bestRegionCacheRatio && server == regionIndexToServerIndex[region] + ) { + // If two servers have same region cache ratio, then the server currently hosting the + // region + // should retain the region + serverWithBestRegionCacheRatio = server; + } + } + regionServerIndexWithBestRegionCachedRatio[region] = serverWithBestRegionCacheRatio; + Pair regionServerPair = + new Pair<>(region, regionIndexToServerIndex[region]); + float tempRegionCacheRatio = regionIndexServerIndexRegionCachedRatio.get(regionServerPair); + if (tempRegionCacheRatio > bestRegionCacheRatio) { + LOG.warn( + "INVALID CONDITION: region {} on server {} cache ratio {} is greater than the " + + "best region cache ratio {} on server {}", + regions[region].getEncodedName(), servers[regionIndexToServerIndex[region]], + tempRegionCacheRatio, bestRegionCacheRatio, servers[serverWithBestRegionCacheRatio]); + } + } + } + + protected float getOrComputeRegionCacheRatio(int region, int server) { + if ( + regionServerIndexWithBestRegionCachedRatio == null + || regionIndexServerIndexRegionCachedRatio.isEmpty() + ) { + computeRegionServerRegionCacheRatio(); + } + + Pair regionServerPair = new Pair<>(region, server); + return regionIndexServerIndexRegionCachedRatio.containsKey(regionServerPair) + ? regionIndexServerIndexRegionCachedRatio.get(regionServerPair) + : 0.0f; + } + + public int[] getOrComputeServerWithBestRegionCachedRatio() { + if ( + regionServerIndexWithBestRegionCachedRatio == null + || regionIndexServerIndexRegionCachedRatio.isEmpty() + ) { + computeRegionServerRegionCacheRatio(); + } + return regionServerIndexWithBestRegionCachedRatio; + } + /** * Maps region index to rack index */ diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerRegionLoad.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerRegionLoad.java index ffb36cb8ca1a..33d00e3de862 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerRegionLoad.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerRegionLoad.java @@ -34,6 +34,8 @@ class BalancerRegionLoad { private final long writeRequestsCount; private final int memStoreSizeMB; private final int storefileSizeMB; + private final int regionSizeMB; + private final float currentRegionPrefetchRatio; BalancerRegionLoad(RegionMetrics regionMetrics) { readRequestsCount = regionMetrics.getReadRequestCount(); @@ -41,6 +43,8 @@ class BalancerRegionLoad { writeRequestsCount = regionMetrics.getWriteRequestCount(); memStoreSizeMB = (int) regionMetrics.getMemStoreSize().get(Size.Unit.MEGABYTE); storefileSizeMB = (int) regionMetrics.getStoreFileSize().get(Size.Unit.MEGABYTE); + regionSizeMB = (int) regionMetrics.getRegionSizeMB().get(Size.Unit.MEGABYTE); + currentRegionPrefetchRatio = regionMetrics.getCurrentRegionCachedRatio(); } public long getReadRequestsCount() { @@ -62,4 +66,12 @@ public int getMemStoreSizeMB() { public int getStorefileSizeMB() { return storefileSizeMB; } + + public int getRegionSizeMB() { + return regionSizeMB; + } + + public float getCurrentRegionCacheRatio() { + return currentRegionPrefetchRatio; + } } diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index a4560cc595a2..54516868a0a0 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -232,7 +232,8 @@ private BalancerClusterState createCluster(List servers, clusterState.put(server, Collections.emptyList()); } } - return new BalancerClusterState(regions, clusterState, null, this.regionFinder, rackManager); + return new BalancerClusterState(regions, clusterState, null, this.regionFinder, rackManager, + null); } private List findIdleServers(List servers) { diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java new file mode 100644 index 000000000000..d73769a3971b --- /dev/null +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java @@ -0,0 +1,479 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +/** An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that assigns regions + * based on the amount they are cached on a given server. A region can move across the region + * servers whenever a region server shuts down or crashes. The region server preserves the cache + * periodically and restores the cache when it is restarted. This balancer implements a mechanism + * where it maintains the amount by which a region is cached on a region server. During balancer + * run, a region plan is generated that takes into account this cache information and tries to + * move the regions so that the cache minimally impacted. + */ + +import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterMetrics; +import org.apache.hadoop.hbase.RegionMetrics; +import org.apache.hadoop.hbase.ServerMetrics; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.Size; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class CacheAwareLoadBalancer extends StochasticLoadBalancer { + private static final Logger LOG = LoggerFactory.getLogger(CacheAwareLoadBalancer.class); + + private Configuration configuration; + + public enum GeneratorFunctionType { + LOAD, + CACHE_RATIO + } + + @Override + public synchronized void loadConf(Configuration configuration) { + this.configuration = configuration; + this.costFunctions = new ArrayList<>(); + super.loadConf(configuration); + } + + @Override + protected List createCandidateGenerators() { + List candidateGenerators = new ArrayList<>(2); + candidateGenerators.add(GeneratorFunctionType.LOAD.ordinal(), + new CacheAwareSkewnessCandidateGenerator()); + candidateGenerators.add(GeneratorFunctionType.CACHE_RATIO.ordinal(), + new CacheAwareCandidateGenerator()); + return candidateGenerators; + } + + @Override + protected List createCostFunctions(Configuration configuration) { + List costFunctions = new ArrayList<>(); + addCostFunction(costFunctions, new CacheAwareRegionSkewnessCostFunction(configuration)); + addCostFunction(costFunctions, new CacheAwareCostFunction(configuration)); + return costFunctions; + } + + private void addCostFunction(List costFunctions, CostFunction costFunction) { + if (costFunction.getMultiplier() > 0) { + costFunctions.add(costFunction); + } + } + + @Override + public void updateClusterMetrics(ClusterMetrics clusterMetrics) { + this.clusterStatus = clusterMetrics; + updateRegionLoad(); + } + + /** + * Collect the amount of region cached for all the regions from all the active region servers. + */ + private void updateRegionLoad() { + loads = new HashMap<>(); + regionCacheRatioOnOldServerMap = new HashMap<>(); + Map> regionCacheRatioOnCurrentServerMap = new HashMap<>(); + + // Build current region cache statistics + clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> { + // Create a map of region and the server where it is currently hosted + sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> { + String regionEncodedName = RegionInfo.encodeRegionName(regionName); + + Deque rload = new ArrayDeque<>(); + + // Get the total size of the hFiles in this region + int regionSizeMB = (int) rm.getRegionSizeMB().get(Size.Unit.MEGABYTE); + + rload.add(new BalancerRegionLoad(rm)); + // Maintain a map of region and it's total size. This is needed to calculate the cache + // ratios for the regions cached on old region servers + regionCacheRatioOnCurrentServerMap.put(regionEncodedName, new Pair<>(sn, regionSizeMB)); + loads.put(regionEncodedName, rload); + }); + }); + + // Build cache statistics for the regions hosted previously on old region servers + clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> { + // Find if a region was previously hosted on a server other than the one it is currently + // hosted on. + sm.getRegionCachedInfo().forEach((String regionEncodedName, Integer regionSizeInCache) -> { + // If the region is found in regionCacheRatioOnCurrentServerMap, it is currently hosted on + // this server + if (regionCacheRatioOnCurrentServerMap.containsKey(regionEncodedName)) { + ServerName currentServer = + regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getFirst(); + if (!ServerName.isSameAddress(currentServer, sn)) { + int regionSizeMB = + regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getSecond(); + float regionCacheRatioOnOldServer = + regionSizeMB == 0 ? 0.0f : (float) regionSizeInCache / regionSizeMB; + regionCacheRatioOnOldServerMap.put(regionEncodedName, + new Pair<>(sn, regionCacheRatioOnOldServer)); + } + } + }); + }); + } + + private RegionInfo getRegionInfoByEncodedName(BalancerClusterState cluster, String regionName) { + Optional regionInfoOptional = + Arrays.stream(cluster.regions).filter((RegionInfo ri) -> { + return regionName.equals(ri.getEncodedName()); + }).findFirst(); + + if (regionInfoOptional.isPresent()) { + return regionInfoOptional.get(); + } + return null; + } + + private class CacheAwareCandidateGenerator extends CandidateGenerator { + @Override + protected BalanceAction generate(BalancerClusterState cluster) { + // Move the regions to the servers they were previously hosted on based on the cache ratio + if ( + !regionCacheRatioOnOldServerMap.isEmpty() + && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext() + ) { + Map.Entry> regionCacheRatioServerMap = + regionCacheRatioOnOldServerMap.entrySet().iterator().next(); + // Get the server where this region was previously hosted + String regionEncodedName = regionCacheRatioServerMap.getKey(); + RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName); + if (regionInfo == null) { + LOG.warn("Region {} not found", regionEncodedName); + regionCacheRatioOnOldServerMap.remove(regionEncodedName); + return BalanceAction.NULL_ACTION; + } + if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) { + regionCacheRatioOnOldServerMap.remove(regionEncodedName); + return BalanceAction.NULL_ACTION; + } + int regionIndex = cluster.regionsToIndex.get(regionInfo); + int oldServerIndex = cluster.serversToIndex + .get(regionCacheRatioOnOldServerMap.get(regionEncodedName).getFirst().getAddress()); + if (oldServerIndex < 0) { + LOG.warn("Server previously hosting region {} not found", regionEncodedName); + regionCacheRatioOnOldServerMap.remove(regionEncodedName); + return BalanceAction.NULL_ACTION; + } + + float oldRegionCacheRatio = + cluster.getOrComputeRegionCacheRatio(regionIndex, oldServerIndex); + int currentServerIndex = cluster.regionIndexToServerIndex[regionIndex]; + float currentRegionCacheRatio = + cluster.getOrComputeRegionCacheRatio(regionIndex, currentServerIndex); + + BalanceAction action = generatePlan(cluster, regionIndex, currentServerIndex, + currentRegionCacheRatio, oldServerIndex, oldRegionCacheRatio); + regionCacheRatioOnOldServerMap.remove(regionEncodedName); + return action; + } + return BalanceAction.NULL_ACTION; + } + + private BalanceAction generatePlan(BalancerClusterState cluster, int regionIndex, + int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex, + float cacheRatioOnOldServer) { + return moveRegionToOldServer(cluster, regionIndex, currentServerIndex, + cacheRatioOnCurrentServer, oldServerIndex, cacheRatioOnOldServer) + ? getAction(currentServerIndex, regionIndex, oldServerIndex, -1) + : BalanceAction.NULL_ACTION; + } + + private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIndex, + int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex, + float cacheRatioOnOldServer) { + // Find if the region has already moved by comparing the current server index with the + // current server index. This can happen when other candidate generator has moved the region + if (currentServerIndex < 0 || oldServerIndex < 0) { + return false; + } + + float cacheRatioDiffThreshold = 0.6f; + + // Conditions for moving the region + + // If the region is fully cached on the old server, move the region back + if (cacheRatioOnOldServer == 1.0f) { + if (LOG.isDebugEnabled()) { + LOG.debug("Region {} moved to the old server {} as it is fully cached there", + cluster.regions[regionIndex].getEncodedName(), cluster.servers[oldServerIndex]); + } + return true; + } + + // Move the region back to the old server if it is cached equally on both the servers + if (cacheRatioOnCurrentServer == cacheRatioOnOldServer) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Region {} moved from {} to {} as the region is cached {} equally on both servers", + cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex], + cluster.servers[oldServerIndex], cacheRatioOnCurrentServer); + } + return true; + } + + // If the region is not fully cached on either of the servers, move the region back to the + // old server if the region cache ratio on the current server is still much less than the old + // server + if ( + cacheRatioOnOldServer > 0.0f + && cacheRatioOnCurrentServer / cacheRatioOnOldServer < cacheRatioDiffThreshold + ) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Region {} moved from {} to {} as region cache ratio {} is better than the current " + + "cache ratio {}", + cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex], + cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, cacheRatioOnOldServer); + } + return true; + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Region {} not moved from {} to {} with current cache ratio {} and old cache ratio {}", + cluster.regions[regionIndex], cluster.servers[currentServerIndex], + cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, cacheRatioOnOldServer); + } + return false; + } + } + + private class CacheAwareSkewnessCandidateGenerator extends LoadCandidateGenerator { + @Override + BalanceAction pickRandomRegions(BalancerClusterState cluster, int thisServer, int otherServer) { + // First move all the regions which were hosted previously on some other server back to their + // old servers + if ( + !regionCacheRatioOnOldServerMap.isEmpty() + && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext() + ) { + // Get the first region index in the historical cache ratio list + Map.Entry> regionEntry = + regionCacheRatioOnOldServerMap.entrySet().iterator().next(); + String regionEncodedName = regionEntry.getKey(); + + RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName); + if (regionInfo == null) { + LOG.warn("Region {} does not exist", regionEncodedName); + regionCacheRatioOnOldServerMap.remove(regionEncodedName); + return BalanceAction.NULL_ACTION; + } + if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) { + regionCacheRatioOnOldServerMap.remove(regionEncodedName); + return BalanceAction.NULL_ACTION; + } + + int regionIndex = cluster.regionsToIndex.get(regionInfo); + + // Get the current host name for this region + thisServer = cluster.regionIndexToServerIndex[regionIndex]; + + // Get the old server index + otherServer = cluster.serversToIndex.get(regionEntry.getValue().getFirst().getAddress()); + + regionCacheRatioOnOldServerMap.remove(regionEncodedName); + + if (otherServer < 0) { + // The old server has been moved to other host and hence, the region cannot be moved back + // to the old server + if (LOG.isDebugEnabled()) { + LOG.debug( + "CacheAwareSkewnessCandidateGenerator: Region {} not moved to the old " + + "server {} as the server does not exist", + regionEncodedName, regionEntry.getValue().getFirst().getHostname()); + } + return BalanceAction.NULL_ACTION; + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it " + + "was hosted their earlier", + regionEncodedName, cluster.servers[thisServer].getHostname(), + cluster.servers[otherServer].getHostname()); + } + + return getAction(thisServer, regionIndex, otherServer, -1); + } + + if (thisServer < 0 || otherServer < 0) { + return BalanceAction.NULL_ACTION; + } + + int regionIndexToMove = pickLeastCachedRegion(cluster, thisServer); + if (regionIndexToMove < 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("CacheAwareSkewnessCandidateGenerator: No region found for movement"); + } + return BalanceAction.NULL_ACTION; + } + if (LOG.isDebugEnabled()) { + LOG.debug( + "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it is " + + "least cached on current server", + cluster.regions[regionIndexToMove].getEncodedName(), + cluster.servers[thisServer].getHostname(), cluster.servers[otherServer].getHostname()); + } + return getAction(thisServer, regionIndexToMove, otherServer, -1); + } + + private int pickLeastCachedRegion(BalancerClusterState cluster, int thisServer) { + float minCacheRatio = Float.MAX_VALUE; + int leastCachedRegion = -1; + for (int i = 0; i < cluster.regionsPerServer[thisServer].length; i++) { + int regionIndex = cluster.regionsPerServer[thisServer][i]; + + float cacheRatioOnCurrentServer = + cluster.getOrComputeRegionCacheRatio(regionIndex, thisServer); + if (cacheRatioOnCurrentServer < minCacheRatio) { + minCacheRatio = cacheRatioOnCurrentServer; + leastCachedRegion = regionIndex; + } + } + return leastCachedRegion; + } + } + + static class CacheAwareRegionSkewnessCostFunction extends CostFunction { + static final String REGION_COUNT_SKEW_COST_KEY = + "hbase.master.balancer.stochastic.regionCountCost"; + static final float DEFAULT_REGION_COUNT_SKEW_COST = 20; + private final DoubleArrayCost cost = new DoubleArrayCost(); + + CacheAwareRegionSkewnessCostFunction(Configuration conf) { + // Load multiplier should be the greatest as it is the most general way to balance data. + this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST)); + } + + @Override + void prepare(BalancerClusterState cluster) { + super.prepare(cluster); + cost.prepare(cluster.numServers); + cost.applyCostsChange(costs -> { + for (int i = 0; i < cluster.numServers; i++) { + costs[i] = cluster.regionsPerServer[i].length; + } + }); + } + + @Override + protected double cost() { + return cost.cost(); + } + + @Override + protected void regionMoved(int region, int oldServer, int newServer) { + cost.applyCostsChange(costs -> { + costs[oldServer] = cluster.regionsPerServer[oldServer].length; + costs[newServer] = cluster.regionsPerServer[newServer].length; + }); + } + + public final void updateWeight(double[] weights) { + weights[GeneratorFunctionType.LOAD.ordinal()] += cost(); + } + } + + static class CacheAwareCostFunction extends CostFunction { + private static final String CACHE_COST_KEY = "hbase.master.balancer.stochastic.cacheCost"; + private double cacheRatio; + private double bestCacheRatio; + + private static final float DEFAULT_CACHE_COST = 20; + + CacheAwareCostFunction(Configuration conf) { + boolean isPersistentCache = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY) != null; + // Disable the CacheAwareCostFunction if the cached file list persistence is not enabled + this.setMultiplier( + !isPersistentCache ? 0.0f : conf.getFloat(CACHE_COST_KEY, DEFAULT_CACHE_COST)); + bestCacheRatio = 0.0; + cacheRatio = 0.0; + } + + @Override + void prepare(BalancerClusterState cluster) { + super.prepare(cluster); + cacheRatio = 0.0; + bestCacheRatio = 0.0; + + for (int region = 0; region < cluster.numRegions; region++) { + cacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region, + cluster.regionIndexToServerIndex[region]); + bestCacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region, + getServerWithBestCacheRatioForRegion(region)); + } + + cacheRatio = bestCacheRatio == 0 ? 1.0 : cacheRatio / bestCacheRatio; + if (LOG.isDebugEnabled()) { + LOG.debug("CacheAwareCostFunction: Cost: {}", 1 - cacheRatio); + } + } + + @Override + protected double cost() { + return scale(0, 1, 1 - cacheRatio); + } + + @Override + protected void regionMoved(int region, int oldServer, int newServer) { + double regionCacheRatioOnOldServer = + cluster.getOrComputeWeightedRegionCacheRatio(region, oldServer); + double regionCacheRatioOnNewServer = + cluster.getOrComputeWeightedRegionCacheRatio(region, newServer); + double cacheRatioDiff = regionCacheRatioOnNewServer - regionCacheRatioOnOldServer; + double normalizedDelta = bestCacheRatio == 0.0 ? 0.0 : cacheRatioDiff / bestCacheRatio; + cacheRatio += normalizedDelta; + if (LOG.isDebugEnabled() && (cacheRatio < 0.0 || cacheRatio > 1.0)) { + LOG.debug( + "CacheAwareCostFunction:regionMoved:region:{}:from:{}:to:{}:regionCacheRatioOnOldServer:{}:" + + "regionCacheRatioOnNewServer:{}:bestRegionCacheRatio:{}:cacheRatio:{}", + cluster.regions[region].getEncodedName(), cluster.servers[oldServer].getHostname(), + cluster.servers[newServer].getHostname(), regionCacheRatioOnOldServer, + regionCacheRatioOnNewServer, bestCacheRatio, cacheRatio); + } + } + + private int getServerWithBestCacheRatioForRegion(int region) { + return cluster.getOrComputeServerWithBestRegionCachedRatio()[region]; + } + + @Override + public final void updateWeight(double[] weights) { + weights[GeneratorFunctionType.CACHE_RATIO.ordinal()] += cost(); + } + } +} diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index edf049e8a718..e5cd5446c5c8 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -136,8 +137,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { private long maxRunningTime = DEFAULT_MAX_RUNNING_TIME; private int numRegionLoadsToRemember = DEFAULT_KEEP_REGION_LOADS; private float minCostNeedBalance = DEFAULT_MIN_COST_NEED_BALANCE; + Map> regionCacheRatioOnOldServerMap = new HashMap<>(); - private List costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC + protected List costFunctions; // FindBugs: Wants this protected; + // IS2_INCONSISTENT_SYNC // To save currently configed sum of multiplier. Defaulted at 1 for cases that carry high cost private float sumMultiplier; // to save and report costs to JMX @@ -224,6 +227,24 @@ protected List createCandidateGenerators() { return candidateGenerators; } + protected List createCostFunctions(Configuration conf) { + List costFunctions = new ArrayList<>(); + addCostFunction(costFunctions, new RegionCountSkewCostFunction(conf)); + addCostFunction(costFunctions, new PrimaryRegionCountSkewCostFunction(conf)); + addCostFunction(costFunctions, new MoveCostFunction(conf, provider)); + addCostFunction(costFunctions, localityCost); + addCostFunction(costFunctions, rackLocalityCost); + addCostFunction(costFunctions, new TableSkewCostFunction(conf)); + addCostFunction(costFunctions, regionReplicaHostCostFunction); + addCostFunction(costFunctions, regionReplicaRackCostFunction); + addCostFunction(costFunctions, new ReadRequestCostFunction(conf)); + addCostFunction(costFunctions, new CPRequestCostFunction(conf)); + addCostFunction(costFunctions, new WriteRequestCostFunction(conf)); + addCostFunction(costFunctions, new MemStoreSizeCostFunction(conf)); + addCostFunction(costFunctions, new StoreFileCostFunction(conf)); + return costFunctions; + } + @Override protected void loadConf(Configuration conf) { super.loadConf(conf); @@ -242,20 +263,7 @@ protected void loadConf(Configuration conf) { regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf); regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf); - costFunctions = new ArrayList<>(); - addCostFunction(new RegionCountSkewCostFunction(conf)); - addCostFunction(new PrimaryRegionCountSkewCostFunction(conf)); - addCostFunction(new MoveCostFunction(conf, provider)); - addCostFunction(localityCost); - addCostFunction(rackLocalityCost); - addCostFunction(new TableSkewCostFunction(conf)); - addCostFunction(regionReplicaHostCostFunction); - addCostFunction(regionReplicaRackCostFunction); - addCostFunction(new ReadRequestCostFunction(conf)); - addCostFunction(new CPRequestCostFunction(conf)); - addCostFunction(new WriteRequestCostFunction(conf)); - addCostFunction(new MemStoreSizeCostFunction(conf)); - addCostFunction(new StoreFileCostFunction(conf)); + this.costFunctions = createCostFunctions(conf); loadCustomCostFunctions(conf); curFunctionCosts = new double[costFunctions.size()]; @@ -459,8 +467,8 @@ protected List balanceTable(TableName tableName, // The clusterState that is given to this method contains the state // of all the regions in the table(s) (that's true today) // Keep track of servers to iterate through them. - BalancerClusterState cluster = - new BalancerClusterState(loadOfOneTable, loads, finder, rackManager); + BalancerClusterState cluster = new BalancerClusterState(loadOfOneTable, loads, finder, + rackManager, regionCacheRatioOnOldServerMap); long startTime = EnvironmentEdgeManager.currentTime(); @@ -568,7 +576,7 @@ protected List balanceTable(TableName tableName, return null; } - private void sendRejectionReasonToRingBuffer(Supplier reason, + protected void sendRejectionReasonToRingBuffer(Supplier reason, List costFunctions) { provider.recordBalancerRejection(() -> { BalancerRejection.Builder builder = new BalancerRejection.Builder().setReason(reason.get()); @@ -627,14 +635,14 @@ private void updateStochasticCosts(TableName tableName, double overall, double[] } } - private void addCostFunction(CostFunction costFunction) { + private void addCostFunction(List costFunctions, CostFunction costFunction) { float multiplier = costFunction.getMultiplier(); if (multiplier > 0) { costFunctions.add(costFunction); } } - private String functionCost() { + protected String functionCost() { StringBuilder builder = new StringBuilder(); for (CostFunction c : costFunctions) { builder.append(c.getClass().getSimpleName()); @@ -655,6 +663,12 @@ private String functionCost() { return builder.toString(); } + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") + List getCostFunctions() { + return costFunctions; + } + private String totalCostsPerFunc() { StringBuilder builder = new StringBuilder(); for (CostFunction c : costFunctions) { diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java index 9ea1c94d1e09..4a996e7796f5 100644 --- a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java +++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NavigableSet; @@ -376,6 +377,19 @@ protected TreeMap> mockClusterServers(int[] mockClu return servers; } + protected Map> mockClusterServersUnsorted(int[] mockCluster, + int numTables) { + int numServers = mockCluster.length; + Map> servers = new LinkedHashMap<>(); + for (int i = 0; i < numServers; i++) { + int numRegions = mockCluster[i]; + ServerAndLoad sal = randomServer(0); + List regions = randomRegions(numRegions, numTables); + servers.put(sal.getServerName(), regions); + } + return servers; + } + protected TreeMap> mockUniformClusterServers(int[] mockCluster) { int numServers = mockCluster.length; TreeMap> servers = new TreeMap<>(); diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java index 21f3a3b66c9a..cc16cfe2ec83 100644 --- a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java +++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java @@ -139,6 +139,8 @@ private ServerMetrics mockServerMetricsWithCpRequests(List regionsOn when(rl.getWriteRequestCount()).thenReturn(0L); when(rl.getMemStoreSize()).thenReturn(Size.ZERO); when(rl.getStoreFileSize()).thenReturn(Size.ZERO); + when(rl.getRegionSizeMB()).thenReturn(Size.ZERO); + when(rl.getCurrentRegionCachedRatio()).thenReturn(0.0f); regionLoadMap.put(info.getRegionName(), rl); } when(serverMetrics.getRegionMetrics()).thenReturn(regionLoadMap); @@ -213,6 +215,8 @@ public void testKeepRegionLoad() throws Exception { when(rl.getWriteRequestCount()).thenReturn(0L); when(rl.getMemStoreSize()).thenReturn(Size.ZERO); when(rl.getStoreFileSize()).thenReturn(new Size(i, Size.Unit.MEGABYTE)); + when(rl.getRegionSizeMB()).thenReturn(Size.ZERO); + when(rl.getCurrentRegionCachedRatio()).thenReturn(0.0f); Map regionLoadMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); regionLoadMap.put(Bytes.toBytes(REGION_KEY), rl); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 12479979b2ba..2aa9ecf69ec4 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1335,6 +1335,18 @@ public enum OperationStatusCode { */ public static final String BUCKET_CACHE_SIZE_KEY = "hbase.bucketcache.size"; + /** + * If the chosen ioengine can persist its state across restarts, the path to the file to persist + * to. This file is NOT the data file. It is a file into which we will serialize the map of what + * is in the data file. For example, if you pass the following argument as + * BUCKET_CACHE_IOENGINE_KEY ("hbase.bucketcache.ioengine"), + * file:/tmp/bucketcache.data , then we will write the bucketcache data to the file + * /tmp/bucketcache.data but the metadata on where the data is in the supplied file + * is an in-memory map that needs to be persisted across restarts. Where to store this in-memory + * state is what you supply here: e.g. /tmp/bucketcache.map. + */ + public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = "hbase.bucketcache.persistent.path"; + /** * HConstants for fast fail on the client side follow */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java index 38a296aad523..6956d584d92a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; +import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY; import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; import java.io.IOException; @@ -47,18 +48,6 @@ public final class BlockCacheFactory { public static final String BLOCKCACHE_POLICY_KEY = "hfile.block.cache.policy"; public static final String BLOCKCACHE_POLICY_DEFAULT = "LRU"; - /** - * If the chosen ioengine can persist its state across restarts, the path to the file to persist - * to. This file is NOT the data file. It is a file into which we will serialize the map of what - * is in the data file. For example, if you pass the following argument as - * BUCKET_CACHE_IOENGINE_KEY ("hbase.bucketcache.ioengine"), - * file:/tmp/bucketcache.data , then we will write the bucketcache data to the file - * /tmp/bucketcache.data but the metadata on where the data is in the supplied file - * is an in-memory map that needs to be persisted across restarts. Where to store this in-memory - * state is what you supply here: e.g. /tmp/bucketcache.map. - */ - public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = "hbase.bucketcache.persistent.path"; - public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads"; public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = "hbase.bucketcache.writer.queuelength"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index eeed85ef0c3a..f321d034bc6b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -347,6 +347,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck fullyCachedFiles.clear(); backingMapValidated.set(true); bucketAllocator = new BucketAllocator(capacity, bucketSizes); + regionCachedSizeMap.clear(); } } else { bucketAllocator = new BucketAllocator(capacity, bucketSizes); @@ -652,7 +653,6 @@ void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decre } } - /** * Free the {{@link BucketEntry} actually,which could only be invoked when the * {@link BucketEntry#refCnt} becoming 0. @@ -1518,6 +1518,7 @@ private void disableCache() { // If persistent ioengine and a path, we will serialize out the backingMap. this.backingMap.clear(); this.fullyCachedFiles.clear(); + this.regionCachedSizeMap.clear(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java new file mode 100644 index 000000000000..3ecd8dc7cfd0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterMetrics; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.RegionMetrics; +import org.apache.hadoop.hbase.ServerMetrics; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.Size; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + +@Category({ LargeTests.class }) +public class TestCacheAwareLoadBalancer extends BalancerTestBase { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCacheAwareLoadBalancer.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestCacheAwareLoadBalancer.class); + + private static CacheAwareLoadBalancer loadBalancer; + + static List servers; + + static List tableDescs; + + static Map tableMap = new HashMap<>(); + + static TableName[] tables = new TableName[] { TableName.valueOf("dt1"), TableName.valueOf("dt2"), + TableName.valueOf("dt3"), TableName.valueOf("dt4") }; + + private static List generateServers(int numServers) { + List servers = new ArrayList<>(numServers); + Random rand = ThreadLocalRandom.current(); + for (int i = 0; i < numServers; i++) { + String host = "server" + rand.nextInt(100000); + int port = rand.nextInt(60000); + servers.add(ServerName.valueOf(host, port, -1)); + } + return servers; + } + + private static List constructTableDesc(boolean hasBogusTable) { + List tds = Lists.newArrayList(); + for (int i = 0; i < tables.length; i++) { + TableDescriptor htd = TableDescriptorBuilder.newBuilder(tables[i]).build(); + tds.add(htd); + } + return tds; + } + + private ServerMetrics mockServerMetricsWithRegionCacheInfo(ServerName server, + List regionsOnServer, float currentCacheRatio, List oldRegionCacheInfo, + int oldRegionCachedSize, int regionSize) { + ServerMetrics serverMetrics = mock(ServerMetrics.class); + Map regionLoadMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (RegionInfo info : regionsOnServer) { + RegionMetrics rl = mock(RegionMetrics.class); + when(rl.getReadRequestCount()).thenReturn(0L); + when(rl.getWriteRequestCount()).thenReturn(0L); + when(rl.getMemStoreSize()).thenReturn(Size.ZERO); + when(rl.getStoreFileSize()).thenReturn(Size.ZERO); + when(rl.getCurrentRegionCachedRatio()).thenReturn(currentCacheRatio); + when(rl.getRegionSizeMB()).thenReturn(new Size(regionSize, Size.Unit.MEGABYTE)); + regionLoadMap.put(info.getRegionName(), rl); + } + when(serverMetrics.getRegionMetrics()).thenReturn(regionLoadMap); + Map oldCacheRatioMap = new HashMap<>(); + for (RegionInfo info : oldRegionCacheInfo) { + oldCacheRatioMap.put(info.getEncodedName(), oldRegionCachedSize); + } + when(serverMetrics.getRegionCachedInfo()).thenReturn(oldCacheRatioMap); + return serverMetrics; + } + + @BeforeClass + public static void beforeAllTests() throws Exception { + servers = generateServers(3); + tableDescs = constructTableDesc(false); + Configuration conf = HBaseConfiguration.create(); + conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); + loadBalancer = new CacheAwareLoadBalancer(); + loadBalancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf)); + loadBalancer.loadConf(conf); + } + + @Test + public void testRegionsNotCachedOnOldServerAndCurrentServer() throws Exception { + // The regions are not cached on old server as well as the current server. This causes + // skewness in the region allocation which should be fixed by the balancer + + Map> clusterState = new HashMap<>(); + ServerName server0 = servers.get(0); + ServerName server1 = servers.get(1); + ServerName server2 = servers.get(2); + + // Simulate that the regions previously hosted by server1 are now hosted on server0 + List regionsOnServer0 = randomRegions(10); + List regionsOnServer1 = randomRegions(0); + List regionsOnServer2 = randomRegions(5); + + clusterState.put(server0, regionsOnServer0); + clusterState.put(server1, regionsOnServer1); + clusterState.put(server2, regionsOnServer2); + + // Mock cluster metrics + Map serverMetricsMap = new TreeMap<>(); + serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, + 0.0f, new ArrayList<>(), 0, 10)); + serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, + 0.0f, new ArrayList<>(), 0, 10)); + serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, + 0.0f, new ArrayList<>(), 0, 10)); + ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); + when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); + loadBalancer.updateClusterMetrics(clusterMetrics); + + Map>> LoadOfAllTable = + (Map) mockClusterServersWithTables(clusterState); + List plans = loadBalancer.balanceCluster(LoadOfAllTable); + Set regionsMovedFromServer0 = new HashSet<>(); + Map> targetServers = new HashMap<>(); + for (RegionPlan plan : plans) { + if (plan.getSource().equals(server0)) { + regionsMovedFromServer0.add(plan.getRegionInfo()); + if (!targetServers.containsKey(plan.getDestination())) { + targetServers.put(plan.getDestination(), new ArrayList<>()); + } + targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); + } + } + // should move 5 regions from server0 to server 1 + assertEquals(5, regionsMovedFromServer0.size()); + assertEquals(5, targetServers.get(server1).size()); + } + + @Test + public void testRegionsPartiallyCachedOnOldServerAndNotCachedOnCurrentServer() throws Exception { + // The regions are partially cached on old server but not cached on the current server + + Map> clusterState = new HashMap<>(); + ServerName server0 = servers.get(0); + ServerName server1 = servers.get(1); + ServerName server2 = servers.get(2); + + // Simulate that the regions previously hosted by server1 are now hosted on server0 + List regionsOnServer0 = randomRegions(10); + List regionsOnServer1 = randomRegions(0); + List regionsOnServer2 = randomRegions(5); + + clusterState.put(server0, regionsOnServer0); + clusterState.put(server1, regionsOnServer1); + clusterState.put(server2, regionsOnServer2); + + // Mock cluster metrics + + // Mock 5 regions from server0 were previously hosted on server1 + List oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1); + + Map serverMetricsMap = new TreeMap<>(); + serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, + 0.0f, new ArrayList<>(), 0, 10)); + serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, + 0.0f, oldCachedRegions, 6, 10)); + serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, + 0.0f, new ArrayList<>(), 0, 10)); + ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); + when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); + loadBalancer.updateClusterMetrics(clusterMetrics); + + Map>> LoadOfAllTable = + (Map) mockClusterServersWithTables(clusterState); + List plans = loadBalancer.balanceCluster(LoadOfAllTable); + Set regionsMovedFromServer0 = new HashSet<>(); + Map> targetServers = new HashMap<>(); + for (RegionPlan plan : plans) { + if (plan.getSource().equals(server0)) { + regionsMovedFromServer0.add(plan.getRegionInfo()); + if (!targetServers.containsKey(plan.getDestination())) { + targetServers.put(plan.getDestination(), new ArrayList<>()); + } + targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); + } + } + // should move 5 regions from server0 to server1 + assertEquals(5, regionsMovedFromServer0.size()); + assertEquals(5, targetServers.get(server1).size()); + assertTrue(targetServers.get(server1).containsAll(oldCachedRegions)); + } + + @Test + public void testRegionsFullyCachedOnOldServerAndNotCachedOnCurrentServers() throws Exception { + // The regions are fully cached on old server + + Map> clusterState = new HashMap<>(); + ServerName server0 = servers.get(0); + ServerName server1 = servers.get(1); + ServerName server2 = servers.get(2); + + // Simulate that the regions previously hosted by server1 are now hosted on server0 + List regionsOnServer0 = randomRegions(10); + List regionsOnServer1 = randomRegions(0); + List regionsOnServer2 = randomRegions(5); + + clusterState.put(server0, regionsOnServer0); + clusterState.put(server1, regionsOnServer1); + clusterState.put(server2, regionsOnServer2); + + // Mock cluster metrics + + // Mock 5 regions from server0 were previously hosted on server1 + List oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1); + + Map serverMetricsMap = new TreeMap<>(); + serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, + 0.0f, new ArrayList<>(), 0, 10)); + serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, + 0.0f, oldCachedRegions, 10, 10)); + serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, + 0.0f, new ArrayList<>(), 0, 10)); + ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); + when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); + loadBalancer.updateClusterMetrics(clusterMetrics); + + Map>> LoadOfAllTable = + (Map) mockClusterServersWithTables(clusterState); + List plans = loadBalancer.balanceCluster(LoadOfAllTable); + Set regionsMovedFromServer0 = new HashSet<>(); + Map> targetServers = new HashMap<>(); + for (RegionPlan plan : plans) { + if (plan.getSource().equals(server0)) { + regionsMovedFromServer0.add(plan.getRegionInfo()); + if (!targetServers.containsKey(plan.getDestination())) { + targetServers.put(plan.getDestination(), new ArrayList<>()); + } + targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); + } + } + // should move 5 regions from server0 to server1 + assertEquals(5, regionsMovedFromServer0.size()); + assertEquals(5, targetServers.get(server1).size()); + assertTrue(targetServers.get(server1).containsAll(oldCachedRegions)); + } + + @Test + public void testRegionsFullyCachedOnOldAndCurrentServers() throws Exception { + // The regions are fully cached on old server + + Map> clusterState = new HashMap<>(); + ServerName server0 = servers.get(0); + ServerName server1 = servers.get(1); + ServerName server2 = servers.get(2); + + // Simulate that the regions previously hosted by server1 are now hosted on server0 + List regionsOnServer0 = randomRegions(10); + List regionsOnServer1 = randomRegions(0); + List regionsOnServer2 = randomRegions(5); + + clusterState.put(server0, regionsOnServer0); + clusterState.put(server1, regionsOnServer1); + clusterState.put(server2, regionsOnServer2); + + // Mock cluster metrics + + // Mock 5 regions from server0 were previously hosted on server1 + List oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1); + + Map serverMetricsMap = new TreeMap<>(); + serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, + 1.0f, new ArrayList<>(), 0, 10)); + serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, + 1.0f, oldCachedRegions, 10, 10)); + serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, + 1.0f, new ArrayList<>(), 0, 10)); + ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); + when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); + loadBalancer.updateClusterMetrics(clusterMetrics); + + Map>> LoadOfAllTable = + (Map) mockClusterServersWithTables(clusterState); + List plans = loadBalancer.balanceCluster(LoadOfAllTable); + Set regionsMovedFromServer0 = new HashSet<>(); + Map> targetServers = new HashMap<>(); + for (RegionPlan plan : plans) { + if (plan.getSource().equals(server0)) { + regionsMovedFromServer0.add(plan.getRegionInfo()); + if (!targetServers.containsKey(plan.getDestination())) { + targetServers.put(plan.getDestination(), new ArrayList<>()); + } + targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); + } + } + // should move 5 regions from server0 to server1 + assertEquals(5, regionsMovedFromServer0.size()); + assertEquals(5, targetServers.get(server1).size()); + assertTrue(targetServers.get(server1).containsAll(oldCachedRegions)); + } + + @Test + public void testRegionsPartiallyCachedOnOldServerAndCurrentServer() throws Exception { + // The regions are partially cached on old server + + Map> clusterState = new HashMap<>(); + ServerName server0 = servers.get(0); + ServerName server1 = servers.get(1); + ServerName server2 = servers.get(2); + + // Simulate that the regions previously hosted by server1 are now hosted on server0 + List regionsOnServer0 = randomRegions(10); + List regionsOnServer1 = randomRegions(0); + List regionsOnServer2 = randomRegions(5); + + clusterState.put(server0, regionsOnServer0); + clusterState.put(server1, regionsOnServer1); + clusterState.put(server2, regionsOnServer2); + + // Mock cluster metrics + + // Mock 5 regions from server0 were previously hosted on server1 + List oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1); + + Map serverMetricsMap = new TreeMap<>(); + serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, + 0.2f, new ArrayList<>(), 0, 10)); + serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, + 0.0f, oldCachedRegions, 6, 10)); + serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, + 1.0f, new ArrayList<>(), 0, 10)); + ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); + when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); + loadBalancer.updateClusterMetrics(clusterMetrics); + + Map>> LoadOfAllTable = + (Map) mockClusterServersWithTables(clusterState); + List plans = loadBalancer.balanceCluster(LoadOfAllTable); + Set regionsMovedFromServer0 = new HashSet<>(); + Map> targetServers = new HashMap<>(); + for (RegionPlan plan : plans) { + if (plan.getSource().equals(server0)) { + regionsMovedFromServer0.add(plan.getRegionInfo()); + if (!targetServers.containsKey(plan.getDestination())) { + targetServers.put(plan.getDestination(), new ArrayList<>()); + } + targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); + } + } + assertEquals(5, regionsMovedFromServer0.size()); + assertEquals(5, targetServers.get(server1).size()); + assertTrue(targetServers.get(server1).containsAll(oldCachedRegions)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancerCostFunctions.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancerCostFunctions.java new file mode 100644 index 000000000000..448e576b1bc7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancerCostFunctions.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestCacheAwareLoadBalancerCostFunctions extends StochasticBalancerTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCacheAwareLoadBalancerCostFunctions.class); + + // Mapping of test -> expected cache cost + private final float[] expectedCacheCost = { 0.0f, 0.0f, 0.5f, 1.0f, 0.0f, 0.572f, 0.0f, 0.075f }; + + /** + * Data set to testCacheCost: [test][0][0] = mapping of server to number of regions it hosts + * [test][region + 1][0] = server that region is hosted on [test][region + 1][server + 1] = size + * of region cached on server + */ + private final int[][][] clusterRegionCacheRatioMocks = new int[][][] { + // Test 1: each region is entirely on server that hosts it + // Cost of moving the regions in this case should be high as the regions are fully cached + // on the server they are currently hosted on + new int[][] { new int[] { 2, 1, 1 }, // Server 0 has 2, server 1 has 1 and server 2 has 1 + // region(s) hosted respectively + new int[] { 0, 100, 0, 0 }, // region 0 is hosted and cached only on server 0 + new int[] { 0, 100, 0, 0 }, // region 1 is hosted and cached only on server 0 + new int[] { 1, 0, 100, 0 }, // region 2 is hosted and cached only on server 1 + new int[] { 2, 0, 0, 100 }, // region 3 is hosted and cached only on server 2 + }, + + // Test 2: each region is cached completely on the server it is currently hosted on, + // but it was also cached on some other server historically + // Cost of moving the regions in this case should be high as the regions are fully cached + // on the server they are currently hosted on. Although, the regions were previously hosted and + // cached on some other server, since they are completely cached on the new server, + // there is no need to move the regions back to the previously hosting cluster + new int[][] { new int[] { 1, 2, 1 }, // Server 0 has 1, server 1 has 2 and server 2 has 1 + // region(s) hosted respectively + new int[] { 0, 100, 0, 100 }, // region 0 is hosted and currently cached on server 0, + // but previously cached completely on server 2 + new int[] { 1, 100, 100, 0 }, // region 1 is hosted and currently cached on server 1, + // but previously cached completely on server 0 + new int[] { 1, 0, 100, 100 }, // region 2 is hosted and currently cached on server 1, + // but previously cached on server 2 + new int[] { 2, 0, 100, 100 }, // region 3 is hosted and currently cached on server 2, + // but previously cached on server 1 + }, + + // Test 3: The regions were hosted and fully cached on a server but later moved to other + // because of server crash procedure. The regions are partially cached on the server they + // are currently hosted on + new int[][] { new int[] { 1, 2, 1 }, new int[] { 0, 50, 0, 100 }, // Region 0 is currently + // hosted and partially + // cached on + // server 0, but was fully + // cached on server 2 + // previously + new int[] { 1, 100, 50, 0 }, // Region 1 is currently hosted and partially cached on + // server 1, but was fully cached on server 0 previously + new int[] { 1, 0, 50, 100 }, // Region 2 is currently hosted and partially cached on + // server 1, but was fully cached on server 2 previously + new int[] { 2, 0, 100, 50 }, // Region 3 is currently hosted and partially cached on + // server 2, but was fully cached on server 1 previously + }, + + // Test 4: The regions were hosted and fully cached on a server, but later moved to other + // server because of server crash procedure. The regions are not at all cached on the server + // they are currently hosted on + new int[][] { new int[] { 1, 1, 2 }, new int[] { 0, 0, 0, 100 }, // Region 0 is currently hosted + // but not cached on server + // 0, + // but was fully cached on + // server 2 previously + new int[] { 1, 100, 0, 0 }, // Region 1 is currently hosted but not cached on server 1, + // but was fully cached on server 0 previously + new int[] { 2, 0, 100, 0 }, // Region 2 is currently hosted but not cached on server 2, + // but was fully cached on server 1 previously + new int[] { 2, 100, 0, 0 }, // Region 3 is currently hosted but not cached on server 2, + // but was fully cached on server 1 previously + }, + + // Test 5: The regions were partially cached on old servers, before moving to the new server + // where also, they are partially cached + new int[][] { new int[] { 2, 1, 1 }, new int[] { 0, 50, 50, 0 }, // Region 0 is hosted and + // partially cached on + // server 0, but + // was previously hosted and + // partially cached on + // server 1 + new int[] { 0, 50, 0, 50 }, // Region 1 is hosted and partially cached on server 0, but + // was previously hosted and partially cached on server 2 + new int[] { 1, 0, 50, 50 }, // Region 2 is hosted and partially cached on server 1, but + // was previously hosted and partially cached on server 2 + new int[] { 2, 0, 50, 50 }, // Region 3 is hosted and partially cached on server 2, but + // was previously hosted and partially cached on server 1 + }, + + // Test 6: The regions are less cached on the new servers as compared to what they were + // cached on the server before they were moved to the new servers + new int[][] { new int[] { 1, 2, 1 }, new int[] { 0, 30, 70, 0 }, // Region 0 is hosted and + // cached 30% on server 0, + // but was + // previously hosted and + // cached 70% on server 1 + new int[] { 1, 70, 30, 0 }, // Region 1 is hosted and cached 30% on server 1, but was + // previously hosted and cached 70% on server 0 + new int[] { 1, 0, 30, 70 }, // Region 2 is hosted and cached 30% on server 1, but was + // previously hosted and cached 70% on server 2 + new int[] { 2, 0, 70, 30 }, // Region 3 is hosted and cached 30% on server 2, but was + // previously hosted and cached 70% on server 1 + }, + + // Test 7: The regions are more cached on the new servers as compared to what they were + // cached on the server before they were moved to the new servers + new int[][] { new int[] { 2, 1, 1 }, new int[] { 0, 80, 20, 0 }, // Region 0 is hosted and 80% + // cached on server 0, but + // was + // previously hosted and 20% + // cached on server 1 + new int[] { 0, 80, 0, 20 }, // Region 1 is hosted and 80% cached on server 0, but was + // previously hosted and 20% cached on server 2 + new int[] { 1, 20, 80, 0 }, // Region 2 is hosted and 80% cached on server 1, but was + // previously hosted and 20% cached on server 0 + new int[] { 2, 0, 20, 80 }, // Region 3 is hosted and 80% cached on server 2, but was + // previously hosted and 20% cached on server 1 + }, + + // Test 8: The regions are randomly assigned to the server with some regions historically + // hosted on other region servers + new int[][] { new int[] { 1, 2, 1 }, new int[] { 0, 34, 0, 58 }, // Region 0 is hosted and + // partially cached on + // server 0, + // but was previously hosted + // and partially cached on + // server 2 + // current cache ratio < + // historical cache ratio + new int[] { 1, 78, 100, 0 }, // Region 1 is hosted and fully cached on server 1, + // but was previously hosted and partially cached on server 0 + // current cache ratio > historical cache ratio + new int[] { 1, 66, 66, 0 }, // Region 2 is hosted and partially cached on server 1, + // but was previously hosted and partially cached on server 0 + // current cache ratio == historical cache ratio + new int[] { 2, 0, 0, 96 }, // Region 3 is hosted and partially cached on server 0 + // No historical cache ratio + }, }; + + private static Configuration storedConfiguration; + + private CacheAwareLoadBalancer loadBalancer = new CacheAwareLoadBalancer(); + + @BeforeClass + public static void saveInitialConfiguration() { + storedConfiguration = new Configuration(conf); + } + + @Before + public void beforeEachTest() { + conf = new Configuration(storedConfiguration); + loadBalancer.loadConf(conf); + } + + @Test + public void testVerifyCacheAwareSkewnessCostFunctionEnabled() { + CacheAwareLoadBalancer lb = new CacheAwareLoadBalancer(); + lb.loadConf(conf); + assertTrue(Arrays.asList(lb.getCostFunctionNames()) + .contains(CacheAwareLoadBalancer.CacheAwareRegionSkewnessCostFunction.class.getSimpleName())); + } + + @Test + public void testVerifyCacheAwareSkewnessCostFunctionDisabled() { + conf.setFloat( + CacheAwareLoadBalancer.CacheAwareRegionSkewnessCostFunction.REGION_COUNT_SKEW_COST_KEY, 0.0f); + + CacheAwareLoadBalancer lb = new CacheAwareLoadBalancer(); + lb.loadConf(conf); + + assertFalse(Arrays.asList(lb.getCostFunctionNames()) + .contains(CacheAwareLoadBalancer.CacheAwareRegionSkewnessCostFunction.class.getSimpleName())); + } + + @Test + public void testVerifyCacheCostFunctionEnabled() { + conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "/tmp/prefetch.persistence"); + + CacheAwareLoadBalancer lb = new CacheAwareLoadBalancer(); + lb.loadConf(conf); + + assertTrue(Arrays.asList(lb.getCostFunctionNames()) + .contains(CacheAwareLoadBalancer.CacheAwareCostFunction.class.getSimpleName())); + } + + @Test + public void testVerifyCacheCostFunctionDisabledByNoBucketCachePersistence() { + assertFalse(Arrays.asList(loadBalancer.getCostFunctionNames()) + .contains(CacheAwareLoadBalancer.CacheAwareCostFunction.class.getSimpleName())); + } + + @Test + public void testVerifyCacheCostFunctionDisabledByNoMultiplier() { + conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "/tmp/prefetch.persistence"); + conf.setFloat("hbase.master.balancer.stochastic.cacheCost", 0.0f); + assertFalse(Arrays.asList(loadBalancer.getCostFunctionNames()) + .contains(CacheAwareLoadBalancer.CacheAwareCostFunction.class.getSimpleName())); + } + + @Test + public void testCacheCost() { + conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "/tmp/prefetch.persistence"); + CacheAwareLoadBalancer.CacheAwareCostFunction costFunction = + new CacheAwareLoadBalancer.CacheAwareCostFunction(conf); + + for (int test = 0; test < clusterRegionCacheRatioMocks.length; test++) { + int[][] clusterRegionLocations = clusterRegionCacheRatioMocks[test]; + MockClusterForCacheCost cluster = new MockClusterForCacheCost(clusterRegionLocations); + costFunction.prepare(cluster); + double cost = costFunction.cost(); + assertEquals(expectedCacheCost[test], cost, 0.01); + } + } + + private class MockClusterForCacheCost extends BalancerClusterState { + private final Map, Float> regionServerCacheRatio = new HashMap<>(); + + public MockClusterForCacheCost(int[][] regionsArray) { + // regions[0] is an array where index = serverIndex and value = number of regions + super(mockClusterServersUnsorted(regionsArray[0], 1), null, null, null, null); + Map> oldCacheRatio = new HashMap<>(); + for (int i = 1; i < regionsArray.length; i++) { + int regionIndex = i - 1; + for (int j = 1; j < regionsArray[i].length; j++) { + int serverIndex = j - 1; + float cacheRatio = (float) regionsArray[i][j] / 100; + regionServerCacheRatio.put(new Pair<>(regionIndex, serverIndex), cacheRatio); + if (cacheRatio > 0.0f && serverIndex != regionsArray[i][0]) { + // This is the historical cacheRatio value + oldCacheRatio.put(regions[regionIndex].getEncodedName(), + new Pair<>(servers[serverIndex], cacheRatio)); + } + } + } + regionCacheRatioOnOldServerMap = oldCacheRatio; + } + + @Override + public int getTotalRegionHFileSizeMB(int region) { + return 1; + } + + @Override + protected float getRegionCacheRatioOnRegionServer(int region, int regionServerIndex) { + float cacheRatio = 0.0f; + + // Get the cache ratio if the region is currently hosted on this server + if (regionServerIndex == regionIndexToServerIndex[region]) { + return regionServerCacheRatio.get(new Pair<>(region, regionServerIndex)); + } + + // Region is not currently hosted on this server. Check if the region was cached on this + // server earlier. This can happen when the server was shutdown and the cache was persisted. + // Search using the index name and server name and not the index id and server id as these + // ids may change when a server is marked as dead or a new server is added. + String regionEncodedName = regions[region].getEncodedName(); + ServerName serverName = servers[regionServerIndex]; + if ( + regionCacheRatioOnOldServerMap != null + && regionCacheRatioOnOldServerMap.containsKey(regionEncodedName) + ) { + Pair serverCacheRatio = + regionCacheRatioOnOldServerMap.get(regionEncodedName); + if (ServerName.isSameAddress(serverName, serverCacheRatio.getFirst())) { + cacheRatio = serverCacheRatio.getSecond(); + regionCacheRatioOnOldServerMap.remove(regionEncodedName); + } + } + return cacheRatio; + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java index 748045246b3b..67ef296da58b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java @@ -86,6 +86,8 @@ private ServerMetrics mockServerMetricsWithReadRequests(ServerName server, when(rl.getWriteRequestCount()).thenReturn(0L); when(rl.getMemStoreSize()).thenReturn(Size.ZERO); when(rl.getStoreFileSize()).thenReturn(Size.ZERO); + when(rl.getRegionSizeMB()).thenReturn(Size.ZERO); + when(rl.getCurrentRegionCachedRatio()).thenReturn(0.0f); regionLoadMap.put(info.getRegionName(), rl); } when(serverMetrics.getRegionMetrics()).thenReturn(regionLoadMap); From d3f8d168fde8934981d5f861e9e65bdd2880d451 Mon Sep 17 00:00:00 2001 From: Rahul Agarkar Date: Thu, 2 Nov 2023 22:15:38 +0530 Subject: [PATCH 4/4] =?UTF-8?q?HBASE-28097=20Add=20documentation=20section?= =?UTF-8?q?=20for=20the=20Cache=20Aware=20balancer=20fu=E2=80=A6=20(#5495)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Wellington Chevreuil --- src/main/asciidoc/_chapters/architecture.adoc | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/src/main/asciidoc/_chapters/architecture.adoc b/src/main/asciidoc/_chapters/architecture.adoc index 23d069c1d919..12bdc09ac764 100644 --- a/src/main/asciidoc/_chapters/architecture.adoc +++ b/src/main/asciidoc/_chapters/architecture.adoc @@ -1130,6 +1130,49 @@ For a RegionServer hosting data that can comfortably fit into cache, or if your The compressed BlockCache is disabled by default. To enable it, set `hbase.block.data.cachecompressed` to `true` in _hbase-site.xml_ on all RegionServers. +==== Cache Aware Load Balancer + +Depending on the data size and the configured cache size, the cache warm up can take anywhere from a few minutes to a few hours. This becomes even more critical for HBase deployments over cloud storage, where compute is separated from storage. Doing this everytime the region server starts can be a very expensive process. To eliminate this, link:https://issues.apache.org/jira/browse/HBASE-27313[HBASE-27313] implemented the cache persistence feature where the region servers periodically persist the blocks cached in the bucket cache. This persisted information is then used to resurrect the cache in the event of a region server restart because of normal restart or crash. + +link:https://issues.apache.org/jira/browse/HBASE-27999[HBASE-27999] implements the cache aware load balancer, which adds to the load balancer the ability to consider the cache allocation of each region on region servers when calculating a new assignment plan, using the region/region server cache allocation information reported by region servers to calculate the percentage of HFiles cached for each region on the hosting server. This information is then used by the balancer as a factor when deciding on an optimal, new assignment plan. + +The master node captures the caching information from all the region servers and uses this information to decide on new region assignments while ensuring a minimal impact on the current cache allocation. A region is assigned to the region server where it has a better cache ratio as compared to the region server where it is currently hosted. + +The CacheAwareLoadBalancer uses two cost elements for deciding the region allocation. These are described below: + +. Cache Cost ++ + +The cache cost is calculated as the percentage of data for a region cached on the region server where it is either currently hosted or was previously hosted. A region may have multiple HFiles, each of different sizes. A HFile is considered to be fully prefetched when all the data blocks in this file are in the cache. The region server hosting this region calculates the ratio of number of HFiles fully cached in the cache to the total number of HFiles in the region. This ratio will vary from 0 (region hosted on this server, but none of its HFiles are cached into the cache) to 1 (region hosted on this server and all the HFiles for this region are cached into the cache). ++ +Every region server maintains this information for all the regions currently hosted there. In addition to that, this cache ratio is also maintained for the regions which were previously hosted on this region server giving historical information about the regions. + +. Skewness Cost ++ + + +The cache aware balancer will consider cache cost with the skewness cost to decide on the region assignment plan under following conditions: + +. There is an idle server in the cluster. This can happen when an existing server is restarted or a new server is added to the cluster. + +. When the cost of maintaining the balance in the cluster is greater than the minimum threshold defined by the configuration _hbase.master.balancer.stochastic.minCostNeedBalance_. + + +The CacheAwareLoadBalancer can be enabled in the cluster by setting the following configuration properties in the master master configuration: + +[source,xml] +---- + + hbase.master.loadbalancer.class + org.apache.hadoop.hbase.master.balancer.CacheAwareLoadBalancer + + + hbase.bucketcache.persistent.path + /path/to/bucketcache_persistent_file + +---- + + [[regionserver_splitting_implementation]] === RegionServer Splitting Implementation