Skip to content

Commit 836a9c4

Browse files
authored
Add flat_skew setting to node overload decider (#3563)
* Add flat_skew setting to node overload decider Signed-off-by: Rishab Nahata <[email protected]>
1 parent e5ad240 commit 836a9c4

File tree

4 files changed

+338
-50
lines changed

4 files changed

+338
-50
lines changed

server/src/internalClusterTest/java/org/opensearch/cluster/allocation/AwarenessAllocationIT.java

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,17 @@
4545
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
4646
import org.opensearch.common.Priority;
4747
import org.opensearch.common.settings.Settings;
48+
import org.opensearch.test.InternalTestCluster;
4849
import org.opensearch.test.OpenSearchIntegTestCase;
4950
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
5051

52+
import java.util.ArrayList;
5153
import java.util.Arrays;
5254
import java.util.List;
5355
import java.util.concurrent.TimeUnit;
5456
import java.util.stream.Collectors;
5557

58+
import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
5659
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
5760
import static org.hamcrest.Matchers.anyOf;
5861
import static org.hamcrest.Matchers.empty;
@@ -351,4 +354,140 @@ public void testAwarenessZonesIncrementalNodes() {
351354
assertThat(counts.get(B_1), equalTo(2));
352355
assertThat(counts.get(noZoneNode), equalTo(2));
353356
}
357+
358+
public void testThreeZoneOneReplicaWithForceZoneValueAndLoadAwareness() throws Exception {
359+
int nodeCountPerAZ = 5;
360+
int numOfShards = 30;
361+
int numOfReplica = 1;
362+
Settings commonSettings = Settings.builder()
363+
.put("cluster.routing.allocation.awareness.attributes", "zone")
364+
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
365+
.put("cluster.routing.allocation.load_awareness.skew_factor", "0.0")
366+
.put("cluster.routing.allocation.load_awareness.provisioned_capacity", Integer.toString(nodeCountPerAZ * 3))
367+
.build();
368+
369+
logger.info("--> starting 15 nodes on zones 'a' & 'b' & 'c'");
370+
List<String> nodes_in_zone_a = internalCluster().startNodes(
371+
nodeCountPerAZ,
372+
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
373+
);
374+
List<String> nodes_in_zone_b = internalCluster().startNodes(
375+
nodeCountPerAZ,
376+
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
377+
);
378+
List<String> nodes_in_zone_c = internalCluster().startNodes(
379+
nodeCountPerAZ,
380+
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
381+
);
382+
383+
// Creating index with 30 primary and 1 replica
384+
createIndex(
385+
"test-1",
386+
Settings.builder()
387+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numOfShards)
388+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplica)
389+
.build()
390+
);
391+
392+
ClusterHealthResponse health = client().admin()
393+
.cluster()
394+
.prepareHealth()
395+
.setIndices("test-1")
396+
.setWaitForEvents(Priority.LANGUID)
397+
.setWaitForGreenStatus()
398+
.setWaitForNodes(Integer.toString(nodeCountPerAZ * 3))
399+
.setWaitForNoRelocatingShards(true)
400+
.setWaitForNoInitializingShards(true)
401+
.execute()
402+
.actionGet();
403+
assertFalse(health.isTimedOut());
404+
405+
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
406+
ObjectIntHashMap<String> counts = new ObjectIntHashMap<>();
407+
408+
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
409+
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
410+
for (ShardRouting shardRouting : indexShardRoutingTable) {
411+
counts.addTo(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1);
412+
}
413+
}
414+
}
415+
416+
assertThat(counts.size(), equalTo(nodeCountPerAZ * 3));
417+
// All shards should be started
418+
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(numOfShards * (numOfReplica + 1)));
419+
420+
// stopping half nodes in zone a
421+
int nodesToStop = nodeCountPerAZ / 2;
422+
List<Settings> nodeDataPathSettings = new ArrayList<>();
423+
for (int i = 0; i < nodesToStop; i++) {
424+
nodeDataPathSettings.add(internalCluster().dataPathSettings(nodes_in_zone_a.get(i)));
425+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodes_in_zone_a.get(i)));
426+
}
427+
428+
client().admin().cluster().prepareReroute().setRetryFailed(true).get();
429+
health = client().admin()
430+
.cluster()
431+
.prepareHealth()
432+
.setIndices("test-1")
433+
.setWaitForEvents(Priority.LANGUID)
434+
.setWaitForNodes(Integer.toString(nodeCountPerAZ * 3 - nodesToStop))
435+
.setWaitForNoRelocatingShards(true)
436+
.setWaitForNoInitializingShards(true)
437+
.execute()
438+
.actionGet();
439+
assertFalse(health.isTimedOut());
440+
441+
// Creating another index with 30 primary and 1 replica
442+
createIndex(
443+
"test-2",
444+
Settings.builder()
445+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numOfShards)
446+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplica)
447+
.build()
448+
);
449+
450+
health = client().admin()
451+
.cluster()
452+
.prepareHealth()
453+
.setIndices("test-1", "test-2")
454+
.setWaitForEvents(Priority.LANGUID)
455+
.setWaitForNodes(Integer.toString(nodeCountPerAZ * 3 - nodesToStop))
456+
.setWaitForNoRelocatingShards(true)
457+
.setWaitForNoInitializingShards(true)
458+
.execute()
459+
.actionGet();
460+
assertFalse(health.isTimedOut());
461+
462+
// Restarting the nodes back
463+
for (int i = 0; i < nodesToStop; i++) {
464+
internalCluster().startNode(
465+
Settings.builder()
466+
.put("node.name", nodes_in_zone_a.get(i))
467+
.put(nodeDataPathSettings.get(i))
468+
.put(commonSettings)
469+
.put("node.attr.zone", "a")
470+
.build()
471+
);
472+
}
473+
client().admin().cluster().prepareReroute().setRetryFailed(true).get();
474+
475+
health = client().admin()
476+
.cluster()
477+
.prepareHealth()
478+
.setIndices("test-1", "test-2")
479+
.setWaitForEvents(Priority.LANGUID)
480+
.setWaitForNodes(Integer.toString(nodeCountPerAZ * 3))
481+
.setWaitForGreenStatus()
482+
.setWaitForActiveShards(2 * numOfShards * (numOfReplica + 1))
483+
.setWaitForNoRelocatingShards(true)
484+
.setWaitForNoInitializingShards(true)
485+
.execute()
486+
.actionGet();
487+
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
488+
489+
// All shards should be started now and cluster health should be green
490+
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2 * numOfShards * (numOfReplica + 1)));
491+
assertThat(health.isTimedOut(), equalTo(false));
492+
}
354493
}

