diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java index a7ae8b4d1a5a..4b3809c107cb 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,6 +115,12 @@ class BalancerClusterState { private float[][] rackLocalities; // Maps localityType -> region -> [server|rack]Index with highest locality private int[][] regionsToMostLocalEntities; + // Maps region -> serverIndex -> regionCacheRatio of a region on a server + private Map, Float> regionIndexServerIndexRegionCachedRatio; + // Maps regionIndex -> serverIndex with best region cache ratio + private int[] regionServerIndexWithBestRegionCachedRatio; + // Maps regionName -> oldServerName -> cache ratio of the region on the old server + Map> regionCacheRatioOnOldServerMap; static class DefaultRackManager extends RackManager { @Override @@ -125,13 +132,20 @@ public String getRack(ServerName server) { BalancerClusterState(Map> clusterState, Map> loads, RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager) { - this(null, clusterState, loads, regionFinder, rackManager); + this(null, clusterState, loads, regionFinder, rackManager, null); + } + + protected BalancerClusterState(Map> clusterState, + Map> loads, RegionHDFSBlockLocationFinder regionFinder, + RackManager rackManager, Map> oldRegionServerRegionCacheRatio) { + this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerRegionCacheRatio); } @SuppressWarnings("unchecked") BalancerClusterState(Collection unassignedRegions, Map> clusterState, Map> loads, - RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager) { + RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager, + Map> oldRegionServerRegionCacheRatio) { if (unassignedRegions == null) { unassignedRegions = Collections.emptyList(); } @@ -145,6 +159,8 @@ public String getRack(ServerName server) { tables = new ArrayList<>(); this.rackManager = rackManager != null ? rackManager : new DefaultRackManager(); + this.regionCacheRatioOnOldServerMap = oldRegionServerRegionCacheRatio; + numRegions = 0; List> serversPerHostList = new ArrayList<>(); @@ -541,6 +557,142 @@ private void computeCachedLocalities() { } + /** + * Returns the size of hFiles from the most recent RegionLoad for region + */ + public int getTotalRegionHFileSizeMB(int region) { + Deque load = regionLoads[region]; + if (load == null) { + // This means, that the region has no actual data on disk + return 0; + } + return regionLoads[region].getLast().getRegionSizeMB(); + } + + /** + * Returns the weighted cache ratio of a region on the given region server + */ + public float getOrComputeWeightedRegionCacheRatio(int region, int server) { + return getTotalRegionHFileSizeMB(region) * getOrComputeRegionCacheRatio(region, server); + } + + /** + * Returns the amount by which a region is cached on a given region server. If the region is not + * currently hosted on the given region server, then find out if it was previously hosted there + * and return the old cache ratio. + */ + protected float getRegionCacheRatioOnRegionServer(int region, int regionServerIndex) { + float regionCacheRatio = 0.0f; + + // Get the current region cache ratio if the region is hosted on the server regionServerIndex + for (int regionIndex : regionsPerServer[regionServerIndex]) { + if (region != regionIndex) { + continue; + } + + Deque regionLoadList = regionLoads[regionIndex]; + + // The region is currently hosted on this region server. Get the region cache ratio for this + // region on this server + regionCacheRatio = + regionLoadList == null ? 0.0f : regionLoadList.getLast().getCurrentRegionCacheRatio(); + + return regionCacheRatio; + } + + // Region is not currently hosted on this server. Check if the region was cached on this + // server earlier. This can happen when the server was shutdown and the cache was persisted. + // Search using the region name and server name and not the index id and server id as these ids + // may change when a server is marked as dead or a new server is added. + String regionEncodedName = regions[region].getEncodedName(); + ServerName serverName = servers[regionServerIndex]; + if ( + regionCacheRatioOnOldServerMap != null + && regionCacheRatioOnOldServerMap.containsKey(regionEncodedName) + ) { + Pair cacheRatioOfRegionOnServer = + regionCacheRatioOnOldServerMap.get(regionEncodedName); + if (ServerName.isSameAddress(cacheRatioOfRegionOnServer.getFirst(), serverName)) { + regionCacheRatio = cacheRatioOfRegionOnServer.getSecond(); + if (LOG.isDebugEnabled()) { + LOG.debug("Old cache ratio found for region {} on server {}: {}", regionEncodedName, + serverName, regionCacheRatio); + } + } + } + return regionCacheRatio; + } + + /** + * Populate the maps containing information about how much a region is cached on a region server. + */ + private void computeRegionServerRegionCacheRatio() { + regionIndexServerIndexRegionCachedRatio = new HashMap<>(); + regionServerIndexWithBestRegionCachedRatio = new int[numRegions]; + + for (int region = 0; region < numRegions; region++) { + float bestRegionCacheRatio = 0.0f; + int serverWithBestRegionCacheRatio = 0; + for (int server = 0; server < numServers; server++) { + float regionCacheRatio = getRegionCacheRatioOnRegionServer(region, server); + if (regionCacheRatio > 0.0f || server == regionIndexToServerIndex[region]) { + // A region with cache ratio 0 on a server means nothing. Hence, just make a note of + // cache ratio only if the cache ratio is greater than 0. + Pair regionServerPair = new Pair<>(region, server); + regionIndexServerIndexRegionCachedRatio.put(regionServerPair, regionCacheRatio); + } + if (regionCacheRatio > bestRegionCacheRatio) { + serverWithBestRegionCacheRatio = server; + // If the server currently hosting the region has equal cache ratio to a historical + // server, consider the current server to keep hosting the region + bestRegionCacheRatio = regionCacheRatio; + } else if ( + regionCacheRatio == bestRegionCacheRatio && server == regionIndexToServerIndex[region] + ) { + // If two servers have same region cache ratio, then the server currently hosting the + // region + // should retain the region + serverWithBestRegionCacheRatio = server; + } + } + regionServerIndexWithBestRegionCachedRatio[region] = serverWithBestRegionCacheRatio; + Pair regionServerPair = + new Pair<>(region, regionIndexToServerIndex[region]); + float tempRegionCacheRatio = regionIndexServerIndexRegionCachedRatio.get(regionServerPair); + if (tempRegionCacheRatio > bestRegionCacheRatio) { + LOG.warn( + "INVALID CONDITION: region {} on server {} cache ratio {} is greater than the " + + "best region cache ratio {} on server {}", + regions[region].getEncodedName(), servers[regionIndexToServerIndex[region]], + tempRegionCacheRatio, bestRegionCacheRatio, servers[serverWithBestRegionCacheRatio]); + } + } + } + + protected float getOrComputeRegionCacheRatio(int region, int server) { + if ( + regionServerIndexWithBestRegionCachedRatio == null + || regionIndexServerIndexRegionCachedRatio.isEmpty() + ) { + computeRegionServerRegionCacheRatio(); + } + + Pair regionServerPair = new Pair<>(region, server); + return regionIndexServerIndexRegionCachedRatio.containsKey(regionServerPair) + ? regionIndexServerIndexRegionCachedRatio.get(regionServerPair) + : 0.0f; + } + + public int[] getOrComputeServerWithBestRegionCachedRatio() { + if ( + regionServerIndexWithBestRegionCachedRatio == null + || regionIndexServerIndexRegionCachedRatio.isEmpty() + ) { + computeRegionServerRegionCacheRatio(); + } + return regionServerIndexWithBestRegionCachedRatio; + } + /** * Maps region index to rack index */ diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerRegionLoad.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerRegionLoad.java index ffb36cb8ca1a..33d00e3de862 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerRegionLoad.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerRegionLoad.java @@ -34,6 +34,8 @@ class BalancerRegionLoad { private final long writeRequestsCount; private final int memStoreSizeMB; private final int storefileSizeMB; + private final int regionSizeMB; + private final float currentRegionPrefetchRatio; BalancerRegionLoad(RegionMetrics regionMetrics) { readRequestsCount = regionMetrics.getReadRequestCount(); @@ -41,6 +43,8 @@ class BalancerRegionLoad { writeRequestsCount = regionMetrics.getWriteRequestCount(); memStoreSizeMB = (int) regionMetrics.getMemStoreSize().get(Size.Unit.MEGABYTE); storefileSizeMB = (int) regionMetrics.getStoreFileSize().get(Size.Unit.MEGABYTE); + regionSizeMB = (int) regionMetrics.getRegionSizeMB().get(Size.Unit.MEGABYTE); + currentRegionPrefetchRatio = regionMetrics.getCurrentRegionCachedRatio(); } public long getReadRequestsCount() { @@ -62,4 +66,12 @@ public int getMemStoreSizeMB() { public int getStorefileSizeMB() { return storefileSizeMB; } + + public int getRegionSizeMB() { + return regionSizeMB; + } + + public float getCurrentRegionCacheRatio() { + return currentRegionPrefetchRatio; + } } diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index a4560cc595a2..54516868a0a0 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -232,7 +232,8 @@ private BalancerClusterState createCluster(List servers, clusterState.put(server, Collections.emptyList()); } } - return new BalancerClusterState(regions, clusterState, null, this.regionFinder, rackManager); + return new BalancerClusterState(regions, clusterState, null, this.regionFinder, rackManager, + null); } private List findIdleServers(List servers) { diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java new file mode 100644 index 000000000000..d73769a3971b --- /dev/null +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java @@ -0,0 +1,479 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +/** An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that assigns regions + * based on the amount they are cached on a given server. A region can move across the region + * servers whenever a region server shuts down or crashes. The region server preserves the cache + * periodically and restores the cache when it is restarted. This balancer implements a mechanism + * where it maintains the amount by which a region is cached on a region server. During balancer + * run, a region plan is generated that takes into account this cache information and tries to + * move the regions so that the cache minimally impacted. + */ + +import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterMetrics; +import org.apache.hadoop.hbase.RegionMetrics; +import org.apache.hadoop.hbase.ServerMetrics; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.Size; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class CacheAwareLoadBalancer extends StochasticLoadBalancer { + private static final Logger LOG = LoggerFactory.getLogger(CacheAwareLoadBalancer.class); + + private Configuration configuration; + + public enum GeneratorFunctionType { + LOAD, + CACHE_RATIO + } + + @Override + public synchronized void loadConf(Configuration configuration) { + this.configuration = configuration; + this.costFunctions = new ArrayList<>(); + super.loadConf(configuration); + } + + @Override + protected List createCandidateGenerators() { + List candidateGenerators = new ArrayList<>(2); + candidateGenerators.add(GeneratorFunctionType.LOAD.ordinal(), + new CacheAwareSkewnessCandidateGenerator()); + candidateGenerators.add(GeneratorFunctionType.CACHE_RATIO.ordinal(), + new CacheAwareCandidateGenerator()); + return candidateGenerators; + } + + @Override + protected List createCostFunctions(Configuration configuration) { + List costFunctions = new ArrayList<>(); + addCostFunction(costFunctions, new CacheAwareRegionSkewnessCostFunction(configuration)); + addCostFunction(costFunctions, new CacheAwareCostFunction(configuration)); + return costFunctions; + } + + private void addCostFunction(List costFunctions, CostFunction costFunction) { + if (costFunction.getMultiplier() > 0) { + costFunctions.add(costFunction); + } + } + + @Override + public void updateClusterMetrics(ClusterMetrics clusterMetrics) { + this.clusterStatus = clusterMetrics; + updateRegionLoad(); + } + + /** + * Collect the amount of region cached for all the regions from all the active region servers. + */ + private void updateRegionLoad() { + loads = new HashMap<>(); + regionCacheRatioOnOldServerMap = new HashMap<>(); + Map> regionCacheRatioOnCurrentServerMap = new HashMap<>(); + + // Build current region cache statistics + clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> { + // Create a map of region and the server where it is currently hosted + sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> { + String regionEncodedName = RegionInfo.encodeRegionName(regionName); + + Deque rload = new ArrayDeque<>(); + + // Get the total size of the hFiles in this region + int regionSizeMB = (int) rm.getRegionSizeMB().get(Size.Unit.MEGABYTE); + + rload.add(new BalancerRegionLoad(rm)); + // Maintain a map of region and it's total size. This is needed to calculate the cache + // ratios for the regions cached on old region servers + regionCacheRatioOnCurrentServerMap.put(regionEncodedName, new Pair<>(sn, regionSizeMB)); + loads.put(regionEncodedName, rload); + }); + }); + + // Build cache statistics for the regions hosted previously on old region servers + clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> { + // Find if a region was previously hosted on a server other than the one it is currently + // hosted on. + sm.getRegionCachedInfo().forEach((String regionEncodedName, Integer regionSizeInCache) -> { + // If the region is found in regionCacheRatioOnCurrentServerMap, it is currently hosted on + // this server + if (regionCacheRatioOnCurrentServerMap.containsKey(regionEncodedName)) { + ServerName currentServer = + regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getFirst(); + if (!ServerName.isSameAddress(currentServer, sn)) { + int regionSizeMB = + regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getSecond(); + float regionCacheRatioOnOldServer = + regionSizeMB == 0 ? 0.0f : (float) regionSizeInCache / regionSizeMB; + regionCacheRatioOnOldServerMap.put(regionEncodedName, + new Pair<>(sn, regionCacheRatioOnOldServer)); + } + } + }); + }); + } + + private RegionInfo getRegionInfoByEncodedName(BalancerClusterState cluster, String regionName) { + Optional regionInfoOptional = + Arrays.stream(cluster.regions).filter((RegionInfo ri) -> { + return regionName.equals(ri.getEncodedName()); + }).findFirst(); + + if (regionInfoOptional.isPresent()) { + return regionInfoOptional.get(); + } + return null; + } + + private class CacheAwareCandidateGenerator extends CandidateGenerator { + @Override + protected BalanceAction generate(BalancerClusterState cluster) { + // Move the regions to the servers they were previously hosted on based on the cache ratio + if ( + !regionCacheRatioOnOldServerMap.isEmpty() + && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext() + ) { + Map.Entry> regionCacheRatioServerMap = + regionCacheRatioOnOldServerMap.entrySet().iterator().next(); + // Get the server where this region was previously hosted + String regionEncodedName = regionCacheRatioServerMap.getKey(); + RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName); + if (regionInfo == null) { + LOG.warn("Region {} not found", regionEncodedName); + regionCacheRatioOnOldServerMap.remove(regionEncodedName); + return BalanceAction.NULL_ACTION; + } + if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) { + regionCacheRatioOnOldServerMap.remove(regionEncodedName); + return BalanceAction.NULL_ACTION; + } + int regionIndex = cluster.regionsToIndex.get(regionInfo); + int oldServerIndex = cluster.serversToIndex + .get(regionCacheRatioOnOldServerMap.get(regionEncodedName).getFirst().getAddress()); + if (oldServerIndex < 0) { + LOG.warn("Server previously hosting region {} not found", regionEncodedName); + regionCacheRatioOnOldServerMap.remove(regionEncodedName); + return BalanceAction.NULL_ACTION; + } + + float oldRegionCacheRatio = + cluster.getOrComputeRegionCacheRatio(regionIndex, oldServerIndex); + int currentServerIndex = cluster.regionIndexToServerIndex[regionIndex]; + float currentRegionCacheRatio = + cluster.getOrComputeRegionCacheRatio(regionIndex, currentServerIndex); + + BalanceAction action = generatePlan(cluster, regionIndex, currentServerIndex, + currentRegionCacheRatio, oldServerIndex, oldRegionCacheRatio); + regionCacheRatioOnOldServerMap.remove(regionEncodedName); + return action; + } + return BalanceAction.NULL_ACTION; + } + + private BalanceAction generatePlan(BalancerClusterState cluster, int regionIndex, + int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex, + float cacheRatioOnOldServer) { + return moveRegionToOldServer(cluster, regionIndex, currentServerIndex, + cacheRatioOnCurrentServer, oldServerIndex, cacheRatioOnOldServer) + ? getAction(currentServerIndex, regionIndex, oldServerIndex, -1) + : BalanceAction.NULL_ACTION; + } + + private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIndex, + int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex, + float cacheRatioOnOldServer) { + // Find if the region has already moved by comparing the current server index with the + // current server index. This can happen when other candidate generator has moved the region + if (currentServerIndex < 0 || oldServerIndex < 0) { + return false; + } + + float cacheRatioDiffThreshold = 0.6f; + + // Conditions for moving the region + + // If the region is fully cached on the old server, move the region back + if (cacheRatioOnOldServer == 1.0f) { + if (LOG.isDebugEnabled()) { + LOG.debug("Region {} moved to the old server {} as it is fully cached there", + cluster.regions[regionIndex].getEncodedName(), cluster.servers[oldServerIndex]); + } + return true; + } + + // Move the region back to the old server if it is cached equally on both the servers + if (cacheRatioOnCurrentServer == cacheRatioOnOldServer) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Region {} moved from {} to {} as the region is cached {} equally on both servers", + cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex], + cluster.servers[oldServerIndex], cacheRatioOnCurrentServer); + } + return true; + } + + // If the region is not fully cached on either of the servers, move the region back to the + // old server if the region cache ratio on the current server is still much less than the old + // server + if ( + cacheRatioOnOldServer > 0.0f + && cacheRatioOnCurrentServer / cacheRatioOnOldServer < cacheRatioDiffThreshold + ) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Region {} moved from {} to {} as region cache ratio {} is better than the current " + + "cache ratio {}", + cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex], + cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, cacheRatioOnOldServer); + } + return true; + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Region {} not moved from {} to {} with current cache ratio {} and old cache ratio {}", + cluster.regions[regionIndex], cluster.servers[currentServerIndex], + cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, cacheRatioOnOldServer); + } + return false; + } + } + + private class CacheAwareSkewnessCandidateGenerator extends LoadCandidateGenerator { + @Override + BalanceAction pickRandomRegions(BalancerClusterState cluster, int thisServer, int otherServer) { + // First move all the regions which were hosted previously on some other server back to their + // old servers + if ( + !regionCacheRatioOnOldServerMap.isEmpty() + && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext() + ) { + // Get the first region index in the historical cache ratio list + Map.Entry> regionEntry = + regionCacheRatioOnOldServerMap.entrySet().iterator().next(); + String regionEncodedName = regionEntry.getKey(); + + RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName); + if (regionInfo == null) { + LOG.warn("Region {} does not exist", regionEncodedName); + regionCacheRatioOnOldServerMap.remove(regionEncodedName); + return BalanceAction.NULL_ACTION; + } + if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) { + regionCacheRatioOnOldServerMap.remove(regionEncodedName); + return BalanceAction.NULL_ACTION; + } + + int regionIndex = cluster.regionsToIndex.get(regionInfo); + + // Get the current host name for this region + thisServer = cluster.regionIndexToServerIndex[regionIndex]; + + // Get the old server index + otherServer = cluster.serversToIndex.get(regionEntry.getValue().getFirst().getAddress()); + + regionCacheRatioOnOldServerMap.remove(regionEncodedName); + + if (otherServer < 0) { + // The old server has been moved to other host and hence, the region cannot be moved back + // to the old server + if (LOG.isDebugEnabled()) { + LOG.debug( + "CacheAwareSkewnessCandidateGenerator: Region {} not moved to the old " + + "server {} as the server does not exist", + regionEncodedName, regionEntry.getValue().getFirst().getHostname()); + } + return BalanceAction.NULL_ACTION; + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it " + + "was hosted their earlier", + regionEncodedName, cluster.servers[thisServer].getHostname(), + cluster.servers[otherServer].getHostname()); + } + + return getAction(thisServer, regionIndex, otherServer, -1); + } + + if (thisServer < 0 || otherServer < 0) { + return BalanceAction.NULL_ACTION; + } + + int regionIndexToMove = pickLeastCachedRegion(cluster, thisServer); + if (regionIndexToMove < 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("CacheAwareSkewnessCandidateGenerator: No region found for movement"); + } + return BalanceAction.NULL_ACTION; + } + if (LOG.isDebugEnabled()) { + LOG.debug( + "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it is " + + "least cached on current server", + cluster.regions[regionIndexToMove].getEncodedName(), + cluster.servers[thisServer].getHostname(), cluster.servers[otherServer].getHostname()); + } + return getAction(thisServer, regionIndexToMove, otherServer, -1); + } + + private int pickLeastCachedRegion(BalancerClusterState cluster, int thisServer) { + float minCacheRatio = Float.MAX_VALUE; + int leastCachedRegion = -1; + for (int i = 0; i < cluster.regionsPerServer[thisServer].length; i++) { + int regionIndex = cluster.regionsPerServer[thisServer][i]; + + float cacheRatioOnCurrentServer = + cluster.getOrComputeRegionCacheRatio(regionIndex, thisServer); + if (cacheRatioOnCurrentServer < minCacheRatio) { + minCacheRatio = cacheRatioOnCurrentServer; + leastCachedRegion = regionIndex; + } + } + return leastCachedRegion; + } + } + + static class CacheAwareRegionSkewnessCostFunction extends CostFunction { + static final String REGION_COUNT_SKEW_COST_KEY = + "hbase.master.balancer.stochastic.regionCountCost"; + static final float DEFAULT_REGION_COUNT_SKEW_COST = 20; + private final DoubleArrayCost cost = new DoubleArrayCost(); + + CacheAwareRegionSkewnessCostFunction(Configuration conf) { + // Load multiplier should be the greatest as it is the most general way to balance data. + this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST)); + } + + @Override + void prepare(BalancerClusterState cluster) { + super.prepare(cluster); + cost.prepare(cluster.numServers); + cost.applyCostsChange(costs -> { + for (int i = 0; i < cluster.numServers; i++) { + costs[i] = cluster.regionsPerServer[i].length; + } + }); + } + + @Override + protected double cost() { + return cost.cost(); + } + + @Override + protected void regionMoved(int region, int oldServer, int newServer) { + cost.applyCostsChange(costs -> { + costs[oldServer] = cluster.regionsPerServer[oldServer].length; + costs[newServer] = cluster.regionsPerServer[newServer].length; + }); + } + + public final void updateWeight(double[] weights) { + weights[GeneratorFunctionType.LOAD.ordinal()] += cost(); + } + } + + static class CacheAwareCostFunction extends CostFunction { + private static final String CACHE_COST_KEY = "hbase.master.balancer.stochastic.cacheCost"; + private double cacheRatio; + private double bestCacheRatio; + + private static final float DEFAULT_CACHE_COST = 20; + + CacheAwareCostFunction(Configuration conf) { + boolean isPersistentCache = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY) != null; + // Disable the CacheAwareCostFunction if the cached file list persistence is not enabled + this.setMultiplier( + !isPersistentCache ? 0.0f : conf.getFloat(CACHE_COST_KEY, DEFAULT_CACHE_COST)); + bestCacheRatio = 0.0; + cacheRatio = 0.0; + } + + @Override + void prepare(BalancerClusterState cluster) { + super.prepare(cluster); + cacheRatio = 0.0; + bestCacheRatio = 0.0; + + for (int region = 0; region < cluster.numRegions; region++) { + cacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region, + cluster.regionIndexToServerIndex[region]); + bestCacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region, + getServerWithBestCacheRatioForRegion(region)); + } + + cacheRatio = bestCacheRatio == 0 ? 1.0 : cacheRatio / bestCacheRatio; + if (LOG.isDebugEnabled()) { + LOG.debug("CacheAwareCostFunction: Cost: {}", 1 - cacheRatio); + } + } + + @Override + protected double cost() { + return scale(0, 1, 1 - cacheRatio); + } + + @Override + protected void regionMoved(int region, int oldServer, int newServer) { + double regionCacheRatioOnOldServer = + cluster.getOrComputeWeightedRegionCacheRatio(region, oldServer); + double regionCacheRatioOnNewServer = + cluster.getOrComputeWeightedRegionCacheRatio(region, newServer); + double cacheRatioDiff = regionCacheRatioOnNewServer - regionCacheRatioOnOldServer; + double normalizedDelta = bestCacheRatio == 0.0 ? 0.0 : cacheRatioDiff / bestCacheRatio; + cacheRatio += normalizedDelta; + if (LOG.isDebugEnabled() && (cacheRatio < 0.0 || cacheRatio > 1.0)) { + LOG.debug( + "CacheAwareCostFunction:regionMoved:region:{}:from:{}:to:{}:regionCacheRatioOnOldServer:{}:" + + "regionCacheRatioOnNewServer:{}:bestRegionCacheRatio:{}:cacheRatio:{}", + cluster.regions[region].getEncodedName(), cluster.servers[oldServer].getHostname(), + cluster.servers[newServer].getHostname(), regionCacheRatioOnOldServer, + regionCacheRatioOnNewServer, bestCacheRatio, cacheRatio); + } + } + + private int getServerWithBestCacheRatioForRegion(int region) { + return cluster.getOrComputeServerWithBestRegionCachedRatio()[region]; + } + + @Override + public final void updateWeight(double[] weights) { + weights[GeneratorFunctionType.CACHE_RATIO.ordinal()] += cost(); + } + } +} diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index edf049e8a718..e5cd5446c5c8 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -136,8 +137,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { private long maxRunningTime = DEFAULT_MAX_RUNNING_TIME; private int numRegionLoadsToRemember = DEFAULT_KEEP_REGION_LOADS; private float minCostNeedBalance = DEFAULT_MIN_COST_NEED_BALANCE; + Map> regionCacheRatioOnOldServerMap = new HashMap<>(); - private List costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC + protected List costFunctions; // FindBugs: Wants this protected; + // IS2_INCONSISTENT_SYNC // To save currently configed sum of multiplier. Defaulted at 1 for cases that carry high cost private float sumMultiplier; // to save and report costs to JMX @@ -224,6 +227,24 @@ protected List createCandidateGenerators() { return candidateGenerators; } + protected List createCostFunctions(Configuration conf) { + List costFunctions = new ArrayList<>(); + addCostFunction(costFunctions, new RegionCountSkewCostFunction(conf)); + addCostFunction(costFunctions, new PrimaryRegionCountSkewCostFunction(conf)); + addCostFunction(costFunctions, new MoveCostFunction(conf, provider)); + addCostFunction(costFunctions, localityCost); + addCostFunction(costFunctions, rackLocalityCost); + addCostFunction(costFunctions, new TableSkewCostFunction(conf)); + addCostFunction(costFunctions, regionReplicaHostCostFunction); + addCostFunction(costFunctions, regionReplicaRackCostFunction); + addCostFunction(costFunctions, new ReadRequestCostFunction(conf)); + addCostFunction(costFunctions, new CPRequestCostFunction(conf)); + addCostFunction(costFunctions, new WriteRequestCostFunction(conf)); + addCostFunction(costFunctions, new MemStoreSizeCostFunction(conf)); + addCostFunction(costFunctions, new StoreFileCostFunction(conf)); + return costFunctions; + } + @Override protected void loadConf(Configuration conf) { super.loadConf(conf); @@ -242,20 +263,7 @@ protected void loadConf(Configuration conf) { regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf); regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf); - costFunctions = new ArrayList<>(); - addCostFunction(new RegionCountSkewCostFunction(conf)); - addCostFunction(new PrimaryRegionCountSkewCostFunction(conf)); - addCostFunction(new MoveCostFunction(conf, provider)); - addCostFunction(localityCost); - addCostFunction(rackLocalityCost); - addCostFunction(new TableSkewCostFunction(conf)); - addCostFunction(regionReplicaHostCostFunction); - addCostFunction(regionReplicaRackCostFunction); - addCostFunction(new ReadRequestCostFunction(conf)); - addCostFunction(new CPRequestCostFunction(conf)); - addCostFunction(new WriteRequestCostFunction(conf)); - addCostFunction(new MemStoreSizeCostFunction(conf)); - addCostFunction(new StoreFileCostFunction(conf)); + this.costFunctions = createCostFunctions(conf); loadCustomCostFunctions(conf); curFunctionCosts = new double[costFunctions.size()]; @@ -459,8 +467,8 @@ protected List balanceTable(TableName tableName, // The clusterState that is given to this method contains the state // of all the regions in the table(s) (that's true today) // Keep track of servers to iterate through them. - BalancerClusterState cluster = - new BalancerClusterState(loadOfOneTable, loads, finder, rackManager); + BalancerClusterState cluster = new BalancerClusterState(loadOfOneTable, loads, finder, + rackManager, regionCacheRatioOnOldServerMap); long startTime = EnvironmentEdgeManager.currentTime(); @@ -568,7 +576,7 @@ protected List balanceTable(TableName tableName, return null; } - private void sendRejectionReasonToRingBuffer(Supplier reason, + protected void sendRejectionReasonToRingBuffer(Supplier reason, List costFunctions) { provider.recordBalancerRejection(() -> { BalancerRejection.Builder builder = new BalancerRejection.Builder().setReason(reason.get()); @@ -627,14 +635,14 @@ private void updateStochasticCosts(TableName tableName, double overall, double[] } } - private void addCostFunction(CostFunction costFunction) { + private void addCostFunction(List costFunctions, CostFunction costFunction) { float multiplier = costFunction.getMultiplier(); if (multiplier > 0) { costFunctions.add(costFunction); } } - private String functionCost() { + protected String functionCost() { StringBuilder builder = new StringBuilder(); for (CostFunction c : costFunctions) { builder.append(c.getClass().getSimpleName()); @@ -655,6 +663,12 @@ private String functionCost() { return builder.toString(); } + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") + List getCostFunctions() { + return costFunctions; + } + private String totalCostsPerFunc() { StringBuilder builder = new StringBuilder(); for (CostFunction c : costFunctions) { diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java index 9ea1c94d1e09..4a996e7796f5 100644 --- a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java +++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NavigableSet; @@ -376,6 +377,19 @@ protected TreeMap> mockClusterServers(int[] mockClu return servers; } + protected Map> mockClusterServersUnsorted(int[] mockCluster, + int numTables) { + int numServers = mockCluster.length; + Map> servers = new LinkedHashMap<>(); + for (int i = 0; i < numServers; i++) { + int numRegions = mockCluster[i]; + ServerAndLoad sal = randomServer(0); + List regions = randomRegions(numRegions, numTables); + servers.put(sal.getServerName(), regions); + } + return servers; + } + protected TreeMap> mockUniformClusterServers(int[] mockCluster) { int numServers = mockCluster.length; TreeMap> servers = new TreeMap<>(); diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java index 21f3a3b66c9a..cc16cfe2ec83 100644 --- a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java +++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java @@ -139,6 +139,8 @@ private ServerMetrics mockServerMetricsWithCpRequests(List regionsOn when(rl.getWriteRequestCount()).thenReturn(0L); when(rl.getMemStoreSize()).thenReturn(Size.ZERO); when(rl.getStoreFileSize()).thenReturn(Size.ZERO); + when(rl.getRegionSizeMB()).thenReturn(Size.ZERO); + when(rl.getCurrentRegionCachedRatio()).thenReturn(0.0f); regionLoadMap.put(info.getRegionName(), rl); } when(serverMetrics.getRegionMetrics()).thenReturn(regionLoadMap); @@ -213,6 +215,8 @@ public void testKeepRegionLoad() throws Exception { when(rl.getWriteRequestCount()).thenReturn(0L); when(rl.getMemStoreSize()).thenReturn(Size.ZERO); when(rl.getStoreFileSize()).thenReturn(new Size(i, Size.Unit.MEGABYTE)); + when(rl.getRegionSizeMB()).thenReturn(Size.ZERO); + when(rl.getCurrentRegionCachedRatio()).thenReturn(0.0f); Map regionLoadMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); regionLoadMap.put(Bytes.toBytes(REGION_KEY), rl); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 12479979b2ba..2aa9ecf69ec4 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1335,6 +1335,18 @@ public enum OperationStatusCode { */ public static final String BUCKET_CACHE_SIZE_KEY = "hbase.bucketcache.size"; + /** + * If the chosen ioengine can persist its state across restarts, the path to the file to persist + * to. This file is NOT the data file. It is a file into which we will serialize the map of what + * is in the data file. For example, if you pass the following argument as + * BUCKET_CACHE_IOENGINE_KEY ("hbase.bucketcache.ioengine"), + * file:/tmp/bucketcache.data , then we will write the bucketcache data to the file + * /tmp/bucketcache.data but the metadata on where the data is in the supplied file + * is an in-memory map that needs to be persisted across restarts. Where to store this in-memory + * state is what you supply here: e.g. /tmp/bucketcache.map. + */ + public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = "hbase.bucketcache.persistent.path"; + /** * HConstants for fast fail on the client side follow */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java index 38a296aad523..6956d584d92a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; +import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY; import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; import java.io.IOException; @@ -47,18 +48,6 @@ public final class BlockCacheFactory { public static final String BLOCKCACHE_POLICY_KEY = "hfile.block.cache.policy"; public static final String BLOCKCACHE_POLICY_DEFAULT = "LRU"; - /** - * If the chosen ioengine can persist its state across restarts, the path to the file to persist - * to. This file is NOT the data file. It is a file into which we will serialize the map of what - * is in the data file. For example, if you pass the following argument as - * BUCKET_CACHE_IOENGINE_KEY ("hbase.bucketcache.ioengine"), - * file:/tmp/bucketcache.data , then we will write the bucketcache data to the file - * /tmp/bucketcache.data but the metadata on where the data is in the supplied file - * is an in-memory map that needs to be persisted across restarts. Where to store this in-memory - * state is what you supply here: e.g. /tmp/bucketcache.map. - */ - public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = "hbase.bucketcache.persistent.path"; - public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads"; public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = "hbase.bucketcache.writer.queuelength"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 64162bb7464d..0faf510f5db6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -342,6 +342,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck } catch (IOException ioex) { backingMap.clear(); fullyCachedFiles.clear(); + regionCachedSizeMap.clear(); LOG.error("Can't restore from file[" + persistencePath + "] because of ", ioex); } } @@ -1477,6 +1478,7 @@ private void disableCache() { // If persistent ioengine and a path, we will serialize out the backingMap. this.backingMap.clear(); this.fullyCachedFiles.clear(); + this.regionCachedSizeMap.clear(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java new file mode 100644 index 000000000000..3ecd8dc7cfd0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterMetrics; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.RegionMetrics; +import org.apache.hadoop.hbase.ServerMetrics; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.Size; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + +@Category({ LargeTests.class }) +public class TestCacheAwareLoadBalancer extends BalancerTestBase { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCacheAwareLoadBalancer.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestCacheAwareLoadBalancer.class); + + private static CacheAwareLoadBalancer loadBalancer; + + static List servers; + + static List tableDescs; + + static Map tableMap = new HashMap<>(); + + static TableName[] tables = new TableName[] { TableName.valueOf("dt1"), TableName.valueOf("dt2"), + TableName.valueOf("dt3"), TableName.valueOf("dt4") }; + + private static List generateServers(int numServers) { + List servers = new ArrayList<>(numServers); + Random rand = ThreadLocalRandom.current(); + for (int i = 0; i < numServers; i++) { + String host = "server" + rand.nextInt(100000); + int port = rand.nextInt(60000); + servers.add(ServerName.valueOf(host, port, -1)); + } + return servers; + } + + private static List constructTableDesc(boolean hasBogusTable) { + List tds = Lists.newArrayList(); + for (int i = 0; i < tables.length; i++) { + TableDescriptor htd = TableDescriptorBuilder.newBuilder(tables[i]).build(); + tds.add(htd); + } + return tds; + } + + private ServerMetrics mockServerMetricsWithRegionCacheInfo(ServerName server, + List regionsOnServer, float currentCacheRatio, List oldRegionCacheInfo, + int oldRegionCachedSize, int regionSize) { + ServerMetrics serverMetrics = mock(ServerMetrics.class); + Map regionLoadMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (RegionInfo info : regionsOnServer) { + RegionMetrics rl = mock(RegionMetrics.class); + when(rl.getReadRequestCount()).thenReturn(0L); + when(rl.getWriteRequestCount()).thenReturn(0L); + when(rl.getMemStoreSize()).thenReturn(Size.ZERO); + when(rl.getStoreFileSize()).thenReturn(Size.ZERO); + when(rl.getCurrentRegionCachedRatio()).thenReturn(currentCacheRatio); + when(rl.getRegionSizeMB()).thenReturn(new Size(regionSize, Size.Unit.MEGABYTE)); + regionLoadMap.put(info.getRegionName(), rl); + } + when(serverMetrics.getRegionMetrics()).thenReturn(regionLoadMap); + Map oldCacheRatioMap = new HashMap<>(); + for (RegionInfo info : oldRegionCacheInfo) { + oldCacheRatioMap.put(info.getEncodedName(), oldRegionCachedSize); + } + when(serverMetrics.getRegionCachedInfo()).thenReturn(oldCacheRatioMap); + return serverMetrics; + } + + @BeforeClass + public static void beforeAllTests() throws Exception { + servers = generateServers(3); + tableDescs = constructTableDesc(false); + Configuration conf = HBaseConfiguration.create(); + conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); + loadBalancer = new CacheAwareLoadBalancer(); + loadBalancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf)); + loadBalancer.loadConf(conf); + } + + @Test + public void testRegionsNotCachedOnOldServerAndCurrentServer() throws Exception { + // The regions are not cached on old server as well as the current server. This causes + // skewness in the region allocation which should be fixed by the balancer + + Map> clusterState = new HashMap<>(); + ServerName server0 = servers.get(0); + ServerName server1 = servers.get(1); + ServerName server2 = servers.get(2); + + // Simulate that the regions previously hosted by server1 are now hosted on server0 + List regionsOnServer0 = randomRegions(10); + List regionsOnServer1 = randomRegions(0); + List regionsOnServer2 = randomRegions(5); + + clusterState.put(server0, regionsOnServer0); + clusterState.put(server1, regionsOnServer1); + clusterState.put(server2, regionsOnServer2); + + // Mock cluster metrics + Map serverMetricsMap = new TreeMap<>(); + serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, + 0.0f, new ArrayList<>(), 0, 10)); + serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, + 0.0f, new ArrayList<>(), 0, 10)); + serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, + 0.0f, new ArrayList<>(), 0, 10)); + ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); + when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); + loadBalancer.updateClusterMetrics(clusterMetrics); + + Map>> LoadOfAllTable = + (Map) mockClusterServersWithTables(clusterState); + List plans = loadBalancer.balanceCluster(LoadOfAllTable); + Set regionsMovedFromServer0 = new HashSet<>(); + Map> targetServers = new HashMap<>(); + for (RegionPlan plan : plans) { + if (plan.getSource().equals(server0)) { + regionsMovedFromServer0.add(plan.getRegionInfo()); + if (!targetServers.containsKey(plan.getDestination())) { + targetServers.put(plan.getDestination(), new ArrayList<>()); + } + targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); + } + } + // should move 5 regions from server0 to server 1 + assertEquals(5, regionsMovedFromServer0.size()); + assertEquals(5, targetServers.get(server1).size()); + } + + @Test + public void testRegionsPartiallyCachedOnOldServerAndNotCachedOnCurrentServer() throws Exception { + // The regions are partially cached on old server but not cached on the current server + + Map> clusterState = new HashMap<>(); + ServerName server0 = servers.get(0); + ServerName server1 = servers.get(1); + ServerName server2 = servers.get(2); + + // Simulate that the regions previously hosted by server1 are now hosted on server0 + List regionsOnServer0 = randomRegions(10); + List regionsOnServer1 = randomRegions(0); + List regionsOnServer2 = randomRegions(5); + + clusterState.put(server0, regionsOnServer0); + clusterState.put(server1, regionsOnServer1); + clusterState.put(server2, regionsOnServer2); + + // Mock cluster metrics + + // Mock 5 regions from server0 were previously hosted on server1 + List oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1); + + Map serverMetricsMap = new TreeMap<>(); + serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, + 0.0f, new ArrayList<>(), 0, 10)); + serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, + 0.0f, oldCachedRegions, 6, 10)); + serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, + 0.0f, new ArrayList<>(), 0, 10)); + ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); + when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); + loadBalancer.updateClusterMetrics(clusterMetrics); + + Map>> LoadOfAllTable = + (Map) mockClusterServersWithTables(clusterState); + List plans = loadBalancer.balanceCluster(LoadOfAllTable); + Set regionsMovedFromServer0 = new HashSet<>(); + Map> targetServers = new HashMap<>(); + for (RegionPlan plan : plans) { + if (plan.getSource().equals(server0)) { + regionsMovedFromServer0.add(plan.getRegionInfo()); + if (!targetServers.containsKey(plan.getDestination())) { + targetServers.put(plan.getDestination(), new ArrayList<>()); + } + targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); + } + } + // should move 5 regions from server0 to server1 + assertEquals(5, regionsMovedFromServer0.size()); + assertEquals(5, targetServers.get(server1).size()); + assertTrue(targetServers.get(server1).containsAll(oldCachedRegions)); + } + + @Test + public void testRegionsFullyCachedOnOldServerAndNotCachedOnCurrentServers() throws Exception { + // The regions are fully cached on old server + + Map> clusterState = new HashMap<>(); + ServerName server0 = servers.get(0); + ServerName server1 = servers.get(1); + ServerName server2 = servers.get(2); + + // Simulate that the regions previously hosted by server1 are now hosted on server0 + List regionsOnServer0 = randomRegions(10); + List regionsOnServer1 = randomRegions(0); + List regionsOnServer2 = randomRegions(5); + + clusterState.put(server0, regionsOnServer0); + clusterState.put(server1, regionsOnServer1); + clusterState.put(server2, regionsOnServer2); + + // Mock cluster metrics + + // Mock 5 regions from server0 were previously hosted on server1 + List oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1); + + Map serverMetricsMap = new TreeMap<>(); + serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, + 0.0f, new ArrayList<>(), 0, 10)); + serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, + 0.0f, oldCachedRegions, 10, 10)); + serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, + 0.0f, new ArrayList<>(), 0, 10)); + ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); + when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); + loadBalancer.updateClusterMetrics(clusterMetrics); + + Map>> LoadOfAllTable = + (Map) mockClusterServersWithTables(clusterState); + List plans = loadBalancer.balanceCluster(LoadOfAllTable); + Set regionsMovedFromServer0 = new HashSet<>(); + Map> targetServers = new HashMap<>(); + for (RegionPlan plan : plans) { + if (plan.getSource().equals(server0)) { + regionsMovedFromServer0.add(plan.getRegionInfo()); + if (!targetServers.containsKey(plan.getDestination())) { + targetServers.put(plan.getDestination(), new ArrayList<>()); + } + targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); + } + } + // should move 5 regions from server0 to server1 + assertEquals(5, regionsMovedFromServer0.size()); + assertEquals(5, targetServers.get(server1).size()); + assertTrue(targetServers.get(server1).containsAll(oldCachedRegions)); + } + + @Test + public void testRegionsFullyCachedOnOldAndCurrentServers() throws Exception { + // The regions are fully cached on old server + + Map> clusterState = new HashMap<>(); + ServerName server0 = servers.get(0); + ServerName server1 = servers.get(1); + ServerName server2 = servers.get(2); + + // Simulate that the regions previously hosted by server1 are now hosted on server0 + List regionsOnServer0 = randomRegions(10); + List regionsOnServer1 = randomRegions(0); + List regionsOnServer2 = randomRegions(5); + + clusterState.put(server0, regionsOnServer0); + clusterState.put(server1, regionsOnServer1); + clusterState.put(server2, regionsOnServer2); + + // Mock cluster metrics + + // Mock 5 regions from server0 were previously hosted on server1 + List oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1); + + Map serverMetricsMap = new TreeMap<>(); + serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, + 1.0f, new ArrayList<>(), 0, 10)); + serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, + 1.0f, oldCachedRegions, 10, 10)); + serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, + 1.0f, new ArrayList<>(), 0, 10)); + ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); + when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); + loadBalancer.updateClusterMetrics(clusterMetrics); + + Map>> LoadOfAllTable = + (Map) mockClusterServersWithTables(clusterState); + List plans = loadBalancer.balanceCluster(LoadOfAllTable); + Set regionsMovedFromServer0 = new HashSet<>(); + Map> targetServers = new HashMap<>(); + for (RegionPlan plan : plans) { + if (plan.getSource().equals(server0)) { + regionsMovedFromServer0.add(plan.getRegionInfo()); + if (!targetServers.containsKey(plan.getDestination())) { + targetServers.put(plan.getDestination(), new ArrayList<>()); + } + targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); + } + } + // should move 5 regions from server0 to server1 + assertEquals(5, regionsMovedFromServer0.size()); + assertEquals(5, targetServers.get(server1).size()); + assertTrue(targetServers.get(server1).containsAll(oldCachedRegions)); + } + + @Test + public void testRegionsPartiallyCachedOnOldServerAndCurrentServer() throws Exception { + // The regions are partially cached on old server + + Map> clusterState = new HashMap<>(); + ServerName server0 = servers.get(0); + ServerName server1 = servers.get(1); + ServerName server2 = servers.get(2); + + // Simulate that the regions previously hosted by server1 are now hosted on server0 + List regionsOnServer0 = randomRegions(10); + List regionsOnServer1 = randomRegions(0); + List regionsOnServer2 = randomRegions(5); + + clusterState.put(server0, regionsOnServer0); + clusterState.put(server1, regionsOnServer1); + clusterState.put(server2, regionsOnServer2); + + // Mock cluster metrics + + // Mock 5 regions from server0 were previously hosted on server1 + List oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1); + + Map serverMetricsMap = new TreeMap<>(); + serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, + 0.2f, new ArrayList<>(), 0, 10)); + serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, + 0.0f, oldCachedRegions, 6, 10)); + serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, + 1.0f, new ArrayList<>(), 0, 10)); + ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); + when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); + loadBalancer.updateClusterMetrics(clusterMetrics); + + Map>> LoadOfAllTable = + (Map) mockClusterServersWithTables(clusterState); + List plans = loadBalancer.balanceCluster(LoadOfAllTable); + Set regionsMovedFromServer0 = new HashSet<>(); + Map> targetServers = new HashMap<>(); + for (RegionPlan plan : plans) { + if (plan.getSource().equals(server0)) { + regionsMovedFromServer0.add(plan.getRegionInfo()); + if (!targetServers.containsKey(plan.getDestination())) { + targetServers.put(plan.getDestination(), new ArrayList<>()); + } + targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); + } + } + assertEquals(5, regionsMovedFromServer0.size()); + assertEquals(5, targetServers.get(server1).size()); + assertTrue(targetServers.get(server1).containsAll(oldCachedRegions)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancerCostFunctions.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancerCostFunctions.java new file mode 100644 index 000000000000..448e576b1bc7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancerCostFunctions.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestCacheAwareLoadBalancerCostFunctions extends StochasticBalancerTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCacheAwareLoadBalancerCostFunctions.class); + + // Mapping of test -> expected cache cost + private final float[] expectedCacheCost = { 0.0f, 0.0f, 0.5f, 1.0f, 0.0f, 0.572f, 0.0f, 0.075f }; + + /** + * Data set to testCacheCost: [test][0][0] = mapping of server to number of regions it hosts + * [test][region + 1][0] = server that region is hosted on [test][region + 1][server + 1] = size + * of region cached on server + */ + private final int[][][] clusterRegionCacheRatioMocks = new int[][][] { + // Test 1: each region is entirely on server that hosts it + // Cost of moving the regions in this case should be high as the regions are fully cached + // on the server they are currently hosted on + new int[][] { new int[] { 2, 1, 1 }, // Server 0 has 2, server 1 has 1 and server 2 has 1 + // region(s) hosted respectively + new int[] { 0, 100, 0, 0 }, // region 0 is hosted and cached only on server 0 + new int[] { 0, 100, 0, 0 }, // region 1 is hosted and cached only on server 0 + new int[] { 1, 0, 100, 0 }, // region 2 is hosted and cached only on server 1 + new int[] { 2, 0, 0, 100 }, // region 3 is hosted and cached only on server 2 + }, + + // Test 2: each region is cached completely on the server it is currently hosted on, + // but it was also cached on some other server historically + // Cost of moving the regions in this case should be high as the regions are fully cached + // on the server they are currently hosted on. Although, the regions were previously hosted and + // cached on some other server, since they are completely cached on the new server, + // there is no need to move the regions back to the previously hosting cluster + new int[][] { new int[] { 1, 2, 1 }, // Server 0 has 1, server 1 has 2 and server 2 has 1 + // region(s) hosted respectively + new int[] { 0, 100, 0, 100 }, // region 0 is hosted and currently cached on server 0, + // but previously cached completely on server 2 + new int[] { 1, 100, 100, 0 }, // region 1 is hosted and currently cached on server 1, + // but previously cached completely on server 0 + new int[] { 1, 0, 100, 100 }, // region 2 is hosted and currently cached on server 1, + // but previously cached on server 2 + new int[] { 2, 0, 100, 100 }, // region 3 is hosted and currently cached on server 2, + // but previously cached on server 1 + }, + + // Test 3: The regions were hosted and fully cached on a server but later moved to other + // because of server crash procedure. The regions are partially cached on the server they + // are currently hosted on + new int[][] { new int[] { 1, 2, 1 }, new int[] { 0, 50, 0, 100 }, // Region 0 is currently + // hosted and partially + // cached on + // server 0, but was fully + // cached on server 2 + // previously + new int[] { 1, 100, 50, 0 }, // Region 1 is currently hosted and partially cached on + // server 1, but was fully cached on server 0 previously + new int[] { 1, 0, 50, 100 }, // Region 2 is currently hosted and partially cached on + // server 1, but was fully cached on server 2 previously + new int[] { 2, 0, 100, 50 }, // Region 3 is currently hosted and partially cached on + // server 2, but was fully cached on server 1 previously + }, + + // Test 4: The regions were hosted and fully cached on a server, but later moved to other + // server because of server crash procedure. The regions are not at all cached on the server + // they are currently hosted on + new int[][] { new int[] { 1, 1, 2 }, new int[] { 0, 0, 0, 100 }, // Region 0 is currently hosted + // but not cached on server + // 0, + // but was fully cached on + // server 2 previously + new int[] { 1, 100, 0, 0 }, // Region 1 is currently hosted but not cached on server 1, + // but was fully cached on server 0 previously + new int[] { 2, 0, 100, 0 }, // Region 2 is currently hosted but not cached on server 2, + // but was fully cached on server 1 previously + new int[] { 2, 100, 0, 0 }, // Region 3 is currently hosted but not cached on server 2, + // but was fully cached on server 1 previously + }, + + // Test 5: The regions were partially cached on old servers, before moving to the new server + // where also, they are partially cached + new int[][] { new int[] { 2, 1, 1 }, new int[] { 0, 50, 50, 0 }, // Region 0 is hosted and + // partially cached on + // server 0, but + // was previously hosted and + // partially cached on + // server 1 + new int[] { 0, 50, 0, 50 }, // Region 1 is hosted and partially cached on server 0, but + // was previously hosted and partially cached on server 2 + new int[] { 1, 0, 50, 50 }, // Region 2 is hosted and partially cached on server 1, but + // was previously hosted and partially cached on server 2 + new int[] { 2, 0, 50, 50 }, // Region 3 is hosted and partially cached on server 2, but + // was previously hosted and partially cached on server 1 + }, + + // Test 6: The regions are less cached on the new servers as compared to what they were + // cached on the server before they were moved to the new servers + new int[][] { new int[] { 1, 2, 1 }, new int[] { 0, 30, 70, 0 }, // Region 0 is hosted and + // cached 30% on server 0, + // but was + // previously hosted and + // cached 70% on server 1 + new int[] { 1, 70, 30, 0 }, // Region 1 is hosted and cached 30% on server 1, but was + // previously hosted and cached 70% on server 0 + new int[] { 1, 0, 30, 70 }, // Region 2 is hosted and cached 30% on server 1, but was + // previously hosted and cached 70% on server 2 + new int[] { 2, 0, 70, 30 }, // Region 3 is hosted and cached 30% on server 2, but was + // previously hosted and cached 70% on server 1 + }, + + // Test 7: The regions are more cached on the new servers as compared to what they were + // cached on the server before they were moved to the new servers + new int[][] { new int[] { 2, 1, 1 }, new int[] { 0, 80, 20, 0 }, // Region 0 is hosted and 80% + // cached on server 0, but + // was + // previously hosted and 20% + // cached on server 1 + new int[] { 0, 80, 0, 20 }, // Region 1 is hosted and 80% cached on server 0, but was + // previously hosted and 20% cached on server 2 + new int[] { 1, 20, 80, 0 }, // Region 2 is hosted and 80% cached on server 1, but was + // previously hosted and 20% cached on server 0 + new int[] { 2, 0, 20, 80 }, // Region 3 is hosted and 80% cached on server 2, but was + // previously hosted and 20% cached on server 1 + }, + + // Test 8: The regions are randomly assigned to the server with some regions historically + // hosted on other region servers + new int[][] { new int[] { 1, 2, 1 }, new int[] { 0, 34, 0, 58 }, // Region 0 is hosted and + // partially cached on + // server 0, + // but was previously hosted + // and partially cached on + // server 2 + // current cache ratio < + // historical cache ratio + new int[] { 1, 78, 100, 0 }, // Region 1 is hosted and fully cached on server 1, + // but was previously hosted and partially cached on server 0 + // current cache ratio > historical cache ratio + new int[] { 1, 66, 66, 0 }, // Region 2 is hosted and partially cached on server 1, + // but was previously hosted and partially cached on server 0 + // current cache ratio == historical cache ratio + new int[] { 2, 0, 0, 96 }, // Region 3 is hosted and partially cached on server 0 + // No historical cache ratio + }, }; + + private static Configuration storedConfiguration; + + private CacheAwareLoadBalancer loadBalancer = new CacheAwareLoadBalancer(); + + @BeforeClass + public static void saveInitialConfiguration() { + storedConfiguration = new Configuration(conf); + } + + @Before + public void beforeEachTest() { + conf = new Configuration(storedConfiguration); + loadBalancer.loadConf(conf); + } + + @Test + public void testVerifyCacheAwareSkewnessCostFunctionEnabled() { + CacheAwareLoadBalancer lb = new CacheAwareLoadBalancer(); + lb.loadConf(conf); + assertTrue(Arrays.asList(lb.getCostFunctionNames()) + .contains(CacheAwareLoadBalancer.CacheAwareRegionSkewnessCostFunction.class.getSimpleName())); + } + + @Test + public void testVerifyCacheAwareSkewnessCostFunctionDisabled() { + conf.setFloat( + CacheAwareLoadBalancer.CacheAwareRegionSkewnessCostFunction.REGION_COUNT_SKEW_COST_KEY, 0.0f); + + CacheAwareLoadBalancer lb = new CacheAwareLoadBalancer(); + lb.loadConf(conf); + + assertFalse(Arrays.asList(lb.getCostFunctionNames()) + .contains(CacheAwareLoadBalancer.CacheAwareRegionSkewnessCostFunction.class.getSimpleName())); + } + + @Test + public void testVerifyCacheCostFunctionEnabled() { + conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "/tmp/prefetch.persistence"); + + CacheAwareLoadBalancer lb = new CacheAwareLoadBalancer(); + lb.loadConf(conf); + + assertTrue(Arrays.asList(lb.getCostFunctionNames()) + .contains(CacheAwareLoadBalancer.CacheAwareCostFunction.class.getSimpleName())); + } + + @Test + public void testVerifyCacheCostFunctionDisabledByNoBucketCachePersistence() { + assertFalse(Arrays.asList(loadBalancer.getCostFunctionNames()) + .contains(CacheAwareLoadBalancer.CacheAwareCostFunction.class.getSimpleName())); + } + + @Test + public void testVerifyCacheCostFunctionDisabledByNoMultiplier() { + conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "/tmp/prefetch.persistence"); + conf.setFloat("hbase.master.balancer.stochastic.cacheCost", 0.0f); + assertFalse(Arrays.asList(loadBalancer.getCostFunctionNames()) + .contains(CacheAwareLoadBalancer.CacheAwareCostFunction.class.getSimpleName())); + } + + @Test + public void testCacheCost() { + conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "/tmp/prefetch.persistence"); + CacheAwareLoadBalancer.CacheAwareCostFunction costFunction = + new CacheAwareLoadBalancer.CacheAwareCostFunction(conf); + + for (int test = 0; test < clusterRegionCacheRatioMocks.length; test++) { + int[][] clusterRegionLocations = clusterRegionCacheRatioMocks[test]; + MockClusterForCacheCost cluster = new MockClusterForCacheCost(clusterRegionLocations); + costFunction.prepare(cluster); + double cost = costFunction.cost(); + assertEquals(expectedCacheCost[test], cost, 0.01); + } + } + + private class MockClusterForCacheCost extends BalancerClusterState { + private final Map, Float> regionServerCacheRatio = new HashMap<>(); + + public MockClusterForCacheCost(int[][] regionsArray) { + // regions[0] is an array where index = serverIndex and value = number of regions + super(mockClusterServersUnsorted(regionsArray[0], 1), null, null, null, null); + Map> oldCacheRatio = new HashMap<>(); + for (int i = 1; i < regionsArray.length; i++) { + int regionIndex = i - 1; + for (int j = 1; j < regionsArray[i].length; j++) { + int serverIndex = j - 1; + float cacheRatio = (float) regionsArray[i][j] / 100; + regionServerCacheRatio.put(new Pair<>(regionIndex, serverIndex), cacheRatio); + if (cacheRatio > 0.0f && serverIndex != regionsArray[i][0]) { + // This is the historical cacheRatio value + oldCacheRatio.put(regions[regionIndex].getEncodedName(), + new Pair<>(servers[serverIndex], cacheRatio)); + } + } + } + regionCacheRatioOnOldServerMap = oldCacheRatio; + } + + @Override + public int getTotalRegionHFileSizeMB(int region) { + return 1; + } + + @Override + protected float getRegionCacheRatioOnRegionServer(int region, int regionServerIndex) { + float cacheRatio = 0.0f; + + // Get the cache ratio if the region is currently hosted on this server + if (regionServerIndex == regionIndexToServerIndex[region]) { + return regionServerCacheRatio.get(new Pair<>(region, regionServerIndex)); + } + + // Region is not currently hosted on this server. Check if the region was cached on this + // server earlier. This can happen when the server was shutdown and the cache was persisted. + // Search using the index name and server name and not the index id and server id as these + // ids may change when a server is marked as dead or a new server is added. + String regionEncodedName = regions[region].getEncodedName(); + ServerName serverName = servers[regionServerIndex]; + if ( + regionCacheRatioOnOldServerMap != null + && regionCacheRatioOnOldServerMap.containsKey(regionEncodedName) + ) { + Pair serverCacheRatio = + regionCacheRatioOnOldServerMap.get(regionEncodedName); + if (ServerName.isSameAddress(serverName, serverCacheRatio.getFirst())) { + cacheRatio = serverCacheRatio.getSecond(); + regionCacheRatioOnOldServerMap.remove(regionEncodedName); + } + } + return cacheRatio; + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java index 748045246b3b..67ef296da58b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java @@ -86,6 +86,8 @@ private ServerMetrics mockServerMetricsWithReadRequests(ServerName server, when(rl.getWriteRequestCount()).thenReturn(0L); when(rl.getMemStoreSize()).thenReturn(Size.ZERO); when(rl.getStoreFileSize()).thenReturn(Size.ZERO); + when(rl.getRegionSizeMB()).thenReturn(Size.ZERO); + when(rl.getCurrentRegionCachedRatio()).thenReturn(0.0f); regionLoadMap.put(info.getRegionName(), rl); } when(serverMetrics.getRegionMetrics()).thenReturn(regionLoadMap);