diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/MockedSCM.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/MockedSCM.java
new file mode 100644
index 000000000000..b290b950442b
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/MockedSCM.java
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import com.google.protobuf.ByteString;
+import jakarta.annotation.Nonnull;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplicaNotFoundException;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMService;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
+import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManager;
+import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManagerImpl;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.time.Clock;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Class for test used for setting up testable StorageContainerManager.
+ * Provides an access to {@link TestableCluster} and to necessary mocked instances
+ */
+public final class MockedSCM {
+ private final StorageContainerManager scm;
+ private final TestableCluster cluster;
+ private final MockNodeManager mockNodeManager;
+ private MockedReplicationManager mockedReplicaManager;
+ private MoveManager moveManager;
+ private ContainerManager containerManager;
+
+ private MockedPlacementPolicies mockedPlacementPolicies;
+
+ public MockedSCM(@Nonnull TestableCluster testableCluster) {
+ scm = mock(StorageContainerManager.class);
+ cluster = testableCluster;
+ mockNodeManager = new MockNodeManager(cluster.getDatanodeToContainersMap());
+ }
+
+ public void init(@Nonnull ContainerBalancerConfiguration balancerConfig) {
+ init(balancerConfig, new OzoneConfiguration());
+ }
+
+ public void init(@Nonnull ContainerBalancerConfiguration balancerConfig, @Nonnull OzoneConfiguration ozoneCfg) {
+ ozoneCfg.setFromObject(balancerConfig);
+ try {
+ doMock(balancerConfig, ozoneCfg);
+ } catch (IOException | NodeNotFoundException | TimeoutException e) {
+ throw new RuntimeException("Can't initialize TestOzoneHDDS: ", e);
+ }
+ }
+
+ /**
+ * Mock some instances that will be used for MockedStorageContainerManager.
+ */
+ private void doMock(@Nonnull ContainerBalancerConfiguration cfg, @Nonnull OzoneConfiguration ozoneCfg)
+ throws IOException, NodeNotFoundException, TimeoutException {
+ containerManager = mockContainerManager(cluster);
+ mockedReplicaManager = MockedReplicationManager.doMock();
+ moveManager = mockMoveManager();
+ StatefulServiceStateManager stateManager = MockedServiceStateManager.doMock();
+ SCMServiceManager scmServiceManager = mockSCMServiceManger();
+
+ mockedPlacementPolicies = MockedPlacementPolicies.doMock(ozoneCfg, mockNodeManager);
+
+ when(scm.getConfiguration()).then(invocationOnMock -> {
+ ozoneCfg.setFromObject(cfg);
+ return ozoneCfg;
+ });
+ when(scm.getMoveManager()).thenReturn(moveManager);
+ when(scm.getScmNodeManager()).thenReturn(mockNodeManager);
+ when(scm.getContainerManager()).thenReturn(containerManager);
+ when(scm.getReplicationManager()).thenReturn(mockedReplicaManager.manager);
+ when(scm.getContainerPlacementPolicy()).thenReturn(mockedPlacementPolicies.placementPolicy);
+ when(scm.getPlacementPolicyValidateProxy()).thenReturn(mockedPlacementPolicies.validateProxyPolicy);
+ when(scm.getSCMServiceManager()).thenReturn(scmServiceManager);
+ when(scm.getScmContext()).thenReturn(SCMContext.emptyContext());
+ when(scm.getClusterMap()).thenReturn(null);
+ when(scm.getEventQueue()).thenReturn(mock(EventPublisher.class));
+ when(scm.getStatefulServiceStateManager()).thenReturn(stateManager);
+ }
+
+ @Override
+ public String toString() {
+ return cluster.toString();
+ }
+
+ public @Nonnull ContainerBalancerTask startBalancerTask(
+ @Nonnull ContainerBalancer containerBalancer,
+ @Nonnull ContainerBalancerConfiguration config
+ ) {
+ ContainerBalancerTask task = new ContainerBalancerTask(scm, 0, containerBalancer,
+ containerBalancer.getMetrics(), config, false);
+ task.run();
+ return task;
+ }
+
+ public @Nonnull ContainerBalancerTask startBalancerTask(@Nonnull ContainerBalancerConfiguration config) {
+ init(config);
+ return startBalancerTask(new ContainerBalancer(scm), config);
+ }
+
+ public void enableLegacyReplicationManager() {
+ mockedReplicaManager.conf.setEnableLegacy(true);
+ }
+
+ public void disableLegacyReplicationManager() {
+ mockedReplicaManager.conf.setEnableLegacy(false);
+ }
+
+ public @Nonnull MoveManager getMoveManager() {
+ return moveManager;
+ }
+
+ public @Nonnull ReplicationManager getReplicationManager() {
+ return mockedReplicaManager.manager;
+ }
+
+ public @Nonnull MockNodeManager getNodeManager() {
+ return mockNodeManager;
+ }
+
+ public @Nonnull StorageContainerManager getStorageContainerManager() {
+ return scm;
+ }
+
+ public @Nonnull TestableCluster getCluster() {
+ return cluster;
+ }
+
+ public @Nonnull ContainerManager getContainerManager() {
+ return containerManager;
+ }
+
+ public @Nonnull PlacementPolicy getPlacementPolicy() {
+ return mockedPlacementPolicies.placementPolicy;
+ }
+
+ public @Nonnull PlacementPolicy getEcPlacementPolicy() {
+ return mockedPlacementPolicies.ecPlacementPolicy;
+ }
+
+ private static @Nonnull ContainerManager mockContainerManager(@Nonnull TestableCluster cluster)
+ throws ContainerNotFoundException {
+ ContainerManager containerManager = mock(ContainerManager.class);
+ Mockito
+ .when(containerManager.getContainerReplicas(any(ContainerID.class)))
+ .thenAnswer(invocationOnMock -> {
+ ContainerID cid = (ContainerID) invocationOnMock.getArguments()[0];
+ return cluster.getCidToReplicasMap().get(cid);
+ });
+
+ Mockito
+ .when(containerManager.getContainer(any(ContainerID.class)))
+ .thenAnswer(invocationOnMock -> {
+ ContainerID cid = (ContainerID) invocationOnMock.getArguments()[0];
+ return cluster.getCidToInfoMap().get(cid);
+ });
+
+ Mockito
+ .when(containerManager.getContainers())
+ .thenReturn(new ArrayList<>(cluster.getCidToInfoMap().values()));
+ return containerManager;
+ }
+
+ private static @Nonnull SCMServiceManager mockSCMServiceManger() {
+ SCMServiceManager scmServiceManager = mock(SCMServiceManager.class);
+
+ Mockito
+ .doNothing()
+ .when(scmServiceManager)
+ .register(Mockito.any(SCMService.class));
+
+ return scmServiceManager;
+ }
+
+ private static @Nonnull MoveManager mockMoveManager()
+ throws NodeNotFoundException, ContainerReplicaNotFoundException, ContainerNotFoundException {
+ MoveManager moveManager = mock(MoveManager.class);
+ Mockito
+ .when(moveManager.move(
+ any(ContainerID.class),
+ any(DatanodeDetails.class),
+ any(DatanodeDetails.class)))
+ .thenReturn(CompletableFuture.completedFuture(MoveManager.MoveResult.COMPLETED));
+ return moveManager;
+ }
+
+ private static final class MockedReplicationManager {
+ private final ReplicationManager manager;
+ private final ReplicationManager.ReplicationManagerConfiguration conf;
+
+ private MockedReplicationManager() {
+ manager = mock(ReplicationManager.class);
+ conf = new ReplicationManager.ReplicationManagerConfiguration();
+ // Disable LegacyReplicationManager. This means balancer should select RATIS as well as
+ // EC containers for balancing. Also, MoveManager will be used.
+ conf.setEnableLegacy(false);
+ }
+
+ private static @Nonnull MockedReplicationManager doMock()
+ throws NodeNotFoundException, ContainerNotFoundException, TimeoutException {
+ MockedReplicationManager mockedManager = new MockedReplicationManager();
+
+ Mockito
+ .when(mockedManager.manager.getConfig())
+ .thenReturn(mockedManager.conf);
+
+ Mockito
+ .when(mockedManager.manager.isContainerReplicatingOrDeleting(Mockito.any(ContainerID.class)))
+ .thenReturn(false);
+
+ Mockito
+ .when(mockedManager.manager.move(
+ Mockito.any(ContainerID.class),
+ Mockito.any(DatanodeDetails.class),
+ Mockito.any(DatanodeDetails.class)))
+ .thenReturn(CompletableFuture.completedFuture(MoveManager.MoveResult.COMPLETED));
+
+ Mockito
+ .when(mockedManager.manager.getClock())
+ .thenReturn(Clock.system(ZoneId.systemDefault()));
+
+ return mockedManager;
+ }
+ }
+
+ private static final class MockedServiceStateManager {
+ private final Map serviceToConfigMap = new HashMap<>();
+ private final StatefulServiceStateManager serviceStateManager = Mockito.mock(StatefulServiceStateManagerImpl.class);
+
+ private static @Nonnull StatefulServiceStateManager doMock() throws IOException {
+ MockedServiceStateManager manager = new MockedServiceStateManager();
+
+ // When StatefulServiceStateManager#saveConfiguration is called, save to in-memory serviceToConfigMap instead.
+ Map map = manager.serviceToConfigMap;
+ StatefulServiceStateManager stateManager = manager.serviceStateManager;
+ Mockito
+ .doAnswer(i -> {
+ map.put(i.getArgument(0, String.class), i.getArgument(1, ByteString.class));
+ return null;
+ })
+ .when(stateManager)
+ .saveConfiguration(Mockito.any(String.class), Mockito.any(ByteString.class));
+
+ // When StatefulServiceStateManager#readConfiguration is called, read from serviceToConfigMap instead.
+ Mockito
+ .when(stateManager.readConfiguration(Mockito.anyString()))
+ .thenAnswer(i -> map.get(i.getArgument(0, String.class)));
+ return stateManager;
+ }
+ }
+
+ private static final class MockedPlacementPolicies {
+ private final PlacementPolicy placementPolicy;
+ private final PlacementPolicy ecPlacementPolicy;
+ private final PlacementPolicyValidateProxy validateProxyPolicy;
+
+ private MockedPlacementPolicies(@Nonnull PlacementPolicy placementPolicy, @Nonnull PlacementPolicy ecPolicy) {
+ this.placementPolicy = placementPolicy;
+ ecPlacementPolicy = ecPolicy;
+ validateProxyPolicy = new PlacementPolicyValidateProxy(this.placementPolicy, ecPlacementPolicy);
+ }
+
+ private static @Nonnull MockedPlacementPolicies doMock(
+ @Nonnull OzoneConfiguration ozoneConfig,
+ @Nonnull NodeManager nodeManager
+ ) throws SCMException {
+ NetworkTopology clusterMap = nodeManager.getClusterNetworkTopologyMap();
+ PlacementPolicy policy = ContainerPlacementPolicyFactory.getPolicy(
+ ozoneConfig, nodeManager, clusterMap, true, SCMContainerPlacementMetrics.create());
+ PlacementPolicy ecPolicy = ContainerPlacementPolicyFactory.getECPolicy(
+ ozoneConfig, nodeManager, clusterMap, true, SCMContainerPlacementMetrics.create());
+ return new MockedPlacementPolicies(policy, ecPolicy);
+ }
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancerDatanodeNodeLimit.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancerDatanodeNodeLimit.java
new file mode 100644
index 000000000000..8f1db615dfe3
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancerDatanodeNodeLimit.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import jakarta.annotation.Nonnull;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.slf4j.event.Level;
+
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link ContainerBalancerTask} moved from {@link TestContainerBalancerTask} to run them on clusters
+ * with different datanode count.
+ */
+public class TestContainerBalancerDatanodeNodeLimit {
+ private static final long STORAGE_UNIT = OzoneConsts.GB;
+ private static final int DATANODE_COUNT_LIMIT_FOR_SMALL_CLUSTER = 15;
+
+ @BeforeAll
+ public static void setup() {
+ GenericTestUtils.setLogLevel(ContainerBalancerTask.LOG, Level.DEBUG);
+ }
+
+ private static Stream createMockedSCMs() {
+ return Stream.of(
+ Arguments.of(getMockedSCM(4)),
+ Arguments.of(getMockedSCM(5)),
+ Arguments.of(getMockedSCM(6)),
+ Arguments.of(getMockedSCM(7)),
+ Arguments.of(getMockedSCM(8)),
+ Arguments.of(getMockedSCM(9)),
+ Arguments.of(getMockedSCM(10)),
+ Arguments.of(getMockedSCM(11)),
+ Arguments.of(getMockedSCM(12)),
+ Arguments.of(getMockedSCM(13)),
+ Arguments.of(getMockedSCM(14)),
+ Arguments.of(getMockedSCM(15)),
+ Arguments.of(getMockedSCM(17)),
+ Arguments.of(getMockedSCM(19)),
+ Arguments.of(getMockedSCM(20)),
+ Arguments.of(getMockedSCM(30)));
+ }
+
+ @ParameterizedTest(name = "MockedSCM #{index}: {0}")
+ @MethodSource("createMockedSCMs")
+ public void containerBalancerShouldObeyMaxDatanodesToInvolveLimit(@Nonnull MockedSCM mockedSCM) {
+ ContainerBalancerConfiguration config = new OzoneConfiguration().getObject(ContainerBalancerConfiguration.class);
+ int nodeCount = mockedSCM.getCluster().getNodeCount();
+ if (nodeCount < DATANODE_COUNT_LIMIT_FOR_SMALL_CLUSTER) {
+ config.setMaxDatanodesPercentageToInvolvePerIteration(100);
+ }
+ config.setIterations(1);
+ config.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT);
+
+ ContainerBalancerTask task = mockedSCM.startBalancerTask(config);
+ ContainerBalancerMetrics metrics = task.getMetrics();
+
+ int maxDatanodePercentage = config.getMaxDatanodesPercentageToInvolvePerIteration();
+ int number = maxDatanodePercentage * nodeCount / 100;
+ int datanodesInvolvedPerIteration = task.getCountDatanodesInvolvedPerIteration();
+ assertThat(datanodesInvolvedPerIteration).isGreaterThan(0);
+ assertThat(datanodesInvolvedPerIteration).isLessThanOrEqualTo(number);
+ long numDatanodesInvolvedInLatestIteration = metrics.getNumDatanodesInvolvedInLatestIteration();
+ assertThat(numDatanodesInvolvedInLatestIteration).isGreaterThan(0);
+ assertThat(numDatanodesInvolvedInLatestIteration).isLessThanOrEqualTo(number);
+ }
+
+ @ParameterizedTest(name = "MockedSCM #{index}: {0}")
+ @MethodSource("createMockedSCMs")
+ public void balancerShouldObeyMaxSizeEnteringTargetLimit(@Nonnull MockedSCM mockedSCM) {
+ OzoneConfiguration ozoneConfig = new OzoneConfiguration();
+ ozoneConfig.set("ozone.scm.container.size", "1MB");
+ ContainerBalancerConfiguration config = balancerConfigByOzoneConfig(ozoneConfig);
+ if (mockedSCM.getCluster().getNodeCount() < DATANODE_COUNT_LIMIT_FOR_SMALL_CLUSTER) {
+ config.setMaxDatanodesPercentageToInvolvePerIteration(100);
+ }
+ config.setIterations(1);
+ config.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT);
+ // No containers should be selected when the limit is just 2 MB.
+ config.setMaxSizeEnteringTarget(2 * OzoneConsts.MB);
+
+ ContainerBalancerTask task = mockedSCM.startBalancerTask(config);
+ // Container balancer still has unbalanced nodes due to MaxSizeEnteringTarget limit
+ assertTrue(stillHaveUnbalancedNodes(task));
+ // ContainerToSourceMap is empty due to MaxSizeEnteringTarget limit
+ assertTrue(task.getContainerToSourceMap().isEmpty());
+ // SizeScheduledForMoveInLatestIteration equals to 0 because there are no containers was selected
+ assertEquals(0, task.getSizeScheduledForMoveInLatestIteration());
+
+ // Some containers should be selected when using default values.
+ ContainerBalancerConfiguration balancerConfig = balancerConfigByOzoneConfig(new OzoneConfiguration());
+ if (mockedSCM.getCluster().getNodeCount() < DATANODE_COUNT_LIMIT_FOR_SMALL_CLUSTER) {
+ balancerConfig.setMaxDatanodesPercentageToInvolvePerIteration(100);
+ }
+ balancerConfig.setIterations(1);
+
+ task = mockedSCM.startBalancerTask(balancerConfig);
+ // Balancer should have identified unbalanced nodes.
+ assertTrue(stillHaveUnbalancedNodes(task));
+ // ContainerToSourceMap is not empty due to some containers should be selected
+ assertFalse(task.getContainerToSourceMap().isEmpty());
+ // SizeScheduledForMoveInLatestIteration doesn't equal to 0 because some containers should be selected
+ assertNotEquals(0, task.getSizeScheduledForMoveInLatestIteration());
+ }
+
+ @ParameterizedTest(name = "MockedSCM #{index}: {0}")
+ @MethodSource("createMockedSCMs")
+ public void balancerShouldObeyMaxSizeLeavingSourceLimit(@Nonnull MockedSCM mockedSCM) {
+ OzoneConfiguration ozoneConfig = new OzoneConfiguration();
+ ozoneConfig.set("ozone.scm.container.size", "1MB");
+ ContainerBalancerConfiguration config = balancerConfigByOzoneConfig(ozoneConfig);
+ if (mockedSCM.getCluster().getNodeCount() < DATANODE_COUNT_LIMIT_FOR_SMALL_CLUSTER) {
+ config.setMaxDatanodesPercentageToInvolvePerIteration(100);
+ }
+ config.setIterations(1);
+ config.setMaxSizeEnteringTarget(50 * STORAGE_UNIT);
+ config.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT);
+ // No source containers should be selected when the limit is just 2 MB.
+ config.setMaxSizeLeavingSource(2 * OzoneConsts.MB);
+
+ ContainerBalancerTask task = mockedSCM.startBalancerTask(config);
+ // Container balancer still has unbalanced nodes due to MaxSizeLeavingSource limit
+ assertTrue(stillHaveUnbalancedNodes(task));
+ // ContainerToSourceMap is empty due to MaxSizeLeavingSource limit
+ assertTrue(task.getContainerToSourceMap().isEmpty());
+ // SizeScheduledForMoveInLatestIteration equals to 0 because there are no containers was selected
+ assertEquals(0, task.getSizeScheduledForMoveInLatestIteration());
+
+ // Some containers should be selected when using default values.
+ ContainerBalancerConfiguration newBalancerConfig = balancerConfigByOzoneConfig(new OzoneConfiguration());
+ if (mockedSCM.getCluster().getNodeCount() < DATANODE_COUNT_LIMIT_FOR_SMALL_CLUSTER) {
+ newBalancerConfig.setMaxDatanodesPercentageToInvolvePerIteration(100);
+ }
+ newBalancerConfig.setIterations(1);
+
+ task = mockedSCM.startBalancerTask(newBalancerConfig);
+ // Balancer should have identified unbalanced nodes.
+ assertTrue(stillHaveUnbalancedNodes(task));
+ // ContainerToSourceMap is not empty due to some containers should be selected
+ assertFalse(task.getContainerToSourceMap().isEmpty());
+ // SizeScheduledForMoveInLatestIteration doesn't equal to 0 because some containers should be selected
+ assertNotEquals(0, task.getSizeScheduledForMoveInLatestIteration());
+ }
+
+ private static boolean stillHaveUnbalancedNodes(@Nonnull ContainerBalancerTask task) {
+ return !task.getUnBalancedNodes().isEmpty();
+ }
+
+ public static @Nonnull MockedSCM getMockedSCM(int datanodeCount) {
+ return new MockedSCM(new TestableCluster(datanodeCount, STORAGE_UNIT));
+ }
+
+ private static @Nonnull ContainerBalancerConfiguration balancerConfigByOzoneConfig(
+ @Nonnull OzoneConfiguration ozoneConfiguration
+ ) {
+ return ozoneConfiguration.getObject(ContainerBalancerConfiguration.class);
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancerTask.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancerTask.java
index 3bed3878123d..6d471366b10b 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancerTask.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancerTask.java
@@ -76,7 +76,6 @@
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -328,32 +327,6 @@ public void unBalancedNodesListShouldBeEmptyWhenClusterIsBalanced()
assertEquals(0, metrics.getNumDatanodesUnbalanced());
}
- /**
- * ContainerBalancer should not involve more datanodes than the
- * maxDatanodesRatioToInvolvePerIteration limit.
- */
- @Test
- public void containerBalancerShouldObeyMaxDatanodesToInvolveLimit()
- throws IllegalContainerBalancerStateException, IOException,
- InvalidContainerBalancerConfigurationException, TimeoutException {
- int percent = 40;
- balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(
- percent);
- balancerConfiguration.setMaxSizeToMovePerIteration(100 * STORAGE_UNIT);
- balancerConfiguration.setThreshold(1);
- balancerConfiguration.setIterations(1);
- startBalancer(balancerConfiguration);
-
- int number = percent * numberOfNodes / 100;
- ContainerBalancerMetrics metrics = containerBalancerTask.getMetrics();
- assertThat(containerBalancerTask.getCountDatanodesInvolvedPerIteration())
- .isLessThanOrEqualTo(number);
- assertThat(metrics.getNumDatanodesInvolvedInLatestIteration()).isGreaterThan(0);
- assertThat(metrics.getNumDatanodesInvolvedInLatestIteration())
- .isLessThanOrEqualTo(number);
- stopBalancer();
- }
-
@Test
public void containerBalancerShouldSelectOnlyClosedContainers()
throws IllegalContainerBalancerStateException, IOException,
@@ -609,86 +582,6 @@ public void balancerShouldNotSelectConfiguredExcludeContainers()
}
}
- @Test
- public void balancerShouldObeyMaxSizeEnteringTargetLimit()
- throws IllegalContainerBalancerStateException, IOException,
- InvalidContainerBalancerConfigurationException, TimeoutException {
- conf.set("ozone.scm.container.size", "1MB");
- balancerConfiguration =
- conf.getObject(ContainerBalancerConfiguration.class);
- balancerConfiguration.setThreshold(10);
- balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
- balancerConfiguration.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT);
-
- // no containers should be selected when the limit is just 2 MB
- balancerConfiguration.setMaxSizeEnteringTarget(2 * OzoneConsts.MB);
- startBalancer(balancerConfiguration);
-
- assertFalse(containerBalancerTask.getUnBalancedNodes()
- .isEmpty());
- assertTrue(containerBalancerTask.getContainerToSourceMap()
- .isEmpty());
- stopBalancer();
-
- // some containers should be selected when using default values
- OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
- ContainerBalancerConfiguration cbc = ozoneConfiguration.
- getObject(ContainerBalancerConfiguration.class);
- cbc.setBalancingInterval(1);
- ContainerBalancer sb = new ContainerBalancer(scm);
- containerBalancerTask = new ContainerBalancerTask(scm, 0, sb,
- sb.getMetrics(), cbc, false);
- containerBalancerTask.run();
-
- stopBalancer();
- // balancer should have identified unbalanced nodes
- assertFalse(containerBalancerTask.getUnBalancedNodes()
- .isEmpty());
- assertFalse(containerBalancerTask.getContainerToSourceMap()
- .isEmpty());
- }
-
- @Test
- public void balancerShouldObeyMaxSizeLeavingSourceLimit()
- throws IllegalContainerBalancerStateException, IOException,
- InvalidContainerBalancerConfigurationException, TimeoutException {
- conf.set("ozone.scm.container.size", "1MB");
- balancerConfiguration =
- conf.getObject(ContainerBalancerConfiguration.class);
- balancerConfiguration.setThreshold(10);
- balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
- balancerConfiguration.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT);
-
- // no source containers should be selected when the limit is just 2 MB
- balancerConfiguration.setMaxSizeLeavingSource(2 * OzoneConsts.MB);
- startBalancer(balancerConfiguration);
-
- assertFalse(containerBalancerTask.getUnBalancedNodes()
- .isEmpty());
- assertTrue(containerBalancerTask.getContainerToSourceMap()
- .isEmpty());
- stopBalancer();
-
- // some containers should be selected when using default values
- OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
- ContainerBalancerConfiguration cbc = ozoneConfiguration.
- getObject(ContainerBalancerConfiguration.class);
- cbc.setBalancingInterval(1);
- ContainerBalancer sb = new ContainerBalancer(scm);
- containerBalancerTask = new ContainerBalancerTask(scm, 0, sb,
- sb.getMetrics(), cbc, false);
- containerBalancerTask.run();
-
- stopBalancer();
- // balancer should have identified unbalanced nodes
- assertFalse(containerBalancerTask.getUnBalancedNodes()
- .isEmpty());
- assertFalse(containerBalancerTask.getContainerToSourceMap()
- .isEmpty());
- assertNotEquals(0,
- containerBalancerTask.getSizeScheduledForMoveInLatestIteration());
- }
-
@Test
public void testMetrics()
throws IllegalContainerBalancerStateException, IOException,
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestableCluster.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestableCluster.java
new file mode 100644
index 000000000000..60ee45535f0b
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestableCluster.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import jakarta.annotation.Nonnull;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+
+/**
+ * Class is used for creating test cluster with a required number of datanodes.
+ * 1. Fill the cluster by generating some data.
+ * 2. Nodes in the cluster have utilization values determined by generateUtilization method.
+ */
+public final class TestableCluster {
+ static final ThreadLocalRandom RANDOM = ThreadLocalRandom.current();
+ private static final Logger LOG = LoggerFactory.getLogger(TestableCluster.class);
+ private final int nodeCount;
+ private final double[] nodeUtilizationList;
+ private final DatanodeUsageInfo[] nodesInCluster;
+ private final Map cidToInfoMap = new HashMap<>();
+ private final Map> cidToReplicasMap = new HashMap<>();
+ private final Map> dnUsageToContainersMap = new HashMap<>();
+ private final double averageUtilization;
+
+ TestableCluster(int numberOfNodes, long storageUnit) {
+ nodeCount = numberOfNodes;
+ nodeUtilizationList = createUtilizationList(nodeCount);
+ nodesInCluster = new DatanodeUsageInfo[nodeCount];
+
+ generateData(storageUnit);
+ createReplicasForContainers();
+ long clusterCapacity = 0, clusterUsedSpace = 0;
+
+ // For each node utilization, calculate that datanode's used space and capacity.
+ for (int i = 0; i < nodeUtilizationList.length; i++) {
+ Set containerIDSet = dnUsageToContainersMap.get(nodesInCluster[i]);
+ long datanodeUsedSpace = 0;
+ for (ContainerID containerID : containerIDSet) {
+ datanodeUsedSpace += cidToInfoMap.get(containerID).getUsedBytes();
+ }
+ // Use node utilization and used space to determine node capacity.
+ long datanodeCapacity = (nodeUtilizationList[i] == 0)
+ ? storageUnit * RANDOM.nextInt(10, 60)
+ : (long) (datanodeUsedSpace / nodeUtilizationList[i]);
+
+ SCMNodeStat stat = new SCMNodeStat(datanodeCapacity, datanodeUsedSpace,
+ datanodeCapacity - datanodeUsedSpace, 0,
+ datanodeCapacity - datanodeUsedSpace - 1);
+ nodesInCluster[i].setScmNodeStat(stat);
+ clusterUsedSpace += datanodeUsedSpace;
+ clusterCapacity += datanodeCapacity;
+ }
+
+ averageUtilization = (double) clusterUsedSpace / clusterCapacity;
+ }
+
+ @Override
+ public String toString() {
+ return "cluster of " + nodeCount + " nodes";
+ }
+
+ @Nonnull Map> getDatanodeToContainersMap() {
+ return dnUsageToContainersMap;
+ }
+
+ @Nonnull Map getCidToInfoMap() {
+ return cidToInfoMap;
+ }
+
+ int getNodeCount() {
+ return nodeCount;
+ }
+
+ double getAverageUtilization() {
+ return averageUtilization;
+ }
+
+ @Nonnull DatanodeUsageInfo[] getNodesInCluster() {
+ return nodesInCluster;
+ }
+
+ double[] getNodeUtilizationList() {
+ return nodeUtilizationList;
+ }
+
+ @Nonnull Map> getCidToReplicasMap() {
+ return cidToReplicasMap;
+ }
+
+ /**
+ * Determines unBalanced nodes, that is, over and under utilized nodes,
+ * according to the generated utilization values for nodes and the threshold.
+ *
+ * @param threshold a percentage in the range 0 to 100
+ * @return list of DatanodeUsageInfo containing the expected(correct) unBalanced nodes.
+ */
+ @Nonnull List getUnBalancedNodes(double threshold) {
+ threshold /= 100;
+ double lowerLimit = averageUtilization - threshold;
+ double upperLimit = averageUtilization + threshold;
+
+ // Use node utilization to determine over and under utilized nodes.
+ List expectedUnBalancedNodes = new ArrayList<>();
+ for (int i = 0; i < nodeCount; i++) {
+ double nodeUtilization = nodeUtilizationList[i];
+ if (nodeUtilization < lowerLimit || nodeUtilization > upperLimit) {
+ expectedUnBalancedNodes.add(nodesInCluster[i]);
+ }
+ }
+ return expectedUnBalancedNodes;
+ }
+
+ /**
+ * Create some datanodes and containers for each node.
+ */
+ private void generateData(long storageUnit) {
+ // Create datanodes and add containers to them.
+ for (int i = 0; i < nodeCount; i++) {
+ DatanodeUsageInfo usageInfo =
+ new DatanodeUsageInfo(MockDatanodeDetails.randomDatanodeDetails(), new SCMNodeStat());
+ nodesInCluster[i] = usageInfo;
+
+ // Create containers with varying used space.
+ Set containerIDSet = new HashSet<>();
+ int sizeMultiple = 0;
+ for (int j = 0; j < i; j++) {
+ sizeMultiple %= 5;
+ sizeMultiple++;
+ ContainerInfo container = createContainer((long) i * i + j, storageUnit * sizeMultiple);
+
+ cidToInfoMap.put(container.containerID(), container);
+ containerIDSet.add(container.containerID());
+
+ // Create initial replica for this container and add it.
+ Set containerReplicaSet = new HashSet<>();
+ containerReplicaSet.add(
+ createReplica(container.containerID(), usageInfo.getDatanodeDetails(), container.getUsedBytes()));
+ cidToReplicasMap.put(container.containerID(), containerReplicaSet);
+ }
+ dnUsageToContainersMap.put(usageInfo, containerIDSet);
+ }
+ }
+
+ private @Nonnull ContainerInfo createContainer(long id, long usedBytes) {
+ ContainerInfo.Builder builder = new ContainerInfo.Builder()
+ .setContainerID(id)
+ .setState(HddsProtos.LifeCycleState.CLOSED)
+ .setOwner("TestContainerBalancer")
+ .setUsedBytes(usedBytes);
+
+ // Make it a RATIS container if id is even, else make it an EC container.
+ ReplicationConfig config = (id % 2 == 0)
+ ? RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE)
+ : new ECReplicationConfig(3, 2);
+
+ builder.setReplicationConfig(config);
+ return builder.build();
+ }
+
+ /**
+ * Create the required number of replicas for each container. Note that one replica already exists and
+ * nodes with utilization value 0 should not have any replicas.
+ */
+ private void createReplicasForContainers() {
+ for (ContainerInfo container : cidToInfoMap.values()) {
+ // One replica already exists; create the remaining ones.
+ ReplicationConfig replicationConfig = container.getReplicationConfig();
+ ContainerID key = container.containerID();
+ for (int i = 0; i < replicationConfig.getRequiredNodes() - 1; i++) {
+ // Randomly pick a datanode for this replica.
+ int dnIndex = RANDOM.nextInt(0, nodeCount);
+ // Don't put replicas in DNs that are supposed to have 0 utilization.
+ if (Math.abs(nodeUtilizationList[dnIndex] - 0.0d) > 0.00001) {
+ DatanodeDetails node = nodesInCluster[dnIndex].getDatanodeDetails();
+ Set replicas = cidToReplicasMap.get(key);
+ replicas.add(createReplica(key, node, container.getUsedBytes()));
+ cidToReplicasMap.put(key, replicas);
+ dnUsageToContainersMap.get(nodesInCluster[dnIndex]).add(key);
+ }
+ }
+ }
+ }
+
+ /**
+ * Generates a range of equally spaced utilization(that is, used / capacity) values from 0 to 1.
+ *
+ * @param count Number of values to generate. Count must be greater than or equal to 1.
+ * @return double array of node utilization values
+ * @throws IllegalArgumentException If the value of the parameter count is less than 1.
+ */
+ private static double[] createUtilizationList(int count) throws IllegalArgumentException {
+ if (count < 1) {
+ LOG.warn("The value of argument count is {}. However, count must be greater than 0.", count);
+ throw new IllegalArgumentException();
+ }
+ double[] result = new double[count];
+ for (int i = 0; i < count; i++) {
+ result[i] = (i / (double) count);
+ }
+ return result;
+ }
+
+ private @Nonnull ContainerReplica createReplica(
+ @Nonnull ContainerID containerID,
+ @Nonnull DatanodeDetails datanodeDetails,
+ long usedBytes
+ ) {
+ return ContainerReplica.newBuilder()
+ .setContainerID(containerID)
+ .setContainerState(ContainerReplicaProto.State.CLOSED)
+ .setDatanodeDetails(datanodeDetails)
+ .setOriginNodeId(datanodeDetails.getUuid())
+ .setSequenceId(1000L)
+ .setBytesUsed(usedBytes)
+ .build();
+ }
+}