diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java index e5c568034181..f569672c6f56 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Optional; /** * The interface to call into underlying container layer. @@ -305,6 +306,26 @@ Map> getSafeModeRuleStatuses() */ boolean getReplicationManagerStatus() throws IOException; + /** + * Start ContainerBalancer. + */ + boolean startContainerBalancer(Optional threshold, + Optional idleiterations, + Optional maxDatanodesToBalance, + Optional maxSizeToMoveInGB) throws IOException; + + /** + * Stop ContainerBalancer. + */ + void stopContainerBalancer() throws IOException; + + /** + * Returns ContainerBalancer status. + * + * @return True if ContainerBalancer is running, false otherwise. + */ + boolean getContainerBalancerStatus() throws IOException; + /** * returns the list of ratis peer roles. Currently only include peer address. */ diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java index a2adc1192ae4..31c44729b157 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java @@ -36,6 +36,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; /** @@ -286,6 +287,26 @@ Map> getSafeModeRuleStatuses() */ boolean getReplicationManagerStatus() throws IOException; + /** + * Start ContainerBalancer. + */ + boolean startContainerBalancer(Optional threshold, + Optional idleiterations, + Optional maxDatanodesToBalance, + Optional maxSizeToMoveInGB) throws IOException; + + /** + * Stop ContainerBalancer. + */ + void stopContainerBalancer() throws IOException; + + /** + * Returns ContainerBalancer status. + * + * @return True if ContainerBalancer is running, false otherwise. + */ + boolean getContainerBalancerStatus() throws IOException; + /** * Get Datanode usage information by ip or uuid. * diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java index a4ae55e2c175..9b88c6a1d123 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java @@ -44,6 +44,9 @@ public enum SCMAction implements AuditAction { START_REPLICATION_MANAGER, STOP_REPLICATION_MANAGER, GET_REPLICATION_MANAGER_STATUS, + START_CONTAINER_BALANCER, + STOP_CONTAINER_BALANCER, + GET_CONTAINER_BALANCER_STATUS, GET_CONTAINER_WITH_PIPELINE_BATCH, ADD_SCM; diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index 6aeb74da5302..b8105ca313cc 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -57,6 +57,8 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.RecommissionNodesResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerBalancerStatusRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerBalancerStatusResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto; @@ -69,6 +71,9 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartMaintenanceNodesResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopContainerBalancerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.Type; import org.apache.hadoop.hdds.scm.DatanodeAdminError; import org.apache.hadoop.hdds.scm.ScmInfo; @@ -91,6 +96,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Consumer; import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION; @@ -712,6 +718,73 @@ public boolean getReplicationManagerStatus() throws IOException { } + @Override + public boolean startContainerBalancer(Optional threshold, + Optional idleiterations, + Optional maxDatanodesToBalance, + Optional maxSizeToMoveInGB) throws IOException{ + StartContainerBalancerRequestProto.Builder builder = + StartContainerBalancerRequestProto.newBuilder(); + builder.setTraceID(TracingUtil.exportCurrentSpan()); + + //make balancer configuration optional + if (threshold.isPresent()) { + double tsd = threshold.get(); + Preconditions.checkState(tsd >= 0.0D && tsd < 1.0D, + "threshold should to be specified in range [0.0, 1.0)."); + builder.setThreshold(tsd); + } + if (maxSizeToMoveInGB.isPresent()) { + long mstm = maxSizeToMoveInGB.get(); + Preconditions.checkState(mstm > 0, + "maxSizeToMoveInGB must be positive."); + builder.setMaxSizeToMoveInGB(mstm); + } + if (maxDatanodesToBalance.isPresent()) { + int mdtb = maxDatanodesToBalance.get(); + Preconditions.checkState(mdtb > 0, + "maxDatanodesToBalance must be positive."); + builder.setMaxDatanodesToBalance(mdtb); + } + if (idleiterations.isPresent()) { + int idi = idleiterations.get(); + Preconditions.checkState(idi > 0 || idi == -1, + "idleiterations must be positive or" + + " -1(infinitly run container balancer)."); + builder.setIdleiterations(idi); + } + + StartContainerBalancerRequestProto request = builder.build(); + StartContainerBalancerResponseProto response = + submitRequest(Type.StartContainerBalancer, + builder1 -> builder1.setStartContainerBalancerRequest(request)) + .getStartContainerBalancerResponse(); + return response.getStart(); + } + + @Override + public void stopContainerBalancer() throws IOException { + + StopContainerBalancerRequestProto request = + StopContainerBalancerRequestProto.getDefaultInstance(); + submitRequest(Type.StopContainerBalancer, + builder -> builder.setStopContainerBalancerRequest(request)); + + } + + @Override + public boolean getContainerBalancerStatus() throws IOException { + + ContainerBalancerStatusRequestProto request = + ContainerBalancerStatusRequestProto.getDefaultInstance(); + ContainerBalancerStatusResponseProto response = + submitRequest(Type.GetContainerBalancerStatus, + builder -> builder.setContainerBalancerStatusRequest(request)) + .getContainerBalancerStatusResponse(); + return response.getIsRunning(); + + } + /** * Builds request for datanode usage information and receives response. * diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto index 5389a558ec80..d67dce9dda58 100644 --- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto +++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto @@ -69,6 +69,9 @@ message ScmContainerLocationRequest { optional DatanodeUsageInfoRequestProto DatanodeUsageInfoRequest = 30; optional GetExistContainerWithPipelinesInBatchRequestProto getExistContainerWithPipelinesInBatchRequest = 31; optional GetContainerTokenRequestProto containerTokenRequest = 32; + optional StartContainerBalancerRequestProto startContainerBalancerRequest = 33; + optional StopContainerBalancerRequestProto stopContainerBalancerRequest = 34; + optional ContainerBalancerStatusRequestProto containerBalancerStatusRequest = 35; } message ScmContainerLocationResponse { @@ -109,6 +112,9 @@ message ScmContainerLocationResponse { optional DatanodeUsageInfoResponseProto DatanodeUsageInfoResponse = 30; optional GetExistContainerWithPipelinesInBatchResponseProto getExistContainerWithPipelinesInBatchResponse = 31; optional GetContainerTokenResponseProto containerTokenResponse = 32; + optional StartContainerBalancerResponseProto startContainerBalancerResponse = 33; + optional StopContainerBalancerResponseProto stopContainerBalancerResponse = 34; + optional ContainerBalancerStatusResponseProto containerBalancerStatusResponse = 35; enum Status { OK = 1; @@ -147,6 +153,9 @@ enum Type { DatanodeUsageInfo = 25; GetExistContainerWithPipelinesInBatch = 26; GetContainerToken = 27; + StartContainerBalancer = 28; + StopContainerBalancer = 29; + GetContainerBalancerStatus = 30; } /** @@ -445,6 +454,33 @@ message GetContainerTokenResponseProto { required TokenProto token = 1; } +message StartContainerBalancerRequestProto { + optional string traceID = 1; + optional double threshold = 2; + optional int32 idleiterations = 3; + optional int32 maxDatanodesToBalance = 4; + optional int64 maxSizeToMoveInGB = 5; +} + +message StartContainerBalancerResponseProto { + required bool start = 1; +} + +message StopContainerBalancerRequestProto { + optional string traceID = 1; +} + +message StopContainerBalancerResponseProto { +} + +message ContainerBalancerStatusRequestProto { + optional string traceID = 1; +} + +message ContainerBalancerStatusResponseProto { + required bool isRunning = 1; +} + /** * Protocol used from an HDFS node to StorageContainerManager. See the request * and response messages for details of the RPC calls. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java index 4b8501ce167e..9371bfb79c99 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java @@ -34,6 +34,8 @@ import java.util.Comparator; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * Container balancer is a service in SCM to move containers between over- and @@ -51,7 +53,8 @@ public class ContainerBalancer { private final SCMContext scmContext; private double threshold; private int maxDatanodesToBalance; - private long maxSizeToMove; + private long maxSizeToMoveInGB; + private int idleIteration; private List unBalancedNodes; private List overUtilizedNodes; private List underUtilizedNodes; @@ -63,6 +66,8 @@ public class ContainerBalancer { private long clusterRemaining; private double clusterAvgUtilisation; private final AtomicBoolean balancerRunning = new AtomicBoolean(false); + private Thread currentBalancingThread; + private Lock lock; /** * Constructs ContainerBalancer with the specified arguments. Initializes @@ -96,28 +101,43 @@ public ContainerBalancer( this.underUtilizedNodes = new ArrayList<>(); this.unBalancedNodes = new ArrayList<>(); this.withinThresholdUtilizedNodes = new ArrayList<>(); + this.lock = new ReentrantLock(); } - /** * Starts ContainerBalancer. Current implementation is incomplete. * * @param balancerConfiguration Configuration values. */ - public boolean start(ContainerBalancerConfiguration balancerConfiguration) { - if (!balancerRunning.compareAndSet(false, true)) { - LOG.error("Container Balancer is already running."); - return false; + public boolean start( + ContainerBalancerConfiguration balancerConfiguration) { + lock.lock(); + try { + if (!balancerRunning.compareAndSet(false, true)) { + LOG.info("Container Balancer is already running."); + return false; + } + + this.config = balancerConfiguration; + this.idleIteration = config.getIdleIteration(); + this.threshold = config.getThreshold(); + this.maxDatanodesToBalance = config.getMaxDatanodesToBalance(); + this.maxSizeToMoveInGB = config.getMaxSizeToMove(); + this.unBalancedNodes = new ArrayList<>(); + LOG.info("Starting Container Balancer...{}", this); + //we should start a new balancer thread async + //and response to cli as soon as possible + + + //TODO: this is a temporary implementation + //modify this later + currentBalancingThread = new Thread(() -> balance()); + currentBalancingThread.start(); + //////////////////////// + } finally { + lock.unlock(); } - ozoneConfiguration = new OzoneConfiguration(); - this.config = balancerConfiguration; - this.threshold = config.getThreshold(); - this.maxDatanodesToBalance = config.getMaxDatanodesToBalance(); - this.maxSizeToMove = config.getMaxSizeToMove(); - this.unBalancedNodes = new ArrayList<>(); - LOG.info("Starting Container Balancer...{}", this); - balance(); return true; } @@ -125,13 +145,18 @@ public boolean start(ContainerBalancerConfiguration balancerConfiguration) { * Balances the cluster. */ private void balance() { - initializeIteration(); - - // unBalancedNodes is not cleared since the next iteration uses this - // iteration's unBalancedNodes to find out how many nodes were balanced - overUtilizedNodes.clear(); - underUtilizedNodes.clear(); - withinThresholdUtilizedNodes.clear(); + for (int i = 0; i < idleIteration; i++) { + if (!initializeIteration()) { + //balancer should be stopped immediately + break; + } + // unBalancedNodes is not cleared since the next iteration uses this + // iteration's unBalancedNodes to find out how many nodes were balanced + overUtilizedNodes.clear(); + underUtilizedNodes.clear(); + withinThresholdUtilizedNodes.clear(); + } + balancerRunning.compareAndSet(true, false); } /** @@ -152,7 +177,6 @@ private boolean initializeIteration() { if (datanodeUsageInfos.isEmpty()) { LOG.info("Container Balancer could not retrieve nodes from Node " + "Manager."); - stop(); return false; } @@ -221,15 +245,21 @@ private boolean initializeIteration() { maxDatanodesToBalance) { LOG.info("Approaching Max Datanodes To Balance limit in Container " + "Balancer. Stopping Balancer."); - stop(); return false; } else { unBalancedNodes.addAll(overUtilizedNodes); unBalancedNodes.addAll(underUtilizedNodes); + //for now, we just sleep to simulate the execution of balancer + //this if for acceptance test now. modify this later when balancer + //if fully completed + try { + Thread.sleep(50); + } catch (InterruptedException e) {} + ///////////////////////////// + if (unBalancedNodes.isEmpty()) { LOG.info("Did not find any unbalanced Datanodes."); - stop(); return false; } else { LOG.info("Container Balancer has identified Datanodes that need to be" + @@ -323,8 +353,27 @@ public static double calculateUtilization( * Stops ContainerBalancer. */ public void stop() { - balancerRunning.set(false); - LOG.info("Container Balancer stopped."); + lock.lock(); + try { + //we should stop the balancer thread gracefully + if(!balancerRunning.get()) { + LOG.info("Container Balancer is not running."); + return; + } + + + //TODO: this is a temporary implementation + //modify this later + if (currentBalancingThread.isAlive()) { + currentBalancingThread.stop(); + } + /////////////////////////// + + balancerRunning.compareAndSet(true, false); + } finally { + lock.unlock(); + } + LOG.info("Container Balancer stopped successfully."); } public void setNodeManager(NodeManager nodeManager) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java index 054968331f70..d9ae868f5d95 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java @@ -56,6 +56,11 @@ public final class ContainerBalancerConfiguration { "by Container Balancer.") private long maxSizeToMove = 10 * OzoneConsts.GB; + @Config(key = "idle.iterations", type = ConfigType.INT, + defaultValue = "10", tags = {ConfigTag.BALANCER}, + description = "The idle iteration count of Container Balancer") + private int idleIterations = 10; + /** * Gets the threshold value for Container Balancer. * @@ -78,6 +83,29 @@ public void setThreshold(double threshold) { this.threshold = String.valueOf(threshold); } + /** + * Gets the idle iteration value for Container Balancer. + * + * @return a idle iteration count larger than 0 + */ + public int getIdleIteration() { + return idleIterations; + } + + /** + * Sets the idle iteration value for Container Balancer. + * + * @param count a idle iteration count larger than 0 + */ + public void setIdleIteration(int count) { + if (count < -1 || 0 == count) { + throw new IllegalArgumentException( + "Idle iteration count must be larger than 0 or " + + "-1(for infinitely running)."); + } + this.idleIterations = count; + } + /** * Gets the value of maximum number of datanodes that will be balanced by * Container Balancer. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java index 07c1095597d9..2aaa3a4dc8f6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -62,6 +62,8 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.RecommissionNodesResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerBalancerStatusRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerBalancerStatusResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto; @@ -78,6 +80,10 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopContainerBalancerRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopContainerBalancerResponseProto; import org.apache.hadoop.hdds.scm.DatanodeAdminError; import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -97,6 +103,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto.Error.errorPipelineAlreadyExists; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto.Error.success; @@ -306,6 +313,27 @@ public ScmContainerLocationResponse processRequest( .setReplicationManagerStatusResponse(getReplicationManagerStatus( request.getSeplicationManagerStatusRequest())) .build(); + case StartContainerBalancer: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setStartContainerBalancerResponse(startContainerBalancer( + request.getStartContainerBalancerRequest())) + .build(); + case StopContainerBalancer: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setStopContainerBalancerResponse(stopContainerBalancer( + request.getStopContainerBalancerRequest())) + .build(); + case GetContainerBalancerStatus: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setContainerBalancerStatusResponse(getContainerBalancerStatus( + request.getContainerBalancerStatusRequest())) + .build(); case GetPipeline: return ScmContainerLocationResponse.newBuilder() .setCmdType(request.getCmdType()) @@ -611,6 +639,48 @@ public ReplicationManagerStatusResponseProto getReplicationManagerStatus( .setIsRunning(impl.getReplicationManagerStatus()).build(); } + public StartContainerBalancerResponseProto startContainerBalancer( + StartContainerBalancerRequestProto request) + throws IOException { + Optional threshold = Optional.empty(); + Optional idleiterations = Optional.empty(); + Optional maxDatanodesToBalance = Optional.empty(); + Optional maxSizeToMoveInGB = Optional.empty(); + + if(request.hasThreshold()) { + threshold = Optional.of(request.getThreshold()); + } + if(request.hasIdleiterations()) { + idleiterations = Optional.of(request.getIdleiterations()); + } + if(request.hasMaxDatanodesToBalance()) { + maxDatanodesToBalance = Optional.of(request.getMaxDatanodesToBalance()); + } + if(request.hasMaxSizeToMoveInGB()) { + maxSizeToMoveInGB = Optional.of(request.getMaxSizeToMoveInGB()); + } + + return StartContainerBalancerResponseProto.newBuilder(). + setStart(impl.startContainerBalancer(threshold, + idleiterations, maxDatanodesToBalance, + maxSizeToMoveInGB)).build(); + } + + public StopContainerBalancerResponseProto stopContainerBalancer( + StopContainerBalancerRequestProto request) + throws IOException { + impl.stopContainerBalancer(); + return StopContainerBalancerResponseProto.newBuilder().build(); + + } + + public ContainerBalancerStatusResponseProto getContainerBalancerStatus( + ContainerBalancerStatusRequestProto request) + throws IOException { + return ContainerBalancerStatusResponseProto.newBuilder() + .setIsRunning(impl.getContainerBalancerStatus()).build(); + } + public DecommissionNodesResponseProto decommissionNodes( DecommissionNodesRequestProto request) throws IOException { List errors = diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java index b0e1ae17ffbc..7948ffd4d05b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.scm.block.BlockManager; import org.apache.hadoop.hdds.scm.container.ContainerManagerV2; import org.apache.hadoop.hdds.scm.container.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer; import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; @@ -50,6 +51,8 @@ public interface OzoneStorageContainerManager { ReplicationManager getReplicationManager(); + ContainerBalancer getContainerBalancer(); + InetSocketAddress getDatanodeRpcAddress(); SCMNodeDetails getScmNodeDetails(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index e8e6cce31269..a37476636f6a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hdds.scm.server; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.protobuf.BlockingService; import com.google.protobuf.ProtocolMessageEnum; @@ -35,6 +36,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos; import org.apache.hadoop.hdds.scm.DatanodeAdminError; import org.apache.hadoop.hdds.scm.ScmInfo; +import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancerConfiguration; import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; @@ -60,6 +62,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.audit.AuditAction; import org.apache.hadoop.ozone.audit.AuditEventStatus; import org.apache.hadoop.ozone.audit.AuditLogger; @@ -80,6 +83,7 @@ import java.util.Map; import java.util.Collections; import java.util.ArrayList; +import java.util.Optional; import java.util.TreeSet; import java.util.Set; import java.util.stream.Collectors; @@ -670,6 +674,65 @@ public boolean getReplicationManagerStatus() { return scm.getReplicationManager().isRunning(); } + @Override + public boolean startContainerBalancer(Optional threshold, + Optional idleiterations, + Optional maxDatanodesToBalance, + Optional maxSizeToMoveInGB) throws IOException{ + getScm().checkAdminAccess(getRemoteUser()); + ContainerBalancerConfiguration cbc = new ContainerBalancerConfiguration(); + if (threshold.isPresent()) { + double tsd = threshold.get(); + Preconditions.checkState(tsd >= 0.0D && tsd < 1.0D, + "threshold should to be specified in range [0.0, 1.0)."); + cbc.setThreshold(tsd); + } + if (maxSizeToMoveInGB.isPresent()) { + long mstm = maxSizeToMoveInGB.get(); + Preconditions.checkState(mstm > 0, + "maxSizeToMoveInGB must be positive."); + cbc.setMaxSizeToMove(mstm * OzoneConsts.GB); + } + if (maxDatanodesToBalance.isPresent()) { + int mdtb = maxDatanodesToBalance.get(); + Preconditions.checkState(mdtb > 0, + "maxDatanodesToBalance must be positive."); + cbc.setMaxDatanodesToBalance(mdtb); + } + if (idleiterations.isPresent()) { + int idi = idleiterations.get(); + Preconditions.checkState(idi > 0 || idi == -1, + "idleiterations must be positive or" + + " -1(infinitly run container balancer)."); + cbc.setIdleIteration(idi); + } + + boolean isStartedSuccessfully = scm.getContainerBalancer().start(cbc); + if (isStartedSuccessfully) { + AUDIT.logWriteSuccess(buildAuditMessageForSuccess( + SCMAction.START_CONTAINER_BALANCER, null)); + } else { + AUDIT.logWriteFailure(buildAuditMessageForSuccess( + SCMAction.START_CONTAINER_BALANCER, null)); + } + return isStartedSuccessfully; + } + + @Override + public void stopContainerBalancer() throws IOException { + getScm().checkAdminAccess(getRemoteUser()); + AUDIT.logWriteSuccess(buildAuditMessageForSuccess( + SCMAction.STOP_CONTAINER_BALANCER, null)); + scm.getContainerBalancer().stop(); + } + + @Override + public boolean getContainerBalancerStatus() { + AUDIT.logWriteSuccess(buildAuditMessageForSuccess( + SCMAction.GET_CONTAINER_BALANCER_STATUS, null)); + return scm.getContainerBalancer().isBalancerRunning(); + } + /** * Get Datanode usage info such as capacity, SCMUsed, and remaining by ip * or uuid. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 1afdb95ce173..8cd3357c3681 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -1512,6 +1512,12 @@ public ReplicationManager getReplicationManager() { return replicationManager; } + @VisibleForTesting + @Override + public ContainerBalancer getContainerBalancer() { + return containerBalancer; + } + /** * Check if the current scm is the leader and ready for accepting requests. * @return - if the current scm is the leader and is ready. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java index 4c76a2670ec6..d09f57993531 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java @@ -69,6 +69,7 @@ public void setup() { balancerConfiguration = new ContainerBalancerConfiguration(); balancerConfiguration.setThreshold(0.1); + balancerConfiguration.setIdleIteration(1); balancerConfiguration.setMaxDatanodesToBalance(10); balancerConfiguration.setMaxSizeToMove(500 * OzoneConsts.GB); conf.setFromObject(balancerConfiguration); @@ -96,11 +97,18 @@ public void setup() { balancerConfiguration.setThreshold(randomThreshold); containerBalancer.start(balancerConfiguration); + + // waiting for balance completed. + // TODO: this is a temporary implementation for now + // modify this after balancer is fully completed + try { + Thread.sleep(100); + } catch (InterruptedException e) {} + expectedUnBalancedNodes = determineExpectedUnBalancedNodes(randomThreshold); unBalancedNodesAccordingToBalancer = containerBalancer.getUnBalancedNodes(); - Assert.assertEquals( expectedUnBalancedNodes.size(), unBalancedNodesAccordingToBalancer.size()); @@ -134,11 +142,17 @@ public void unBalancedNodesListShouldBeEmptyWhenClusterIsBalanced() { @Test public void containerBalancerShouldStopWhenMaxDatanodesToBalanceIsReached() { balancerConfiguration.setMaxDatanodesToBalance(2); - balancerConfiguration.setThreshold(0); + balancerConfiguration.setThreshold(0.1); containerBalancer.start(balancerConfiguration); + // waiting for balance completed. + // TODO: this is a temporary implementation for now + // modify this after balancer is fully completed + try { + Thread.sleep(3000); + } catch (InterruptedException e) {} + Assert.assertFalse(containerBalancer.isBalancerRunning()); - containerBalancer.stop(); } /** diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerCommands.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerCommands.java new file mode 100644 index 000000000000..bc2544486df5 --- /dev/null +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerCommands.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.cli; + +import java.util.concurrent.Callable; + +import org.apache.hadoop.hdds.cli.GenericCli; +import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.hdds.cli.OzoneAdmin; +import org.apache.hadoop.hdds.cli.SubcommandWithParent; + +import org.kohsuke.MetaInfServices; +import picocli.CommandLine.Command; +import picocli.CommandLine.Model.CommandSpec; +import picocli.CommandLine.Spec; + +/** + * Subcommand to group container balancer related operations. + * + *

The balancer is a tool that balances datanode space usage on an Ozone + * cluster when some datanodes become full or when new empty nodes join + * the cluster. The tool can be run by the cluster administrator + * from command line while applications adding and deleting blocks. + * + *

SYNOPSIS + *

+ * To start:
+ *      ozone admin containerbalancer start
+ *      [ -t/--threshold {@literal }]
+ *      [ -i/--idleiterations {@literal }]
+ *      [ -d/--maxDatanodesToBalance {@literal }]
+ *      [ -s/--maxSizeToMoveInGB {@literal }]
+ *      Examples:
+ *      ozone admin containerbalancer start
+ *        start balancer with default values in the configuration
+ *      ozone admin containerbalancer start -t 0.05
+ *        start balancer with a threshold of 5%
+ *      ozone admin containerbalancer start -i 20
+ *        start balancer with maximum 20 consecutive idle iterations
+ *      ozone admin containerbalancer start -i 0
+ *        run balancer infinitely with default values in the configuration
+ *      ozone admin containerbalancer start -d 10
+ *        start balancer with maximum 10 datanodes to balance
+ *      ozone admin containerbalancer start -s 10
+ *        start balancer with maximum size of 10GB to move
+ * To stop:
+ *      ozone admin containerbalancer stop
+ * 
+ * + *

DESCRIPTION + *

The threshold parameter is a fraction in the range of (1%, 100%) with a + * default value of 10%. The threshold sets a target for whether the cluster + * is balanced. A cluster is balanced if for each datanode, the utilization + * of the node (ratio of used space at the node to total capacity of the node) + * differs from the utilization of the (ratio of used space in the cluster + * to total capacity of the cluster) by no more than the threshold value. + * The smaller the threshold, the more balanced a cluster will become. + * It takes more time to run the balancer for small threshold values. + * Also for a very small threshold the cluster may not be able to reach the + * balanced state when applications write and delete files concurrently. + * + *

The administrator can interrupt the execution of the balancer at any + * time by running the command "ozone admin containerbalancer stop" + * through command line + */ +@Command( + name = "containerbalancer", + description = "ContainerBalancer specific operations", + mixinStandardHelpOptions = true, + versionProvider = HddsVersionProvider.class, + subcommands = { + ContainerBalancerStartSubcommand.class, + ContainerBalancerStopSubcommand.class, + ContainerBalancerStatusSubcommand.class + }) +@MetaInfServices(SubcommandWithParent.class) +public class ContainerBalancerCommands implements Callable, + SubcommandWithParent { + + @Spec + private CommandSpec spec; + + @Override + public Void call() throws Exception { + GenericCli.missingSubcommand(spec); + return null; + } + + @Override + public Class getParentType() { + return OzoneAdmin.class; + } +} diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStartSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStartSubcommand.java new file mode 100644 index 000000000000..94d9ef7dab63 --- /dev/null +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStartSubcommand.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.cli; + +import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.hdds.scm.client.ScmClient; +import picocli.CommandLine.Command; +import picocli.CommandLine.Option; + +import java.io.IOException; +import java.util.Optional; + +/** + * Handler to start container balancer. + */ +@Command( + name = "start", + description = "Start ContainerBalancer", + mixinStandardHelpOptions = true, + versionProvider = HddsVersionProvider.class) +public class ContainerBalancerStartSubcommand extends ScmSubcommand { + + @Option(names = {"-t", "--threshold"}, + description = "Threshold target whether the cluster is balanced") + private Optional threshold; + + @Option(names = {"-i", "--idleiterations"}, + description = "Maximum consecutive idle iterations") + private Optional idleiterations; + + @Option(names = {"-d", "--maxDatanodesToBalance"}, + description = "Maximum datanodes to move") + private Optional maxDatanodesToBalance; + + @Option(names = {"-s", "--maxSizeToMoveInGB"}, + description = "Maximum size to move in GB, " + + "for 10GB it should be set as 10") + private Optional maxSizeToMoveInGB; + + @Override + public void execute(ScmClient scmClient) throws IOException { + boolean result = scmClient.startContainerBalancer(threshold, idleiterations, + maxDatanodesToBalance, maxSizeToMoveInGB); + if (result) { + System.out.println("Starting ContainerBalancer Successfully."); + return; + } + System.out.println("ContainerBalancer is already running, " + + "Please stop it first."); + } +} \ No newline at end of file diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStatusSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStatusSubcommand.java new file mode 100644 index 000000000000..e0cd436bdf0a --- /dev/null +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStatusSubcommand.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.cli; + +import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.hdds.scm.client.ScmClient; +import picocli.CommandLine.Command; + +import java.io.IOException; + +/** + * Handler to query status of container balancer. + */ +@Command( + name = "status", + description = "Check if ContainerBalancer is running or not", + mixinStandardHelpOptions = true, + versionProvider = HddsVersionProvider.class) +public class ContainerBalancerStatusSubcommand extends ScmSubcommand { + + @Override + public void execute(ScmClient scmClient) throws IOException { + boolean execReturn = scmClient.getContainerBalancerStatus(); + if(execReturn){ + System.out.println("ContainerBalancer is Running."); + } else { + System.out.println("ContainerBalancer is Not Running."); + } + } +} diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStopSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStopSubcommand.java new file mode 100644 index 000000000000..89e7680f31c5 --- /dev/null +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStopSubcommand.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.cli; + +import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.hdds.scm.client.ScmClient; +import picocli.CommandLine.Command; + +import java.io.IOException; + +/** + * Handler to stop container balancer. + */ +@Command( + name = "stop", + description = "Stop ContainerBalancer", + mixinStandardHelpOptions = true, + versionProvider = HddsVersionProvider.class) +public class ContainerBalancerStopSubcommand extends ScmSubcommand { + @Override + public void execute(ScmClient scmClient) throws IOException { + scmClient.stopContainerBalancer(); + System.out.println("Stopping ContainerBalancer..."); + } +} diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java index e48a719184e7..aeed4ef2d034 100644 --- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java @@ -21,6 +21,7 @@ import java.security.cert.X509Certificate; import java.util.List; import java.util.Map; +import java.util.Optional; import com.google.common.base.Preconditions; import org.apache.commons.lang3.tuple.Pair; @@ -550,6 +551,25 @@ public boolean getReplicationManagerStatus() throws IOException { return storageContainerLocationClient.getReplicationManagerStatus(); } + @Override + public boolean startContainerBalancer(Optionalthreshold, + Optional idleiterations, + Optional maxDatanodesToBalance, + Optional maxSizeToMoveInGB) throws IOException { + return storageContainerLocationClient.startContainerBalancer(threshold, + idleiterations, maxDatanodesToBalance, maxSizeToMoveInGB); + } + + @Override + public void stopContainerBalancer() throws IOException { + storageContainerLocationClient.stopContainerBalancer(); + } + + @Override + public boolean getContainerBalancerStatus() throws IOException { + return storageContainerLocationClient.getContainerBalancerStatus(); + } + @Override public List getScmRatisRoles() throws IOException { return storageContainerLocationClient.getScmInfo().getRatisPeerRoles(); diff --git a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestContainerBalancerSubCommand.java b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestContainerBalancerSubCommand.java new file mode 100644 index 000000000000..b377895a4e12 --- /dev/null +++ b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestContainerBalancerSubCommand.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.cli.datanode; + +import org.apache.hadoop.hdds.scm.cli.ContainerBalancerStopSubcommand; +import org.apache.hadoop.hdds.scm.cli.ContainerBalancerStartSubcommand; +import org.apache.hadoop.hdds.scm.cli.ContainerBalancerStatusSubcommand; +import org.apache.hadoop.hdds.scm.client.ScmClient; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +/** + * Unit tests to validate the the ContainerBalancerSubCommand class includes the + * correct output when executed against a mock client. + */ +public class TestContainerBalancerSubCommand { + + private ContainerBalancerStopSubcommand stopCmd; + private ContainerBalancerStartSubcommand startCmd; + private ContainerBalancerStatusSubcommand statusCmd; + private final ByteArrayOutputStream outContent = new ByteArrayOutputStream(); + private final ByteArrayOutputStream errContent = new ByteArrayOutputStream(); + private final PrintStream originalOut = System.out; + private final PrintStream originalErr = System.err; + private static final String DEFAULT_ENCODING = StandardCharsets.UTF_8.name(); + + @Before + public void setup() throws UnsupportedEncodingException { + stopCmd = new ContainerBalancerStopSubcommand(); + startCmd = new ContainerBalancerStartSubcommand(); + statusCmd = new ContainerBalancerStatusSubcommand(); + System.setOut(new PrintStream(outContent, false, DEFAULT_ENCODING)); + System.setErr(new PrintStream(errContent, false, DEFAULT_ENCODING)); + } + + @After + public void tearDown() { + System.setOut(originalOut); + System.setErr(originalErr); + } + + @Test + public void testContainerBalancerStatusSubcommandRunning() + throws IOException { + ScmClient scmClient = mock(ScmClient.class); + + //test status is running + Mockito.when(scmClient.getContainerBalancerStatus()) + .thenAnswer(invocation -> true); + + statusCmd.execute(scmClient); + + Pattern p = Pattern.compile( + "^ContainerBalancer\\sis\\sRunning."); + Matcher m = p.matcher(outContent.toString(DEFAULT_ENCODING)); + assertTrue(m.find()); + } + + @Test + public void testContainerBalancerStatusSubcommandNotRunning() + throws IOException { + ScmClient scmClient = mock(ScmClient.class); + + Mockito.when(scmClient.getContainerBalancerStatus()) + .thenAnswer(invocation -> false); + + statusCmd.execute(scmClient); + + Pattern p = Pattern.compile( + "^ContainerBalancer\\sis\\sNot\\sRunning."); + Matcher m = p.matcher(outContent.toString(DEFAULT_ENCODING)); + assertTrue(m.find()); + } + + @Test + public void testContainerBalancerStopSubcommand() throws IOException { + ScmClient scmClient = mock(ScmClient.class); + stopCmd.execute(scmClient); + + Pattern p = Pattern.compile("^Stopping\\sContainerBalancer..."); + Matcher m = p.matcher(outContent.toString(DEFAULT_ENCODING)); + assertTrue(m.find()); + } + + @Test + public void testContainerBalancerStartSubcommandWhenBalancerIsNotRunning() + throws IOException { + ScmClient scmClient = mock(ScmClient.class); + Mockito.when(scmClient.startContainerBalancer(null, null, null, null)) + .thenAnswer(invocation -> true); + startCmd.execute(scmClient); + + Pattern p = Pattern.compile("^Starting\\sContainerBalancer" + + "\\sSuccessfully."); + Matcher m = p.matcher(outContent.toString(DEFAULT_ENCODING)); + assertTrue(m.find()); + } + + @Test + public void testContainerBalancerStartSubcommandWhenBalancerIsRunning() + throws IOException { + ScmClient scmClient = mock(ScmClient.class); + Mockito.when(scmClient.startContainerBalancer(null, null, null, null)) + .thenAnswer(invocation -> false); + startCmd.execute(scmClient); + + Pattern p = Pattern.compile("^ContainerBalancer\\sis\\salready\\srunning," + + "\\sPlease\\sstop\\sit\\sfirst."); + Matcher m = p.matcher(outContent.toString(DEFAULT_ENCODING)); + assertTrue(m.find()); + } + +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerBalancerOperations.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerBalancerOperations.java new file mode 100644 index 000000000000..7e525206d168 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerBalancerOperations.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient; +import org.apache.hadoop.hdds.scm.client.ScmClient; +import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import java.util.Optional; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * This class tests container balancer operations + * from cblock clients. + */ +public class TestContainerBalancerOperations { + + /** + * Set a timeout for each test. + */ + @Rule + public Timeout timeout = Timeout.seconds(300); + + private static ScmClient containerBalancerClient; + private static MiniOzoneCluster cluster; + private static OzoneConfiguration ozoneConf; + + @BeforeClass + public static void setup() throws Exception { + ozoneConf = new OzoneConfiguration(); + ozoneConf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, + SCMContainerPlacementCapacity.class, PlacementPolicy.class); + cluster = MiniOzoneCluster.newBuilder(ozoneConf).setNumDatanodes(3).build(); + containerBalancerClient = new ContainerOperationClient(ozoneConf); + cluster.waitForClusterToBeReady(); + } + + @AfterClass + public static void cleanup() throws Exception { + if(cluster != null) { + cluster.shutdown(); + } + } + + /** + * test container balancer operation with {@link ContainerOperationClient}. + * @throws Exception + */ + @Test + public void testContainerBalancerCLIOperations() throws Exception { + // test normally start and stop + boolean running = containerBalancerClient.getContainerBalancerStatus(); + assertFalse(running); + Optional threshold = Optional.of(0.1); + Optional idleiterations = Optional.of(10000); + Optional maxDatanodesToBalance = Optional.of(1); + Optional maxSizeToMoveInGB = Optional.of(1L); + + containerBalancerClient.startContainerBalancer(threshold, idleiterations, + maxDatanodesToBalance, maxSizeToMoveInGB); + running = containerBalancerClient.getContainerBalancerStatus(); + assertTrue(running); + + // waiting for balance completed. + // TODO: this is a temporary implementation for now + // modify this after balancer is fully completed + try { + Thread.sleep(100); + } catch (InterruptedException e) {} + + running = containerBalancerClient.getContainerBalancerStatus(); + assertFalse(running); + + // test normally start , and stop it before balance is completed + containerBalancerClient.startContainerBalancer(threshold, idleiterations, + maxDatanodesToBalance, maxSizeToMoveInGB); + running = containerBalancerClient.getContainerBalancerStatus(); + assertTrue(running); + + containerBalancerClient.stopContainerBalancer(); + running = containerBalancerClient.getContainerBalancerStatus(); + assertFalse(running); + } + + //TODO: add more acceptance after container balancer is fully completed +} \ No newline at end of file diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java index ebef4d8f42a0..652ab6e5e1a6 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerReportHandler; import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler; import org.apache.hadoop.hdds.scm.container.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer; import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory; import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics; import org.apache.hadoop.hdds.scm.events.SCMEvents; @@ -322,6 +323,11 @@ public ReplicationManager getReplicationManager() { return null; } + @Override + public ContainerBalancer getContainerBalancer() { + return null; + } + @Override public InetSocketAddress getDatanodeRpcAddress() { return getDatanodeProtocolServer().getDatanodeRpcAddress();