diff --git a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java index 39dcabab668..69848dc0a51 100644 --- a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java +++ b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java @@ -44,5 +44,6 @@ public enum ConfigTag { DATANODE, RECON, DELETION, - HA + HA, + BALANCER } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java new file mode 100644 index 00000000000..382be2108f2 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.hdds.scm.container.balancer; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.container.ContainerManagerV2; +import org.apache.hadoop.hdds.scm.container.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +public class ContainerBalancer { + + private static final Logger LOG = + LoggerFactory.getLogger(ContainerBalancer.class); + + private NodeManager nodeManager; + private ContainerManagerV2 containerManager; + private ReplicationManager replicationManager; + private OzoneConfiguration ozoneConfiguration; + private double threshold; + private int maxDatanodesToBalance; + private long maxSizeToMove; + private boolean balancerRunning; + private List sourceNodes; + private List targetNodes; + private ContainerBalancerConfiguration config; + + public ContainerBalancer( + NodeManager nodeManager, + ContainerManagerV2 containerManager, + ReplicationManager replicationManager, + OzoneConfiguration ozoneConfiguration) { + this.nodeManager = nodeManager; + this.containerManager = containerManager; + this.replicationManager = replicationManager; + this.ozoneConfiguration = ozoneConfiguration; + this.balancerRunning = false; + this.config = new ContainerBalancerConfiguration(); + } + + /** + * Start ContainerBalancer. Current implementation is incomplete. + * + * @param balancerConfiguration Configuration values. + */ + public void start(ContainerBalancerConfiguration balancerConfiguration) { + this.balancerRunning = true; + + ozoneConfiguration = new OzoneConfiguration(); + + // initialise configs + this.config = balancerConfiguration; + this.threshold = config.getThreshold(); + this.maxDatanodesToBalance = + config.getMaxDatanodesToBalance(); + this.maxSizeToMove = config.getMaxSizeToMove(); + + LOG.info("Starting Container Balancer..."); + + // sorted list in order from most to least used + List nodes = nodeManager. + getMostOrLeastUsedDatanodes(true); + double avgUtilisation = calculateAvgUtilisation(nodes); + + // under utilized nodes have utilization(that is, used / capacity) less + // than lower limit + double lowerLimit = avgUtilisation - threshold; + + // over utilized nodes have utilization(that is, used / capacity) greater + // than upper limit + double upperLimit = avgUtilisation + threshold; + LOG.info("Lower limit for utilization is {}", lowerLimit); + LOG.info("Upper limit for utilization is {}", upperLimit); + + // find over utilised(source) and under utilised(target) nodes + sourceNodes = new ArrayList<>(); + targetNodes = new ArrayList<>(); +// for (DatanodeUsageInfo node : nodes) { +// SCMNodeStat stat = node.getScmNodeStat(); +// double utilization = stat.getScmUsed().get().doubleValue() / +// stat.getCapacity().get().doubleValue(); +// if (utilization > upperLimit) { +// sourceNodes.add(node); +// } else if (utilization < lowerLimit || utilization < avgUtilisation) { +// targetNodes.add(node); +// } +// } + } + + // calculate the average datanode utilisation across the cluster + private double calculateAvgUtilisation(List nodes) { + SCMNodeStat aggregatedStats = new SCMNodeStat( + 0, 0, 0); + for (DatanodeUsageInfo node : nodes) { + aggregatedStats.add(node.getScmNodeStat()); + } + return aggregatedStats.getScmUsed().get().doubleValue() / + aggregatedStats.getCapacity().get().doubleValue(); + } + + public void stop() { + LOG.info("Stopping Container Balancer..."); + balancerRunning = false; + LOG.info("Container Balancer stopped."); + } + + @Override + public String toString() { + String status = String.format("Container Balancer status:%n" + + "%-30s %s%n" + + "%-30s %b%n", "Key", "Value", "Running", balancerRunning); + return status + config.toString(); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java new file mode 100644 index 00000000000..c68c420cbac --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.hdds.scm.container.balancer; + +import org.apache.hadoop.hdds.conf.Config; +import org.apache.hadoop.hdds.conf.ConfigGroup; +import org.apache.hadoop.hdds.conf.ConfigTag; +import org.apache.hadoop.hdds.conf.ConfigType; + +/** + * This class contains configuration values for the ContainerBalancer. + */ +@ConfigGroup(prefix = "hdds.container.balancer.") +public final class ContainerBalancerConfiguration { + @Config(key = "utilization.threshold", type = ConfigType.AUTO, defaultValue = + "0.1", tags = {ConfigTag.BALANCER}, + description = "Threshold is a fraction in the range of 0 to 1. A " + + "cluster is considered balanced if for each datanode, the " + + "utilization of the datanode (used space to capacity ratio) differs" + + " from the utilization of the cluster (used space to capacity ratio" + + " of the entire cluster) no more than the threshold value.") + private double threshold = 0.1; + + @Config(key = "datanodes.balanced.max", type = ConfigType.INT, + defaultValue = "5", tags = {ConfigTag.BALANCER}, description = "The " + + "maximum number of datanodes that should be balanced. Container " + + "Balancer will not balance more number of datanodes than this limit.") + private int maxDatanodesToBalance = 5; + + @Config(key = "size.moved.max", type = ConfigType.LONG, + defaultValue = "10737418240L", tags = {ConfigTag.BALANCER}, + description = "The maximum size of data in Bytes that will be moved " + + "by the Container Balancer.") + private long maxSizeToMove = 10737418240L; + + /** + * Get the threshold value for Container Balancer. + * @return a fraction in the range 0 to 1 + */ + public double getThreshold() { + return threshold; + } + + /** + * Set the threshold value for Container Balancer. + * @param threshold a fraction in the range 0 to 1 + */ + public void setThreshold(double threshold) { + this.threshold = threshold; + } + + /** + * Get the value of maximum number of datanodes that will be balanced by + * Container Balancer. + * @return maximum number of datanodes + */ + public int getMaxDatanodesToBalance() { + return maxDatanodesToBalance; + } + + /** + * Set the value of maximum number of datanodes that will be balanced by + * Container Balancer. + * @param maxDatanodesToBalance maximum number of datanodes + */ + public void setMaxDatanodesToBalance(int maxDatanodesToBalance) { + this.maxDatanodesToBalance = maxDatanodesToBalance; + } + + /** + * Get the value of maximum number of bytes that will be moved by the + * Container Balancer. + * @return maximum number of bytes + */ + public long getMaxSizeToMove() { + return maxSizeToMove; + } + + /** + * Set the value of maximum number of bytes that will be moved by + * Container Balancer. + * @param maxSizeToMove maximum number of bytes + */ + public void setMaxSizeToMove(long maxSizeToMove) { + this.maxSizeToMove = maxSizeToMove; + } + + @Override + public String toString() { + return String.format("Container Balancer Configuration values:%n" + + "%-30s %s%n" + + "%-30s %f%n" + + "%-30s %d%n" + + "%-30s %dB%n", "Key", "Value", "Threshold", + threshold, "Max Datanodes to Balance", maxDatanodesToBalance, + "Max Size to Move", maxSizeToMove); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/package-info.java new file mode 100644 index 00000000000..c15f7c639ae --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** + * This package contains classes related to Container Balancer. + */ +package org.apache.hadoop.hdds.scm.container.balancer; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index bfd45d8ed36..37ecf77c4fd 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -21,29 +21,12 @@ */ package org.apache.hadoop.hdds.scm.server; -import javax.management.ObjectName; -import java.io.IOException; -import java.math.BigInteger; -import java.net.InetAddress; -import java.net.InetSocketAddress; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.protobuf.BlockingService; - -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.UUID; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; - import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.HddsUtils; @@ -54,28 +37,9 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; import org.apache.hadoop.hdds.scm.PlacementPolicy; -import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl; -import org.apache.hadoop.hdds.scm.container.ContainerManagerV2; -import org.apache.hadoop.hdds.scm.ha.HASecurityUtils; -import org.apache.hadoop.hdds.scm.ha.SCMContext; -import org.apache.hadoop.hdds.scm.ha.SCMHAManager; -import org.apache.hadoop.hdds.scm.ha.SCMHAManagerImpl; -import org.apache.hadoop.hdds.scm.ha.SCMHANodeDetails; -import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo; -import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; -import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails; -import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl; -import org.apache.hadoop.hdds.scm.ha.SCMHAUtils; -import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator; -import org.apache.hadoop.hdds.scm.ScmInfo; -import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore; -import org.apache.hadoop.hdds.security.x509.certificate.authority.PKIProfiles.DefaultProfile; -import org.apache.hadoop.hdds.security.x509.certificate.client.SCMCertificateClient; -import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec; -import org.apache.hadoop.hdds.utils.HAUtils; -import org.apache.hadoop.hdds.utils.HddsServerUtil; import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.block.BlockManager; import org.apache.hadoop.hdds.scm.block.BlockManagerImpl; import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImplV2; @@ -85,9 +49,12 @@ import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl; +import org.apache.hadoop.hdds.scm.container.ContainerManagerV2; import org.apache.hadoop.hdds.scm.container.ContainerReportHandler; import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler; import org.apache.hadoop.hdds.scm.container.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer; import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory; import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics; import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat; @@ -95,31 +62,48 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; +import org.apache.hadoop.hdds.scm.ha.HASecurityUtils; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.ha.SCMHAManager; +import org.apache.hadoop.hdds.scm.ha.SCMHAManagerImpl; +import org.apache.hadoop.hdds.scm.ha.SCMHANodeDetails; +import org.apache.hadoop.hdds.scm.ha.SCMHAUtils; +import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails; +import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo; +import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl; +import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; +import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; import org.apache.hadoop.hdds.scm.node.DeadNodeHandler; import org.apache.hadoop.hdds.scm.node.NewNodeHandler; -import org.apache.hadoop.hdds.scm.node.StartDatanodeAdminHandler; -import org.apache.hadoop.hdds.scm.node.NonHealthyToHealthyNodeHandler; +import org.apache.hadoop.hdds.scm.node.NodeDecommissionManager; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeReportHandler; +import org.apache.hadoop.hdds.scm.node.NonHealthyToHealthyNodeHandler; import org.apache.hadoop.hdds.scm.node.SCMNodeManager; import org.apache.hadoop.hdds.scm.node.StaleNodeHandler; -import org.apache.hadoop.hdds.scm.node.NodeDecommissionManager; +import org.apache.hadoop.hdds.scm.node.StartDatanodeAdminHandler; import org.apache.hadoop.hdds.scm.pipeline.PipelineActionHandler; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; -import org.apache.hadoop.hdds.scm.pipeline.PipelineReportHandler; import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerV2Impl; +import org.apache.hadoop.hdds.scm.pipeline.PipelineReportHandler; import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.PipelineChoosePolicyFactory; import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateServer; +import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore; import org.apache.hadoop.hdds.security.x509.certificate.authority.DefaultCAServer; +import org.apache.hadoop.hdds.security.x509.certificate.authority.PKIProfiles.DefaultProfile; +import org.apache.hadoop.hdds.security.x509.certificate.client.SCMCertificateClient; +import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec; import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.EventQueue; +import org.apache.hadoop.hdds.utils.HAUtils; +import org.apache.hadoop.hdds.utils.HddsServerUtil; import org.apache.hadoop.hdds.utils.HddsVersionInfo; import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; import org.apache.hadoop.io.IOUtils; @@ -140,9 +124,25 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.management.ObjectName; +import java.io.IOException; +import java.math.BigInteger; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT_DEFAULT; -import static org.apache.hadoop.hdds.utils.HAUtils.checkSecurityAndSCMHAEnabled; import static org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore.CertType.VALID_CERTS; +import static org.apache.hadoop.hdds.utils.HAUtils.checkSecurityAndSCMHAEnabled; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD; import static org.apache.hadoop.ozone.OzoneConsts.CRL_SEQUENCE_ID_KEY; import static org.apache.hadoop.ozone.OzoneConsts.SCM_SUB_CA_PREFIX; @@ -237,6 +237,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl private final SCMHANodeDetails scmHANodeDetails; + private ContainerBalancer containerBalancer; + /** * Creates a new StorageContainerManager. Configuration will be * updated with information on the actual listening addresses used @@ -400,6 +402,10 @@ private StorageContainerManager(OzoneConfiguration conf, eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler); eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler); + containerBalancer = new ContainerBalancer(scmNodeManager, + containerManager, replicationManager, configuration); + LOG.info(containerBalancer.toString()); + // Emit initial safe mode status, as now handlers are registered. scmSafeModeManager.emitSafeModeStatus();