diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto index c99274da75ea..01597a7b0ece 100644 --- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto +++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto @@ -431,3 +431,22 @@ message ReplicationManagerReportProto { repeated KeyIntValue stat = 2; repeated KeyContainerIDList statSample = 3; } + +message ContainerBalancerConfigurationProto { + optional string utilizationThreshold = 5; + optional int32 datanodesInvolvedMaxPercentagePerIteration = 6; + optional int64 sizeMovedMaxPerIteration = 7; + optional int64 sizeEnteringTargetMax = 8; + optional int64 sizeLeavingSourceMax = 9; + optional int32 iterations = 10; + optional string excludeContainers = 11; + optional int64 moveTimeout = 12; + optional int64 balancingIterationInterval = 13; + optional string includeDatanodes = 14; + optional string excludeDatanodes = 15; + optional bool moveNetworkTopologyEnable = 16; + optional bool triggerDuBeforeMoveEnable = 17; + + required bool shouldRun = 18; + optional int32 nextIterationIndex = 19; +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java index c2d96ffbd291..77d9b5cc61ee 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.fs.DUFactory; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerBalancerConfigurationProto; import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -33,7 +34,7 @@ import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.ha.SCMContext; -import org.apache.hadoop.hdds.scm.ha.SCMService; +import org.apache.hadoop.hdds.scm.ha.StatefulService; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; import org.apache.hadoop.hdds.scm.node.NodeManager; @@ -43,6 +44,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -65,7 +67,7 @@ * Container balancer is a service in SCM to move containers between over- and * under-utilized datanodes. */ -public class ContainerBalancer implements SCMService { +public class ContainerBalancer extends StatefulService { public static final Logger LOG = LoggerFactory.getLogger(ContainerBalancer.class); @@ -98,7 +100,6 @@ public class ContainerBalancer implements SCMService { private NetworkTopology networkTopology; private double upperLimit; private double lowerLimit; - private volatile boolean balancerRunning; private volatile Thread currentBalancingThread; private Lock lock; private ContainerBalancerSelectionCriteria selectionCriteria; @@ -110,15 +111,17 @@ public class ContainerBalancer implements SCMService { CompletableFuture> moveSelectionToFutureMap; private IterationResult iterationResult; + private int nextIterationIndex; /** * Constructs ContainerBalancer with the specified arguments. Initializes - * new ContainerBalancerConfiguration and ContainerBalancerMetrics. - * Container Balancer does not start on construction. + * ContainerBalancerMetrics. Container Balancer does not start on + * construction. * * @param scm the storage container manager */ public ContainerBalancer(StorageContainerManager scm) { + super(scm.getStatefulServiceStateManager()); this.nodeManager = scm.getScmNodeManager(); this.containerManager = scm.getContainerManager(); this.replicationManager = scm.getReplicationManager(); @@ -134,13 +137,16 @@ public ContainerBalancer(StorageContainerManager scm) { this.unBalancedNodes = new ArrayList<>(); this.placementPolicy = scm.getContainerPlacementPolicy(); this.networkTopology = scm.getClusterMap(); + this.nextIterationIndex = 0; this.lock = new ReentrantLock(); findSourceStrategy = new FindSourceGreedy(nodeManager); + scm.getSCMServiceManager().register(this); } /** - * Balances the cluster. + * Balances the cluster in iterations. Regularly checks if balancing has + * been stopped. */ private void balance() { this.iterations = config.getIterations(); @@ -149,7 +155,11 @@ private void balance() { this.iterations = Integer.MAX_VALUE; } - for (int i = 0; i < iterations && balancerRunning; i++) { + // nextIterationIndex is the iteration that balancer should start from on + // leader change or restart + int i = nextIterationIndex; + resetState(); + for (; i < iterations && isBalancerRunning(); i++) { if (config.getTriggerDuEnable()) { // before starting a new iteration, we trigger all the datanode // to run `du`. this is an aggressive action, with which we can @@ -179,19 +189,37 @@ private void balance() { } } - // stop balancing if iteration is not initialized + // initialize this iteration. stop balancing on initialization failure if (!initializeIteration()) { - stopBalancer(); + // just return if the reason for initialization failure is that + // balancer has been stopped in another thread + if (!isBalancerRunning()) { + return; + } + // otherwise, try to stop balancer + tryStopBalancer("Could not initialize ContainerBalancer's " + + "iteration number " + i); return; } - //if no new move option is generated, it means the cluster can - //not be balanced any more , so just stop IterationResult iR = doIteration(); metrics.incrementNumIterations(1); LOG.info("Result of this iteration of Container Balancer: {}", iR); + + // persist next iteration index + if (iR == IterationResult.ITERATION_COMPLETED) { + try { + saveConfiguration(config, true, i + 1); + } catch (IOException e) { + LOG.warn("Could not persist next iteration index value for " + + "ContainerBalancer after completing an iteration", e); + } + } + + // if no new move option is generated, it means the cluster cannot be + // balanced anymore; so just stop balancer if (iR == IterationResult.CAN_NOT_BALANCE_ANY_MORE) { - stopBalancer(); + tryStopBalancer(iR.toString()); return; } @@ -215,7 +243,11 @@ private void balance() { } } } - stopBalancer(); + + // finally, stop balancer if it hasn't been stopped already + if (isBalancerRunning()) { + tryStopBalancer("Completed all iterations."); + } } /** @@ -238,12 +270,11 @@ private boolean initializeIteration() { List datanodeUsageInfos = nodeManager.getMostOrLeastUsedDatanodes(true); if (datanodeUsageInfos.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Container Balancer could not retrieve nodes from Node " + - "Manager."); - } + LOG.warn("Received an empty list of datanodes from Node Manager when " + + "trying to identify which nodes to balance"); return false; } + this.threshold = config.getThresholdAsRatio(); this.maxDatanodesRatioToInvolvePerIteration = config.getMaxDatanodesRatioToInvolvePerIteration(); @@ -801,24 +832,72 @@ private void resetState() { /** * Receives a notification for raft or safe mode related status changes. * Stops ContainerBalancer if it's running and the current SCM becomes a - * follower or enters safe mode. + * follower or enters safe mode. Starts ContainerBalancer if the current + * SCM becomes leader, is out of safe mode and balancer should run. */ @Override public void notifyStatusChanged() { - if (!scmContext.isLeader() || scmContext.isInSafeMode()) { - if (isBalancerRunning()) { - stopBalancingThread(); + lock.lock(); + try { + if (!scmContext.isLeader() || scmContext.isInSafeMode()) { + if (isBalancerRunning()) { + LOG.info("Stopping ContainerBalancer in this scm on status change"); + stop(); + } + } else { + if (shouldRun()) { + try { + LOG.info("Starting ContainerBalancer in this scm on status change"); + start(); + } catch (IllegalContainerBalancerStateException | + InvalidContainerBalancerConfigurationException e) { + LOG.warn("Could not start ContainerBalancer on raft/safe-mode " + + "status change.", e); + } + } } + } finally { + lock.unlock(); } } /** - * Checks if ContainerBalancer should run. - * @return false + * Checks if ContainerBalancer should start (after a leader change, restart + * etc.) by reading persisted state. + * @return true if the persisted state is true, otherwise false */ @Override public boolean shouldRun() { - return false; + try { + ContainerBalancerConfigurationProto proto = + readConfiguration(ContainerBalancerConfigurationProto.class); + if (proto == null) { + LOG.warn("Could not find persisted configuration for {} when checking" + + " if ContainerBalancer should run. ContainerBalancer should not " + + "run now.", this.getServiceName()); + return false; + } + return proto.getShouldRun(); + } catch (IOException e) { + LOG.warn("Could not read persisted configuration for checking if " + + "ContainerBalancer should start. ContainerBalancer should not start" + + " now.", e); + return false; + } + } + + /** + * Checks if ContainerBalancer is currently running in this SCM. + * + * @return true if the currentBalancingThread is not null, otherwise false + */ + public boolean isBalancerRunning() { + lock.lock(); + try { + return currentBalancingThread != null; + } finally { + lock.unlock(); + } } /** @@ -830,12 +909,47 @@ public String getServiceName() { } /** - * Starts ContainerBalancer as an SCMService. + * Starts ContainerBalancer as an SCMService. Validates state, reads and + * validates persisted configuration, and then starts the balancing + * thread. + * @throws IllegalContainerBalancerStateException if balancer should not + * run according to persisted configuration + * @throws InvalidContainerBalancerConfigurationException if failed to + * retrieve persisted configuration, or the configuration is null */ @Override - public void start() { - if (shouldRun()) { + public void start() throws IllegalContainerBalancerStateException, + InvalidContainerBalancerConfigurationException { + lock.lock(); + try { + // should be leader-ready, out of safe mode, and not running already + validateState(false); + ContainerBalancerConfigurationProto proto; + try { + proto = readConfiguration(ContainerBalancerConfigurationProto.class); + } catch (IOException e) { + throw new InvalidContainerBalancerConfigurationException("Could not " + + "retrieve persisted configuration while starting " + + "Container Balancer as an SCMService. Will not start now.", e); + } + if (proto == null) { + throw new InvalidContainerBalancerConfigurationException("Persisted " + + "configuration for ContainerBalancer is null during start. Will " + + "not start now."); + } + if (!proto.getShouldRun()) { + throw new IllegalContainerBalancerStateException("According to " + + "persisted configuration, ContainerBalancer should not run."); + } + ContainerBalancerConfiguration configuration = + ContainerBalancerConfiguration.fromProtobuf(proto, + ozoneConfiguration); + validateConfiguration(configuration); + this.config = configuration; + this.nextIterationIndex = proto.getNextIterationIndex(); startBalancingThread(); + } finally { + lock.unlock(); } } @@ -848,13 +962,16 @@ public void start() { * @throws InvalidContainerBalancerConfigurationException if * {@link ContainerBalancerConfiguration} config file is incorrectly * configured + * @throws IOException on failure to persist + * {@link ContainerBalancerConfiguration} */ - public void startBalancer() throws IllegalContainerBalancerStateException, - InvalidContainerBalancerConfigurationException { + public void startBalancer(ContainerBalancerConfiguration configuration) + throws IllegalContainerBalancerStateException, + InvalidContainerBalancerConfigurationException, IOException { lock.lock(); try { - validateState(); - validateConfiguration(this.config); + // validates state, config, and then saves config + setBalancerConfigOnStartBalancer(configuration); startBalancingThread(); } finally { lock.unlock(); @@ -867,7 +984,6 @@ public void startBalancer() throws IllegalContainerBalancerStateException, private void startBalancingThread() { lock.lock(); try { - balancerRunning = true; currentBalancingThread = new Thread(this::balance); currentBalancingThread.setName("ContainerBalancer"); currentBalancingThread.setDaemon(true); @@ -879,11 +995,17 @@ private void startBalancingThread() { } /** - * Checks if ContainerBalancer can start. - * @throws IllegalContainerBalancerStateException if ContainerBalancer is - * already running, SCM is in safe mode, or SCM is not leader ready + * Validates balancer's state based on the specified expectedRunning. + * Confirms SCM is leader-ready and out of safe mode. + * + * @param expectedRunning true if ContainerBalancer is expected to be + * running, else false + * @throws IllegalContainerBalancerStateException if SCM is not + * leader-ready, is in safe mode, or state does not match the specified + * expected state */ - private void validateState() throws IllegalContainerBalancerStateException { + private void validateState(boolean expectedRunning) + throws IllegalContainerBalancerStateException { if (!scmContext.isLeaderReady()) { LOG.warn("SCM is not leader ready"); throw new IllegalContainerBalancerStateException("SCM is not leader " + @@ -895,10 +1017,10 @@ private void validateState() throws IllegalContainerBalancerStateException { } lock.lock(); try { - if (isBalancerRunning() || currentBalancingThread != null) { - LOG.warn("Cannot start ContainerBalancer because it's already running"); + if (isBalancerRunning() != expectedRunning) { throw new IllegalContainerBalancerStateException( - "Cannot start ContainerBalancer because it's already running"); + "Expect ContainerBalancer running state to be " + expectedRunning + + ", but running state is actually " + isBalancerRunning()); } } finally { lock.unlock(); @@ -910,20 +1032,49 @@ private void validateState() throws IllegalContainerBalancerStateException { */ @Override public void stop() { - stopBalancer(); + lock.lock(); + try { + if (!isBalancerRunning()) { + LOG.warn("Cannot stop Container Balancer because it's not running"); + return; + } + stopBalancingThread(); + } finally { + lock.unlock(); + } } /** * Stops ContainerBalancer gracefully. */ - public void stopBalancer() { + public void stopBalancer() + throws IOException, IllegalContainerBalancerStateException { lock.lock(); try { - if (!isBalancerRunning()) { - LOG.info("Container Balancer is not running."); - return; - } - stopBalancingThread(); + // should be leader, out of safe mode, and currently running + validateState(true); + saveConfiguration(config, false, 0); + stop(); + } finally { + lock.unlock(); + } + } + + /** + * Tries to stop ContainerBalancer. Logs the reason for stopping. Calls + * {@link ContainerBalancer#stopBalancer()}. + * @param stopReason a string specifying the reason for stopping + * ContainerBalancer. + */ + private void tryStopBalancer(String stopReason) { + lock.lock(); + try { + LOG.info("Stopping ContainerBalancer. Reason for stopping: {}", + stopReason); + stopBalancer(); + } catch (IllegalContainerBalancerStateException | IOException e) { + LOG.warn("Tried to stop ContainerBalancer but failed. Reason for " + + "stopping: {}", stopReason, e); } finally { lock.unlock(); } @@ -933,7 +1084,6 @@ private void stopBalancingThread() { Thread balancingThread; lock.lock(); try { - balancerRunning = false; balancingThread = currentBalancingThread; currentBalancingThread = null; } finally { @@ -952,6 +1102,20 @@ private void stopBalancingThread() { LOG.info("Container Balancer stopped successfully."); } + private void saveConfiguration(ContainerBalancerConfiguration configuration, + boolean shouldRun, int index) + throws IOException { + lock.lock(); + try { + saveConfiguration(configuration.toProtobufBuilder() + .setShouldRun(shouldRun) + .setNextIterationIndex(index) + .build()); + } finally { + lock.unlock(); + } + } + private void validateConfiguration(ContainerBalancerConfiguration conf) throws InvalidContainerBalancerConfigurationException { // maxSizeEnteringTarget and maxSizeLeavingSource should by default be @@ -1007,12 +1171,24 @@ public void setOzoneConfiguration( } /** - * Sets the configuration that ContainerBalancer will use. This should be - * set before starting balancer. - * @param config ContainerBalancerConfiguration + * Persists the configuration that ContainerBalancer will use after + * validating state and the specified configuration. + * @param configuration ContainerBalancerConfiguration to persist + * @throws InvalidContainerBalancerConfigurationException on failure to + * validate the specified configuration + * @throws IllegalContainerBalancerStateException if this SCM is not leader + * or not out of safe mode or if ContainerBalancer is currently running in + * this SCM + * @throws IOException on failure to persist configuration */ - public void setConfig(ContainerBalancerConfiguration config) { - this.config = config; + private void setBalancerConfigOnStartBalancer( + ContainerBalancerConfiguration configuration) + throws InvalidContainerBalancerConfigurationException, + IllegalContainerBalancerStateException, IOException { + validateState(false); + validateConfiguration(configuration); + saveConfiguration(configuration, true, 0); + this.config = configuration; } /** @@ -1060,15 +1236,6 @@ public Map getSourceToTargetMap() { return sourceToTargetMap; } - /** - * Checks if ContainerBalancer is currently running. - * - * @return true if ContainerBalancer is running, false if not running. - */ - public boolean isBalancerRunning() { - return balancerRunning; - } - @VisibleForTesting int getCountDatanodesInvolvedPerIteration() { return countDatanodesInvolvedPerIteration; @@ -1096,7 +1263,7 @@ public static int ratioToPercent(double ratio) { public String toString() { String status = String.format("%nContainer Balancer status:%n" + "%-30s %s%n" + - "%-30s %b%n", "Key", "Value", "Running", balancerRunning); + "%-30s %b%n", "Key", "Value", "Running", isBalancerRunning()); return status + config.toString(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java index 4e994c8a3896..9d5083768cd1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java @@ -22,8 +22,11 @@ import org.apache.hadoop.hdds.conf.ConfigGroup; import org.apache.hadoop.hdds.conf.ConfigTag; import org.apache.hadoop.hdds.conf.ConfigType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerBalancerConfigurationProto; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.ozone.OzoneConsts; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -209,6 +212,10 @@ public Boolean getTriggerDuEnable() { return triggerDuEnable; } + public void setTriggerDuEnable(boolean enable) { + triggerDuEnable = enable; + } + /** * Set the NetworkTopologyEnable value for Container Balancer. * @@ -315,6 +322,10 @@ public void setMoveTimeout(Duration duration) { this.moveTimeout = duration.toMillis(); } + public void setMoveTimeout(long millis) { + this.moveTimeout = millis; + } + public Duration getBalancingInterval() { return Duration.ofMillis(balancingInterval); } @@ -323,6 +334,10 @@ public void setBalancingInterval(Duration balancingInterval) { this.balancingInterval = balancingInterval.toMillis(); } + public void setBalancingInterval(long millis) { + this.balancingInterval = millis; + } + /** * Gets a set of datanode hostnames or ip addresses that will be the exclusive * participants in balancing. @@ -390,4 +405,73 @@ public String toString() { "Max Size Leaving Source per Iteration", maxSizeLeavingSource / OzoneConsts.GB); } + + ContainerBalancerConfigurationProto.Builder toProtobufBuilder() { + ContainerBalancerConfigurationProto.Builder builder = + ContainerBalancerConfigurationProto.newBuilder(); + + builder.setUtilizationThreshold(threshold) + .setDatanodesInvolvedMaxPercentagePerIteration( + maxDatanodesPercentageToInvolvePerIteration) + .setSizeMovedMaxPerIteration(maxSizeToMovePerIteration) + .setSizeEnteringTargetMax(maxSizeEnteringTarget) + .setSizeLeavingSourceMax(maxSizeLeavingSource) + .setIterations(iterations) + .setExcludeContainers(excludeContainers) + .setMoveTimeout(moveTimeout) + .setBalancingIterationInterval(balancingInterval) + .setIncludeDatanodes(includeNodes) + .setExcludeDatanodes(excludeNodes) + .setMoveNetworkTopologyEnable(networkTopologyEnable) + .setTriggerDuBeforeMoveEnable(triggerDuEnable); + return builder; + } + + static ContainerBalancerConfiguration fromProtobuf( + @NotNull ContainerBalancerConfigurationProto proto, + @NotNull OzoneConfiguration ozoneConfiguration) { + ContainerBalancerConfiguration config = + ozoneConfiguration.getObject(ContainerBalancerConfiguration.class); + if (proto.hasUtilizationThreshold()) { + config.setThreshold(Double.parseDouble(proto.getUtilizationThreshold())); + } + if (proto.hasDatanodesInvolvedMaxPercentagePerIteration()) { + config.setMaxDatanodesPercentageToInvolvePerIteration( + proto.getDatanodesInvolvedMaxPercentagePerIteration()); + } + if (proto.hasSizeMovedMaxPerIteration()) { + config.setMaxSizeToMovePerIteration(proto.getSizeMovedMaxPerIteration()); + } + if (proto.hasSizeEnteringTargetMax()) { + config.setMaxSizeEnteringTarget(proto.getSizeEnteringTargetMax()); + } + if (proto.hasSizeLeavingSourceMax()) { + config.setMaxSizeLeavingSource(proto.getSizeLeavingSourceMax()); + } + if (proto.hasIterations()) { + config.setIterations(proto.getIterations()); + } + if (proto.hasExcludeContainers()) { + config.setExcludeContainers(proto.getExcludeContainers()); + } + if (proto.hasMoveTimeout()) { + config.setMoveTimeout(proto.getMoveTimeout()); + } + if (proto.hasBalancingIterationInterval()) { + config.setBalancingInterval(proto.getBalancingIterationInterval()); + } + if (proto.hasIncludeDatanodes()) { + config.setIncludeNodes(proto.getIncludeDatanodes()); + } + if (proto.hasExcludeDatanodes()) { + config.setExcludeNodes(proto.getExcludeDatanodes()); + } + if (proto.hasMoveNetworkTopologyEnable()) { + config.setNetworkTopologyEnable(proto.getMoveNetworkTopologyEnable()); + } + if (proto.hasTriggerDuBeforeMoveEnable()) { + config.setTriggerDuEnable(proto.getTriggerDuBeforeMoveEnable()); + } + return config; + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java index cc938e636f5b..b061ddf7c911 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java @@ -18,10 +18,13 @@ package org.apache.hadoop.hdds.scm.container.balancer; +import org.apache.hadoop.hdds.scm.ha.SCMServiceException; + /** * Signals that a state change cannot be performed on ContainerBalancer. */ -public class IllegalContainerBalancerStateException extends Exception { +public class IllegalContainerBalancerStateException extends + SCMServiceException { /** * Constructs an IllegalContainerBalancerStateException with no detail diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/InvalidContainerBalancerConfigurationException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/InvalidContainerBalancerConfigurationException.java index c6a6bf030b45..9a4cc86ca4d6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/InvalidContainerBalancerConfigurationException.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/InvalidContainerBalancerConfigurationException.java @@ -18,11 +18,16 @@ package org.apache.hadoop.hdds.scm.container.balancer; +import org.apache.hadoop.hdds.scm.ha.SCMServiceException; + +import java.io.IOException; + /** * Signals that {@link ContainerBalancerConfiguration} contains invalid * configuration value(s). */ -public class InvalidContainerBalancerConfigurationException extends Exception { +public class InvalidContainerBalancerConfigurationException extends + SCMServiceException { /** * Constructs an InvalidContainerBalancerConfigurationException with no detail @@ -44,4 +49,8 @@ public InvalidContainerBalancerConfigurationException(String s) { super(s); } + public InvalidContainerBalancerConfigurationException(String s, + IOException e) { + super(s, e); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java index 4d7c435b1eff..2b185c9e4e4a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java @@ -66,7 +66,7 @@ enum Event { /** * starts the SCM service. */ - void start(); + void start() throws SCMServiceException; /** * stops the SCM service. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceException.java new file mode 100644 index 000000000000..72fb7d25d279 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceException.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.hadoop.hdds.scm.ha; + +/** + * Checked exceptions thrown by an {@link SCMService}. + */ +public class SCMServiceException extends Exception { + + /** + * Constructs a new exception with {@code null} as its detail message. + */ + public SCMServiceException() { + super(); + } + + public SCMServiceException(String s) { + super(s); + } + + public SCMServiceException(String message, Throwable cause) { + super(message, cause); + } + + public SCMServiceException(Throwable cause) { + super(cause); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java index 1b75c4feed01..2ab8f8ea2405 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java @@ -70,7 +70,11 @@ public synchronized void notifyEventTriggered(Event event) { public synchronized void start() { for (SCMService service : services) { LOG.debug("Stopping service:{}.", service.getServiceName()); - service.start(); + try { + service.start(); + } catch (SCMServiceException e) { + LOG.warn("Could not start " + service.getServiceName(), e); + } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java index 441e83ce13da..69df7c740fbc 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java @@ -20,8 +20,6 @@ import com.google.protobuf.ByteString; import com.google.protobuf.GeneratedMessage; -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import java.io.IOException; import java.lang.reflect.InvocationTargetException; @@ -35,10 +33,11 @@ public abstract class StatefulService implements SCMService { /** * Initialize a StatefulService from an extending class. - * @param scm {@link StorageContainerManager} + * @param stateManager a reference to the + * {@link StatefulServiceStateManager} from SCM. */ - protected StatefulService(StorageContainerManager scm) { - stateManager = scm.getStatefulServiceStateManager(); + protected StatefulService(StatefulServiceStateManager stateManager) { + this.stateManager = stateManager; } /** @@ -60,21 +59,27 @@ protected final void saveConfiguration(GeneratedMessage configurationMessage) * * @param configType the Class object of the protobuf message type * @param the Type of the protobuf message - * @return persisted protobuf message + * @return persisted protobuf message or null if the entry is not found * @throws IOException on failure to fetch the message from DB or when * parsing it. ensure the specified configType is correct */ protected final T readConfiguration( Class configType) throws IOException { + ByteString byteString = stateManager.readConfiguration(getServiceName()); + if (byteString == null) { + return null; + } try { return configType.cast(ReflectionUtil.getMethod(configType, "parseFrom", ByteString.class) - .invoke(null, stateManager.readConfiguration(getServiceName()))); + .invoke(null, byteString)); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { e.printStackTrace(); - throw new InvalidProtocolBufferException("GeneratedMessage cannot " + - "be parsed for type " + configType + ": " + e.getMessage()); + throw new IOException("GeneratedMessage cannot be parsed. Ensure that " + + configType + " is the correct expected message type for " + + this.getServiceName(), e); } + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java index d470f1bf7f64..1e7a756f41b5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java @@ -23,6 +23,8 @@ import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer; import org.apache.hadoop.hdds.utils.db.Table; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.lang.reflect.Proxy; @@ -34,6 +36,9 @@ public final class StatefulServiceStateManagerImpl implements StatefulServiceStateManager { + public static final Logger LOG = + LoggerFactory.getLogger(StatefulServiceStateManagerImpl.class); + // this table maps the service name to the configuration (ByteString) private Table statefulServiceConfig; private final DBTransactionBuffer transactionBuffer; @@ -52,10 +57,19 @@ private StatefulServiceStateManagerImpl( public void saveConfiguration(String serviceName, ByteString bytes) throws IOException { transactionBuffer.addToBuffer(statefulServiceConfig, serviceName, bytes); + + if (LOG.isDebugEnabled()) { + LOG.debug("Added specified bytes to the transaction buffer for key " + + "{} to table {}", serviceName, statefulServiceConfig.getName()); + } + if (transactionBuffer instanceof SCMHADBTransactionBuffer) { SCMHADBTransactionBuffer buffer = (SCMHADBTransactionBuffer) transactionBuffer; buffer.flush(); + if (LOG.isDebugEnabled()) { + LOG.debug("Transaction buffer flushed"); + } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ByteStringCodec.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ByteStringCodec.java new file mode 100644 index 000000000000..e0cf00c52a69 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ByteStringCodec.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.ha.io; + +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; + +/** + * A dummy codec that serializes a ByteString object to ByteString. + */ +public class ByteStringCodec implements Codec { + + @Override + public ByteString serialize(Object object) + throws InvalidProtocolBufferException { + return (ByteString) object; + } + + @Override + public Object deserialize(Class type, ByteString value) + throws InvalidProtocolBufferException { + return value; + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java index 9fb771b7a70f..dae2b3ce6ad0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.scm.ha.io; +import com.google.protobuf.ByteString; import com.google.protobuf.GeneratedMessage; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.ProtocolMessageEnum; @@ -45,6 +46,7 @@ public final class CodecFactory { codecs.put(Boolean.class, new BooleanCodec()); codecs.put(BigInteger.class, new BigIntegerCodec()); codecs.put(X509Certificate.class, new X509CertificateCodec()); + codecs.put(ByteString.class, new ByteStringCodec()); } private CodecFactory() { } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index 2d38331cebba..fcee862029d4 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -909,10 +909,9 @@ public StartContainerBalancerResponseProto startContainerBalancer( } ContainerBalancer containerBalancer = scm.getContainerBalancer(); - containerBalancer.setConfig(cbc); try { - containerBalancer.startBalancer(); - } catch (IllegalContainerBalancerStateException | + containerBalancer.startBalancer(cbc); + } catch (IllegalContainerBalancerStateException | IOException | InvalidContainerBalancerConfigurationException e) { AUDIT.logWriteFailure(buildAuditMessageForFailure( SCMAction.START_CONTAINER_BALANCER, null, e)); @@ -931,9 +930,14 @@ public StartContainerBalancerResponseProto startContainerBalancer( @Override public void stopContainerBalancer() throws IOException { getScm().checkAdminAccess(getRemoteUser()); - AUDIT.logWriteSuccess(buildAuditMessageForSuccess( - SCMAction.STOP_CONTAINER_BALANCER, null)); - scm.getContainerBalancer().stopBalancer(); + try { + scm.getContainerBalancer().stopBalancer(); + AUDIT.logWriteSuccess(buildAuditMessageForSuccess( + SCMAction.STOP_CONTAINER_BALANCER, null)); + } catch (IllegalContainerBalancerStateException e) { + AUDIT.logWriteFailure(buildAuditMessageForFailure( + SCMAction.STOP_CONTAINER_BALANCER, null, e)); + } } @Override diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 9e651c8dfdc0..4440d4a67da1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -1469,10 +1469,15 @@ private void persistSCMCertificates() throws IOException { @Override public void stop() { try { - LOG.info("Stopping Container Balancer service."); - containerBalancer.stopBalancer(); + if (containerBalancer.isBalancerRunning()) { + LOG.info("Stopping Container Balancer service."); + // stop ContainerBalancer thread in this scm + containerBalancer.stop(); + } else { + LOG.info("Container Balancer is not running."); + } } catch (Exception e) { - LOG.error("Failed to stop Container Balancer service."); + LOG.error("Failed to stop Container Balancer service.", e); } try { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java index 4901617f9f57..98c92b224239 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.container.balancer; +import com.google.protobuf.ByteString; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -28,7 +29,6 @@ import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.MockNodeManager; @@ -38,8 +38,11 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory; import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.ha.SCMService; +import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; +import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManager; +import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManagerImpl; import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; @@ -49,14 +52,13 @@ import org.apache.ozone.test.GenericTestUtils; import org.junit.Assert; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.event.Level; +import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; @@ -96,21 +98,24 @@ public class TestContainerBalancer { private Map cidToInfoMap = new HashMap<>(); private Map> datanodeToContainersMap = new HashMap<>(); + private Map serviceToConfigMap = new HashMap<>(); private static final ThreadLocalRandom RANDOM = ThreadLocalRandom.current(); - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); + private StatefulServiceStateManager serviceStateManager; /** * Sets up configuration values and creates a mock cluster. */ @Before - public void setup() throws SCMException, NodeNotFoundException { + public void setup() throws IOException, NodeNotFoundException { conf = new OzoneConfiguration(); scm = Mockito.mock(StorageContainerManager.class); containerManager = Mockito.mock(ContainerManager.class); replicationManager = Mockito.mock(ReplicationManager.class); + serviceStateManager = Mockito.mock(StatefulServiceStateManagerImpl.class); + SCMServiceManager scmServiceManager = Mockito.mock(SCMServiceManager.class); + // these configs will usually be specified in each test balancerConfiguration = conf.getObject(ContainerBalancerConfiguration.class); balancerConfiguration.setThreshold(10); @@ -161,7 +166,30 @@ public void setup() throws SCMException, NodeNotFoundException { when(scm.getClusterMap()).thenReturn(null); when(scm.getEventQueue()).thenReturn(mock(EventPublisher.class)); when(scm.getConfiguration()).thenReturn(conf); + when(scm.getStatefulServiceStateManager()).thenReturn(serviceStateManager); + when(scm.getSCMServiceManager()).thenReturn(scmServiceManager); + /* + When StatefulServiceStateManager#saveConfiguration is called, save to + in-memory serviceToConfigMap instead. + */ + Mockito.doAnswer(i -> { + serviceToConfigMap.put(i.getArgument(0, String.class), i.getArgument(1, + ByteString.class)); + return null; + }).when(serviceStateManager).saveConfiguration( + Mockito.any(String.class), + Mockito.any(ByteString.class)); + + /* + When StatefulServiceStateManager#readConfiguration is called, read from + serviceToConfigMap instead. + */ + when(serviceStateManager.readConfiguration(Mockito.anyString())).thenAnswer( + i -> serviceToConfigMap.get(i.getArgument(0, String.class))); + + Mockito.doNothing().when(scmServiceManager) + .register(Mockito.any(SCMService.class)); containerBalancer = new ContainerBalancer(scm); } @@ -184,7 +212,9 @@ public void testCalculationOfUtilization() { */ @Test public void - initializeIterationShouldUpdateUnBalancedNodesWhenThresholdChanges() { + initializeIterationShouldUpdateUnBalancedNodesWhenThresholdChanges() + throws IllegalContainerBalancerStateException, IOException, + InvalidContainerBalancerConfigurationException { List expectedUnBalancedNodes; List unBalancedNodesAccordingToBalancer; @@ -207,7 +237,7 @@ public void testCalculationOfUtilization() { unBalancedNodesAccordingToBalancer = containerBalancer.getUnBalancedNodes(); - containerBalancer.stopBalancer(); + stopBalancer(); Assert.assertEquals( expectedUnBalancedNodes.size(), unBalancedNodesAccordingToBalancer.size()); @@ -224,13 +254,15 @@ public void testCalculationOfUtilization() { * balanced. */ @Test - public void unBalancedNodesListShouldBeEmptyWhenClusterIsBalanced() { + public void unBalancedNodesListShouldBeEmptyWhenClusterIsBalanced() + throws IllegalContainerBalancerStateException, IOException, + InvalidContainerBalancerConfigurationException { balancerConfiguration.setThreshold(99.99); startBalancer(balancerConfiguration); sleepWhileBalancing(100); - containerBalancer.stopBalancer(); + stopBalancer(); ContainerBalancerMetrics metrics = containerBalancer.getMetrics(); Assert.assertEquals(0, containerBalancer.getUnBalancedNodes().size()); Assert.assertEquals(0, metrics.getNumDatanodesUnbalanced()); @@ -241,7 +273,9 @@ public void unBalancedNodesListShouldBeEmptyWhenClusterIsBalanced() { * maxDatanodesRatioToInvolvePerIteration limit. */ @Test - public void containerBalancerShouldObeyMaxDatanodesToInvolveLimit() { + public void containerBalancerShouldObeyMaxDatanodesToInvolveLimit() + throws IllegalContainerBalancerStateException, IOException, + InvalidContainerBalancerConfigurationException { int percent = 20; balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration( percent); @@ -259,11 +293,13 @@ public void containerBalancerShouldObeyMaxDatanodesToInvolveLimit() { Assert.assertTrue(metrics.getNumDatanodesInvolvedInLatestIteration() > 0); Assert.assertFalse( metrics.getNumDatanodesInvolvedInLatestIteration() > number); - containerBalancer.stopBalancer(); + stopBalancer(); } @Test - public void containerBalancerShouldSelectOnlyClosedContainers() { + public void containerBalancerShouldSelectOnlyClosedContainers() + throws IllegalContainerBalancerStateException, IOException, + InvalidContainerBalancerConfigurationException { // make all containers open, balancer should not select any of them for (ContainerInfo containerInfo : cidToInfoMap.values()) { containerInfo.setState(HddsProtos.LifeCycleState.OPEN); @@ -271,7 +307,7 @@ public void containerBalancerShouldSelectOnlyClosedContainers() { balancerConfiguration.setThreshold(10); startBalancer(balancerConfiguration); sleepWhileBalancing(500); - containerBalancer.stopBalancer(); + stopBalancer(); // balancer should have identified unbalanced nodes Assert.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty()); @@ -291,7 +327,7 @@ public void containerBalancerShouldSelectOnlyClosedContainers() { } startBalancer(balancerConfiguration); sleepWhileBalancing(500); - containerBalancer.stopBalancer(); + stopBalancer(); // check whether all selected containers are closed for (ContainerMoveSelection moveSelection: @@ -303,7 +339,9 @@ public void containerBalancerShouldSelectOnlyClosedContainers() { } @Test - public void containerBalancerShouldObeyMaxSizeToMoveLimit() { + public void containerBalancerShouldObeyMaxSizeToMoveLimit() + throws IllegalContainerBalancerStateException, IOException, + InvalidContainerBalancerConfigurationException { balancerConfiguration.setThreshold(1); balancerConfiguration.setMaxSizeToMovePerIteration(10 * OzoneConsts.GB); balancerConfiguration.setIterations(1); @@ -319,11 +357,13 @@ public void containerBalancerShouldObeyMaxSizeToMoveLimit() { .getDataSizeMovedGBInLatestIteration(); Assert.assertTrue(size > 0); Assert.assertFalse(size > 10); - containerBalancer.stopBalancer(); + stopBalancer(); } @Test - public void targetDatanodeShouldNotAlreadyContainSelectedContainer() { + public void targetDatanodeShouldNotAlreadyContainSelectedContainer() + throws IllegalContainerBalancerStateException, IOException, + InvalidContainerBalancerConfigurationException { balancerConfiguration.setThreshold(10); balancerConfiguration.setMaxSizeToMovePerIteration(100 * OzoneConsts.GB); balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100); @@ -336,7 +376,7 @@ public void targetDatanodeShouldNotAlreadyContainSelectedContainer() { Thread.sleep(1000); } catch (InterruptedException e) { } - containerBalancer.stopBalancer(); + stopBalancer(); Map sourceToTargetMap = containerBalancer.getSourceToTargetMap(); for (ContainerMoveSelection moveSelection : sourceToTargetMap.values()) { @@ -350,7 +390,9 @@ public void targetDatanodeShouldNotAlreadyContainSelectedContainer() { } @Test - public void containerMoveSelectionShouldFollowPlacementPolicy() { + public void containerMoveSelectionShouldFollowPlacementPolicy() + throws IllegalContainerBalancerStateException, IOException, + InvalidContainerBalancerConfigurationException { balancerConfiguration.setThreshold(10); balancerConfiguration.setMaxSizeToMovePerIteration(50 * OzoneConsts.GB); balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100); @@ -363,7 +405,7 @@ public void containerMoveSelectionShouldFollowPlacementPolicy() { Thread.sleep(1000); } catch (InterruptedException e) { } - containerBalancer.stopBalancer(); + stopBalancer(); Map sourceToTargetMap = containerBalancer.getSourceToTargetMap(); @@ -392,7 +434,8 @@ public void containerMoveSelectionShouldFollowPlacementPolicy() { @Test public void targetDatanodeShouldBeInServiceHealthy() - throws NodeNotFoundException { + throws NodeNotFoundException, IllegalContainerBalancerStateException, + IOException, InvalidContainerBalancerConfigurationException { balancerConfiguration.setThreshold(10); balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100); balancerConfiguration.setMaxSizeToMovePerIteration(50 * OzoneConsts.GB); @@ -407,7 +450,7 @@ public void targetDatanodeShouldBeInServiceHealthy() } catch (InterruptedException e) { } - containerBalancer.stopBalancer(); + stopBalancer(); for (ContainerMoveSelection moveSelection : containerBalancer.getSourceToTargetMap().values()) { DatanodeDetails target = moveSelection.getTargetNode(); @@ -419,7 +462,9 @@ public void targetDatanodeShouldBeInServiceHealthy() } @Test - public void selectedContainerShouldNotAlreadyHaveBeenSelected() { + public void selectedContainerShouldNotAlreadyHaveBeenSelected() + throws IllegalContainerBalancerStateException, IOException, + InvalidContainerBalancerConfigurationException { balancerConfiguration.setThreshold(10); balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100); balancerConfiguration.setMaxSizeToMovePerIteration(50 * OzoneConsts.GB); @@ -434,7 +479,7 @@ public void selectedContainerShouldNotAlreadyHaveBeenSelected() { Thread.sleep(1000); } catch (InterruptedException e) { } - containerBalancer.stopBalancer(); + stopBalancer(); Set containers = new HashSet<>(); for (ContainerMoveSelection moveSelection : containerBalancer.getSourceToTargetMap().values()) { @@ -445,7 +490,9 @@ public void selectedContainerShouldNotAlreadyHaveBeenSelected() { } @Test - public void balancerShouldNotSelectConfiguredExcludeContainers() { + public void balancerShouldNotSelectConfiguredExcludeContainers() + throws IllegalContainerBalancerStateException, IOException, + InvalidContainerBalancerConfigurationException { balancerConfiguration.setThreshold(10); balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100); balancerConfiguration.setMaxSizeToMovePerIteration(50 * OzoneConsts.GB); @@ -461,7 +508,7 @@ public void balancerShouldNotSelectConfiguredExcludeContainers() { Thread.sleep(1000); } catch (InterruptedException e) { } - containerBalancer.stopBalancer(); + stopBalancer(); Set excludeContainers = balancerConfiguration.getExcludeContainers(); for (ContainerMoveSelection moveSelection : @@ -472,7 +519,9 @@ public void balancerShouldNotSelectConfiguredExcludeContainers() { } @Test - public void balancerShouldObeyMaxSizeEnteringTargetLimit() { + public void balancerShouldObeyMaxSizeEnteringTargetLimit() + throws IllegalContainerBalancerStateException, IOException, + InvalidContainerBalancerConfigurationException { conf.set("ozone.scm.container.size", "1MB"); balancerConfiguration = conf.getObject(ContainerBalancerConfiguration.class); @@ -487,7 +536,7 @@ public void balancerShouldObeyMaxSizeEnteringTargetLimit() { Assert.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty()); Assert.assertTrue(containerBalancer.getSourceToTargetMap().isEmpty()); - containerBalancer.stopBalancer(); + stopBalancer(); // some containers should be selected when using default values OzoneConfiguration ozoneConfiguration = new OzoneConfiguration(); @@ -497,26 +546,28 @@ public void balancerShouldObeyMaxSizeEnteringTargetLimit() { sleepWhileBalancing(500); - containerBalancer.stopBalancer(); + stopBalancer(); // balancer should have identified unbalanced nodes Assert.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty()); Assert.assertFalse(containerBalancer.getSourceToTargetMap().isEmpty()); } @Test - public void testMetrics() { + public void testMetrics() + throws IllegalContainerBalancerStateException, IOException, + InvalidContainerBalancerConfigurationException { conf.set("hdds.datanode.du.refresh.period", "1ms"); balancerConfiguration.setBalancingInterval(Duration.ofMillis(2)); balancerConfiguration.setThreshold(10); balancerConfiguration.setIterations(1); balancerConfiguration.setMaxSizeEnteringTarget(6 * OzoneConsts.GB); - // deliberately set max size per iteration to a low value, 6GB + // deliberately set max size per iteration to a low value, 6 GB balancerConfiguration.setMaxSizeToMovePerIteration(6 * OzoneConsts.GB); balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100); startBalancer(balancerConfiguration); sleepWhileBalancing(500); - containerBalancer.stopBalancer(); + stopBalancer(); ContainerBalancerMetrics metrics = containerBalancer.getMetrics(); Assert.assertEquals(determineExpectedUnBalancedNodes( @@ -535,7 +586,9 @@ public void testMetrics() { * exclude configurations, then it should be excluded. */ @Test - public void balancerShouldFollowExcludeAndIncludeDatanodesConfigurations() { + public void balancerShouldFollowExcludeAndIncludeDatanodesConfigurations() + throws IllegalContainerBalancerStateException, IOException, + InvalidContainerBalancerConfigurationException { balancerConfiguration.setThreshold(10); balancerConfiguration.setIterations(1); balancerConfiguration.setMaxSizeEnteringTarget(10 * OzoneConsts.GB); @@ -569,7 +622,7 @@ public void balancerShouldFollowExcludeAndIncludeDatanodesConfigurations() { balancerConfiguration.setIncludeNodes(includeNodes); startBalancer(balancerConfiguration); sleepWhileBalancing(500); - containerBalancer.stopBalancer(); + stopBalancer(); // finally, these should be the only nodes included in balancing // (included - excluded) @@ -606,7 +659,9 @@ public void testContainerBalancerConfiguration() { @Test public void checkIterationResult() - throws NodeNotFoundException, ContainerNotFoundException { + throws NodeNotFoundException, IOException, + IllegalContainerBalancerStateException, + InvalidContainerBalancerConfigurationException { balancerConfiguration.setThreshold(10); balancerConfiguration.setIterations(1); balancerConfiguration.setMaxSizeEnteringTarget(10 * OzoneConsts.GB); @@ -622,7 +677,7 @@ public void checkIterationResult() */ Assert.assertEquals(ContainerBalancer.IterationResult.ITERATION_COMPLETED, containerBalancer.getIterationResult()); - containerBalancer.stop(); + stopBalancer(); /* Now, limit maxSizeToMovePerIteration but fail all container moves. The @@ -640,12 +695,14 @@ public void checkIterationResult() Assert.assertEquals(ContainerBalancer.IterationResult.ITERATION_COMPLETED, containerBalancer.getIterationResult()); - containerBalancer.stop(); + stopBalancer(); } @Test public void checkIterationResultTimeout() - throws NodeNotFoundException, ContainerNotFoundException { + throws NodeNotFoundException, IOException, + IllegalContainerBalancerStateException, + InvalidContainerBalancerConfigurationException { Mockito.when(replicationManager.move(Mockito.any(ContainerID.class), Mockito.any(DatanodeDetails.class), @@ -673,7 +730,7 @@ public void checkIterationResultTimeout() .getNumContainerMovesCompletedInLatestIteration()); Assert.assertTrue(containerBalancer.getMetrics() .getNumContainerMovesTimeoutInLatestIteration() > 1); - containerBalancer.stop(); + stopBalancer(); } @@ -859,13 +916,19 @@ private void sleepWhileBalancing(long millis) { } } - private void startBalancer(ContainerBalancerConfiguration config) { - containerBalancer.setConfig(config); + private void startBalancer(ContainerBalancerConfiguration config) + throws IllegalContainerBalancerStateException, IOException, + InvalidContainerBalancerConfigurationException { + containerBalancer.startBalancer(config); + } + + private void stopBalancer() { try { - containerBalancer.startBalancer(); - } catch (IllegalContainerBalancerStateException | - InvalidContainerBalancerConfigurationException e) { - LOG.info("Could not start ContainerBalancer while testing", e); + if (containerBalancer.isBalancerRunning()) { + containerBalancer.stopBalancer(); + } + } catch (IOException | IllegalContainerBalancerStateException e) { + LOG.warn("Failed to stop balancer", e); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java index b4882ac9805a..906b2aaf702e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java @@ -16,10 +16,17 @@ */ package org.apache.hadoop.ozone.scm; +import com.google.protobuf.ByteString; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient; +import org.apache.hadoop.hdds.scm.client.ScmClient; import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer; +import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancerConfiguration; +import org.apache.hadoop.hdds.scm.container.balancer.IllegalContainerBalancerStateException; +import org.apache.hadoop.hdds.scm.container.balancer.InvalidContainerBalancerConfigurationException; import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; @@ -36,6 +43,7 @@ import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl; import org.apache.ozone.test.GenericTestUtils; import org.junit.Assert; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -44,7 +52,9 @@ import java.io.IOException; import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeoutException; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerBalancerConfigurationProto; import static org.apache.hadoop.hdds.scm.HddsTestUtils.getContainer; import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; @@ -226,6 +236,89 @@ public void testMoveFailover() throws Exception { Assert.assertFalse(inflightMove.containsKey(id)); } + /** + * Starts ContainerBalancer when the cluster is already balanced. + * ContainerBalancer will identify that no unbalanced nodes are present and + * exit and stop in the first iteration. We test that ContainerBalancer + * persists ContainerBalancerConfigurationProto#shouldRun as false in all + * the 3 SCMs when it stops. + * @throws IOException + * @throws IllegalContainerBalancerStateException + * @throws InvalidContainerBalancerConfigurationException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testContainerBalancerPersistsConfigurationInAllSCMs() + throws IOException, IllegalContainerBalancerStateException, + InvalidContainerBalancerConfigurationException, InterruptedException, + TimeoutException { + SCMClientConfig scmClientConfig = + conf.getObject(SCMClientConfig.class); + scmClientConfig.setRetryInterval(100); + scmClientConfig.setMaxRetryTimeout(1500); + Assertions.assertEquals(15, scmClientConfig.getRetryCount()); + conf.setFromObject(scmClientConfig); + StorageContainerManager leader = getLeader(cluster); + Assertions.assertNotNull(leader); + + ScmClient scmClient = new ContainerOperationClient(conf); + // assert that container balancer is not running right now + Assertions.assertFalse(scmClient.getContainerBalancerStatus()); + ContainerBalancerConfiguration balancerConf = + conf.getObject(ContainerBalancerConfiguration.class); + ContainerBalancer containerBalancer = leader.getContainerBalancer(); + + /* + Start container balancer. Since this cluster is already balanced, + container balancer should exit early, stop, and persist configuration to DB. + */ + containerBalancer.startBalancer(balancerConf); + + // assert that balancer has stopped since the cluster is already balanced + GenericTestUtils.waitFor(() -> !containerBalancer.isBalancerRunning(), + 10, 500); + Assertions.assertFalse(containerBalancer.isBalancerRunning()); + + ByteString byteString = + leader.getScmMetadataStore().getStatefulServiceConfigTable().get( + containerBalancer.getServiceName()); + ContainerBalancerConfigurationProto proto = + ContainerBalancerConfigurationProto.parseFrom(byteString); + GenericTestUtils.waitFor(() -> !proto.getShouldRun(), 5, 50); + + long leaderTermIndex = + leader.getScmHAManager().getRatisServer().getSCMStateMachine() + .getLastAppliedTermIndex().getIndex(); + + /* + Fetch persisted configuration to verify that `shouldRun` is set to false. + */ + for (StorageContainerManager scm : cluster.getStorageContainerManagers()) { + if (!scm.checkLeader()) { + // Wait and retry for follower to update transactions to leader + // snapshot index. + // Timeout error if follower does not load update within 3s + GenericTestUtils.waitFor(() -> scm.getScmHAManager().getRatisServer() + .getSCMStateMachine().getLastAppliedTermIndex() + .getIndex() >= leaderTermIndex, 100, 3000); + ContainerBalancer followerBalancer = scm.getContainerBalancer(); + GenericTestUtils.waitFor( + () -> !followerBalancer.isBalancerRunning(), 50, 5000); + GenericTestUtils.waitFor(() -> !followerBalancer.shouldRun(), 100, + 5000); + } + scm.getStatefulServiceStateManager().readConfiguration( + containerBalancer.getServiceName()); + byteString = + scm.getScmMetadataStore().getStatefulServiceConfigTable().get( + containerBalancer.getServiceName()); + ContainerBalancerConfigurationProto protobuf = + ContainerBalancerConfigurationProto.parseFrom(byteString); + Assertions.assertFalse(protobuf.getShouldRun()); + } + } + static StorageContainerManager getLeader(MiniOzoneHAClusterImpl impl) { for (StorageContainerManager scm : impl.getStorageContainerManagers()) { if (scm.checkLeader()) {