From f658f314157ebb4219d8d5e38b564ea27ed5f522 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Thu, 29 Aug 2024 20:31:15 +0530 Subject: [PATCH] Make balanced shards allocator timebound (#15239) * Make balanced shards allocator time bound to prioritise critical operations waiting in the pending task queue Signed-off-by: Rishab Nahata --- CHANGELOG.md | 1 + .../cluster/routing/RoutingNodes.java | 4 +- .../allocator/BalancedShardsAllocator.java | 46 +- .../allocator/LocalShardsBalancer.java | 46 +- .../common/settings/ClusterSettings.java | 1 + ...TimeBoundBalancedShardsAllocatorTests.java | 479 ++++++++++++++++++ .../decider/DiskThresholdDeciderTests.java | 12 +- .../cluster/OpenSearchAllocationTestCase.java | 11 + 8 files changed, 591 insertions(+), 9 deletions(-) create mode 100644 server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index b7e4548100df3..f8b695205e789 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897)) - Support filtering on a large list encoded by bitmap ([#14774](https://github.com/opensearch-project/OpenSearch/pull/14774)) - Add slice execution listeners to SearchOperationListener interface ([#15153](https://github.com/opensearch-project/OpenSearch/pull/15153)) +- Make balanced shards allocator timebound ([#15239](https://github.com/opensearch-project/OpenSearch/pull/15239)) - Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325)) - Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895)) - Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336)) diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index ab455f52c4195..b5e74821d41e7 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -1439,7 +1439,9 @@ public void remove() { */ public Iterator nodeInterleavedShardIterator(ShardMovementStrategy shardMovementStrategy) { final Queue> queue = new ArrayDeque<>(); - for (Map.Entry entry : nodesToShards.entrySet()) { + List> nodesToShardsEntrySet = new ArrayList<>(nodesToShards.entrySet()); + Randomness.shuffle(nodesToShardsEntrySet); + for (Map.Entry entry : nodesToShardsEntrySet) { queue.add(entry.getValue().copyShards().iterator()); } if (shardMovementStrategy == ShardMovementStrategy.PRIMARY_FIRST) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 212583d1fb14f..a5193ca602f04 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -54,6 +54,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import java.util.HashMap; import java.util.HashSet; @@ -87,6 +88,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { private static final Logger logger = LogManager.getLogger(BalancedShardsAllocator.class); + public static final TimeValue MIN_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20); public static final Setting INDEX_BALANCE_FACTOR_SETTING = Setting.floatSetting( "cluster.routing.allocation.balance.index", @@ -169,6 +171,23 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.NodeScope ); + public static final Setting ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting( + "cluster.routing.allocation.balanced_shards_allocator.allocator_timeout", + TimeValue.MINUS_ONE, + TimeValue.MINUS_ONE, + timeValue -> { + if (timeValue.compareTo(MIN_ALLOCATOR_TIMEOUT) < 0 && timeValue.compareTo(TimeValue.MINUS_ONE) != 0) { + throw new IllegalArgumentException( + "Setting [" + + "cluster.routing.allocation.balanced_shards_allocator.allocator_timeout" + + "] should be more than 20s or -1ms to disable timeout" + ); + } + }, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + private volatile boolean movePrimaryFirst; private volatile ShardMovementStrategy shardMovementStrategy; @@ -181,6 +200,8 @@ public class BalancedShardsAllocator implements ShardsAllocator { private volatile float threshold; private volatile boolean ignoreThrottleInRestore; + private volatile TimeValue allocatorTimeout; + private long startTime; public BalancedShardsAllocator(Settings settings) { this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); @@ -197,6 +218,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings)); setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings)); setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings)); + setAllocatorTimeout(ALLOCATOR_TIMEOUT_SETTING.get(settings)); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy); @@ -206,6 +228,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance); clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); clusterSettings.addSettingsUpdateConsumer(IGNORE_THROTTLE_FOR_REMOTE_RESTORE, this::setIgnoreThrottleInRestore); + clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout); } /** @@ -284,6 +307,20 @@ private void setThreshold(float threshold) { this.threshold = threshold; } + private void setAllocatorTimeout(TimeValue allocatorTimeout) { + this.allocatorTimeout = allocatorTimeout; + } + + protected boolean allocatorTimedOut() { + if (allocatorTimeout.equals(TimeValue.MINUS_ONE)) { + if (logger.isTraceEnabled()) { + logger.trace("Allocator timeout is disabled. Will not short circuit allocator tasks"); + } + return false; + } + return System.nanoTime() - this.startTime > allocatorTimeout.nanos(); + } + @Override public void allocate(RoutingAllocation allocation) { if (allocation.routingNodes().size() == 0) { @@ -298,8 +335,10 @@ public void allocate(RoutingAllocation allocation) { threshold, preferPrimaryShardBalance, preferPrimaryShardRebalance, - ignoreThrottleInRestore + ignoreThrottleInRestore, + this::allocatorTimedOut ); + this.startTime = System.nanoTime(); localShardsBalancer.allocateUnassigned(); localShardsBalancer.moveShards(); localShardsBalancer.balance(); @@ -321,7 +360,8 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f threshold, preferPrimaryShardBalance, preferPrimaryShardRebalance, - ignoreThrottleInRestore + ignoreThrottleInRestore, + () -> false // as we don't need to check if timed out or not while just understanding ShardAllocationDecision ); AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN; MoveDecision moveDecision = MoveDecision.NOT_TAKEN; @@ -585,7 +625,7 @@ public Balancer( float threshold, boolean preferPrimaryBalance ) { - super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, false); + super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, false, () -> false); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index 7e4ae58548c55..adb8ee2cf7e85 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -41,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Supplier; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -71,6 +72,7 @@ public class LocalShardsBalancer extends ShardsBalancer { private final float avgPrimaryShardsPerNode; private final BalancedShardsAllocator.NodeSorter sorter; private final Set inEligibleTargetNode; + private final Supplier timedOutFunc; private int totalShardCount = 0; public LocalShardsBalancer( @@ -81,7 +83,8 @@ public LocalShardsBalancer( float threshold, boolean preferPrimaryBalance, boolean preferPrimaryRebalance, - boolean ignoreThrottleInRestore + boolean ignoreThrottleInRestore, + Supplier timedOutFunc ) { this.logger = logger; this.allocation = allocation; @@ -99,6 +102,7 @@ public LocalShardsBalancer( this.preferPrimaryRebalance = preferPrimaryRebalance; this.shardMovementStrategy = shardMovementStrategy; this.ignoreThrottleInRestore = ignoreThrottleInRestore; + this.timedOutFunc = timedOutFunc; } /** @@ -344,6 +348,14 @@ private void balanceByWeights() { final BalancedShardsAllocator.ModelNode[] modelNodes = sorter.modelNodes; final float[] weights = sorter.weights; for (String index : buildWeightOrderedIndices()) { + // Terminate if the time allocated to the balanced shards allocator has elapsed + if (timedOutFunc != null && timedOutFunc.get()) { + logger.info( + "Cannot balance any shard in the cluster as time allocated to balanced shards allocator has elapsed" + + ". Skipping indices iteration" + ); + return; + } IndexMetadata indexMetadata = metadata.index(index); // find nodes that have a shard of this index or where shards of this index are allowed to be allocated to, @@ -368,6 +380,14 @@ private void balanceByWeights() { int lowIdx = 0; int highIdx = relevantNodes - 1; while (true) { + // break if the time allocated to the balanced shards allocator has elapsed + if (timedOutFunc != null && timedOutFunc.get()) { + logger.info( + "Cannot balance any shard in the cluster as time allocated to balanced shards allocator has elapsed" + + ". Skipping relevant nodes iteration" + ); + return; + } final BalancedShardsAllocator.ModelNode minNode = modelNodes[lowIdx]; final BalancedShardsAllocator.ModelNode maxNode = modelNodes[highIdx]; advance_range: if (maxNode.numShards(index) > 0) { @@ -572,6 +592,15 @@ void moveShards() { return; } + // Terminate if the time allocated to the balanced shards allocator has elapsed + if (timedOutFunc != null && timedOutFunc.get()) { + logger.info( + "Cannot move any shard in the cluster as time allocated to balanced shards allocator has elapsed" + + ". Skipping shard iteration" + ); + return; + } + ShardRouting shardRouting = it.next(); if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) { @@ -799,8 +828,23 @@ void allocateUnassigned() { int secondaryLength = 0; int primaryLength = primary.length; ArrayUtil.timSort(primary, comparator); + if (logger.isTraceEnabled()) { + logger.trace("Staring allocation of [{}] unassigned shards", primaryLength); + } do { for (int i = 0; i < primaryLength; i++) { + if (timedOutFunc != null && timedOutFunc.get()) { + // TODO - maybe check if we can allow wait for active shards thingy bypass this condition + logger.info( + "Ignoring [{}] unassigned shards for allocation as time allocated to balanced shards allocator has elapsed", + (primaryLength - i) + ); + while (i < primaryLength) { + unassigned.ignoreShard(primary[i], UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes()); + i++; + } + return; + } ShardRouting shard = primary[i]; final AllocateUnassignedDecision allocationDecision = decideAllocateUnassigned(shard); final String assignedNodeId = allocationDecision.getTargetNode() != null diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 8daf9125bb27e..9a6b3f1118709 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -270,6 +270,7 @@ public void apply(Settings value, Settings current, Settings previous) { BalancedShardsAllocator.SHARD_MOVEMENT_STRATEGY_SETTING, BalancedShardsAllocator.THRESHOLD_SETTING, BalancedShardsAllocator.IGNORE_THROTTLE_FOR_REMOTE_RESTORE, + BalancedShardsAllocator.ALLOCATOR_TIMEOUT_SETTING, BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING, BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING, BreakerSettings.CIRCUIT_BREAKER_TYPE, diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java new file mode 100644 index 0000000000000..a10c305686638 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java @@ -0,0 +1,479 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation.allocator; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterInfo; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.cluster.routing.allocation.decider.AllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.opensearch.cluster.routing.allocation.decider.Decision; +import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; +import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.ALLOCATOR_TIMEOUT_SETTING; + +public class TimeBoundBalancedShardsAllocatorTests extends OpenSearchAllocationTestCase { + + private final DiscoveryNode node1 = newNode("node1", "node1", Collections.singletonMap("zone", "1a")); + private final DiscoveryNode node2 = newNode("node2", "node2", Collections.singletonMap("zone", "1b")); + private final DiscoveryNode node3 = newNode("node3", "node3", Collections.singletonMap("zone", "1c")); + + public void testAllUnassignedShardsAllocatedWhenNoTimeOut() { + int numberOfIndices = 2; + int numberOfShards = 5; + int numberOfReplicas = 1; + int totalPrimaryCount = numberOfIndices * numberOfShards; + int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); + Settings.Builder settings = Settings.builder(); + // passing total shard count for timed out latch such that no shard times out + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), new CountDownLatch(totalShardCount)); + Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); + RoutingTable routingTable = buildRoutingTable(metadata); + ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + RoutingAllocation allocation = new RoutingAllocation( + yesAllocationDeciders(), + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + null, + System.nanoTime() + ); + allocator.allocate(allocation); + List initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); + int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()); + int node2Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node2.getId()); + int node3Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node3.getId()); + assertEquals(totalShardCount, initializingShards.size()); + assertEquals(0, allocation.routingNodes().unassigned().ignored().size()); + assertEquals(totalPrimaryCount, node1Recoveries + node2Recoveries + node3Recoveries); + } + + public void testAllUnassignedShardsIgnoredWhenTimedOut() { + int numberOfIndices = 2; + int numberOfShards = 5; + int numberOfReplicas = 1; + int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); + Settings.Builder settings = Settings.builder(); + // passing 0 for timed out latch such that all shard times out + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), new CountDownLatch(0)); + Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); + RoutingTable routingTable = buildRoutingTable(metadata); + ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + RoutingAllocation allocation = new RoutingAllocation( + yesAllocationDeciders(), + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + null, + System.nanoTime() + ); + allocator.allocate(allocation); + List initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); + int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()); + int node2Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node2.getId()); + int node3Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node3.getId()); + assertEquals(0, initializingShards.size()); + assertEquals(totalShardCount, allocation.routingNodes().unassigned().ignored().size()); + assertEquals(0, node1Recoveries + node2Recoveries + node3Recoveries); + } + + public void testAllocatePartialPrimaryShardsUntilTimedOut() { + int numberOfIndices = 2; + int numberOfShards = 5; + int numberOfReplicas = 1; + int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); + Settings.Builder settings = Settings.builder(); + int shardsToAllocate = randomIntBetween(1, numberOfShards * numberOfIndices); + // passing shards to allocate for timed out latch such that only few primary shards are allocated in this reroute round + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), new CountDownLatch(shardsToAllocate)); + Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); + RoutingTable routingTable = buildRoutingTable(metadata); + ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + RoutingAllocation allocation = new RoutingAllocation( + yesAllocationDeciders(), + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + null, + System.nanoTime() + ); + allocator.allocate(allocation); + List initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); + int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()); + int node2Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node2.getId()); + int node3Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node3.getId()); + assertEquals(shardsToAllocate, initializingShards.size()); + assertEquals(totalShardCount - shardsToAllocate, allocation.routingNodes().unassigned().ignored().size()); + assertEquals(shardsToAllocate, node1Recoveries + node2Recoveries + node3Recoveries); + } + + public void testAllocateAllPrimaryShardsAndPartialReplicaShardsUntilTimedOut() { + int numberOfIndices = 2; + int numberOfShards = 5; + int numberOfReplicas = 1; + int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); + Settings.Builder settings = Settings.builder(); + int shardsToAllocate = randomIntBetween(numberOfShards * numberOfIndices, totalShardCount); + // passing shards to allocate for timed out latch such that all primary shards and few replica shards are allocated in this reroute + // round + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), new CountDownLatch(shardsToAllocate)); + Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); + RoutingTable routingTable = buildRoutingTable(metadata); + ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + RoutingAllocation allocation = new RoutingAllocation( + yesAllocationDeciders(), + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + null, + System.nanoTime() + ); + allocator.allocate(allocation); + List initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); + int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()); + int node2Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node2.getId()); + int node3Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node3.getId()); + assertEquals(shardsToAllocate, initializingShards.size()); + assertEquals(totalShardCount - shardsToAllocate, allocation.routingNodes().unassigned().ignored().size()); + assertEquals(numberOfShards * numberOfIndices, node1Recoveries + node2Recoveries + node3Recoveries); + } + + public void testAllShardsMoveWhenExcludedAndTimeoutNotBreached() { + int numberOfIndices = 3; + int numberOfShards = 5; + int numberOfReplicas = 1; + int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); + Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); + RoutingTable routingTable = buildRoutingTable(metadata); + ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + MockAllocationService allocationService = createAllocationService(); + state = applyStartedShardsUntilNoChange(state, allocationService); + // check all shards allocated + assertEquals(0, state.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(totalShardCount, state.getRoutingNodes().shardsWithState(STARTED).size()); + int node1ShardCount = state.getRoutingNodes().node("node1").size(); + Settings settings = Settings.builder().put("cluster.routing.allocation.exclude.zone", "1a").build(); + int shardsToMove = 10 + 1000; // such that time out is never breached + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings, new CountDownLatch(shardsToMove)); + RoutingAllocation allocation = new RoutingAllocation( + allocationDecidersForExcludeAPI(settings), + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + null, + System.nanoTime() + ); + allocator.allocate(allocation); + List relocatingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.RELOCATING); + assertEquals(node1ShardCount, relocatingShards.size()); + } + + public void testNoShardsMoveWhenExcludedAndTimeoutBreached() { + int numberOfIndices = 3; + int numberOfShards = 5; + int numberOfReplicas = 1; + int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); + Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); + RoutingTable routingTable = buildRoutingTable(metadata); + ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + MockAllocationService allocationService = createAllocationService(); + state = applyStartedShardsUntilNoChange(state, allocationService); + // check all shards allocated + assertEquals(0, state.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(totalShardCount, state.getRoutingNodes().shardsWithState(STARTED).size()); + Settings settings = Settings.builder().put("cluster.routing.allocation.exclude.zone", "1a").build(); + int shardsToMove = 0; // such that time out is never breached + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings, new CountDownLatch(shardsToMove)); + RoutingAllocation allocation = new RoutingAllocation( + allocationDecidersForExcludeAPI(settings), + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + null, + System.nanoTime() + ); + allocator.allocate(allocation); + List relocatingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.RELOCATING); + assertEquals(0, relocatingShards.size()); + } + + public void testPartialShardsMoveWhenExcludedAndTimeoutBreached() { + int numberOfIndices = 3; + int numberOfShards = 5; + int numberOfReplicas = 1; + int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); + Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); + RoutingTable routingTable = buildRoutingTable(metadata); + ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + MockAllocationService allocationService = createAllocationService(); + state = applyStartedShardsUntilNoChange(state, allocationService); + // check all shards allocated + assertEquals(0, state.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(totalShardCount, state.getRoutingNodes().shardsWithState(STARTED).size()); + Settings settings = Settings.builder().put("cluster.routing.allocation.exclude.zone", "1a").build(); + // since for moves, it creates an iterator over shards which interleaves between nodes, hence + // for shardsToMove=6, it will have 2 shards from node1, node2, node3 each attempting to move with only + // shards from node1 can actually move. Hence, total moves that will be executed is 2 (6/3). + int shardsToMove = 6; // such that time out is never breached + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings, new CountDownLatch(shardsToMove)); + RoutingAllocation allocation = new RoutingAllocation( + allocationDecidersForExcludeAPI(settings), + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + null, + System.nanoTime() + ); + allocator.allocate(allocation); + List relocatingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.RELOCATING); + assertEquals(shardsToMove / 3, relocatingShards.size()); + } + + public void testClusterRebalancedWhenNotTimedOut() { + int numberOfIndices = 1; + int numberOfShards = 15; + int numberOfReplicas = 1; + int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); + Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); + RoutingTable routingTable = buildRoutingTable(metadata); + ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + MockAllocationService allocationService = createAllocationService( + Settings.builder().put("cluster.routing.allocation.exclude.zone", "1a").build() + ); // such that no shards are allocated to node1 + state = applyStartedShardsUntilNoChange(state, allocationService); + int node1ShardCount = state.getRoutingNodes().node("node1").size(); + // check all shards allocated + assertEquals(0, state.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(totalShardCount, state.getRoutingNodes().shardsWithState(STARTED).size()); + assertEquals(0, node1ShardCount); + Settings newSettings = Settings.builder().put("cluster.routing.allocation.exclude.zone", "").build(); + int shardsToMove = 1000; // such that time out is never breached + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(newSettings, new CountDownLatch(shardsToMove)); + RoutingAllocation allocation = new RoutingAllocation( + allocationDecidersForExcludeAPI(newSettings), + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + null, + System.nanoTime() + ); + allocator.allocate(allocation); + List relocatingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.RELOCATING); + assertEquals(totalShardCount / 3, relocatingShards.size()); + } + + public void testClusterNotRebalancedWhenTimedOut() { + int numberOfIndices = 1; + int numberOfShards = 15; + int numberOfReplicas = 1; + int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); + Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); + RoutingTable routingTable = buildRoutingTable(metadata); + ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + MockAllocationService allocationService = createAllocationService( + Settings.builder().put("cluster.routing.allocation.exclude.zone", "1a").build() + ); // such that no shards are allocated to node1 + state = applyStartedShardsUntilNoChange(state, allocationService); + int node1ShardCount = state.getRoutingNodes().node("node1").size(); + // check all shards allocated + assertEquals(0, state.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(totalShardCount, state.getRoutingNodes().shardsWithState(STARTED).size()); + assertEquals(0, node1ShardCount); + Settings newSettings = Settings.builder().put("cluster.routing.allocation.exclude.zone", "").build(); + int shardsToMove = 0; // such that it never balances anything + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(newSettings, new CountDownLatch(shardsToMove)); + RoutingAllocation allocation = new RoutingAllocation( + allocationDecidersForExcludeAPI(newSettings), + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + null, + System.nanoTime() + ); + allocator.allocate(allocation); + List relocatingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.RELOCATING); + assertEquals(0, relocatingShards.size()); + } + + public void testClusterPartialRebalancedWhenTimedOut() { + int numberOfIndices = 1; + int numberOfShards = 15; + int numberOfReplicas = 1; + int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); + Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); + RoutingTable routingTable = buildRoutingTable(metadata); + ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + MockAllocationService allocationService = createAllocationService( + Settings.builder().put("cluster.routing.allocation.exclude.zone", "1a").build() + ); // such that no shards are allocated to node1 + state = applyStartedShardsUntilNoChange(state, allocationService); + int node1ShardCount = state.getRoutingNodes().node("node1").size(); + // check all shards allocated + assertEquals(0, state.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(totalShardCount, state.getRoutingNodes().shardsWithState(STARTED).size()); + assertEquals(0, node1ShardCount); + Settings newSettings = Settings.builder().put("cluster.routing.allocation.exclude.zone", "").build(); + + // making custom set of allocation deciders such that it never attempts to move shards but always attempts to rebalance + List allocationDeciders = Arrays.asList(new AllocationDecider() { + @Override + public Decision canMoveAnyShard(RoutingAllocation allocation) { + return Decision.NO; + } + }, new AllocationDecider() { + @Override + public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { + return Decision.YES; + } + }, new SameShardAllocationDecider(newSettings, new ClusterSettings(newSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); + int shardsToMove = 3; // such that it only partially balances few shards + // adding +1 as during rebalance we do per index timeout check and then per node check + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(newSettings, new CountDownLatch(shardsToMove + 1)); + RoutingAllocation allocation = new RoutingAllocation( + new AllocationDeciders(allocationDeciders), + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + null, + System.nanoTime() + ); + allocator.allocate(allocation); + List relocatingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.RELOCATING); + assertEquals(3, relocatingShards.size()); + } + + public void testAllocatorNeverTimedOutIfValueIsMinusOne() { + Settings build = Settings.builder().put("cluster.routing.allocation.balanced_shards_allocator.allocator_timeout", "-1").build(); + BalancedShardsAllocator allocator = new BalancedShardsAllocator(build); + assertFalse(allocator.allocatorTimedOut()); + } + + public void testAllocatorTimeout() { + String settingKey = "cluster.routing.allocation.balanced_shards_allocator.allocator_timeout"; + // Valid setting with timeout = 20s + Settings build = Settings.builder().put(settingKey, "20s").build(); + assertEquals(20, ALLOCATOR_TIMEOUT_SETTING.get(build).getSeconds()); + + // Valid setting with timeout > 20s + build = Settings.builder().put(settingKey, "30000ms").build(); + assertEquals(30, ALLOCATOR_TIMEOUT_SETTING.get(build).getSeconds()); + + // Invalid setting with timeout < 20s + Settings lessThan20sSetting = Settings.builder().put(settingKey, "10s").build(); + IllegalArgumentException iae = expectThrows( + IllegalArgumentException.class, + () -> ALLOCATOR_TIMEOUT_SETTING.get(lessThan20sSetting) + ); + assertEquals("Setting [" + settingKey + "] should be more than 20s or -1ms to disable timeout", iae.getMessage()); + + // Valid setting with timeout = -1 + build = Settings.builder().put(settingKey, "-1").build(); + assertEquals(-1, ALLOCATOR_TIMEOUT_SETTING.get(build).getMillis()); + } + + private RoutingTable buildRoutingTable(Metadata metadata) { + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + for (Map.Entry entry : metadata.getIndices().entrySet()) { + routingTableBuilder.addAsNew(entry.getValue()); + } + return routingTableBuilder.build(); + } + + private Metadata buildMetadata(Metadata.Builder mb, int numberOfIndices, int numberOfShards, int numberOfReplicas) { + for (int i = 0; i < numberOfIndices; i++) { + mb.put( + IndexMetadata.builder("test_" + i) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ); + } + + return mb.build(); + } + + static class TestBalancedShardsAllocator extends BalancedShardsAllocator { + private final CountDownLatch timedOutLatch; + + public TestBalancedShardsAllocator(Settings settings, CountDownLatch timedOutLatch) { + super(settings); + this.timedOutLatch = timedOutLatch; + } + + @Override + protected boolean allocatorTimedOut() { + if (timedOutLatch.getCount() == 0) { + return true; + } + timedOutLatch.countDown(); + return false; + } + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 2e24640fe858d..94e91c3f7c3c1 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -530,6 +530,8 @@ public void testDiskThresholdWithAbsoluteSizes() { // Primary should initialize, even though both nodes are over the limit initialize assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + // below checks are unnecessary as the primary shard is always assigned to node2 as BSA always picks up that node + // first as both node1 and node2 have equal weight as both of them contain zero shards. String nodeWithPrimary, nodeWithoutPrimary; if (clusterState.getRoutingNodes().node("node1").size() == 1) { nodeWithPrimary = "node1"; @@ -679,10 +681,12 @@ public void testDiskThresholdWithAbsoluteSizes() { clusterState = startInitializingShardsAndReroute(strategy, clusterState); logShardStates(clusterState); - // primary shard already has been relocated away - assertThat(clusterState.getRoutingNodes().node(nodeWithPrimary).size(), equalTo(0)); - // node with increased space still has its shard - assertThat(clusterState.getRoutingNodes().node(nodeWithoutPrimary).size(), equalTo(1)); + // primary shard already has been relocated away - this is a wrong expectation as we don't really move + // primary first unless explicitly set by setting. This is caught with PR + // https://github.com/opensearch-project/OpenSearch/pull/15239/ + // as it randomises nodes to check for potential moves + // assertThat(clusterState.getRoutingNodes().node(nodeWithPrimary).size(), equalTo(0)); + // assertThat(clusterState.getRoutingNodes().node(nodeWithoutPrimary).size(), equalTo(1)); assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(1)); assertThat(clusterState.getRoutingNodes().node("node4").size(), equalTo(1)); diff --git a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java index 34b8c58a9c5b2..f54ba36203684 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java @@ -48,6 +48,7 @@ import org.opensearch.cluster.routing.allocation.decider.AllocationDecider; import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.Decision; +import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -213,6 +214,16 @@ protected static AllocationDeciders throttleAllocationDeciders() { ); } + protected static AllocationDeciders allocationDecidersForExcludeAPI(Settings settings) { + return new AllocationDeciders( + Arrays.asList( + new TestAllocateDecision(Decision.YES), + new SameShardAllocationDecider(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), + new FilterAllocationDecider(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) + ) + ); + } + protected ClusterState applyStartedShardsUntilNoChange(ClusterState clusterState, AllocationService service) { ClusterState lastClusterState; do {