Skip to content

Commit ca39e4e

Browse files
Make balanced shards allocator timebound (#15239)
* Make balanced shards allocator time bound to prioritise critical operations waiting in the pending task queue Signed-off-by: Rishab Nahata <[email protected]> (cherry picked from commit e982a16) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 4ec2c5d commit ca39e4e

File tree

8 files changed

+591
-9
lines changed

8 files changed

+591
-9
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2222
- [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897))
2323
- Support filtering on a large list encoded by bitmap ([#14774](https://github.com/opensearch-project/OpenSearch/pull/14774))
2424
- Add slice execution listeners to SearchOperationListener interface ([#15153](https://github.com/opensearch-project/OpenSearch/pull/15153))
25+
- Make balanced shards allocator timebound ([#15239](https://github.com/opensearch-project/OpenSearch/pull/15239))
2526
- Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325))
2627
- Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895))
2728
- Star tree mapping changes ([#14605](https://github.com/opensearch-project/OpenSearch/pull/14605))

server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -1439,7 +1439,9 @@ public void remove() {
14391439
*/
14401440
public Iterator<ShardRouting> nodeInterleavedShardIterator(ShardMovementStrategy shardMovementStrategy) {
14411441
final Queue<Iterator<ShardRouting>> queue = new ArrayDeque<>();
1442-
for (Map.Entry<String, RoutingNode> entry : nodesToShards.entrySet()) {
1442+
List<Map.Entry<String, RoutingNode>> nodesToShardsEntrySet = new ArrayList<>(nodesToShards.entrySet());
1443+
Randomness.shuffle(nodesToShardsEntrySet);
1444+
for (Map.Entry<String, RoutingNode> entry : nodesToShardsEntrySet) {
14431445
queue.add(entry.getValue().copyShards().iterator());
14441446
}
14451447
if (shardMovementStrategy == ShardMovementStrategy.PRIMARY_FIRST) {

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

+43-3
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.opensearch.common.settings.Setting;
5555
import org.opensearch.common.settings.Setting.Property;
5656
import org.opensearch.common.settings.Settings;
57+
import org.opensearch.common.unit.TimeValue;
5758

5859
import java.util.HashMap;
5960
import java.util.HashSet;
@@ -87,6 +88,7 @@
8788
public class BalancedShardsAllocator implements ShardsAllocator {
8889

8990
private static final Logger logger = LogManager.getLogger(BalancedShardsAllocator.class);
91+
public static final TimeValue MIN_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20);
9092

9193
public static final Setting<Float> INDEX_BALANCE_FACTOR_SETTING = Setting.floatSetting(
9294
"cluster.routing.allocation.balance.index",
@@ -169,6 +171,23 @@ public class BalancedShardsAllocator implements ShardsAllocator {
169171
Property.NodeScope
170172
);
171173

174+
public static final Setting<TimeValue> ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting(
175+
"cluster.routing.allocation.balanced_shards_allocator.allocator_timeout",
176+
TimeValue.MINUS_ONE,
177+
TimeValue.MINUS_ONE,
178+
timeValue -> {
179+
if (timeValue.compareTo(MIN_ALLOCATOR_TIMEOUT) < 0 && timeValue.compareTo(TimeValue.MINUS_ONE) != 0) {
180+
throw new IllegalArgumentException(
181+
"Setting ["
182+
+ "cluster.routing.allocation.balanced_shards_allocator.allocator_timeout"
183+
+ "] should be more than 20s or -1ms to disable timeout"
184+
);
185+
}
186+
},
187+
Setting.Property.NodeScope,
188+
Setting.Property.Dynamic
189+
);
190+
172191
private volatile boolean movePrimaryFirst;
173192
private volatile ShardMovementStrategy shardMovementStrategy;
174193

@@ -181,6 +200,8 @@ public class BalancedShardsAllocator implements ShardsAllocator {
181200
private volatile float threshold;
182201

183202
private volatile boolean ignoreThrottleInRestore;
203+
private volatile TimeValue allocatorTimeout;
204+
private long startTime;
184205

185206
public BalancedShardsAllocator(Settings settings) {
186207
this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
@@ -197,6 +218,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
197218
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings));
198219
setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings));
199220
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
221+
setAllocatorTimeout(ALLOCATOR_TIMEOUT_SETTING.get(settings));
200222
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance);
201223
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst);
202224
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy);
@@ -206,6 +228,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
206228
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance);
207229
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
208230
clusterSettings.addSettingsUpdateConsumer(IGNORE_THROTTLE_FOR_REMOTE_RESTORE, this::setIgnoreThrottleInRestore);
231+
clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout);
209232
}
210233

211234
/**
@@ -284,6 +307,20 @@ private void setThreshold(float threshold) {
284307
this.threshold = threshold;
285308
}
286309

310+
private void setAllocatorTimeout(TimeValue allocatorTimeout) {
311+
this.allocatorTimeout = allocatorTimeout;
312+
}
313+
314+
protected boolean allocatorTimedOut() {
315+
if (allocatorTimeout.equals(TimeValue.MINUS_ONE)) {
316+
if (logger.isTraceEnabled()) {
317+
logger.trace("Allocator timeout is disabled. Will not short circuit allocator tasks");
318+
}
319+
return false;
320+
}
321+
return System.nanoTime() - this.startTime > allocatorTimeout.nanos();
322+
}
323+
287324
@Override
288325
public void allocate(RoutingAllocation allocation) {
289326
if (allocation.routingNodes().size() == 0) {
@@ -298,8 +335,10 @@ public void allocate(RoutingAllocation allocation) {
298335
threshold,
299336
preferPrimaryShardBalance,
300337
preferPrimaryShardRebalance,
301-
ignoreThrottleInRestore
338+
ignoreThrottleInRestore,
339+
this::allocatorTimedOut
302340
);
341+
this.startTime = System.nanoTime();
303342
localShardsBalancer.allocateUnassigned();
304343
localShardsBalancer.moveShards();
305344
localShardsBalancer.balance();
@@ -321,7 +360,8 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f
321360
threshold,
322361
preferPrimaryShardBalance,
323362
preferPrimaryShardRebalance,
324-
ignoreThrottleInRestore
363+
ignoreThrottleInRestore,
364+
() -> false // as we don't need to check if timed out or not while just understanding ShardAllocationDecision
325365
);
326366
AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN;
327367
MoveDecision moveDecision = MoveDecision.NOT_TAKEN;
@@ -585,7 +625,7 @@ public Balancer(
585625
float threshold,
586626
boolean preferPrimaryBalance
587627
) {
588-
super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, false);
628+
super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, false, () -> false);
589629
}
590630
}
591631

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java

+45-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.List;
4242
import java.util.Map;
4343
import java.util.Set;
44+
import java.util.function.Supplier;
4445
import java.util.stream.Stream;
4546
import java.util.stream.StreamSupport;
4647

@@ -71,6 +72,7 @@ public class LocalShardsBalancer extends ShardsBalancer {
7172
private final float avgPrimaryShardsPerNode;
7273
private final BalancedShardsAllocator.NodeSorter sorter;
7374
private final Set<RoutingNode> inEligibleTargetNode;
75+
private final Supplier<Boolean> timedOutFunc;
7476
private int totalShardCount = 0;
7577

7678
public LocalShardsBalancer(
@@ -81,7 +83,8 @@ public LocalShardsBalancer(
8183
float threshold,
8284
boolean preferPrimaryBalance,
8385
boolean preferPrimaryRebalance,
84-
boolean ignoreThrottleInRestore
86+
boolean ignoreThrottleInRestore,
87+
Supplier<Boolean> timedOutFunc
8588
) {
8689
this.logger = logger;
8790
this.allocation = allocation;
@@ -99,6 +102,7 @@ public LocalShardsBalancer(
99102
this.preferPrimaryRebalance = preferPrimaryRebalance;
100103
this.shardMovementStrategy = shardMovementStrategy;
101104
this.ignoreThrottleInRestore = ignoreThrottleInRestore;
105+
this.timedOutFunc = timedOutFunc;
102106
}
103107

104108
/**
@@ -344,6 +348,14 @@ private void balanceByWeights() {
344348
final BalancedShardsAllocator.ModelNode[] modelNodes = sorter.modelNodes;
345349
final float[] weights = sorter.weights;
346350
for (String index : buildWeightOrderedIndices()) {
351+
// Terminate if the time allocated to the balanced shards allocator has elapsed
352+
if (timedOutFunc != null && timedOutFunc.get()) {
353+
logger.info(
354+
"Cannot balance any shard in the cluster as time allocated to balanced shards allocator has elapsed"
355+
+ ". Skipping indices iteration"
356+
);
357+
return;
358+
}
347359
IndexMetadata indexMetadata = metadata.index(index);
348360

349361
// find nodes that have a shard of this index or where shards of this index are allowed to be allocated to,
@@ -368,6 +380,14 @@ private void balanceByWeights() {
368380
int lowIdx = 0;
369381
int highIdx = relevantNodes - 1;
370382
while (true) {
383+
// break if the time allocated to the balanced shards allocator has elapsed
384+
if (timedOutFunc != null && timedOutFunc.get()) {
385+
logger.info(
386+
"Cannot balance any shard in the cluster as time allocated to balanced shards allocator has elapsed"
387+
+ ". Skipping relevant nodes iteration"
388+
);
389+
return;
390+
}
371391
final BalancedShardsAllocator.ModelNode minNode = modelNodes[lowIdx];
372392
final BalancedShardsAllocator.ModelNode maxNode = modelNodes[highIdx];
373393
advance_range: if (maxNode.numShards(index) > 0) {
@@ -572,6 +592,15 @@ void moveShards() {
572592
return;
573593
}
574594

595+
// Terminate if the time allocated to the balanced shards allocator has elapsed
596+
if (timedOutFunc != null && timedOutFunc.get()) {
597+
logger.info(
598+
"Cannot move any shard in the cluster as time allocated to balanced shards allocator has elapsed"
599+
+ ". Skipping shard iteration"
600+
);
601+
return;
602+
}
603+
575604
ShardRouting shardRouting = it.next();
576605

577606
if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) {
@@ -799,8 +828,23 @@ void allocateUnassigned() {
799828
int secondaryLength = 0;
800829
int primaryLength = primary.length;
801830
ArrayUtil.timSort(primary, comparator);
831+
if (logger.isTraceEnabled()) {
832+
logger.trace("Staring allocation of [{}] unassigned shards", primaryLength);
833+
}
802834
do {
803835
for (int i = 0; i < primaryLength; i++) {
836+
if (timedOutFunc != null && timedOutFunc.get()) {
837+
// TODO - maybe check if we can allow wait for active shards thingy bypass this condition
838+
logger.info(
839+
"Ignoring [{}] unassigned shards for allocation as time allocated to balanced shards allocator has elapsed",
840+
(primaryLength - i)
841+
);
842+
while (i < primaryLength) {
843+
unassigned.ignoreShard(primary[i], UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes());
844+
i++;
845+
}
846+
return;
847+
}
804848
ShardRouting shard = primary[i];
805849
final AllocateUnassignedDecision allocationDecision = decideAllocateUnassigned(shard);
806850
final String assignedNodeId = allocationDecision.getTargetNode() != null

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

+1
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,7 @@ public void apply(Settings value, Settings current, Settings previous) {
271271
BalancedShardsAllocator.SHARD_MOVEMENT_STRATEGY_SETTING,
272272
BalancedShardsAllocator.THRESHOLD_SETTING,
273273
BalancedShardsAllocator.IGNORE_THROTTLE_FOR_REMOTE_RESTORE,
274+
BalancedShardsAllocator.ALLOCATOR_TIMEOUT_SETTING,
274275
BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING,
275276
BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING,
276277
BreakerSettings.CIRCUIT_BREAKER_TYPE,

0 commit comments

Comments
 (0)