Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Fix remote shards balance #16972

Open
wants to merge 2 commits into
base: 2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Ensure consistency of system flag on IndexMetadata after diff is applied ([#16644](https://github.com/opensearch-project/OpenSearch/pull/16644))
- Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state ([#16763](https://github.com/opensearch-project/OpenSearch/pull/16763))
- Fix _list/shards API failing when closed indices are present ([#16606](https://github.com/opensearch-project/OpenSearch/pull/16606))
- Fix remote shards balance ([#15335](https://github.com/opensearch-project/OpenSearch/pull/15335))
- Always use `constant_score` query for `match_only_text` field ([#16964](https://github.com/opensearch-project/OpenSearch/pull/16964))

### Security
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,17 @@
final Map<String, Integer> nodePrimaryShardCount = calculateNodePrimaryShardCount(remoteRoutingNodes);
int totalPrimaryShardCount = nodePrimaryShardCount.values().stream().reduce(0, Integer::sum);

totalPrimaryShardCount += routingNodes.unassigned().getNumPrimaries();
int avgPrimaryPerNode = (totalPrimaryShardCount + routingNodes.size() - 1) / routingNodes.size();
int unassignedRemotePrimaryShardCount = 0;
for (ShardRouting shard : routingNodes.unassigned()) {
if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation)) && shard.primary()) {
unassignedRemotePrimaryShardCount++;

Check warning on line 253 in server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java#L253

Added line #L253 was not covered by tests
}
}

Check warning on line 255 in server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java#L255

Added line #L255 was not covered by tests
totalPrimaryShardCount += unassignedRemotePrimaryShardCount;
final int avgPrimaryPerNode = (totalPrimaryShardCount + remoteRoutingNodes.size() - 1) / remoteRoutingNodes.size();

ArrayDeque<RoutingNode> sourceNodes = new ArrayDeque<>();
ArrayDeque<RoutingNode> targetNodes = new ArrayDeque<>();
final ArrayDeque<RoutingNode> sourceNodes = new ArrayDeque<>();
final ArrayDeque<RoutingNode> targetNodes = new ArrayDeque<>();
for (RoutingNode node : remoteRoutingNodes) {
if (nodePrimaryShardCount.get(node.nodeId()) > avgPrimaryPerNode) {
sourceNodes.add(node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public AllocationService createRemoteCapableAllocationService() {
}

public AllocationService createRemoteCapableAllocationService(String excludeNodes) {
Settings settings = Settings.builder().put("cluster.routing.allocation.exclude.node_id", excludeNodes).build();
Settings settings = Settings.builder().put("cluster.routing.allocation.exclude._id", excludeNodes).build();
return new MockAllocationService(
randomAllocationDeciders(settings, EMPTY_CLUSTER_SETTINGS, random()),
new TestGatewayAllocator(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,51 @@ public class RemoteShardsRebalanceShardsTests extends RemoteShardsBalancerBaseTe
* Post rebalance primaries should be balanced across all the nodes.
*/
public void testShardAllocationAndRebalance() {
int localOnlyNodes = 20;
int remoteCapableNodes = 40;
int localIndices = 40;
int remoteIndices = 80;
final int localOnlyNodes = 20;
final int remoteCapableNodes = 40;
final int halfRemoteCapableNodes = remoteCapableNodes / 2;
final int localIndices = 40;
final int remoteIndices = 80;
ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices);
AllocationService service = this.createRemoteCapableAllocationService();
final StringBuilder excludeNodes = new StringBuilder();
for (int i = 0; i < halfRemoteCapableNodes; i++) {
excludeNodes.append(getNodeId(i, true));
if (i != (remoteCapableNodes / 2 - 1)) {
excludeNodes.append(", ");
}
}
AllocationService service = this.createRemoteCapableAllocationService(excludeNodes.toString());
clusterState = allocateShardsAndBalance(clusterState, service);
RoutingNodes routingNodes = clusterState.getRoutingNodes();
RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes);

final Map<String, Integer> nodePrimariesCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, true);
final Map<String, Integer> nodeReplicaCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, false);
Map<String, Integer> nodePrimariesCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, true);
Map<String, Integer> nodeReplicaCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, false);
int avgPrimariesPerNode = getTotalShardCountAcrossNodes(nodePrimariesCounter) / remoteCapableNodes;

// Primary and replica are balanced post first reroute
// Primary and replica are balanced after first allocating unassigned
for (RoutingNode node : routingNodes) {
if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getNodePool(node))) {
if (Integer.parseInt(node.nodeId().split("-")[4]) < halfRemoteCapableNodes) {
assertEquals(0, (int) nodePrimariesCounter.getOrDefault(node.nodeId(), 0));
} else {
assertEquals(avgPrimariesPerNode * 2, (int) nodePrimariesCounter.get(node.nodeId()));
}
assertTrue(nodeReplicaCounter.getOrDefault(node.nodeId(), 0) >= 0);
}
}

// Remove exclude constraint and rebalance
service = this.createRemoteCapableAllocationService();
clusterState = allocateShardsAndBalance(clusterState, service);
routingNodes = clusterState.getRoutingNodes();
allocation = getRoutingAllocation(clusterState, routingNodes);
nodePrimariesCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, true);
nodeReplicaCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, false);
for (RoutingNode node : routingNodes) {
if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getNodePool(node))) {
assertInRange(nodePrimariesCounter.get(node.nodeId()), avgPrimariesPerNode, remoteCapableNodes - 1);
assertTrue(nodeReplicaCounter.get(node.nodeId()) >= 0);
assertEquals(avgPrimariesPerNode, (int) nodePrimariesCounter.get(node.nodeId()));
assertTrue(nodeReplicaCounter.getOrDefault(node.nodeId(), 0) >= 0);
}
}
}
Expand Down
Loading