diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java index 382be2108f2c..ec906a5a830f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java @@ -23,13 +23,17 @@ import org.apache.hadoop.hdds.scm.container.ContainerManagerV2; import org.apache.hadoop.hdds.scm.container.ReplicationManager; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; public class ContainerBalancer { @@ -40,96 +44,347 @@ public class ContainerBalancer { private ContainerManagerV2 containerManager; private ReplicationManager replicationManager; private OzoneConfiguration ozoneConfiguration; + private final SCMContext scmContext; private double threshold; private int maxDatanodesToBalance; private long maxSizeToMove; - private boolean balancerRunning; - private List sourceNodes; - private List targetNodes; + private List unBalancedNodes; + private List overUtilizedNodes; + private List underUtilizedNodes; + private List withinThresholdUtilizedNodes; private ContainerBalancerConfiguration config; + private ContainerBalancerMetrics metrics; + private long clusterCapacity; + private long clusterUsed; + private long clusterRemaining; + private double clusterAvgUtilisation; + private final AtomicBoolean balancerRunning = new AtomicBoolean(false); + /** + * Constructs ContainerBalancer with the specified arguments. Initializes + * new ContainerBalancerConfiguration and ContainerBalancerMetrics. + * Container Balancer does not start on construction. + * + * @param nodeManager NodeManager + * @param containerManager ContainerManager + * @param replicationManager ReplicationManager + * @param ozoneConfiguration OzoneConfiguration + */ public ContainerBalancer( NodeManager nodeManager, ContainerManagerV2 containerManager, ReplicationManager replicationManager, - OzoneConfiguration ozoneConfiguration) { + OzoneConfiguration ozoneConfiguration, + final SCMContext scmContext) { this.nodeManager = nodeManager; this.containerManager = containerManager; this.replicationManager = replicationManager; this.ozoneConfiguration = ozoneConfiguration; - this.balancerRunning = false; this.config = new ContainerBalancerConfiguration(); + this.metrics = new ContainerBalancerMetrics(); + this.scmContext = scmContext; + + this.clusterCapacity = 0L; + this.clusterUsed = 0L; + this.clusterRemaining = 0L; + + this.overUtilizedNodes = new ArrayList<>(); + this.underUtilizedNodes = new ArrayList<>(); + this.unBalancedNodes = new ArrayList<>(); + this.withinThresholdUtilizedNodes = new ArrayList<>(); } /** - * Start ContainerBalancer. Current implementation is incomplete. + * Starts ContainerBalancer. Current implementation is incomplete. * * @param balancerConfiguration Configuration values. */ - public void start(ContainerBalancerConfiguration balancerConfiguration) { - this.balancerRunning = true; + public boolean start(ContainerBalancerConfiguration balancerConfiguration) { + if (!balancerRunning.compareAndSet(false, true)) { + LOG.error("Container Balancer is already running."); + return false; + } ozoneConfiguration = new OzoneConfiguration(); - - // initialise configs this.config = balancerConfiguration; this.threshold = config.getThreshold(); - this.maxDatanodesToBalance = - config.getMaxDatanodesToBalance(); + this.maxDatanodesToBalance = config.getMaxDatanodesToBalance(); this.maxSizeToMove = config.getMaxSizeToMove(); + this.unBalancedNodes = new ArrayList<>(); + + LOG.info("Starting Container Balancer...{}", this); + balance(); + return true; + } + + /** + * Balances the cluster. + */ + private void balance() { + initializeIteration(); - LOG.info("Starting Container Balancer..."); + // unBalancedNodes is not cleared since the next iteration uses this + // iteration's unBalancedNodes to find out how many nodes were balanced + overUtilizedNodes.clear(); + underUtilizedNodes.clear(); + withinThresholdUtilizedNodes.clear(); + } + /** + * Initializes an iteration during balancing. Recognizes over, under, and + * within threshold utilized nodes. Decides whether balancing needs to + * continue or should be stopped. + * + * @return true if successfully initialized, otherwise false. + */ + private boolean initializeIteration() { + if (scmContext.isInSafeMode()) { + LOG.error("Container Balancer cannot operate while SCM is in Safe Mode."); + return false; + } // sorted list in order from most to least used - List nodes = nodeManager. - getMostOrLeastUsedDatanodes(true); - double avgUtilisation = calculateAvgUtilisation(nodes); + List datanodeUsageInfos = + nodeManager.getMostOrLeastUsedDatanodes(true); + if (datanodeUsageInfos.isEmpty()) { + LOG.info("Container Balancer could not retrieve nodes from Node " + + "Manager."); + stop(); + return false; + } + + clusterAvgUtilisation = calculateAvgUtilization(datanodeUsageInfos); + LOG.info("Average utilization of the cluster is {}", clusterAvgUtilisation); // under utilized nodes have utilization(that is, used / capacity) less // than lower limit - double lowerLimit = avgUtilisation - threshold; + double lowerLimit = clusterAvgUtilisation - threshold; // over utilized nodes have utilization(that is, used / capacity) greater // than upper limit - double upperLimit = avgUtilisation + threshold; - LOG.info("Lower limit for utilization is {}", lowerLimit); - LOG.info("Upper limit for utilization is {}", upperLimit); - - // find over utilised(source) and under utilised(target) nodes - sourceNodes = new ArrayList<>(); - targetNodes = new ArrayList<>(); -// for (DatanodeUsageInfo node : nodes) { -// SCMNodeStat stat = node.getScmNodeStat(); -// double utilization = stat.getScmUsed().get().doubleValue() / -// stat.getCapacity().get().doubleValue(); -// if (utilization > upperLimit) { -// sourceNodes.add(node); -// } else if (utilization < lowerLimit || utilization < avgUtilisation) { -// targetNodes.add(node); -// } -// } - } - - // calculate the average datanode utilisation across the cluster - private double calculateAvgUtilisation(List nodes) { + double upperLimit = clusterAvgUtilisation + threshold; + + LOG.info("Lower limit for utilization is {} and Upper limit for " + + "utilization is {}", lowerLimit, upperLimit); + + long countDatanodesToBalance = 0L; + double overLoadedBytes = 0D, underLoadedBytes = 0D; + + // find over and under utilized nodes + for (DatanodeUsageInfo datanodeUsageInfo : datanodeUsageInfos) { + double utilization = calculateUtilization(datanodeUsageInfo); + if (utilization > upperLimit) { + overUtilizedNodes.add(datanodeUsageInfo); + countDatanodesToBalance += 1; + + // amount of bytes greater than upper limit in this node + overLoadedBytes += ratioToBytes( + datanodeUsageInfo.getScmNodeStat().getCapacity().get(), + utilization) - ratioToBytes( + datanodeUsageInfo.getScmNodeStat().getCapacity().get(), + upperLimit); + } else if (utilization < lowerLimit) { + underUtilizedNodes.add(datanodeUsageInfo); + countDatanodesToBalance += 1; + + // amount of bytes lesser than lower limit in this node + underLoadedBytes += ratioToBytes( + datanodeUsageInfo.getScmNodeStat().getCapacity().get(), + lowerLimit) - ratioToBytes( + datanodeUsageInfo.getScmNodeStat().getCapacity().get(), + utilization); + } else { + withinThresholdUtilizedNodes.add(datanodeUsageInfo); + } + } + Collections.reverse(underUtilizedNodes); + + long countDatanodesBalanced = 0; + // count number of nodes that were balanced in previous iteration + for (DatanodeUsageInfo node : unBalancedNodes) { + if (!containsNode(overUtilizedNodes, node) && + !containsNode(underUtilizedNodes, node)) { + countDatanodesBalanced += 1; + } + } + // calculate total number of nodes that have been balanced so far + countDatanodesBalanced = + metrics.incrementDatanodesNumBalanced(countDatanodesBalanced); + + unBalancedNodes = new ArrayList<>( + overUtilizedNodes.size() + underUtilizedNodes.size()); + + if (countDatanodesBalanced + countDatanodesToBalance > + maxDatanodesToBalance) { + LOG.info("Approaching Max Datanodes To Balance limit in Container " + + "Balancer. Stopping Balancer."); + stop(); + return false; + } else { + unBalancedNodes.addAll(overUtilizedNodes); + unBalancedNodes.addAll(underUtilizedNodes); + + if (unBalancedNodes.isEmpty()) { + LOG.info("Did not find any unbalanced Datanodes."); + stop(); + return false; + } else { + LOG.info("Container Balancer has identified Datanodes that need to be" + + " balanced."); + } + } + return true; + } + + /** + * Performs binary search to determine if the specified listToSearch + * contains the specified node. + * + * @param listToSearch List of DatanodeUsageInfo to be searched. + * @param node DatanodeUsageInfo to be searched for. + * @return true if the specified node is present in listToSearch, otherwise + * false. + */ + private boolean containsNode( + List listToSearch, DatanodeUsageInfo node) { + int index = 0; + Comparator comparator = + DatanodeUsageInfo.getMostUsedByRemainingRatio(); + int size = listToSearch.size(); + if (size == 0) { + return false; + } + + if (comparator.compare(listToSearch.get(0), + listToSearch.get(size - 1)) < 0) { + index = + Collections.binarySearch(listToSearch, node, comparator.reversed()); + } else { + index = Collections.binarySearch(listToSearch, node, comparator); + } + return index >= 0 && listToSearch.get(index).equals(node); + } + + /** + * Calculates the number of used bytes given capacity and utilization ratio. + * + * @param nodeCapacity capacity of the node. + * @param utilizationRatio used space by capacity ratio of the node. + * @return number of bytes + */ + private double ratioToBytes(Long nodeCapacity, double utilizationRatio) { + return nodeCapacity * utilizationRatio; + } + + /** + * Calculates the average utilization for the specified nodes. + * Utilization is used space divided by capacity. + * + * @param nodes List of DatanodeUsageInfo to find the average utilization for + * @return Average utilization value + */ + private double calculateAvgUtilization(List nodes) { + if (nodes.size() == 0) { + LOG.warn("No nodes to calculate average utilization for in " + + "ContainerBalancer."); + return 0; + } SCMNodeStat aggregatedStats = new SCMNodeStat( 0, 0, 0); for (DatanodeUsageInfo node : nodes) { aggregatedStats.add(node.getScmNodeStat()); } - return aggregatedStats.getScmUsed().get().doubleValue() / - aggregatedStats.getCapacity().get().doubleValue(); + clusterCapacity = aggregatedStats.getCapacity().get(); + clusterUsed = aggregatedStats.getScmUsed().get(); + clusterRemaining = aggregatedStats.getRemaining().get(); + + return clusterUsed / (double) clusterCapacity; + } + + /** + * Calculates the utilization, that is used space divided by capacity, for + * the given datanodeUsageInfo. + * + * @param datanodeUsageInfo DatanodeUsageInfo to calculate utilization for + * @return Utilization value + */ + public static double calculateUtilization( + DatanodeUsageInfo datanodeUsageInfo) { + SCMNodeStat stat = datanodeUsageInfo.getScmNodeStat(); + + return stat.getScmUsed().get().doubleValue() / + stat.getCapacity().get().doubleValue(); } + /** + * Stops ContainerBalancer. + */ public void stop() { - LOG.info("Stopping Container Balancer..."); - balancerRunning = false; + balancerRunning.set(false); LOG.info("Container Balancer stopped."); } + public void setNodeManager(NodeManager nodeManager) { + this.nodeManager = nodeManager; + } + + public void setContainerManager( + ContainerManagerV2 containerManager) { + this.containerManager = containerManager; + } + + public void setReplicationManager( + ReplicationManager replicationManager) { + this.replicationManager = replicationManager; + } + + public void setOzoneConfiguration( + OzoneConfiguration ozoneConfiguration) { + this.ozoneConfiguration = ozoneConfiguration; + } + + /** + * Gets the average utilization of the cluster as calculated by + * ContainerBalancer. + * + * @return average utilization value + */ + public double getClusterAvgUtilisation() { + return clusterAvgUtilisation; + } + + /** + * Gets the list of unBalanced nodes, that is, the over and under utilized + * nodes in the cluster. + * + * @return List of DatanodeUsageInfo containing unBalanced nodes. + */ + public List getUnBalancedNodes() { + return unBalancedNodes; + } + + /** + * Sets the unBalanced nodes, that is, the over and under utilized nodes in + * the cluster. + * + * @param unBalancedNodes List of DatanodeUsageInfo + */ + public void setUnBalancedNodes( + List unBalancedNodes) { + this.unBalancedNodes = unBalancedNodes; + } + + /** + * Checks if ContainerBalancer is currently running. + * + * @return true if ContainerBalancer is running, false if not running. + */ + public boolean isBalancerRunning() { + return balancerRunning.get(); + } + @Override public String toString() { - String status = String.format("Container Balancer status:%n" + + String status = String.format("%nContainer Balancer status:%n" + "%-30s %s%n" + "%-30s %b%n", "Key", "Value", "Running", balancerRunning); return status + config.toString(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java index c68c420cbac6..054968331f70 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java @@ -23,12 +23,18 @@ import org.apache.hadoop.hdds.conf.ConfigGroup; import org.apache.hadoop.hdds.conf.ConfigTag; import org.apache.hadoop.hdds.conf.ConfigType; +import org.apache.hadoop.ozone.OzoneConsts; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class contains configuration values for the ContainerBalancer. */ @ConfigGroup(prefix = "hdds.container.balancer.") public final class ContainerBalancerConfiguration { + private static final Logger LOG = + LoggerFactory.getLogger(ContainerBalancerConfiguration.class); + @Config(key = "utilization.threshold", type = ConfigType.AUTO, defaultValue = "0.1", tags = {ConfigTag.BALANCER}, description = "Threshold is a fraction in the range of 0 to 1. A " + @@ -36,7 +42,7 @@ public final class ContainerBalancerConfiguration { "utilization of the datanode (used space to capacity ratio) differs" + " from the utilization of the cluster (used space to capacity ratio" + " of the entire cluster) no more than the threshold value.") - private double threshold = 0.1; + private String threshold = "0.1"; @Config(key = "datanodes.balanced.max", type = ConfigType.INT, defaultValue = "5", tags = {ConfigTag.BALANCER}, description = "The " + @@ -44,31 +50,38 @@ public final class ContainerBalancerConfiguration { "Balancer will not balance more number of datanodes than this limit.") private int maxDatanodesToBalance = 5; - @Config(key = "size.moved.max", type = ConfigType.LONG, - defaultValue = "10737418240L", tags = {ConfigTag.BALANCER}, - description = "The maximum size of data in Bytes that will be moved " + - "by the Container Balancer.") - private long maxSizeToMove = 10737418240L; + @Config(key = "size.moved.max", type = ConfigType.SIZE, + defaultValue = "10GB", tags = {ConfigTag.BALANCER}, + description = "The maximum size of data in bytes that will be moved " + + "by Container Balancer.") + private long maxSizeToMove = 10 * OzoneConsts.GB; /** - * Get the threshold value for Container Balancer. + * Gets the threshold value for Container Balancer. + * * @return a fraction in the range 0 to 1 */ public double getThreshold() { - return threshold; + return Double.parseDouble(threshold); } /** - * Set the threshold value for Container Balancer. + * Sets the threshold value for Container Balancer. + * * @param threshold a fraction in the range 0 to 1 */ public void setThreshold(double threshold) { - this.threshold = threshold; + if (threshold < 0 || threshold > 1) { + throw new IllegalArgumentException( + "Threshold must be a fraction in the range 0 to 1."); + } + this.threshold = String.valueOf(threshold); } /** - * Get the value of maximum number of datanodes that will be balanced by + * Gets the value of maximum number of datanodes that will be balanced by * Container Balancer. + * * @return maximum number of datanodes */ public int getMaxDatanodesToBalance() { @@ -76,8 +89,9 @@ public int getMaxDatanodesToBalance() { } /** - * Set the value of maximum number of datanodes that will be balanced by + * Sets the value of maximum number of datanodes that will be balanced by * Container Balancer. + * * @param maxDatanodesToBalance maximum number of datanodes */ public void setMaxDatanodesToBalance(int maxDatanodesToBalance) { @@ -85,18 +99,18 @@ public void setMaxDatanodesToBalance(int maxDatanodesToBalance) { } /** - * Get the value of maximum number of bytes that will be moved by the - * Container Balancer. - * @return maximum number of bytes + * Gets the maximum size that will be moved by Container Balancer. + * + * @return maximum size in Bytes */ public long getMaxSizeToMove() { return maxSizeToMove; } /** - * Set the value of maximum number of bytes that will be moved by - * Container Balancer. - * @param maxSizeToMove maximum number of bytes + * Sets the value of maximum size that will be moved by Container Balancer. + * + * @param maxSizeToMove maximum number of Bytes */ public void setMaxSizeToMove(long maxSizeToMove) { this.maxSizeToMove = maxSizeToMove; @@ -106,7 +120,7 @@ public void setMaxSizeToMove(long maxSizeToMove) { public String toString() { return String.format("Container Balancer Configuration values:%n" + "%-30s %s%n" + - "%-30s %f%n" + + "%-30s %s%n" + "%-30s %d%n" + "%-30s %dB%n", "Key", "Value", "Threshold", threshold, "Max Datanodes to Balance", maxDatanodesToBalance, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java new file mode 100644 index 000000000000..1d8ab410b2ff --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.hdds.scm.container.balancer; + +import org.apache.hadoop.hdds.scm.container.placement.metrics.LongMetric; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; + +@Metrics(name = "ContainerBalancer Metrics", about = "Metrics related to " + + "Container Balancer running in SCM", context = "SCM") +public final class ContainerBalancerMetrics { + + @Metric(about = "The total amount of used space in GigaBytes that needs to " + + "be balanced.") + private LongMetric dataSizeToBalanceGB; + + @Metric(about = "The amount of Giga Bytes that have been moved to achieve " + + "balance.") + private LongMetric dataSizeBalancedGB; + + @Metric(about = "Number of containers that Container Balancer has moved" + + " until now.") + private LongMetric movedContainersNum; + + @Metric(about = "The total number of datanodes that need to be balanced.") + private LongMetric datanodesNumToBalance; + + @Metric(about = "Number of datanodes that Container Balancer has balanced " + + "until now.") + private LongMetric datanodesNumBalanced; + + @Metric(about = "Utilisation value of the current maximum utilised datanode.") + private double maxDatanodeUtilizedRatio; + + /** + * Initialise metrics for ContainerBalancer. + */ + public ContainerBalancerMetrics() { + dataSizeToBalanceGB = new LongMetric(0L); + dataSizeBalancedGB = new LongMetric(0L); + movedContainersNum = new LongMetric(0L); + datanodesNumToBalance = new LongMetric(0L); + datanodesNumBalanced = new LongMetric(0L); + maxDatanodeUtilizedRatio = 0D; + } + + public LongMetric getDataSizeToBalanceGB() { + return dataSizeToBalanceGB; + } + + public void setDataSizeToBalanceGB(long size) { + this.dataSizeToBalanceGB = new LongMetric(size); + } + + public LongMetric getDataSizeBalancedGB() { + return dataSizeBalancedGB; + } + + public void setDataSizeBalancedGB( + LongMetric dataSizeBalancedGB) { + this.dataSizeBalancedGB = dataSizeBalancedGB; + } + + public LongMetric getMovedContainersNum() { + return movedContainersNum; + } + + public void setMovedContainersNum( + LongMetric movedContainersNum) { + this.movedContainersNum = movedContainersNum; + } + + public LongMetric getDatanodesNumToBalance() { + return datanodesNumToBalance; + } + + public void setDatanodesNumToBalance( + LongMetric datanodesNumToBalance) { + this.datanodesNumToBalance = datanodesNumToBalance; + } + + public LongMetric getDatanodesNumBalanced() { + return datanodesNumBalanced; + } + + public void setDatanodesNumBalanced( + LongMetric datanodesNumBalanced) { + this.datanodesNumBalanced = datanodesNumBalanced; + } + + /** + * Add specified valueToAdd to datanodesNumBalanced. + * + * @param valueToAdd The value to add. + * @return The result after addition. + */ + public long incrementDatanodesNumBalanced(long valueToAdd) { + datanodesNumBalanced.add(valueToAdd); + return datanodesNumBalanced.get(); + } + + public double getMaxDatanodeUtilizedRatio() { + return maxDatanodeUtilizedRatio; + } + + public void setMaxDatanodeUtilizedRatio( + double maxDatanodeUtilizedRatio) { + this.maxDatanodeUtilizedRatio = maxDatanodeUtilizedRatio; + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index fdfc4ec9c260..bf9d1ca89b3c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -412,7 +412,7 @@ private StorageContainerManager(OzoneConfiguration conf, eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler); containerBalancer = new ContainerBalancer(scmNodeManager, - containerManager, replicationManager, configuration); + containerManager, replicationManager, configuration, scmContext); LOG.info(containerBalancer.toString()); // Emit initial safe mode status, as now handlers are registered. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 949f7a1f23d7..7d97d9042d7a 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -16,35 +16,36 @@ */ package org.apache.hadoop.hdds.scm.container; -import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.StorageReportProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.NodeReportProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; + .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.net.NetConstants; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; import org.apache.hadoop.hdds.scm.net.Node; import org.apache.hadoop.hdds.scm.node.DatanodeInfo; import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; -import org.apache.hadoop.hdds.scm.node.NodeStatus; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.scm.pipeline.PipelineID; -import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; -import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.NodeReportProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.protocol.VersionResponse; @@ -57,6 +58,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -65,10 +67,11 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState - .HEALTHY; +import static org.apache.hadoop.hdds.protocol.proto + .HddsProtos.NodeState.HEALTHY; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE; /** @@ -104,9 +107,7 @@ public class MockNodeManager implements NodeManager { private int numRaftLogDisksPerDatanode; private int numPipelinePerDatanode; - public MockNodeManager(NetworkTopologyImpl clusterMap, - List nodes, - boolean initializeFakeNodes, int nodeCount) { + { this.healthyNodes = new LinkedList<>(); this.staleNodes = new LinkedList<>(); this.deadNodes = new LinkedList<>(); @@ -115,6 +116,12 @@ public MockNodeManager(NetworkTopologyImpl clusterMap, this.node2ContainerMap = new Node2ContainerMap(); this.dnsToUuidMap = new ConcurrentHashMap<>(); this.aggregateStat = new SCMNodeStat(); + this.clusterMap = new NetworkTopologyImpl(new OzoneConfiguration()); + } + + public MockNodeManager(NetworkTopologyImpl clusterMap, + List nodes, + boolean initializeFakeNodes, int nodeCount) { this.clusterMap = clusterMap; if (!nodes.isEmpty()) { for (int x = 0; x < nodes.size(); x++) { @@ -143,6 +150,28 @@ public MockNodeManager(boolean initializeFakeNodes, int nodeCount) { initializeFakeNodes, nodeCount); } + public MockNodeManager(List nodes) + throws IllegalArgumentException { + if (!nodes.isEmpty()) { + for (DatanodeUsageInfo node : nodes) { + register(node.getDatanodeDetails(), null, null); + nodeMetricMap.put(node.getDatanodeDetails(), node.getScmNodeStat()); + aggregateStat.add(node.getScmNodeStat()); + healthyNodes.add(node.getDatanodeDetails()); + } + } else { + throw new IllegalArgumentException("The argument nodes list must not " + + "be empty"); + } + + safemode = false; + this.commandMap = new HashMap<>(); + numHealthyDisksPerDatanode = 1; + numRaftLogDisksPerDatanode = 1; + numPipelinePerDatanode = numRaftLogDisksPerDatanode * + NUM_PIPELINE_PER_METADATA_DISK; + } + /** * Invoked from ctor to create some node Metrics. * @@ -299,7 +328,22 @@ public Map getNodeStats() { @Override public List getMostOrLeastUsedDatanodes( boolean mostUsed) { - return null; + List datanodeDetailsList = + getNodes(NodeOperationalState.IN_SERVICE, HEALTHY); + if (datanodeDetailsList == null) { + return new ArrayList<>(); + } + Comparator comparator; + if (mostUsed) { + comparator = DatanodeUsageInfo.getMostUsedByRemainingRatio().reversed(); + } else { + comparator = DatanodeUsageInfo.getMostUsedByRemainingRatio(); + } + + return datanodeDetailsList.stream() + .map(node -> new DatanodeUsageInfo(node, nodeMetricMap.get(node))) + .sorted(comparator) + .collect(Collectors.toList()); } /** diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java new file mode 100644 index 000000000000..1fad575fda0b --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.hdds.scm.container.balancer; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.scm.container.ContainerManagerV2; +import org.apache.hadoop.hdds.scm.container.MockNodeManager; +import org.apache.hadoop.hdds.scm.container.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; +import org.apache.hadoop.ozone.OzoneConsts; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +public class TestContainerBalancer { + + private static final Logger LOG = + LoggerFactory.getLogger(TestContainerBalancer.class); + + private ReplicationManager replicationManager; + private ContainerManagerV2 containerManager; + private ContainerBalancer containerBalancer; + private MockNodeManager mockNodeManager; + private OzoneConfiguration conf; + private ContainerBalancerConfiguration balancerConfiguration; + private List nodesInCluster; + private List nodeUtilizations; + private double averageUtilization; + private int numberOfNodes; + + /** + * Sets up configuration values and creates a mock cluster. + */ + @Before + public void setup() { + conf = new OzoneConfiguration(); + containerManager = Mockito.mock(ContainerManagerV2.class); + replicationManager = Mockito.mock(ReplicationManager.class); + + balancerConfiguration = new ContainerBalancerConfiguration(); + balancerConfiguration.setThreshold(0.1); + balancerConfiguration.setMaxDatanodesToBalance(10); + balancerConfiguration.setMaxSizeToMove(500 * OzoneConsts.GB); + conf.setFromObject(balancerConfiguration); + + // create datanodes with the generated nodeUtilization values + this.averageUtilization = createNodesInCluster(); + mockNodeManager = new MockNodeManager(nodesInCluster); + containerBalancer = new ContainerBalancer(mockNodeManager, containerManager, + replicationManager, conf, SCMContext.emptyContext()); + } + + /** + * Checks whether ContainerBalancer is correctly updating the list of + * unBalanced nodes with varying values of Threshold. + */ + @Test + public void + initializeIterationShouldUpdateUnBalancedNodesWhenThresholdChanges() { + List expectedUnBalancedNodes; + List unBalancedNodesAccordingToBalancer; + + // check for random threshold values + for (int i = 0; i < 50; i++) { + double randomThreshold = Math.random(); + + balancerConfiguration.setThreshold(randomThreshold); + containerBalancer.start(balancerConfiguration); + expectedUnBalancedNodes = + determineExpectedUnBalancedNodes(randomThreshold); + unBalancedNodesAccordingToBalancer = + containerBalancer.getUnBalancedNodes(); + + Assert.assertEquals( + expectedUnBalancedNodes.size(), + unBalancedNodesAccordingToBalancer.size()); + + for (int j = 0; j < expectedUnBalancedNodes.size(); j++) { + Assert.assertEquals(expectedUnBalancedNodes.get(j).getDatanodeDetails(), + unBalancedNodesAccordingToBalancer.get(j).getDatanodeDetails()); + } + containerBalancer.stop(); + } + + } + + /** + * Checks whether the list of unBalanced nodes is empty when the cluster is + * balanced. + */ + @Test + public void unBalancedNodesListShouldBeEmptyWhenClusterIsBalanced() { + balancerConfiguration.setThreshold(0.99); + containerBalancer.start(balancerConfiguration); + + Assert.assertEquals(0, containerBalancer.getUnBalancedNodes().size()); + containerBalancer.stop(); + } + + /** + * Checks whether ContainerBalancer stops when the limit of + * MaxDatanodesToBalance is reached. + */ + @Test + public void containerBalancerShouldStopWhenMaxDatanodesToBalanceIsReached() { + balancerConfiguration.setMaxDatanodesToBalance(2); + balancerConfiguration.setThreshold(0); + containerBalancer.start(balancerConfiguration); + + Assert.assertFalse(containerBalancer.isBalancerRunning()); + containerBalancer.stop(); + } + + /** + * Determines unBalanced nodes, that is, over and under utilized nodes, + * according to the generated utilization values for nodes and the threshold. + * + * @param threshold A fraction from range 0 to 1. + * @return List of DatanodeUsageInfo containing the expected(correct) + * unBalanced nodes. + */ + private List determineExpectedUnBalancedNodes( + double threshold) { + double lowerLimit = averageUtilization - threshold; + double upperLimit = averageUtilization + threshold; + + // use node utilizations to determine over and under utilized nodes + List expectedUnBalancedNodes = new ArrayList<>(); + for (int i = 0; i < numberOfNodes; i++) { + if (nodeUtilizations.get(numberOfNodes - i - 1) > upperLimit) { + expectedUnBalancedNodes.add(nodesInCluster.get(numberOfNodes - i - 1)); + } + } + for (int i = 0; i < numberOfNodes; i++) { + if (nodeUtilizations.get(i) < lowerLimit) { + expectedUnBalancedNodes.add(nodesInCluster.get(i)); + } + } + return expectedUnBalancedNodes; + } + + /** + * Generates a range of equally spaced utilization(that is, used / capacity) + * values from 0 to 1. + * + * @param count Number of values to generate. Count must be greater than or + * equal to 1. + * @throws IllegalArgumentException If the value of the parameter count is + * less than 1. + */ + private void generateUtilizations(int count) throws IllegalArgumentException { + if (count < 1) { + LOG.warn("The value of argument count is {}. However, count must be " + + "greater than 0.", count); + throw new IllegalArgumentException(); + } + nodeUtilizations = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + nodeUtilizations.add(i / (double) count); + } + } + + /** + * Creates DatanodeUsageInfo nodes using the generated utilization values. + * Capacities are chosen randomly from a list. + * + * @return Average utilization of the created cluster. + */ + private double createNodesInCluster() { + this.numberOfNodes = 10; + generateUtilizations(numberOfNodes); + nodesInCluster = new ArrayList<>(nodeUtilizations.size()); + long[] capacities = {1000000, 2000000, 3000000, 4000000, 5000000}; + double totalUsed = 0, totalCapacity = 0; + + for (double utilization : nodeUtilizations) { + // select a random index from 0 to capacities.length + int index = ThreadLocalRandom.current().nextInt(0, capacities.length); + long capacity = capacities[index]; + long used = (long) (capacity * utilization); + totalCapacity += capacity; + totalUsed += used; + SCMNodeStat stat = new SCMNodeStat(capacity, used, capacity - used); + + nodesInCluster.add( + new DatanodeUsageInfo(MockDatanodeDetails.randomDatanodeDetails(), + stat)); + } + return totalUsed / totalCapacity; + } + +}