server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeLoadAwareAllocationDecider.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,13 @@
3333
* </pre>
3434
* <p>
3535
* and prevent allocation on the surviving nodes of the under capacity cluster
36-
* based on overload factor defined as a percentage by
36+
* based on overload factor defined as a percentage and flat skew as absolute allowed skewness by
37+
* </p>
3738
* <pre>
3839
* cluster.routing.allocation.load_awareness.skew_factor: X
40+
* cluster.routing.allocation.load_awareness.flat_skew: N
3941
* </pre>
40-
* The total limit per node based on skew_factor doesn't limit primaries that previously
42+
* The total limit per node based on skew_factor and flat_skew doesn't limit primaries that previously
4143
* existed on the disk as those shards are force allocated by
4244
* {@link AllocationDeciders#canForceAllocatePrimary(ShardRouting, RoutingNode, RoutingAllocation)}
4345
* however new primaries due to index creation, snapshot restore etc can be controlled via the below settings.
@@ -74,19 +76,29 @@ public class NodeLoadAwareAllocationDecider extends AllocationDecider {
7476
Setting.Property.Dynamic,
7577
Property.NodeScope
7678
);
79+
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_FLAT_SKEW_SETTING = Setting.intSetting(
80+
"cluster.routing.allocation.load_awareness.flat_skew",
81+
2,
82+
2,
83+
Property.Dynamic,
84+
Property.NodeScope
85+
);
7786

7887
private volatile int provisionedCapacity;
7988

8089
private volatile double skewFactor;
8190

8291
private volatile boolean allowUnassignedPrimaries;
8392

93+
private volatile int flatSkew;
94+
8495
private static final Logger logger = LogManager.getLogger(NodeLoadAwareAllocationDecider.class);
8596

8697
public NodeLoadAwareAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
8798
this.skewFactor = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.get(settings);
8899
this.provisionedCapacity = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.get(settings);
89100
this.allowUnassignedPrimaries = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.get(settings);
101+
this.flatSkew = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_FLAT_SKEW_SETTING.get(settings);
90102
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING, this::setSkewFactor);
91103
clusterSettings.addSettingsUpdateConsumer(
92104
CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING,
@@ -96,6 +108,7 @@ public NodeLoadAwareAllocationDecider(Settings settings, ClusterSettings cluster
96108
CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING,
97109
this::setAllowUnassignedPrimaries
98110
);
111+
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_FLAT_SKEW_SETTING, this::setFlatSkew);
99112
}
100113

101114
private void setAllowUnassignedPrimaries(boolean allowUnassignedPrimaries) {
@@ -110,6 +123,10 @@ private void setProvisionedCapacity(int provisionedCapacity) {
110123
this.provisionedCapacity = provisionedCapacity;
111124
}
112125

126+
private void setFlatSkew(int flatSkew) {
127+
this.flatSkew = flatSkew;
128+
}
129+
113130
@Override
114131
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
115132
return underCapacity(shardRouting, node, allocation, (count, limit) -> count >= limit);
@@ -146,7 +163,7 @@ private Decision underCapacity(
146163
Metadata metadata = allocation.metadata();
147164
float expectedAvgShardsPerNode = (float) metadata.getTotalNumberOfShards() / provisionedCapacity;
148165
int nodeShardCount = node.numberOfOwningShards();
149-
int limit = (int) Math.ceil(expectedAvgShardsPerNode * (1 + skewFactor / 100.0));
166+
int limit = flatSkew + (int) Math.ceil(expectedAvgShardsPerNode * (1 + skewFactor / 100.0));
150167
if (decider.test(nodeShardCount, limit)) {
151168
logger.debug(
152169
() -> new ParameterizedMessage(
@@ -163,10 +180,11 @@ private Decision underCapacity(
163180
Decision.NO,
164181
NAME,
165182
"too many shards [%d] allocated to this node, limit per node [%d] considering"
166-
+ " overload factor [%.2f] based on capacity [%d]",
183+
+ " overload factor [%.2f] and flat skew [%d] based on capacity [%d]",
167184
nodeShardCount,
168185
limit,
169186
skewFactor,
187+
flatSkew,
170188
provisionedCapacity
171189
);
172190
}

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,7 @@ public void apply(Settings value, Settings current, Settings previous) {
556556
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING,
557557
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING,
558558
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING,
559+
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_FLAT_SKEW_SETTING,
559560
ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED,
560561
ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED,
561562
ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW,

0 commit comments

Comments
 (0)