From fd177a6c115e975d8145fe5661b6df03638d74f0 Mon Sep 17 00:00:00 2001 From: Rahul Agarkar Date: Wed, 2 Aug 2023 23:06:30 +0530 Subject: [PATCH 1/3] =?UTF-8?q?HBASE-27997=20Enhance=20prefetch=20executor?= =?UTF-8?q?=20to=20record=20region=20prefetch=20infor=E2=80=A6=20(#5339)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Wellington Chevreuil Reviewew-by: Kota-SH --- .../main/protobuf/PrefetchPersistence.proto | 7 ++++- .../hbase/io/hfile/HFilePreadReader.java | 10 +++++++ .../hbase/io/hfile/PrefetchProtoUtils.java | 26 ++++++++++++++++--- 3 files changed, 39 insertions(+), 4 deletions(-) diff --git a/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto b/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto index d1a2b4cfd1b7..a024b94baa62 100644 --- a/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto +++ b/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto @@ -27,5 +27,10 @@ option optimize_for = SPEED; message PrefetchedHfileName { - map prefetched_files = 1; + map prefetched_files = 1; +} + +message RegionFileSizeMap { + required string region_name = 1; + required uint64 region_prefetch_size = 2; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java index f1579ea53b8e..2079dcafb65f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java @@ -132,6 +132,7 @@ public void run() { LOG.warn("Close prefetch stream reader failed, path: " + path, e); } } + String regionName = getRegionName(path); PrefetchExecutor.complete(path); } } @@ -139,6 +140,15 @@ public void run() { } } + /* + * Get the region name for the given file path. A HFile is always kept under the //. To find the region for a given hFile, just find the name of the grandparent + * directory. + */ + private static String getRegionName(Path path) { + return path.getParent().getParent().getName(); + } + private static String getPathOffsetEndStr(final Path path, final long offset, final long end) { return "path=" + path.toString() + ", offset=" + offset + ", end=" + end; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java index e75e8a6a6522..df67e4429a2d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hbase.io.hfile; +import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos; @@ -26,8 +28,26 @@ private PrefetchProtoUtils() { } static PersistentPrefetchProtos.PrefetchedHfileName - toPB(Map prefetchedHfileNames) { - return PersistentPrefetchProtos.PrefetchedHfileName.newBuilder() - .putAllPrefetchedFiles(prefetchedHfileNames).build(); + toPB(Map> prefetchedHfileNames) { + Map tmpMap = new HashMap<>(); + prefetchedHfileNames.forEach((hFileName, regionPrefetchMap) -> { + PersistentPrefetchProtos.RegionFileSizeMap tmpRegionFileSize = + PersistentPrefetchProtos.RegionFileSizeMap.newBuilder() + .setRegionName(regionPrefetchMap.getFirst()) + .setRegionPrefetchSize(regionPrefetchMap.getSecond()).build(); + tmpMap.put(hFileName, tmpRegionFileSize); + }); + return PersistentPrefetchProtos.PrefetchedHfileName.newBuilder().putAllPrefetchedFiles(tmpMap) + .build(); + } + + static Map> + fromPB(Map prefetchHFileNames) { + Map> hFileMap = new HashMap<>(); + prefetchHFileNames.forEach((hFileName, regionPrefetchMap) -> { + hFileMap.put(hFileName, + new Pair<>(regionPrefetchMap.getRegionName(), regionPrefetchMap.getRegionPrefetchSize())); + }); + return hFileMap; } } From 766c9353b3e863cd5bba0a5a21fd06c725e5038a Mon Sep 17 00:00:00 2001 From: Rahul Agarkar Date: Tue, 29 Aug 2023 15:36:23 +0530 Subject: [PATCH 2/3] =?UTF-8?q?HBASE-27998=20Enhance=20region=20metrics=20?= =?UTF-8?q?to=20include=20prefetch=20ratio=20for=20each=E2=80=A6=20(#5342)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Wellington Chevreuil --- .../apache/hadoop/hbase/RegionMetrics.java | 6 ++ .../hadoop/hbase/RegionMetricsBuilder.java | 38 +++++++- .../apache/hadoop/hbase/ServerMetrics.java | 6 ++ .../hadoop/hbase/ServerMetricsBuilder.java | 20 ++++- .../main/protobuf/PrefetchPersistence.proto | 36 -------- .../main/protobuf/server/ClusterStatus.proto | 11 +++ .../protobuf/server/io/BucketCacheEntry.proto | 8 +- .../hadoop/hbase/io/hfile/BlockCache.java | 3 +- .../hbase/io/hfile/CombinedBlockCache.java | 3 +- .../hbase/io/hfile/HFilePreadReader.java | 4 +- .../hbase/io/hfile/PrefetchProtoUtils.java | 53 ------------ .../hbase/io/hfile/bucket/BucketCache.java | 86 ++++++++++++++++--- .../io/hfile/bucket/BucketProtoUtils.java | 26 +++++- .../hbase/regionserver/HRegionServer.java | 40 ++++++++- .../hadoop/hbase/TestServerMetrics.java | 18 ++-- .../master/TestRegionsRecoveryChore.java | 14 +++ 16 files changed, 250 insertions(+), 122 deletions(-) delete mode 100644 hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionMetrics.java index 47b36a7a1516..b029d0288564 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionMetrics.java @@ -138,4 +138,10 @@ default String getNameAsString() { /** Returns the compaction state of this region */ CompactionState getCompactionState(); + + /** Returns the total size of the hfiles in the region */ + Size getRegionSizeMB(); + + /** Returns current prefetch ratio of this region on this server */ + float getCurrentRegionCachedRatio(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionMetricsBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionMetricsBuilder.java index 43b3a17aac17..d3361693079a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionMetricsBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionMetricsBuilder.java @@ -80,7 +80,8 @@ public static RegionMetrics toRegionMetrics(ClusterStatusProtos.RegionLoad regio ClusterStatusProtos.StoreSequenceId::getSequenceId))) .setUncompressedStoreFileSize( new Size(regionLoadPB.getStoreUncompressedSizeMB(), Size.Unit.MEGABYTE)) - .build(); + .setRegionSizeMB(new Size(regionLoadPB.getRegionSizeMB(), Size.Unit.MEGABYTE)) + .setCurrentRegionCachedRatio(regionLoadPB.getCurrentRegionCachedRatio()).build(); } private static List @@ -120,7 +121,8 @@ public static ClusterStatusProtos.RegionLoad toRegionLoad(RegionMetrics regionMe .addAllStoreCompleteSequenceId(toStoreSequenceId(regionMetrics.getStoreSequenceId())) .setStoreUncompressedSizeMB( (int) regionMetrics.getUncompressedStoreFileSize().get(Size.Unit.MEGABYTE)) - .build(); + .setRegionSizeMB((int) regionMetrics.getRegionSizeMB().get(Size.Unit.MEGABYTE)) + .setCurrentRegionCachedRatio(regionMetrics.getCurrentRegionCachedRatio()).build(); } public static RegionMetricsBuilder newBuilder(byte[] name) { @@ -154,6 +156,8 @@ public static RegionMetricsBuilder newBuilder(byte[] name) { private long blocksLocalWithSsdWeight; private long blocksTotalWeight; private CompactionState compactionState; + private Size regionSizeMB = Size.ZERO; + private float currentRegionCachedRatio; private RegionMetricsBuilder(byte[] name) { this.name = name; @@ -289,6 +293,16 @@ public RegionMetricsBuilder setCompactionState(CompactionState compactionState) return this; } + public RegionMetricsBuilder setRegionSizeMB(Size value) { + this.regionSizeMB = value; + return this; + } + + public RegionMetricsBuilder setCurrentRegionCachedRatio(float value) { + this.currentRegionCachedRatio = value; + return this; + } + public RegionMetrics build() { return new RegionMetricsImpl(name, storeCount, storeFileCount, storeRefCount, maxCompactedStoreFileRefCount, compactingCellCount, compactedCellCount, storeFileSize, @@ -296,7 +310,7 @@ public RegionMetrics build() { uncompressedStoreFileSize, writeRequestCount, readRequestCount, cpRequestCount, filteredReadRequestCount, completedSequenceId, storeSequenceIds, dataLocality, lastMajorCompactionTimestamp, dataLocalityForSsd, blocksLocalWeight, blocksLocalWithSsdWeight, - blocksTotalWeight, compactionState); + blocksTotalWeight, compactionState, regionSizeMB, currentRegionCachedRatio); } private static class RegionMetricsImpl implements RegionMetrics { @@ -327,6 +341,8 @@ private static class RegionMetricsImpl implements RegionMetrics { private final long blocksLocalWithSsdWeight; private final long blocksTotalWeight; private final CompactionState compactionState; + private final Size regionSizeMB; + private final float currentRegionCachedRatio; RegionMetricsImpl(byte[] name, int storeCount, int storeFileCount, int storeRefCount, int maxCompactedStoreFileRefCount, final long compactingCellCount, long compactedCellCount, @@ -336,7 +352,7 @@ private static class RegionMetricsImpl implements RegionMetrics { long filteredReadRequestCount, long completedSequenceId, Map storeSequenceIds, float dataLocality, long lastMajorCompactionTimestamp, float dataLocalityForSsd, long blocksLocalWeight, long blocksLocalWithSsdWeight, long blocksTotalWeight, - CompactionState compactionState) { + CompactionState compactionState, Size regionSizeMB, float currentRegionCachedRatio) { this.name = Preconditions.checkNotNull(name); this.storeCount = storeCount; this.storeFileCount = storeFileCount; @@ -364,6 +380,8 @@ private static class RegionMetricsImpl implements RegionMetrics { this.blocksLocalWithSsdWeight = blocksLocalWithSsdWeight; this.blocksTotalWeight = blocksTotalWeight; this.compactionState = compactionState; + this.regionSizeMB = regionSizeMB; + this.currentRegionCachedRatio = currentRegionCachedRatio; } @Override @@ -501,6 +519,16 @@ public CompactionState getCompactionState() { return compactionState; } + @Override + public Size getRegionSizeMB() { + return regionSizeMB; + } + + @Override + public float getCurrentRegionCachedRatio() { + return currentRegionCachedRatio; + } + @Override public String toString() { StringBuilder sb = @@ -541,6 +569,8 @@ public String toString() { Strings.appendKeyValue(sb, "blocksLocalWithSsdWeight", blocksLocalWithSsdWeight); Strings.appendKeyValue(sb, "blocksTotalWeight", blocksTotalWeight); Strings.appendKeyValue(sb, "compactionState", compactionState); + Strings.appendKeyValue(sb, "regionSizeMB", regionSizeMB); + Strings.appendKeyValue(sb, "currentRegionCachedRatio", currentRegionCachedRatio); return sb.toString(); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java index 2684886ba3d5..2cf55a1abdc0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java @@ -106,4 +106,10 @@ default String getVersion() { @Nullable List getTasks(); + /** + * Returns the region cache information for the regions hosted on this server + * @return map of region encoded name and the size of the region cached on this region server + * rounded to MB + */ + Map getRegionCachedInfo(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java index 7a0312f22fdc..c7aea21e845a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java @@ -85,6 +85,7 @@ public static ServerMetrics toServerMetrics(ServerName serverName, int versionNu : null) .setTasks(serverLoadPB.getTasksList().stream().map(ProtobufUtil::getServerTask) .collect(Collectors.toList())) + .setRegionCachedInfo(serverLoadPB.getRegionCachedInfoMap()) .setReportTimestamp(serverLoadPB.getReportEndTime()) .setLastReportTimestamp(serverLoadPB.getReportStartTime()).setVersionNumber(versionNumber) .setVersion(version).build(); @@ -111,6 +112,7 @@ public static ClusterStatusProtos.ServerLoad toServerLoad(ServerMetrics metrics) .map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList())) .addAllTasks( metrics.getTasks().stream().map(ProtobufUtil::toServerTask).collect(Collectors.toList())) + .putAllRegionCachedInfo(metrics.getRegionCachedInfo()) .setReportStartTime(metrics.getLastReportTimestamp()) .setReportEndTime(metrics.getReportTimestamp()); if (metrics.getReplicationLoadSink() != null) { @@ -142,6 +144,7 @@ public static ServerMetricsBuilder newBuilder(ServerName sn) { private long reportTimestamp = EnvironmentEdgeManager.currentTime(); private long lastReportTimestamp = 0; private final List tasks = new ArrayList<>(); + private Map regionCachedInfo = new HashMap<>(); private ServerMetricsBuilder(ServerName serverName) { this.serverName = serverName; @@ -232,11 +235,16 @@ public ServerMetricsBuilder setTasks(List tasks) { return this; } + public ServerMetricsBuilder setRegionCachedInfo(Map value) { + this.regionCachedInfo = value; + return this; + } + public ServerMetrics build() { return new ServerMetricsImpl(serverName, versionNumber, version, requestCountPerSecond, requestCount, readRequestCount, writeRequestCount, usedHeapSize, maxHeapSize, infoServerPort, sources, sink, regionStatus, coprocessorNames, reportTimestamp, lastReportTimestamp, - userMetrics, tasks); + userMetrics, tasks, regionCachedInfo); } private static class ServerMetricsImpl implements ServerMetrics { @@ -259,13 +267,15 @@ private static class ServerMetricsImpl implements ServerMetrics { private final long lastReportTimestamp; private final Map userMetrics; private final List tasks; + private final Map regionCachedInfo; ServerMetricsImpl(ServerName serverName, int versionNumber, String version, long requestCountPerSecond, long requestCount, long readRequestsCount, long writeRequestsCount, Size usedHeapSize, Size maxHeapSize, int infoServerPort, List sources, ReplicationLoadSink sink, Map regionStatus, Set coprocessorNames, long reportTimestamp, - long lastReportTimestamp, Map userMetrics, List tasks) { + long lastReportTimestamp, Map userMetrics, List tasks, + Map regionCachedInfo) { this.serverName = Preconditions.checkNotNull(serverName); this.versionNumber = versionNumber; this.version = version; @@ -284,6 +294,7 @@ private static class ServerMetricsImpl implements ServerMetrics { this.reportTimestamp = reportTimestamp; this.lastReportTimestamp = lastReportTimestamp; this.tasks = tasks; + this.regionCachedInfo = regionCachedInfo; } @Override @@ -386,6 +397,11 @@ public List getTasks() { return tasks; } + @Override + public Map getRegionCachedInfo() { + return Collections.unmodifiableMap(regionCachedInfo); + } + @Override public String toString() { int storeCount = 0; diff --git a/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto b/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto deleted file mode 100644 index a024b94baa62..000000000000 --- a/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -syntax = "proto2"; - -package hbase.pb; - -option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated"; -option java_outer_classname = "PersistentPrefetchProtos"; -option java_generic_services = true; -option java_generate_equals_and_hash = true; -option optimize_for = SPEED; - - -message PrefetchedHfileName { - map prefetched_files = 1; -} - -message RegionFileSizeMap { - required string region_name = 1; - required uint64 region_prefetch_size = 2; -} diff --git a/hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto b/hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto index 28cc5a865c23..58fd3c8d2a5b 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto @@ -177,6 +177,12 @@ message RegionLoad { MAJOR = 2; MAJOR_AND_MINOR = 3; } + + /** Total region size in MB */ + optional uint32 region_size_MB = 28; + + /** Current region cache ratio on this server */ + optional float current_region_cached_ratio = 29; } message UserLoad { @@ -315,6 +321,11 @@ message ServerLoad { * The active monitored tasks */ repeated ServerTask tasks = 15; + + /** + * The metrics for region cached on this region server + */ + map regionCachedInfo = 16; } message LiveServerInfo { diff --git a/hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto b/hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto index ae1980fe51e6..80fc10ada786 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto @@ -32,7 +32,7 @@ message BucketCacheEntry { map deserializers = 4; required BackingMap backing_map = 5; optional bytes checksum = 6; - map prefetched_files = 7; + map cached_files = 7; } message BackingMap { @@ -81,3 +81,9 @@ enum BlockPriority { multi = 1; memory = 2; } + +message RegionFileSizeMap { + required string region_name = 1; + required uint64 region_cached_size = 2; +} + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index e480c9b5789b..91ebaaabd422 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -20,6 +20,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Optional; +import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; /** @@ -167,7 +168,7 @@ default boolean isMetaBlock(BlockType blockType) { /** * Returns the list of fully cached files */ - default Optional> getFullyCachedFiles() { + default Optional>> getFullyCachedFiles() { return Optional.empty(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index a421dfc83aa0..1e0fe7709292 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -22,6 +22,7 @@ import java.util.Optional; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -431,7 +432,7 @@ public BlockCache[] getBlockCaches() { * Returns the list of fully cached files */ @Override - public Optional> getFullyCachedFiles() { + public Optional>> getFullyCachedFiles() { return this.l2Cache.getFullyCachedFiles(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java index 2079dcafb65f..7cdbd5aff486 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java @@ -115,7 +115,8 @@ public void run() { block.release(); } } - bucketCacheOptional.ifPresent(bc -> bc.fileCacheCompleted(path.getName())); + final long fileSize = offset; + bucketCacheOptional.ifPresent(bc -> bc.fileCacheCompleted(path, fileSize)); } catch (IOException e) { // IOExceptions are probably due to region closes (relocation, etc.) if (LOG.isTraceEnabled()) { @@ -132,7 +133,6 @@ public void run() { LOG.warn("Close prefetch stream reader failed, path: " + path, e); } } - String regionName = getRegionName(path); PrefetchExecutor.complete(path); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java deleted file mode 100644 index df67e4429a2d..000000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io.hfile; - -import java.util.HashMap; -import java.util.Map; -import org.apache.hadoop.hbase.util.Pair; - -import org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos; - -final class PrefetchProtoUtils { - private PrefetchProtoUtils() { - } - - static PersistentPrefetchProtos.PrefetchedHfileName - toPB(Map> prefetchedHfileNames) { - Map tmpMap = new HashMap<>(); - prefetchedHfileNames.forEach((hFileName, regionPrefetchMap) -> { - PersistentPrefetchProtos.RegionFileSizeMap tmpRegionFileSize = - PersistentPrefetchProtos.RegionFileSizeMap.newBuilder() - .setRegionName(regionPrefetchMap.getFirst()) - .setRegionPrefetchSize(regionPrefetchMap.getSecond()).build(); - tmpMap.put(hFileName, tmpRegionFileSize); - }); - return PersistentPrefetchProtos.PrefetchedHfileName.newBuilder().putAllPrefetchedFiles(tmpMap) - .build(); - } - - static Map> - fromPB(Map prefetchHFileNames) { - Map> hFileMap = new HashMap<>(); - prefetchHFileNames.forEach((hFileName, regionPrefetchMap) -> { - hFileMap.put(hFileName, - new Pair<>(regionPrefetchMap.getRegionName(), regionPrefetchMap.getRegionPrefetchSize())); - }); - return hFileMap; - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index e3d740383085..ca7750c92c56 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; @@ -51,6 +52,7 @@ import java.util.function.Consumer; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.TableName; @@ -79,6 +81,7 @@ import org.apache.hadoop.hbase.util.IdReadWriteLockStrongRef; import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool; import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool.ReferenceType; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -151,8 +154,17 @@ public class BucketCache implements BlockCache, HeapSize { private AtomicBoolean backingMapValidated = new AtomicBoolean(false); - /** Set of files for which prefetch is completed */ - final Map fullyCachedFiles = new ConcurrentHashMap<>(); + /** + * Map of hFile -> Region -> File size. This map is used to track all files completed prefetch, + * together with the region those belong to and the total cached size for the + * region.TestBlockEvictionOnRegionMovement + */ + final Map> fullyCachedFiles = new ConcurrentHashMap<>(); + /** + * Map of region -> total size of the region prefetched on this region server. This is the total + * size of hFiles for this region prefetched on this region server + */ + final Map regionCachedSizeMap = new ConcurrentHashMap<>(); private BucketCachePersister cachePersister; @@ -546,7 +558,6 @@ protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cach } else { this.blockNumber.increment(); this.heapSize.add(cachedItem.heapSize()); - blocksByHFile.add(cacheKey); } } @@ -636,15 +647,11 @@ void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decre cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); } if (ioEngine.isPersistent()) { - fullyCachedFiles.remove(cacheKey.getHfileName()); + removeFileFromPrefetch(cacheKey.getHfileName()); setCacheInconsistent(true); } } - public void fileCacheCompleted(String fileName) { - fullyCachedFiles.put(fileName, true); - } - /** * Free the {{@link BucketEntry} actually,which could only be invoked when the * {@link BucketEntry#refCnt} becoming 0. @@ -1300,6 +1307,10 @@ public boolean isCachePersistent() { return ioEngine.isPersistent() && persistencePath != null; } + public Map getRegionCachedInfo() { + return Collections.unmodifiableMap(regionCachedSizeMap); + } + /** * @see #persistToFile() */ @@ -1337,6 +1348,29 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException { } } + private void updateRegionSizeMapWhileRetrievingFromFile() { + // Update the regionCachedSizeMap with the region size while restarting the region server + if (LOG.isDebugEnabled()) { + LOG.debug("Updating region size map after retrieving cached file list"); + dumpPrefetchList(); + } + regionCachedSizeMap.clear(); + fullyCachedFiles.forEach((hFileName, hFileSize) -> { + // Get the region name for each file + String regionEncodedName = hFileSize.getFirst(); + long cachedFileSize = hFileSize.getSecond(); + regionCachedSizeMap.merge(regionEncodedName, cachedFileSize, + (oldpf, fileSize) -> oldpf + fileSize); + }); + } + + private void dumpPrefetchList() { + for (Map.Entry> outerEntry : fullyCachedFiles.entrySet()) { + LOG.debug("Cached File Entry:<{},<{},{}>>", outerEntry.getKey(), + outerEntry.getValue().getFirst(), outerEntry.getValue().getSecond()); + } + } + /** * Create an input stream that deletes the file after reading it. Use in try-with-resources to * avoid this pattern where an exception thrown from a finally block may mask earlier exceptions: @@ -1401,7 +1435,7 @@ private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOExceptio backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), this::createRecycler); fullyCachedFiles.clear(); - fullyCachedFiles.putAll(proto.getPrefetchedFilesMap()); + fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap())); if (proto.hasChecksum()) { try { ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), @@ -1444,6 +1478,7 @@ private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOExceptio LOG.info("Persistent file is old format, it does not support verifying file integrity!"); backingMapValidated.set(true); } + updateRegionSizeMapWhileRetrievingFromFile(); verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); } @@ -1581,7 +1616,7 @@ protected String getAlgorithm() { */ @Override public int evictBlocksByHfileName(String hfileName) { - this.fullyCachedFiles.remove(hfileName); + removeFileFromPrefetch(hfileName); Set keySet = blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE), true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true); @@ -1966,7 +2001,7 @@ public AtomicBoolean getBackingMapValidated() { } @Override - public Optional> getFullyCachedFiles() { + public Optional>> getFullyCachedFiles() { return Optional.of(fullyCachedFiles); } @@ -1985,4 +2020,33 @@ public static Optional getBucketCacheFromCacheConfig(CacheConfig ca return Optional.empty(); } + private void removeFileFromPrefetch(String hfileName) { + // Update the regionPrefetchedSizeMap before removing the file from prefetchCompleted + if (fullyCachedFiles.containsKey(hfileName)) { + Pair regionEntry = fullyCachedFiles.get(hfileName); + String regionEncodedName = regionEntry.getFirst(); + long filePrefetchSize = regionEntry.getSecond(); + LOG.debug("Removing file {} for region {}", hfileName, regionEncodedName); + regionCachedSizeMap.computeIfPresent(regionEncodedName, (rn, pf) -> pf - filePrefetchSize); + // If all the blocks for a region are evicted from the cache, remove the entry for that region + if ( + regionCachedSizeMap.containsKey(regionEncodedName) + && regionCachedSizeMap.get(regionEncodedName) == 0 + ) { + regionCachedSizeMap.remove(regionEncodedName); + } + } + fullyCachedFiles.remove(hfileName); + } + + public void fileCacheCompleted(Path filePath, long size) { + Pair pair = new Pair<>(); + // sets the region name + String regionName = filePath.getParent().getParent().getName(); + pair.setFirst(regionName); + pair.setSecond(size); + fullyCachedFiles.put(filePath.getName(), pair); + regionCachedSizeMap.merge(regionName, size, (oldpf, fileSize) -> oldpf + fileSize); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java index 8830e5d3255a..7cc5050506e4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; @@ -28,6 +29,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; @@ -45,7 +47,7 @@ static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache) { .setIoClass(cache.ioEngine.getClass().getName()) .setMapClass(cache.backingMap.getClass().getName()) .putAllDeserializers(CacheableDeserializerIdManager.save()) - .putAllPrefetchedFiles(cache.fullyCachedFiles) + .putAllCachedFiles(toCachedPB(cache.fullyCachedFiles)) .setBackingMap(BucketProtoUtils.toPB(cache.backingMap)) .setChecksum(ByteString .copyFrom(((PersistentIOEngine) cache.ioEngine).calculateChecksum(cache.getAlgorithm()))) @@ -185,4 +187,26 @@ private static BlockType fromPb(BucketCacheProtos.BlockType blockType) { throw new Error("Unrecognized BlockType."); } } + + static Map + toCachedPB(Map> prefetchedHfileNames) { + Map tmpMap = new HashMap<>(); + prefetchedHfileNames.forEach((hfileName, regionPrefetchMap) -> { + BucketCacheProtos.RegionFileSizeMap tmpRegionFileSize = + BucketCacheProtos.RegionFileSizeMap.newBuilder().setRegionName(regionPrefetchMap.getFirst()) + .setRegionCachedSize(regionPrefetchMap.getSecond()).build(); + tmpMap.put(hfileName, tmpRegionFileSize); + }); + return tmpMap; + } + + static Map> + fromPB(Map prefetchHFileNames) { + Map> hfileMap = new HashMap<>(); + prefetchHFileNames.forEach((hfileName, regionPrefetchMap) -> { + hfileMap.put(hfileName, + new Pair<>(regionPrefetchMap.getRegionName(), regionPrefetchMap.getRegionCachedSize())); + }); + return hfileMap; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 85721a354977..3042a2eae451 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -65,10 +65,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; import java.util.stream.Collectors; import javax.management.MalformedObjectNameException; import javax.servlet.http.HttpServlet; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.mutable.MutableFloat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -108,7 +110,9 @@ import org.apache.hadoop.hbase.http.InfoServer; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; +import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.ipc.RpcClient; @@ -239,6 +243,9 @@ public class HRegionServer extends HBaseServerBase private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class); + int unitMB = 1024 * 1024; + int unitKB = 1024; + /** * For testing only! Set to true to skip notifying region assignment to master . */ @@ -1211,6 +1218,11 @@ private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, lon serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); } } + computeIfPersistentBucketCache(bc -> { + bc.getRegionCachedInfo().forEach((regionName, prefetchSize) -> { + serverLoad.putRegionCachedInfo(regionName, roundSize(prefetchSize, unitMB)); + }); + }); serverLoad.setReportStartTime(reportStartTime); serverLoad.setReportEndTime(reportEndTime); if (this.infoServer != null) { @@ -1510,6 +1522,15 @@ private static int roundSize(long sizeInByte, int sizeUnit) { } } + private void computeIfPersistentBucketCache(Consumer computation) { + if (blockCache instanceof CombinedBlockCache) { + BlockCache l2 = ((CombinedBlockCache) blockCache).getSecondLevelCache(); + if (l2 instanceof BucketCache && ((BucketCache) l2).isCachePersistent()) { + computation.accept((BucketCache) l2); + } + } + } + /** * @param r Region to get RegionLoad for. * @param regionLoadBldr the RegionLoad.Builder, can be null @@ -1519,6 +1540,7 @@ private static int roundSize(long sizeInByte, int sizeUnit) { RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, RegionSpecifier.Builder regionSpecifier) throws IOException { byte[] name = r.getRegionInfo().getRegionName(); + String regionEncodedName = r.getRegionInfo().getEncodedName(); int stores = 0; int storefiles = 0; int storeRefCount = 0; @@ -1531,6 +1553,7 @@ RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, long totalStaticBloomSize = 0L; long totalCompactingKVs = 0L; long currentCompactedKVs = 0L; + long totalRegionSize = 0L; List storeList = r.getStores(); stores += storeList.size(); for (HStore store : storeList) { @@ -1542,6 +1565,7 @@ RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, Math.max(maxCompactedStoreFileRefCount, currentMaxCompactedStoreFileRefCount); storeUncompressedSize += store.getStoreSizeUncompressed(); storefileSize += store.getStorefilesSize(); + totalRegionSize += store.getHFilesSize(); // TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB? storefileIndexSize += store.getStorefilesRootLevelIndexSize(); CompactionProgress progress = store.getCompactionProgress(); @@ -1554,9 +1578,6 @@ RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, totalStaticBloomSize += store.getTotalStaticBloomSize(); } - int unitMB = 1024 * 1024; - int unitKB = 1024; - int memstoreSizeMB = roundSize(r.getMemStoreDataSize(), unitMB); int storeUncompressedSizeMB = roundSize(storeUncompressedSize, unitMB); int storefileSizeMB = roundSize(storefileSize, unitMB); @@ -1564,6 +1585,16 @@ RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, int rootLevelIndexSizeKB = roundSize(rootLevelIndexSize, unitKB); int totalStaticIndexSizeKB = roundSize(totalStaticIndexSize, unitKB); int totalStaticBloomSizeKB = roundSize(totalStaticBloomSize, unitKB); + int regionSizeMB = roundSize(totalRegionSize, unitMB); + final MutableFloat currentRegionCachedRatio = new MutableFloat(0.0f); + computeIfPersistentBucketCache(bc -> { + if (bc.getRegionCachedInfo().containsKey(regionEncodedName)) { + currentRegionCachedRatio.setValue(regionSizeMB == 0 + ? 0.0f + : (float) roundSize(bc.getRegionCachedInfo().get(regionEncodedName), unitMB) + / regionSizeMB); + } + }); HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution(); float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname()); @@ -1594,7 +1625,8 @@ RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, .setDataLocalityForSsd(dataLocalityForSsd).setBlocksLocalWeight(blocksLocalWeight) .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight).setBlocksTotalWeight(blocksTotalWeight) .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState())) - .setLastMajorCompactionTs(r.getOldestHfileTs(true)); + .setLastMajorCompactionTs(r.getOldestHfileTs(true)).setRegionSizeMB(regionSizeMB) + .setCurrentRegionCachedRatio(currentRegionCachedRatio.floatValue()); r.setCompleteSequenceId(regionLoadBldr); return regionLoadBldr.build(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerMetrics.java index 8bcf3e600f88..8dfb8b1a4632 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerMetrics.java @@ -60,6 +60,10 @@ public void testRegionLoadAggregation() { metrics.getRegionMetrics().values().stream().mapToLong(v -> v.getCpRequestCount()).sum()); assertEquals(300, metrics.getRegionMetrics().values().stream() .mapToLong(v -> v.getFilteredReadRequestCount()).sum()); + assertEquals(2, metrics.getRegionMetrics().values().stream() + .mapToLong(v -> (long) v.getCurrentRegionCachedRatio()).count()); + assertEquals(150, metrics.getRegionMetrics().values().stream() + .mapToDouble(v -> v.getRegionSizeMB().get(Size.Unit.MEGABYTE)).sum(), 0); } @Test @@ -99,12 +103,14 @@ private ClusterStatusProtos.ServerLoad createServerLoadProto() { ClusterStatusProtos.RegionLoad.newBuilder().setRegionSpecifier(rSpecOne).setStores(10) .setStorefiles(101).setStoreUncompressedSizeMB(106).setStorefileSizeMB(520) .setFilteredReadRequestsCount(100).setStorefileIndexSizeKB(42).setRootIndexSizeKB(201) - .setReadRequestsCount(Integer.MAX_VALUE).setWriteRequestsCount(Integer.MAX_VALUE).build(); - ClusterStatusProtos.RegionLoad rlTwo = ClusterStatusProtos.RegionLoad.newBuilder() - .setRegionSpecifier(rSpecTwo).setStores(3).setStorefiles(13).setStoreUncompressedSizeMB(23) - .setStorefileSizeMB(300).setFilteredReadRequestsCount(200).setStorefileIndexSizeKB(40) - .setRootIndexSizeKB(303).setReadRequestsCount(Integer.MAX_VALUE) - .setWriteRequestsCount(Integer.MAX_VALUE).setCpRequestsCount(100).build(); + .setReadRequestsCount(Integer.MAX_VALUE).setWriteRequestsCount(Integer.MAX_VALUE) + .setRegionSizeMB(100).setCurrentRegionCachedRatio(0.9f).build(); + ClusterStatusProtos.RegionLoad rlTwo = + ClusterStatusProtos.RegionLoad.newBuilder().setRegionSpecifier(rSpecTwo).setStores(3) + .setStorefiles(13).setStoreUncompressedSizeMB(23).setStorefileSizeMB(300) + .setFilteredReadRequestsCount(200).setStorefileIndexSizeKB(40).setRootIndexSizeKB(303) + .setReadRequestsCount(Integer.MAX_VALUE).setWriteRequestsCount(Integer.MAX_VALUE) + .setCpRequestsCount(100).setRegionSizeMB(50).setCurrentRegionCachedRatio(1.0f).build(); ClusterStatusProtos.ServerLoad sl = ClusterStatusProtos.ServerLoad.newBuilder() .addRegionLoads(rlOne).addRegionLoads(rlTwo).build(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryChore.java index 594db4d7c303..31fcf9fd47f5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryChore.java @@ -400,6 +400,10 @@ public List getTasks() { return null; } + @Override + public Map getRegionCachedInfo() { + return new HashMap<>(); + } }; return serverMetrics; } @@ -541,6 +545,16 @@ public long getBlocksTotalWeight() { public CompactionState getCompactionState() { return null; } + + @Override + public Size getRegionSizeMB() { + return null; + } + + @Override + public float getCurrentRegionCachedRatio() { + return 0.0f; + } }; return regionMetrics; } From 5bac2d74762b01d627eec38495417bb20d58b525 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Tue, 21 Nov 2023 18:42:42 +0000 Subject: [PATCH 3/3] HBASE-27999 Implement cache prefetch aware load balancer (#5527) this commit is part of the rebase of HBASE-28186 Signed-off-by: Wellington Chevreuil Signed-off-by: Tak Lon (Stephen) Wu Co-authored-by: Rahul Agarkar --- .../master/balancer/BalancerClusterState.java | 156 +++++- .../master/balancer/BalancerRegionLoad.java | 12 + .../master/balancer/BaseLoadBalancer.java | 3 +- .../balancer/CacheAwareLoadBalancer.java | 479 ++++++++++++++++++ .../balancer/StochasticLoadBalancer.java | 54 +- .../master/balancer/BalancerTestBase.java | 14 + .../balancer/TestStochasticLoadBalancer.java | 4 + .../org/apache/hadoop/hbase/HConstants.java | 12 + .../hbase/io/hfile/BlockCacheFactory.java | 13 +- .../hbase/io/hfile/bucket/BucketCache.java | 2 + .../io/hfile/bucket/PersistentIOEngine.java | 4 +- .../balancer/TestCacheAwareLoadBalancer.java | 397 +++++++++++++++ ...stCacheAwareLoadBalancerCostFunctions.java | 316 ++++++++++++ ...rWithStochasticLoadBalancerAsInternal.java | 2 + 14 files changed, 1432 insertions(+), 36 deletions(-) create mode 100644 hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancerCostFunctions.java diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java index a7ae8b4d1a5a..4b3809c107cb 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,6 +115,12 @@ class BalancerClusterState { private float[][] rackLocalities; // Maps localityType -> region -> [server|rack]Index with highest locality private int[][] regionsToMostLocalEntities; + // Maps region -> serverIndex -> regionCacheRatio of a region on a server + private Map, Float> regionIndexServerIndexRegionCachedRatio; + // Maps regionIndex -> serverIndex with best region cache ratio + private int[] regionServerIndexWithBestRegionCachedRatio; + // Maps regionName -> oldServerName -> cache ratio of the region on the old server + Map> regionCacheRatioOnOldServerMap; static class DefaultRackManager extends RackManager { @Override @@ -125,13 +132,20 @@ public String getRack(ServerName server) { BalancerClusterState(Map> clusterState, Map> loads, RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager) { - this(null, clusterState, loads, regionFinder, rackManager); + this(null, clusterState, loads, regionFinder, rackManager, null); + } + + protected BalancerClusterState(Map> clusterState, + Map> loads, RegionHDFSBlockLocationFinder regionFinder, + RackManager rackManager, Map> oldRegionServerRegionCacheRatio) { + this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerRegionCacheRatio); } @SuppressWarnings("unchecked") BalancerClusterState(Collection unassignedRegions, Map> clusterState, Map> loads, - RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager) { + RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager, + Map> oldRegionServerRegionCacheRatio) { if (unassignedRegions == null) { unassignedRegions = Collections.emptyList(); } @@ -145,6 +159,8 @@ public String getRack(ServerName server) { tables = new ArrayList<>(); this.rackManager = rackManager != null ? rackManager : new DefaultRackManager(); + this.regionCacheRatioOnOldServerMap = oldRegionServerRegionCacheRatio; + numRegions = 0; List> serversPerHostList = new ArrayList<>(); @@ -541,6 +557,142 @@ private void computeCachedLocalities() { } + /** + * Returns the size of hFiles from the most recent RegionLoad for region + */ + public int getTotalRegionHFileSizeMB(int region) { + Deque load = regionLoads[region]; + if (load == null) { + // This means, that the region has no actual data on disk + return 0; + } + return regionLoads[region].getLast().getRegionSizeMB(); + } + + /** + * Returns the weighted cache ratio of a region on the given region server + */ + public float getOrComputeWeightedRegionCacheRatio(int region, int server) { + return getTotalRegionHFileSizeMB(region) * getOrComputeRegionCacheRatio(region, server); + } + + /** + * Returns the amount by which a region is cached on a given region server. If the region is not + * currently hosted on the given region server, then find out if it was previously hosted there + * and return the old cache ratio. + */ + protected float getRegionCacheRatioOnRegionServer(int region, int regionServerIndex) { + float regionCacheRatio = 0.0f; + + // Get the current region cache ratio if the region is hosted on the server regionServerIndex + for (int regionIndex : regionsPerServer[regionServerIndex]) { + if (region != regionIndex) { + continue; + } + + Deque regionLoadList = regionLoads[regionIndex]; + + // The region is currently hosted on this region server. Get the region cache ratio for this + // region on this server + regionCacheRatio = + regionLoadList == null ? 0.0f : regionLoadList.getLast().getCurrentRegionCacheRatio(); + + return regionCacheRatio; + } + + // Region is not currently hosted on this server. Check if the region was cached on this + // server earlier. This can happen when the server was shutdown and the cache was persisted. + // Search using the region name and server name and not the index id and server id as these ids + // may change when a server is marked as dead or a new server is added. + String regionEncodedName = regions[region].getEncodedName(); + ServerName serverName = servers[regionServerIndex]; + if ( + regionCacheRatioOnOldServerMap != null + && regionCacheRatioOnOldServerMap.containsKey(regionEncodedName) + ) { + Pair cacheRatioOfRegionOnServer = + regionCacheRatioOnOldServerMap.get(regionEncodedName); + if (ServerName.isSameAddress(cacheRatioOfRegionOnServer.getFirst(), serverName)) { + regionCacheRatio = cacheRatioOfRegionOnServer.getSecond(); + if (LOG.isDebugEnabled()) { + LOG.debug("Old cache ratio found for region {} on server {}: {}", regionEncodedName, + serverName, regionCacheRatio); + } + } + } + return regionCacheRatio; + } + + /** + * Populate the maps containing information about how much a region is cached on a region server. + */ + private void computeRegionServerRegionCacheRatio() { + regionIndexServerIndexRegionCachedRatio = new HashMap<>(); + regionServerIndexWithBestRegionCachedRatio = new int[numRegions]; + + for (int region = 0; region < numRegions; region++) { + float bestRegionCacheRatio = 0.0f; + int serverWithBestRegionCacheRatio = 0; + for (int server = 0; server < numServers; server++) { + float regionCacheRatio = getRegionCacheRatioOnRegionServer(region, server); + if (regionCacheRatio > 0.0f || server == regionIndexToServerIndex[region]) { + // A region with cache ratio 0 on a server means nothing. Hence, just make a note of + // cache ratio only if the cache ratio is greater than 0. + Pair regionServerPair = new Pair<>(region, server); + regionIndexServerIndexRegionCachedRatio.put(regionServerPair, regionCacheRatio); + } + if (regionCacheRatio > bestRegionCacheRatio) { + serverWithBestRegionCacheRatio = server; + // If the server currently hosting the region has equal cache ratio to a historical + // server, consider the current server to keep hosting the region + bestRegionCacheRatio = regionCacheRatio; + } else if ( + regionCacheRatio == bestRegionCacheRatio && server == regionIndexToServerIndex[region] + ) { + // If two servers have same region cache ratio, then the server currently hosting the + // region + // should retain the region + serverWithBestRegionCacheRatio = server; + } + } + regionServerIndexWithBestRegionCachedRatio[region] = serverWithBestRegionCacheRatio; + Pair regionServerPair = + new Pair<>(region, regionIndexToServerIndex[region]); + float tempRegionCacheRatio = regionIndexServerIndexRegionCachedRatio.get(regionServerPair); + if (tempRegionCacheRatio > bestRegionCacheRatio) { + LOG.warn( + "INVALID CONDITION: region {} on server {} cache ratio {} is greater than the " + + "best region cache ratio {} on server {}", + regions[region].getEncodedName(), servers[regionIndexToServerIndex[region]], + tempRegionCacheRatio, bestRegionCacheRatio, servers[serverWithBestRegionCacheRatio]); + } + } + } + + protected float getOrComputeRegionCacheRatio(int region, int server) { + if ( + regionServerIndexWithBestRegionCachedRatio == null + || regionIndexServerIndexRegionCachedRatio.isEmpty() + ) { + computeRegionServerRegionCacheRatio(); + } + + Pair regionServerPair = new Pair<>(region, server); + return regionIndexServerIndexRegionCachedRatio.containsKey(regionServerPair) + ? regionIndexServerIndexRegionCachedRatio.get(regionServerPair) + : 0.0f; + } + + public int[] getOrComputeServerWithBestRegionCachedRatio() { + if ( + regionServerIndexWithBestRegionCachedRatio == null + || regionIndexServerIndexRegionCachedRatio.isEmpty() + ) { + computeRegionServerRegionCacheRatio(); + } + return regionServerIndexWithBestRegionCachedRatio; + } + /** * Maps region index to rack index */ diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerRegionLoad.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerRegionLoad.java index ffb36cb8ca1a..33d00e3de862 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerRegionLoad.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerRegionLoad.java @@ -34,6 +34,8 @@ class BalancerRegionLoad { private final long writeRequestsCount; private final int memStoreSizeMB; private final int storefileSizeMB; + private final int regionSizeMB; + private final float currentRegionPrefetchRatio; BalancerRegionLoad(RegionMetrics regionMetrics) { readRequestsCount = regionMetrics.getReadRequestCount(); @@ -41,6 +43,8 @@ class BalancerRegionLoad { writeRequestsCount = regionMetrics.getWriteRequestCount(); memStoreSizeMB = (int) regionMetrics.getMemStoreSize().get(Size.Unit.MEGABYTE); storefileSizeMB = (int) regionMetrics.getStoreFileSize().get(Size.Unit.MEGABYTE); + regionSizeMB = (int) regionMetrics.getRegionSizeMB().get(Size.Unit.MEGABYTE); + currentRegionPrefetchRatio = regionMetrics.getCurrentRegionCachedRatio(); } public long getReadRequestsCount() { @@ -62,4 +66,12 @@ public int getMemStoreSizeMB() { public int getStorefileSizeMB() { return storefileSizeMB; } + + public int getRegionSizeMB() { + return regionSizeMB; + } + + public float getCurrentRegionCacheRatio() { + return currentRegionPrefetchRatio; + } } diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index a4560cc595a2..54516868a0a0 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -232,7 +232,8 @@ private BalancerClusterState createCluster(List servers, clusterState.put(server, Collections.emptyList()); } } - return new BalancerClusterState(regions, clusterState, null, this.regionFinder, rackManager); + return new BalancerClusterState(regions, clusterState, null, this.regionFinder, rackManager, + null); } private List findIdleServers(List servers) { diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java new file mode 100644 index 000000000000..d73769a3971b --- /dev/null +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java @@ -0,0 +1,479 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +/** An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that assigns regions + * based on the amount they are cached on a given server. A region can move across the region + * servers whenever a region server shuts down or crashes. The region server preserves the cache + * periodically and restores the cache when it is restarted. This balancer implements a mechanism + * where it maintains the amount by which a region is cached on a region server. During balancer + * run, a region plan is generated that takes into account this cache information and tries to + * move the regions so that the cache minimally impacted. + */ + +import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterMetrics; +import org.apache.hadoop.hbase.RegionMetrics; +import org.apache.hadoop.hbase.ServerMetrics; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.Size; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class CacheAwareLoadBalancer extends StochasticLoadBalancer { + private static final Logger LOG = LoggerFactory.getLogger(CacheAwareLoadBalancer.class); + + private Configuration configuration; + + public enum GeneratorFunctionType { + LOAD, + CACHE_RATIO + } + + @Override + public synchronized void loadConf(Configuration configuration) { + this.configuration = configuration; + this.costFunctions = new ArrayList<>(); + super.loadConf(configuration); + } + + @Override + protected List createCandidateGenerators() { + List candidateGenerators = new ArrayList<>(2); + candidateGenerators.add(GeneratorFunctionType.LOAD.ordinal(), + new CacheAwareSkewnessCandidateGenerator()); + candidateGenerators.add(GeneratorFunctionType.CACHE_RATIO.ordinal(), + new CacheAwareCandidateGenerator()); + return candidateGenerators; + } + + @Override + protected List createCostFunctions(Configuration configuration) { + List costFunctions = new ArrayList<>(); + addCostFunction(costFunctions, new CacheAwareRegionSkewnessCostFunction(configuration)); + addCostFunction(costFunctions, new CacheAwareCostFunction(configuration)); + return costFunctions; + } + + private void addCostFunction(List costFunctions, CostFunction costFunction) { + if (costFunction.getMultiplier() > 0) { + costFunctions.add(costFunction); + } + } + + @Override + public void updateClusterMetrics(ClusterMetrics clusterMetrics) { + this.clusterStatus = clusterMetrics; + updateRegionLoad(); + } + + /** + * Collect the amount of region cached for all the regions from all the active region servers. + */ + private void updateRegionLoad() { + loads = new HashMap<>(); + regionCacheRatioOnOldServerMap = new HashMap<>(); + Map> regionCacheRatioOnCurrentServerMap = new HashMap<>(); + + // Build current region cache statistics + clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> { + // Create a map of region and the server where it is currently hosted + sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> { + String regionEncodedName = RegionInfo.encodeRegionName(regionName); + + Deque rload = new ArrayDeque<>(); + + // Get the total size of the hFiles in this region + int regionSizeMB = (int) rm.getRegionSizeMB().get(Size.Unit.MEGABYTE); + + rload.add(new BalancerRegionLoad(rm)); + // Maintain a map of region and it's total size. This is needed to calculate the cache + // ratios for the regions cached on old region servers + regionCacheRatioOnCurrentServerMap.put(regionEncodedName, new Pair<>(sn, regionSizeMB)); + loads.put(regionEncodedName, rload); + }); + }); + + // Build cache statistics for the regions hosted previously on old region servers + clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> { + // Find if a region was previously hosted on a server other than the one it is currently + // hosted on. + sm.getRegionCachedInfo().forEach((String regionEncodedName, Integer regionSizeInCache) -> { + // If the region is found in regionCacheRatioOnCurrentServerMap, it is currently hosted on + // this server + if (regionCacheRatioOnCurrentServerMap.containsKey(regionEncodedName)) { + ServerName currentServer = + regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getFirst(); + if (!ServerName.isSameAddress(currentServer, sn)) { + int regionSizeMB = + regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getSecond(); + float regionCacheRatioOnOldServer = + regionSizeMB == 0 ? 0.0f : (float) regionSizeInCache / regionSizeMB; + regionCacheRatioOnOldServerMap.put(regionEncodedName, + new Pair<>(sn, regionCacheRatioOnOldServer)); + } + } + }); + }); + } + + private RegionInfo getRegionInfoByEncodedName(BalancerClusterState cluster, String regionName) { + Optional regionInfoOptional = + Arrays.stream(cluster.regions).filter((RegionInfo ri) -> { + return regionName.equals(ri.getEncodedName()); + }).findFirst(); + + if (regionInfoOptional.isPresent()) { + return regionInfoOptional.get(); + } + return null; + } + + private class CacheAwareCandidateGenerator extends CandidateGenerator { + @Override + protected BalanceAction generate(BalancerClusterState cluster) { + // Move the regions to the servers they were previously hosted on based on the cache ratio + if ( + !regionCacheRatioOnOldServerMap.isEmpty() + && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext() + ) { + Map.Entry> regionCacheRatioServerMap = + regionCacheRatioOnOldServerMap.entrySet().iterator().next(); + // Get the server where this region was previously hosted + String regionEncodedName = regionCacheRatioServerMap.getKey(); + RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName); + if (regionInfo == null) { + LOG.warn("Region {} not found", regionEncodedName); + regionCacheRatioOnOldServerMap.remove(regionEncodedName); + return BalanceAction.NULL_ACTION; + } + if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) { + regionCacheRatioOnOldServerMap.remove(regionEncodedName); + return BalanceAction.NULL_ACTION; + } + int regionIndex = cluster.regionsToIndex.get(regionInfo); + int oldServerIndex = cluster.serversToIndex + .get(regionCacheRatioOnOldServerMap.get(regionEncodedName).getFirst().getAddress()); + if (oldServerIndex < 0) { + LOG.warn("Server previously hosting region {} not found", regionEncodedName); + regionCacheRatioOnOldServerMap.remove(regionEncodedName); + return BalanceAction.NULL_ACTION; + } + + float oldRegionCacheRatio = + cluster.getOrComputeRegionCacheRatio(regionIndex, oldServerIndex); + int currentServerIndex = cluster.regionIndexToServerIndex[regionIndex]; + float currentRegionCacheRatio = + cluster.getOrComputeRegionCacheRatio(regionIndex, currentServerIndex); + + BalanceAction action = generatePlan(cluster, regionIndex, currentServerIndex, + currentRegionCacheRatio, oldServerIndex, oldRegionCacheRatio); + regionCacheRatioOnOldServerMap.remove(regionEncodedName); + return action; + } + return BalanceAction.NULL_ACTION; + } + + private BalanceAction generatePlan(BalancerClusterState cluster, int regionIndex, + int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex, + float cacheRatioOnOldServer) { + return moveRegionToOldServer(cluster, regionIndex, currentServerIndex, + cacheRatioOnCurrentServer, oldServerIndex, cacheRatioOnOldServer) + ? getAction(currentServerIndex, regionIndex, oldServerIndex, -1) + : BalanceAction.NULL_ACTION; + } + + private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIndex, + int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex, + float cacheRatioOnOldServer) { + // Find if the region has already moved by comparing the current server index with the + // current server index. This can happen when other candidate generator has moved the region + if (currentServerIndex < 0 || oldServerIndex < 0) { + return false; + } + + float cacheRatioDiffThreshold = 0.6f; + + // Conditions for moving the region + + // If the region is fully cached on the old server, move the region back + if (cacheRatioOnOldServer == 1.0f) { + if (LOG.isDebugEnabled()) { + LOG.debug("Region {} moved to the old server {} as it is fully cached there", + cluster.regions[regionIndex].getEncodedName(), cluster.servers[oldServerIndex]); + } + return true; + } + + // Move the region back to the old server if it is cached equally on both the servers + if (cacheRatioOnCurrentServer == cacheRatioOnOldServer) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Region {} moved from {} to {} as the region is cached {} equally on both servers", + cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex], + cluster.servers[oldServerIndex], cacheRatioOnCurrentServer); + } + return true; + } + + // If the region is not fully cached on either of the servers, move the region back to the + // old server if the region cache ratio on the current server is still much less than the old + // server + if ( + cacheRatioOnOldServer > 0.0f + && cacheRatioOnCurrentServer / cacheRatioOnOldServer < cacheRatioDiffThreshold + ) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Region {} moved from {} to {} as region cache ratio {} is better than the current " + + "cache ratio {}", + cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex], + cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, cacheRatioOnOldServer); + } + return true; + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Region {} not moved from {} to {} with current cache ratio {} and old cache ratio {}", + cluster.regions[regionIndex], cluster.servers[currentServerIndex], + cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, cacheRatioOnOldServer); + } + return false; + } + } + + private class CacheAwareSkewnessCandidateGenerator extends LoadCandidateGenerator { + @Override + BalanceAction pickRandomRegions(BalancerClusterState cluster, int thisServer, int otherServer) { + // First move all the regions which were hosted previously on some other server back to their + // old servers + if ( + !regionCacheRatioOnOldServerMap.isEmpty() + && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext() + ) { + // Get the first region index in the historical cache ratio list + Map.Entry> regionEntry = + regionCacheRatioOnOldServerMap.entrySet().iterator().next(); + String regionEncodedName = regionEntry.getKey(); + + RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName); + if (regionInfo == null) { + LOG.warn("Region {} does not exist", regionEncodedName); + regionCacheRatioOnOldServerMap.remove(regionEncodedName); + return BalanceAction.NULL_ACTION; + } + if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) { + regionCacheRatioOnOldServerMap.remove(regionEncodedName); + return BalanceAction.NULL_ACTION; + } + + int regionIndex = cluster.regionsToIndex.get(regionInfo); + + // Get the current host name for this region + thisServer = cluster.regionIndexToServerIndex[regionIndex]; + + // Get the old server index + otherServer = cluster.serversToIndex.get(regionEntry.getValue().getFirst().getAddress()); + + regionCacheRatioOnOldServerMap.remove(regionEncodedName); + + if (otherServer < 0) { + // The old server has been moved to other host and hence, the region cannot be moved back + // to the old server + if (LOG.isDebugEnabled()) { + LOG.debug( + "CacheAwareSkewnessCandidateGenerator: Region {} not moved to the old " + + "server {} as the server does not exist", + regionEncodedName, regionEntry.getValue().getFirst().getHostname()); + } + return BalanceAction.NULL_ACTION; + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it " + + "was hosted their earlier", + regionEncodedName, cluster.servers[thisServer].getHostname(), + cluster.servers[otherServer].getHostname()); + } + + return getAction(thisServer, regionIndex, otherServer, -1); + } + + if (thisServer < 0 || otherServer < 0) { + return BalanceAction.NULL_ACTION; + } + + int regionIndexToMove = pickLeastCachedRegion(cluster, thisServer); + if (regionIndexToMove < 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("CacheAwareSkewnessCandidateGenerator: No region found for movement"); + } + return BalanceAction.NULL_ACTION; + } + if (LOG.isDebugEnabled()) { + LOG.debug( + "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it is " + + "least cached on current server", + cluster.regions[regionIndexToMove].getEncodedName(), + cluster.servers[thisServer].getHostname(), cluster.servers[otherServer].getHostname()); + } + return getAction(thisServer, regionIndexToMove, otherServer, -1); + } + + private int pickLeastCachedRegion(BalancerClusterState cluster, int thisServer) { + float minCacheRatio = Float.MAX_VALUE; + int leastCachedRegion = -1; + for (int i = 0; i < cluster.regionsPerServer[thisServer].length; i++) { + int regionIndex = cluster.regionsPerServer[thisServer][i]; + + float cacheRatioOnCurrentServer = + cluster.getOrComputeRegionCacheRatio(regionIndex, thisServer); + if (cacheRatioOnCurrentServer < minCacheRatio) { + minCacheRatio = cacheRatioOnCurrentServer; + leastCachedRegion = regionIndex; + } + } + return leastCachedRegion; + } + } + + static class CacheAwareRegionSkewnessCostFunction extends CostFunction { + static final String REGION_COUNT_SKEW_COST_KEY = + "hbase.master.balancer.stochastic.regionCountCost"; + static final float DEFAULT_REGION_COUNT_SKEW_COST = 20; + private final DoubleArrayCost cost = new DoubleArrayCost(); + + CacheAwareRegionSkewnessCostFunction(Configuration conf) { + // Load multiplier should be the greatest as it is the most general way to balance data. + this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST)); + } + + @Override + void prepare(BalancerClusterState cluster) { + super.prepare(cluster); + cost.prepare(cluster.numServers); + cost.applyCostsChange(costs -> { + for (int i = 0; i < cluster.numServers; i++) { + costs[i] = cluster.regionsPerServer[i].length; + } + }); + } + + @Override + protected double cost() { + return cost.cost(); + } + + @Override + protected void regionMoved(int region, int oldServer, int newServer) { + cost.applyCostsChange(costs -> { + costs[oldServer] = cluster.regionsPerServer[oldServer].length; + costs[newServer] = cluster.regionsPerServer[newServer].length; + }); + } + + public final void updateWeight(double[] weights) { + weights[GeneratorFunctionType.LOAD.ordinal()] += cost(); + } + } + + static class CacheAwareCostFunction extends CostFunction { + private static final String CACHE_COST_KEY = "hbase.master.balancer.stochastic.cacheCost"; + private double cacheRatio; + private double bestCacheRatio; + + private static final float DEFAULT_CACHE_COST = 20; + + CacheAwareCostFunction(Configuration conf) { + boolean isPersistentCache = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY) != null; + // Disable the CacheAwareCostFunction if the cached file list persistence is not enabled + this.setMultiplier( + !isPersistentCache ? 0.0f : conf.getFloat(CACHE_COST_KEY, DEFAULT_CACHE_COST)); + bestCacheRatio = 0.0; + cacheRatio = 0.0; + } + + @Override + void prepare(BalancerClusterState cluster) { + super.prepare(cluster); + cacheRatio = 0.0; + bestCacheRatio = 0.0; + + for (int region = 0; region < cluster.numRegions; region++) { + cacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region, + cluster.regionIndexToServerIndex[region]); + bestCacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region, + getServerWithBestCacheRatioForRegion(region)); + } + + cacheRatio = bestCacheRatio == 0 ? 1.0 : cacheRatio / bestCacheRatio; + if (LOG.isDebugEnabled()) { + LOG.debug("CacheAwareCostFunction: Cost: {}", 1 - cacheRatio); + } + } + + @Override + protected double cost() { + return scale(0, 1, 1 - cacheRatio); + } + + @Override + protected void regionMoved(int region, int oldServer, int newServer) { + double regionCacheRatioOnOldServer = + cluster.getOrComputeWeightedRegionCacheRatio(region, oldServer); + double regionCacheRatioOnNewServer = + cluster.getOrComputeWeightedRegionCacheRatio(region, newServer); + double cacheRatioDiff = regionCacheRatioOnNewServer - regionCacheRatioOnOldServer; + double normalizedDelta = bestCacheRatio == 0.0 ? 0.0 : cacheRatioDiff / bestCacheRatio; + cacheRatio += normalizedDelta; + if (LOG.isDebugEnabled() && (cacheRatio < 0.0 || cacheRatio > 1.0)) { + LOG.debug( + "CacheAwareCostFunction:regionMoved:region:{}:from:{}:to:{}:regionCacheRatioOnOldServer:{}:" + + "regionCacheRatioOnNewServer:{}:bestRegionCacheRatio:{}:cacheRatio:{}", + cluster.regions[region].getEncodedName(), cluster.servers[oldServer].getHostname(), + cluster.servers[newServer].getHostname(), regionCacheRatioOnOldServer, + regionCacheRatioOnNewServer, bestCacheRatio, cacheRatio); + } + } + + private int getServerWithBestCacheRatioForRegion(int region) { + return cluster.getOrComputeServerWithBestRegionCachedRatio()[region]; + } + + @Override + public final void updateWeight(double[] weights) { + weights[GeneratorFunctionType.CACHE_RATIO.ordinal()] += cost(); + } + } +} diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index edf049e8a718..e5cd5446c5c8 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -136,8 +137,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { private long maxRunningTime = DEFAULT_MAX_RUNNING_TIME; private int numRegionLoadsToRemember = DEFAULT_KEEP_REGION_LOADS; private float minCostNeedBalance = DEFAULT_MIN_COST_NEED_BALANCE; + Map> regionCacheRatioOnOldServerMap = new HashMap<>(); - private List costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC + protected List costFunctions; // FindBugs: Wants this protected; + // IS2_INCONSISTENT_SYNC // To save currently configed sum of multiplier. Defaulted at 1 for cases that carry high cost private float sumMultiplier; // to save and report costs to JMX @@ -224,6 +227,24 @@ protected List createCandidateGenerators() { return candidateGenerators; } + protected List createCostFunctions(Configuration conf) { + List costFunctions = new ArrayList<>(); + addCostFunction(costFunctions, new RegionCountSkewCostFunction(conf)); + addCostFunction(costFunctions, new PrimaryRegionCountSkewCostFunction(conf)); + addCostFunction(costFunctions, new MoveCostFunction(conf, provider)); + addCostFunction(costFunctions, localityCost); + addCostFunction(costFunctions, rackLocalityCost); + addCostFunction(costFunctions, new TableSkewCostFunction(conf)); + addCostFunction(costFunctions, regionReplicaHostCostFunction); + addCostFunction(costFunctions, regionReplicaRackCostFunction); + addCostFunction(costFunctions, new ReadRequestCostFunction(conf)); + addCostFunction(costFunctions, new CPRequestCostFunction(conf)); + addCostFunction(costFunctions, new WriteRequestCostFunction(conf)); + addCostFunction(costFunctions, new MemStoreSizeCostFunction(conf)); + addCostFunction(costFunctions, new StoreFileCostFunction(conf)); + return costFunctions; + } + @Override protected void loadConf(Configuration conf) { super.loadConf(conf); @@ -242,20 +263,7 @@ protected void loadConf(Configuration conf) { regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf); regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf); - costFunctions = new ArrayList<>(); - addCostFunction(new RegionCountSkewCostFunction(conf)); - addCostFunction(new PrimaryRegionCountSkewCostFunction(conf)); - addCostFunction(new MoveCostFunction(conf, provider)); - addCostFunction(localityCost); - addCostFunction(rackLocalityCost); - addCostFunction(new TableSkewCostFunction(conf)); - addCostFunction(regionReplicaHostCostFunction); - addCostFunction(regionReplicaRackCostFunction); - addCostFunction(new ReadRequestCostFunction(conf)); - addCostFunction(new CPRequestCostFunction(conf)); - addCostFunction(new WriteRequestCostFunction(conf)); - addCostFunction(new MemStoreSizeCostFunction(conf)); - addCostFunction(new StoreFileCostFunction(conf)); + this.costFunctions = createCostFunctions(conf); loadCustomCostFunctions(conf); curFunctionCosts = new double[costFunctions.size()]; @@ -459,8 +467,8 @@ protected List balanceTable(TableName tableName, // The clusterState that is given to this method contains the state // of all the regions in the table(s) (that's true today) // Keep track of servers to iterate through them. - BalancerClusterState cluster = - new BalancerClusterState(loadOfOneTable, loads, finder, rackManager); + BalancerClusterState cluster = new BalancerClusterState(loadOfOneTable, loads, finder, + rackManager, regionCacheRatioOnOldServerMap); long startTime = EnvironmentEdgeManager.currentTime(); @@ -568,7 +576,7 @@ protected List balanceTable(TableName tableName, return null; } - private void sendRejectionReasonToRingBuffer(Supplier reason, + protected void sendRejectionReasonToRingBuffer(Supplier reason, List costFunctions) { provider.recordBalancerRejection(() -> { BalancerRejection.Builder builder = new BalancerRejection.Builder().setReason(reason.get()); @@ -627,14 +635,14 @@ private void updateStochasticCosts(TableName tableName, double overall, double[] } } - private void addCostFunction(CostFunction costFunction) { + private void addCostFunction(List costFunctions, CostFunction costFunction) { float multiplier = costFunction.getMultiplier(); if (multiplier > 0) { costFunctions.add(costFunction); } } - private String functionCost() { + protected String functionCost() { StringBuilder builder = new StringBuilder(); for (CostFunction c : costFunctions) { builder.append(c.getClass().getSimpleName()); @@ -655,6 +663,12 @@ private String functionCost() { return builder.toString(); } + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") + List getCostFunctions() { + return costFunctions; + } + private String totalCostsPerFunc() { StringBuilder builder = new StringBuilder(); for (CostFunction c : costFunctions) { diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java index 9ea1c94d1e09..4a996e7796f5 100644 --- a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java +++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NavigableSet; @@ -376,6 +377,19 @@ protected TreeMap> mockClusterServers(int[] mockClu return servers; } + protected Map> mockClusterServersUnsorted(int[] mockCluster, + int numTables) { + int numServers = mockCluster.length; + Map> servers = new LinkedHashMap<>(); + for (int i = 0; i < numServers; i++) { + int numRegions = mockCluster[i]; + ServerAndLoad sal = randomServer(0); + List regions = randomRegions(numRegions, numTables); + servers.put(sal.getServerName(), regions); + } + return servers; + } + protected TreeMap> mockUniformClusterServers(int[] mockCluster) { int numServers = mockCluster.length; TreeMap> servers = new TreeMap<>(); diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java index 21f3a3b66c9a..cc16cfe2ec83 100644 --- a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java +++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java @@ -139,6 +139,8 @@ private ServerMetrics mockServerMetricsWithCpRequests(List regionsOn when(rl.getWriteRequestCount()).thenReturn(0L); when(rl.getMemStoreSize()).thenReturn(Size.ZERO); when(rl.getStoreFileSize()).thenReturn(Size.ZERO); + when(rl.getRegionSizeMB()).thenReturn(Size.ZERO); + when(rl.getCurrentRegionCachedRatio()).thenReturn(0.0f); regionLoadMap.put(info.getRegionName(), rl); } when(serverMetrics.getRegionMetrics()).thenReturn(regionLoadMap); @@ -213,6 +215,8 @@ public void testKeepRegionLoad() throws Exception { when(rl.getWriteRequestCount()).thenReturn(0L); when(rl.getMemStoreSize()).thenReturn(Size.ZERO); when(rl.getStoreFileSize()).thenReturn(new Size(i, Size.Unit.MEGABYTE)); + when(rl.getRegionSizeMB()).thenReturn(Size.ZERO); + when(rl.getCurrentRegionCachedRatio()).thenReturn(0.0f); Map regionLoadMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); regionLoadMap.put(Bytes.toBytes(REGION_KEY), rl); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 12479979b2ba..2aa9ecf69ec4 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1335,6 +1335,18 @@ public enum OperationStatusCode { */ public static final String BUCKET_CACHE_SIZE_KEY = "hbase.bucketcache.size"; + /** + * If the chosen ioengine can persist its state across restarts, the path to the file to persist + * to. This file is NOT the data file. It is a file into which we will serialize the map of what + * is in the data file. For example, if you pass the following argument as + * BUCKET_CACHE_IOENGINE_KEY ("hbase.bucketcache.ioengine"), + * file:/tmp/bucketcache.data , then we will write the bucketcache data to the file + * /tmp/bucketcache.data but the metadata on where the data is in the supplied file + * is an in-memory map that needs to be persisted across restarts. Where to store this in-memory + * state is what you supply here: e.g. /tmp/bucketcache.map. + */ + public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = "hbase.bucketcache.persistent.path"; + /** * HConstants for fast fail on the client side follow */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java index 38a296aad523..6956d584d92a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; +import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY; import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; import java.io.IOException; @@ -47,18 +48,6 @@ public final class BlockCacheFactory { public static final String BLOCKCACHE_POLICY_KEY = "hfile.block.cache.policy"; public static final String BLOCKCACHE_POLICY_DEFAULT = "LRU"; - /** - * If the chosen ioengine can persist its state across restarts, the path to the file to persist - * to. This file is NOT the data file. It is a file into which we will serialize the map of what - * is in the data file. For example, if you pass the following argument as - * BUCKET_CACHE_IOENGINE_KEY ("hbase.bucketcache.ioengine"), - * file:/tmp/bucketcache.data , then we will write the bucketcache data to the file - * /tmp/bucketcache.data but the metadata on where the data is in the supplied file - * is an in-memory map that needs to be persisted across restarts. Where to store this in-memory - * state is what you supply here: e.g. /tmp/bucketcache.map. - */ - public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = "hbase.bucketcache.persistent.path"; - public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads"; public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = "hbase.bucketcache.writer.queuelength"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index ca7750c92c56..f321d034bc6b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -347,6 +347,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck fullyCachedFiles.clear(); backingMapValidated.set(true); bucketAllocator = new BucketAllocator(capacity, bucketSizes); + regionCachedSizeMap.clear(); } } else { bucketAllocator = new BucketAllocator(capacity, bucketSizes); @@ -1517,6 +1518,7 @@ private void disableCache() { // If persistent ioengine and a path, we will serialize out the backingMap. this.backingMap.clear(); this.fullyCachedFiles.clear(); + this.regionCachedSizeMap.clear(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java index 495814fdc5fe..88b8d62923ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.Shell; import org.apache.yetus.audience.InterfaceAudience; @@ -91,7 +92,8 @@ protected byte[] calculateChecksum(String algorithm) { private static long getFileSize(String filePath) throws IOException { DU.setExecCommand(filePath); DU.execute(); - return Long.parseLong(DU.getOutput().split("\t")[0]); + String size = DU.getOutput().split("\t")[0]; + return StringUtils.isEmpty(size.trim()) ? 0 : Long.parseLong(size); } private static class DuFileCommand extends Shell.ShellCommandExecutor { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java new file mode 100644 index 000000000000..3ecd8dc7cfd0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterMetrics; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.RegionMetrics; +import org.apache.hadoop.hbase.ServerMetrics; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.Size; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + +@Category({ LargeTests.class }) +public class TestCacheAwareLoadBalancer extends BalancerTestBase { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCacheAwareLoadBalancer.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestCacheAwareLoadBalancer.class); + + private static CacheAwareLoadBalancer loadBalancer; + + static List servers; + + static List tableDescs; + + static Map tableMap = new HashMap<>(); + + static TableName[] tables = new TableName[] { TableName.valueOf("dt1"), TableName.valueOf("dt2"), + TableName.valueOf("dt3"), TableName.valueOf("dt4") }; + + private static List generateServers(int numServers) { + List servers = new ArrayList<>(numServers); + Random rand = ThreadLocalRandom.current(); + for (int i = 0; i < numServers; i++) { + String host = "server" + rand.nextInt(100000); + int port = rand.nextInt(60000); + servers.add(ServerName.valueOf(host, port, -1)); + } + return servers; + } + + private static List constructTableDesc(boolean hasBogusTable) { + List tds = Lists.newArrayList(); + for (int i = 0; i < tables.length; i++) { + TableDescriptor htd = TableDescriptorBuilder.newBuilder(tables[i]).build(); + tds.add(htd); + } + return tds; + } + + private ServerMetrics mockServerMetricsWithRegionCacheInfo(ServerName server, + List regionsOnServer, float currentCacheRatio, List oldRegionCacheInfo, + int oldRegionCachedSize, int regionSize) { + ServerMetrics serverMetrics = mock(ServerMetrics.class); + Map regionLoadMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (RegionInfo info : regionsOnServer) { + RegionMetrics rl = mock(RegionMetrics.class); + when(rl.getReadRequestCount()).thenReturn(0L); + when(rl.getWriteRequestCount()).thenReturn(0L); + when(rl.getMemStoreSize()).thenReturn(Size.ZERO); + when(rl.getStoreFileSize()).thenReturn(Size.ZERO); + when(rl.getCurrentRegionCachedRatio()).thenReturn(currentCacheRatio); + when(rl.getRegionSizeMB()).thenReturn(new Size(regionSize, Size.Unit.MEGABYTE)); + regionLoadMap.put(info.getRegionName(), rl); + } + when(serverMetrics.getRegionMetrics()).thenReturn(regionLoadMap); + Map oldCacheRatioMap = new HashMap<>(); + for (RegionInfo info : oldRegionCacheInfo) { + oldCacheRatioMap.put(info.getEncodedName(), oldRegionCachedSize); + } + when(serverMetrics.getRegionCachedInfo()).thenReturn(oldCacheRatioMap); + return serverMetrics; + } + + @BeforeClass + public static void beforeAllTests() throws Exception { + servers = generateServers(3); + tableDescs = constructTableDesc(false); + Configuration conf = HBaseConfiguration.create(); + conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); + loadBalancer = new CacheAwareLoadBalancer(); + loadBalancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf)); + loadBalancer.loadConf(conf); + } + + @Test + public void testRegionsNotCachedOnOldServerAndCurrentServer() throws Exception { + // The regions are not cached on old server as well as the current server. This causes + // skewness in the region allocation which should be fixed by the balancer + + Map> clusterState = new HashMap<>(); + ServerName server0 = servers.get(0); + ServerName server1 = servers.get(1); + ServerName server2 = servers.get(2); + + // Simulate that the regions previously hosted by server1 are now hosted on server0 + List regionsOnServer0 = randomRegions(10); + List regionsOnServer1 = randomRegions(0); + List regionsOnServer2 = randomRegions(5); + + clusterState.put(server0, regionsOnServer0); + clusterState.put(server1, regionsOnServer1); + clusterState.put(server2, regionsOnServer2); + + // Mock cluster metrics + Map serverMetricsMap = new TreeMap<>(); + serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, + 0.0f, new ArrayList<>(), 0, 10)); + serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, + 0.0f, new ArrayList<>(), 0, 10)); + serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, + 0.0f, new ArrayList<>(), 0, 10)); + ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); + when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); + loadBalancer.updateClusterMetrics(clusterMetrics); + + Map>> LoadOfAllTable = + (Map) mockClusterServersWithTables(clusterState); + List plans = loadBalancer.balanceCluster(LoadOfAllTable); + Set regionsMovedFromServer0 = new HashSet<>(); + Map> targetServers = new HashMap<>(); + for (RegionPlan plan : plans) { + if (plan.getSource().equals(server0)) { + regionsMovedFromServer0.add(plan.getRegionInfo()); + if (!targetServers.containsKey(plan.getDestination())) { + targetServers.put(plan.getDestination(), new ArrayList<>()); + } + targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); + } + } + // should move 5 regions from server0 to server 1 + assertEquals(5, regionsMovedFromServer0.size()); + assertEquals(5, targetServers.get(server1).size()); + } + + @Test + public void testRegionsPartiallyCachedOnOldServerAndNotCachedOnCurrentServer() throws Exception { + // The regions are partially cached on old server but not cached on the current server + + Map> clusterState = new HashMap<>(); + ServerName server0 = servers.get(0); + ServerName server1 = servers.get(1); + ServerName server2 = servers.get(2); + + // Simulate that the regions previously hosted by server1 are now hosted on server0 + List regionsOnServer0 = randomRegions(10); + List regionsOnServer1 = randomRegions(0); + List regionsOnServer2 = randomRegions(5); + + clusterState.put(server0, regionsOnServer0); + clusterState.put(server1, regionsOnServer1); + clusterState.put(server2, regionsOnServer2); + + // Mock cluster metrics + + // Mock 5 regions from server0 were previously hosted on server1 + List oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1); + + Map serverMetricsMap = new TreeMap<>(); + serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, + 0.0f, new ArrayList<>(), 0, 10)); + serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, + 0.0f, oldCachedRegions, 6, 10)); + serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, + 0.0f, new ArrayList<>(), 0, 10)); + ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); + when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); + loadBalancer.updateClusterMetrics(clusterMetrics); + + Map>> LoadOfAllTable = + (Map) mockClusterServersWithTables(clusterState); + List plans = loadBalancer.balanceCluster(LoadOfAllTable); + Set regionsMovedFromServer0 = new HashSet<>(); + Map> targetServers = new HashMap<>(); + for (RegionPlan plan : plans) { + if (plan.getSource().equals(server0)) { + regionsMovedFromServer0.add(plan.getRegionInfo()); + if (!targetServers.containsKey(plan.getDestination())) { + targetServers.put(plan.getDestination(), new ArrayList<>()); + } + targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); + } + } + // should move 5 regions from server0 to server1 + assertEquals(5, regionsMovedFromServer0.size()); + assertEquals(5, targetServers.get(server1).size()); + assertTrue(targetServers.get(server1).containsAll(oldCachedRegions)); + } + + @Test + public void testRegionsFullyCachedOnOldServerAndNotCachedOnCurrentServers() throws Exception { + // The regions are fully cached on old server + + Map> clusterState = new HashMap<>(); + ServerName server0 = servers.get(0); + ServerName server1 = servers.get(1); + ServerName server2 = servers.get(2); + + // Simulate that the regions previously hosted by server1 are now hosted on server0 + List regionsOnServer0 = randomRegions(10); + List regionsOnServer1 = randomRegions(0); + List regionsOnServer2 = randomRegions(5); + + clusterState.put(server0, regionsOnServer0); + clusterState.put(server1, regionsOnServer1); + clusterState.put(server2, regionsOnServer2); + + // Mock cluster metrics + + // Mock 5 regions from server0 were previously hosted on server1 + List oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1); + + Map serverMetricsMap = new TreeMap<>(); + serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, + 0.0f, new ArrayList<>(), 0, 10)); + serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, + 0.0f, oldCachedRegions, 10, 10)); + serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, + 0.0f, new ArrayList<>(), 0, 10)); + ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); + when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); + loadBalancer.updateClusterMetrics(clusterMetrics); + + Map>> LoadOfAllTable = + (Map) mockClusterServersWithTables(clusterState); + List plans = loadBalancer.balanceCluster(LoadOfAllTable); + Set regionsMovedFromServer0 = new HashSet<>(); + Map> targetServers = new HashMap<>(); + for (RegionPlan plan : plans) { + if (plan.getSource().equals(server0)) { + regionsMovedFromServer0.add(plan.getRegionInfo()); + if (!targetServers.containsKey(plan.getDestination())) { + targetServers.put(plan.getDestination(), new ArrayList<>()); + } + targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); + } + } + // should move 5 regions from server0 to server1 + assertEquals(5, regionsMovedFromServer0.size()); + assertEquals(5, targetServers.get(server1).size()); + assertTrue(targetServers.get(server1).containsAll(oldCachedRegions)); + } + + @Test + public void testRegionsFullyCachedOnOldAndCurrentServers() throws Exception { + // The regions are fully cached on old server + + Map> clusterState = new HashMap<>(); + ServerName server0 = servers.get(0); + ServerName server1 = servers.get(1); + ServerName server2 = servers.get(2); + + // Simulate that the regions previously hosted by server1 are now hosted on server0 + List regionsOnServer0 = randomRegions(10); + List regionsOnServer1 = randomRegions(0); + List regionsOnServer2 = randomRegions(5); + + clusterState.put(server0, regionsOnServer0); + clusterState.put(server1, regionsOnServer1); + clusterState.put(server2, regionsOnServer2); + + // Mock cluster metrics + + // Mock 5 regions from server0 were previously hosted on server1 + List oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1); + + Map serverMetricsMap = new TreeMap<>(); + serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, + 1.0f, new ArrayList<>(), 0, 10)); + serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, + 1.0f, oldCachedRegions, 10, 10)); + serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, + 1.0f, new ArrayList<>(), 0, 10)); + ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); + when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); + loadBalancer.updateClusterMetrics(clusterMetrics); + + Map>> LoadOfAllTable = + (Map) mockClusterServersWithTables(clusterState); + List plans = loadBalancer.balanceCluster(LoadOfAllTable); + Set regionsMovedFromServer0 = new HashSet<>(); + Map> targetServers = new HashMap<>(); + for (RegionPlan plan : plans) { + if (plan.getSource().equals(server0)) { + regionsMovedFromServer0.add(plan.getRegionInfo()); + if (!targetServers.containsKey(plan.getDestination())) { + targetServers.put(plan.getDestination(), new ArrayList<>()); + } + targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); + } + } + // should move 5 regions from server0 to server1 + assertEquals(5, regionsMovedFromServer0.size()); + assertEquals(5, targetServers.get(server1).size()); + assertTrue(targetServers.get(server1).containsAll(oldCachedRegions)); + } + + @Test + public void testRegionsPartiallyCachedOnOldServerAndCurrentServer() throws Exception { + // The regions are partially cached on old server + + Map> clusterState = new HashMap<>(); + ServerName server0 = servers.get(0); + ServerName server1 = servers.get(1); + ServerName server2 = servers.get(2); + + // Simulate that the regions previously hosted by server1 are now hosted on server0 + List regionsOnServer0 = randomRegions(10); + List regionsOnServer1 = randomRegions(0); + List regionsOnServer2 = randomRegions(5); + + clusterState.put(server0, regionsOnServer0); + clusterState.put(server1, regionsOnServer1); + clusterState.put(server2, regionsOnServer2); + + // Mock cluster metrics + + // Mock 5 regions from server0 were previously hosted on server1 + List oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1); + + Map serverMetricsMap = new TreeMap<>(); + serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, + 0.2f, new ArrayList<>(), 0, 10)); + serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, + 0.0f, oldCachedRegions, 6, 10)); + serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, + 1.0f, new ArrayList<>(), 0, 10)); + ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); + when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); + loadBalancer.updateClusterMetrics(clusterMetrics); + + Map>> LoadOfAllTable = + (Map) mockClusterServersWithTables(clusterState); + List plans = loadBalancer.balanceCluster(LoadOfAllTable); + Set regionsMovedFromServer0 = new HashSet<>(); + Map> targetServers = new HashMap<>(); + for (RegionPlan plan : plans) { + if (plan.getSource().equals(server0)) { + regionsMovedFromServer0.add(plan.getRegionInfo()); + if (!targetServers.containsKey(plan.getDestination())) { + targetServers.put(plan.getDestination(), new ArrayList<>()); + } + targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); + } + } + assertEquals(5, regionsMovedFromServer0.size()); + assertEquals(5, targetServers.get(server1).size()); + assertTrue(targetServers.get(server1).containsAll(oldCachedRegions)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancerCostFunctions.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancerCostFunctions.java new file mode 100644 index 000000000000..448e576b1bc7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancerCostFunctions.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestCacheAwareLoadBalancerCostFunctions extends StochasticBalancerTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCacheAwareLoadBalancerCostFunctions.class); + + // Mapping of test -> expected cache cost + private final float[] expectedCacheCost = { 0.0f, 0.0f, 0.5f, 1.0f, 0.0f, 0.572f, 0.0f, 0.075f }; + + /** + * Data set to testCacheCost: [test][0][0] = mapping of server to number of regions it hosts + * [test][region + 1][0] = server that region is hosted on [test][region + 1][server + 1] = size + * of region cached on server + */ + private final int[][][] clusterRegionCacheRatioMocks = new int[][][] { + // Test 1: each region is entirely on server that hosts it + // Cost of moving the regions in this case should be high as the regions are fully cached + // on the server they are currently hosted on + new int[][] { new int[] { 2, 1, 1 }, // Server 0 has 2, server 1 has 1 and server 2 has 1 + // region(s) hosted respectively + new int[] { 0, 100, 0, 0 }, // region 0 is hosted and cached only on server 0 + new int[] { 0, 100, 0, 0 }, // region 1 is hosted and cached only on server 0 + new int[] { 1, 0, 100, 0 }, // region 2 is hosted and cached only on server 1 + new int[] { 2, 0, 0, 100 }, // region 3 is hosted and cached only on server 2 + }, + + // Test 2: each region is cached completely on the server it is currently hosted on, + // but it was also cached on some other server historically + // Cost of moving the regions in this case should be high as the regions are fully cached + // on the server they are currently hosted on. Although, the regions were previously hosted and + // cached on some other server, since they are completely cached on the new server, + // there is no need to move the regions back to the previously hosting cluster + new int[][] { new int[] { 1, 2, 1 }, // Server 0 has 1, server 1 has 2 and server 2 has 1 + // region(s) hosted respectively + new int[] { 0, 100, 0, 100 }, // region 0 is hosted and currently cached on server 0, + // but previously cached completely on server 2 + new int[] { 1, 100, 100, 0 }, // region 1 is hosted and currently cached on server 1, + // but previously cached completely on server 0 + new int[] { 1, 0, 100, 100 }, // region 2 is hosted and currently cached on server 1, + // but previously cached on server 2 + new int[] { 2, 0, 100, 100 }, // region 3 is hosted and currently cached on server 2, + // but previously cached on server 1 + }, + + // Test 3: The regions were hosted and fully cached on a server but later moved to other + // because of server crash procedure. The regions are partially cached on the server they + // are currently hosted on + new int[][] { new int[] { 1, 2, 1 }, new int[] { 0, 50, 0, 100 }, // Region 0 is currently + // hosted and partially + // cached on + // server 0, but was fully + // cached on server 2 + // previously + new int[] { 1, 100, 50, 0 }, // Region 1 is currently hosted and partially cached on + // server 1, but was fully cached on server 0 previously + new int[] { 1, 0, 50, 100 }, // Region 2 is currently hosted and partially cached on + // server 1, but was fully cached on server 2 previously + new int[] { 2, 0, 100, 50 }, // Region 3 is currently hosted and partially cached on + // server 2, but was fully cached on server 1 previously + }, + + // Test 4: The regions were hosted and fully cached on a server, but later moved to other + // server because of server crash procedure. The regions are not at all cached on the server + // they are currently hosted on + new int[][] { new int[] { 1, 1, 2 }, new int[] { 0, 0, 0, 100 }, // Region 0 is currently hosted + // but not cached on server + // 0, + // but was fully cached on + // server 2 previously + new int[] { 1, 100, 0, 0 }, // Region 1 is currently hosted but not cached on server 1, + // but was fully cached on server 0 previously + new int[] { 2, 0, 100, 0 }, // Region 2 is currently hosted but not cached on server 2, + // but was fully cached on server 1 previously + new int[] { 2, 100, 0, 0 }, // Region 3 is currently hosted but not cached on server 2, + // but was fully cached on server 1 previously + }, + + // Test 5: The regions were partially cached on old servers, before moving to the new server + // where also, they are partially cached + new int[][] { new int[] { 2, 1, 1 }, new int[] { 0, 50, 50, 0 }, // Region 0 is hosted and + // partially cached on + // server 0, but + // was previously hosted and + // partially cached on + // server 1 + new int[] { 0, 50, 0, 50 }, // Region 1 is hosted and partially cached on server 0, but + // was previously hosted and partially cached on server 2 + new int[] { 1, 0, 50, 50 }, // Region 2 is hosted and partially cached on server 1, but + // was previously hosted and partially cached on server 2 + new int[] { 2, 0, 50, 50 }, // Region 3 is hosted and partially cached on server 2, but + // was previously hosted and partially cached on server 1 + }, + + // Test 6: The regions are less cached on the new servers as compared to what they were + // cached on the server before they were moved to the new servers + new int[][] { new int[] { 1, 2, 1 }, new int[] { 0, 30, 70, 0 }, // Region 0 is hosted and + // cached 30% on server 0, + // but was + // previously hosted and + // cached 70% on server 1 + new int[] { 1, 70, 30, 0 }, // Region 1 is hosted and cached 30% on server 1, but was + // previously hosted and cached 70% on server 0 + new int[] { 1, 0, 30, 70 }, // Region 2 is hosted and cached 30% on server 1, but was + // previously hosted and cached 70% on server 2 + new int[] { 2, 0, 70, 30 }, // Region 3 is hosted and cached 30% on server 2, but was + // previously hosted and cached 70% on server 1 + }, + + // Test 7: The regions are more cached on the new servers as compared to what they were + // cached on the server before they were moved to the new servers + new int[][] { new int[] { 2, 1, 1 }, new int[] { 0, 80, 20, 0 }, // Region 0 is hosted and 80% + // cached on server 0, but + // was + // previously hosted and 20% + // cached on server 1 + new int[] { 0, 80, 0, 20 }, // Region 1 is hosted and 80% cached on server 0, but was + // previously hosted and 20% cached on server 2 + new int[] { 1, 20, 80, 0 }, // Region 2 is hosted and 80% cached on server 1, but was + // previously hosted and 20% cached on server 0 + new int[] { 2, 0, 20, 80 }, // Region 3 is hosted and 80% cached on server 2, but was + // previously hosted and 20% cached on server 1 + }, + + // Test 8: The regions are randomly assigned to the server with some regions historically + // hosted on other region servers + new int[][] { new int[] { 1, 2, 1 }, new int[] { 0, 34, 0, 58 }, // Region 0 is hosted and + // partially cached on + // server 0, + // but was previously hosted + // and partially cached on + // server 2 + // current cache ratio < + // historical cache ratio + new int[] { 1, 78, 100, 0 }, // Region 1 is hosted and fully cached on server 1, + // but was previously hosted and partially cached on server 0 + // current cache ratio > historical cache ratio + new int[] { 1, 66, 66, 0 }, // Region 2 is hosted and partially cached on server 1, + // but was previously hosted and partially cached on server 0 + // current cache ratio == historical cache ratio + new int[] { 2, 0, 0, 96 }, // Region 3 is hosted and partially cached on server 0 + // No historical cache ratio + }, }; + + private static Configuration storedConfiguration; + + private CacheAwareLoadBalancer loadBalancer = new CacheAwareLoadBalancer(); + + @BeforeClass + public static void saveInitialConfiguration() { + storedConfiguration = new Configuration(conf); + } + + @Before + public void beforeEachTest() { + conf = new Configuration(storedConfiguration); + loadBalancer.loadConf(conf); + } + + @Test + public void testVerifyCacheAwareSkewnessCostFunctionEnabled() { + CacheAwareLoadBalancer lb = new CacheAwareLoadBalancer(); + lb.loadConf(conf); + assertTrue(Arrays.asList(lb.getCostFunctionNames()) + .contains(CacheAwareLoadBalancer.CacheAwareRegionSkewnessCostFunction.class.getSimpleName())); + } + + @Test + public void testVerifyCacheAwareSkewnessCostFunctionDisabled() { + conf.setFloat( + CacheAwareLoadBalancer.CacheAwareRegionSkewnessCostFunction.REGION_COUNT_SKEW_COST_KEY, 0.0f); + + CacheAwareLoadBalancer lb = new CacheAwareLoadBalancer(); + lb.loadConf(conf); + + assertFalse(Arrays.asList(lb.getCostFunctionNames()) + .contains(CacheAwareLoadBalancer.CacheAwareRegionSkewnessCostFunction.class.getSimpleName())); + } + + @Test + public void testVerifyCacheCostFunctionEnabled() { + conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "/tmp/prefetch.persistence"); + + CacheAwareLoadBalancer lb = new CacheAwareLoadBalancer(); + lb.loadConf(conf); + + assertTrue(Arrays.asList(lb.getCostFunctionNames()) + .contains(CacheAwareLoadBalancer.CacheAwareCostFunction.class.getSimpleName())); + } + + @Test + public void testVerifyCacheCostFunctionDisabledByNoBucketCachePersistence() { + assertFalse(Arrays.asList(loadBalancer.getCostFunctionNames()) + .contains(CacheAwareLoadBalancer.CacheAwareCostFunction.class.getSimpleName())); + } + + @Test + public void testVerifyCacheCostFunctionDisabledByNoMultiplier() { + conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "/tmp/prefetch.persistence"); + conf.setFloat("hbase.master.balancer.stochastic.cacheCost", 0.0f); + assertFalse(Arrays.asList(loadBalancer.getCostFunctionNames()) + .contains(CacheAwareLoadBalancer.CacheAwareCostFunction.class.getSimpleName())); + } + + @Test + public void testCacheCost() { + conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "/tmp/prefetch.persistence"); + CacheAwareLoadBalancer.CacheAwareCostFunction costFunction = + new CacheAwareLoadBalancer.CacheAwareCostFunction(conf); + + for (int test = 0; test < clusterRegionCacheRatioMocks.length; test++) { + int[][] clusterRegionLocations = clusterRegionCacheRatioMocks[test]; + MockClusterForCacheCost cluster = new MockClusterForCacheCost(clusterRegionLocations); + costFunction.prepare(cluster); + double cost = costFunction.cost(); + assertEquals(expectedCacheCost[test], cost, 0.01); + } + } + + private class MockClusterForCacheCost extends BalancerClusterState { + private final Map, Float> regionServerCacheRatio = new HashMap<>(); + + public MockClusterForCacheCost(int[][] regionsArray) { + // regions[0] is an array where index = serverIndex and value = number of regions + super(mockClusterServersUnsorted(regionsArray[0], 1), null, null, null, null); + Map> oldCacheRatio = new HashMap<>(); + for (int i = 1; i < regionsArray.length; i++) { + int regionIndex = i - 1; + for (int j = 1; j < regionsArray[i].length; j++) { + int serverIndex = j - 1; + float cacheRatio = (float) regionsArray[i][j] / 100; + regionServerCacheRatio.put(new Pair<>(regionIndex, serverIndex), cacheRatio); + if (cacheRatio > 0.0f && serverIndex != regionsArray[i][0]) { + // This is the historical cacheRatio value + oldCacheRatio.put(regions[regionIndex].getEncodedName(), + new Pair<>(servers[serverIndex], cacheRatio)); + } + } + } + regionCacheRatioOnOldServerMap = oldCacheRatio; + } + + @Override + public int getTotalRegionHFileSizeMB(int region) { + return 1; + } + + @Override + protected float getRegionCacheRatioOnRegionServer(int region, int regionServerIndex) { + float cacheRatio = 0.0f; + + // Get the cache ratio if the region is currently hosted on this server + if (regionServerIndex == regionIndexToServerIndex[region]) { + return regionServerCacheRatio.get(new Pair<>(region, regionServerIndex)); + } + + // Region is not currently hosted on this server. Check if the region was cached on this + // server earlier. This can happen when the server was shutdown and the cache was persisted. + // Search using the index name and server name and not the index id and server id as these + // ids may change when a server is marked as dead or a new server is added. + String regionEncodedName = regions[region].getEncodedName(); + ServerName serverName = servers[regionServerIndex]; + if ( + regionCacheRatioOnOldServerMap != null + && regionCacheRatioOnOldServerMap.containsKey(regionEncodedName) + ) { + Pair serverCacheRatio = + regionCacheRatioOnOldServerMap.get(regionEncodedName); + if (ServerName.isSameAddress(serverName, serverCacheRatio.getFirst())) { + cacheRatio = serverCacheRatio.getSecond(); + regionCacheRatioOnOldServerMap.remove(regionEncodedName); + } + } + return cacheRatio; + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java index 748045246b3b..67ef296da58b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java @@ -86,6 +86,8 @@ private ServerMetrics mockServerMetricsWithReadRequests(ServerName server, when(rl.getWriteRequestCount()).thenReturn(0L); when(rl.getMemStoreSize()).thenReturn(Size.ZERO); when(rl.getStoreFileSize()).thenReturn(Size.ZERO); + when(rl.getRegionSizeMB()).thenReturn(Size.ZERO); + when(rl.getCurrentRegionCachedRatio()).thenReturn(0.0f); regionLoadMap.put(info.getRegionName(), rl); } when(serverMetrics.getRegionMetrics()).thenReturn(regionLoadMap);