Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/140237.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 140237
summary: Overall Decision for Deciders prioritizes THROTTLE
area: Allocation
type: bug
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing.allocation.allocator;

import org.apache.logging.log4j.Level;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.After;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class NonDesiredBalanceIT extends ESIntegTestCase {

private static final Set<String> NOT_PREFERRED_AND_THROTTLED_NODES = Collections.newSetFromMap(new ConcurrentHashMap<>());

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(NotPreferredAndThrottledPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING.getKey(), ClusterModule.BALANCED_ALLOCATOR)
.build();
}
Comment on lines +51 to +56
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Since we are starting node manually for eadch test. I think we can merge the two test classes and start node in each test method with its own node settings. That seems less clutter to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah definitely some de-duplication due here. I'll add it to the follow-up


@After
public void clearThrottleAndNotPreferredNodes() {
NOT_PREFERRED_AND_THROTTLED_NODES.clear();
}

/**
* Tests that the non-desired balance allocator obeys the THROTTLE decision of a node.
*/
@TestLogging(
reason = "watch for can't move message",
value = "org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator:TRACE"
)
public void testThrottleVsNotPreferredPriorityInDecisions() {
final Settings settings = Settings.builder().build();
final var sourceNode = internalCluster().startNode(settings);
final var indexName = randomIdentifier();
createIndex(indexName, 1, 0);
ensureGreen(indexName);
final var targetNode = internalCluster().startNode(settings);
final var sourceNodeID = getNodeId(sourceNode);
final var targetNodeID = getNodeId(targetNode);

NOT_PREFERRED_AND_THROTTLED_NODES.add(targetNodeID);

var mapNodeIdsToNames = nodeIdsToNames();
final var sourceNodeName = mapNodeIdsToNames.get(sourceNodeID);
final var targetNodeName = mapNodeIdsToNames.get(targetNodeID);
assertNotNull(sourceNodeName);
assertNotNull(targetNodeName);

logger.info("--> Verifying the shard did not move because allocation to the target node (ID: " + targetNodeID + ") is throttled.");
MockLog.awaitLogger(() -> {
logger.info(
"--> Excluding shard assignment to node "
+ sourceNodeName
+ "(ID: "
+ sourceNodeID
+ ") to force its shard to move to node "
+ targetNodeName
+ "(ID: "
+ targetNodeID
+ ")"
);
updateClusterSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", sourceNodeName));
},
BalancedShardsAllocator.class,
new MockLog.SeenEventExpectation(
"unable to relocate shard due to throttling",
BalancedShardsAllocator.class.getCanonicalName(),
Level.TRACE,
"[[*]][0] can't move: [MoveDecision{canMoveDecision=throttled, canRemainDecision=NO(), *}]"
)
);

logger.info("--> Clearing THROTTLE decider response and prodding the Reconciler (with a reroute request) to try again");
clearThrottleAndNotPreferredNodes();

safeGet(
client().execute(TransportClusterRerouteAction.TYPE, new ClusterRerouteRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))
);

safeAwait(
ClusterServiceUtils.addMasterTemporaryStateListener(
state -> state.routingTable(ProjectId.DEFAULT)
.index(indexName)
.allShards()
.flatMap(IndexShardRoutingTable::allShards)
.allMatch(shardRouting -> shardRouting.currentNodeId().equals(targetNodeID) && shardRouting.started())
)
);
}

public static class NotPreferredAndThrottledPlugin extends Plugin implements ClusterPlugin {
@Override
public Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) {

return List.of(new AllocationDecider() {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return NOT_PREFERRED_AND_THROTTLED_NODES.contains(node.nodeId()) ? Decision.NOT_PREFERRED : Decision.YES;
}
}, new AllocationDecider() {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
// THROTTLING is not returned in simulation, so only THROTTLE for real moves in the Reconciler.
return (NOT_PREFERRED_AND_THROTTLED_NODES.contains(node.nodeId()) && allocation.isSimulating() == false)
? Decision.THROTTLE
: Decision.YES;
}
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing.allocation.allocator;

import org.apache.logging.log4j.Level;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.After;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class ReconcilerIT extends ESIntegTestCase {

private static final Set<String> NOT_PREFERRED_AND_THROTTLED_NODES = Collections.newSetFromMap(new ConcurrentHashMap<>());

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(NotPreferredAndThrottledPlugin.class);
}

@After
public void clearThrottleAndNotPreferredNodes() {
NOT_PREFERRED_AND_THROTTLED_NODES.clear();
}

/**
* Tests that the Reconciler obeys the THROTTLE decision of a node.
*/
@TestLogging(
reason = "track when Reconciler decisions",
value = "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceReconciler:TRACE"
)
public void testThrottleVsNotPreferredPriorityInDecisions() {
final Settings settings = Settings.builder().build();
final var sourceNode = internalCluster().startNode(settings);
final var indexName = randomIdentifier();
createIndex(indexName, 1, 0);
ensureGreen(indexName);
final var targetNode = internalCluster().startNode(settings);
final var sourceNodeID = getNodeId(sourceNode);
final var targetNodeID = getNodeId(targetNode);

NOT_PREFERRED_AND_THROTTLED_NODES.add(targetNodeID);

var mapNodeIdsToNames = nodeIdsToNames();
final var sourceNodeName = mapNodeIdsToNames.get(sourceNodeID);
final var targetNodeName = mapNodeIdsToNames.get(targetNodeID);
assertNotNull(sourceNodeName);
assertNotNull(targetNodeName);

logger.info("--> Verifying the shard did not move because allocation to the target node (ID: " + targetNodeID + ") is throttled.");
MockLog.awaitLogger(() -> {
logger.info(
"--> Excluding shard assignment to node "
+ sourceNodeName
+ "(ID: "
+ sourceNodeID
+ ") to force its shard to move to node "
+ targetNodeName
+ "(ID: "
+ targetNodeID
+ ")"
);
updateClusterSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", sourceNodeName));
},
DesiredBalanceReconciler.class,
new MockLog.SeenEventExpectation(
"unable to relocate shard that can no longer remain",
DesiredBalanceReconciler.class.getCanonicalName(),
Level.TRACE,
"Cannot move shard * and cannot remain because of [NO()]"
)
);

logger.info("--> Clearing THROTTLE decider response and prodding the Reconciler (with a reroute request) to try again");
clearThrottleAndNotPreferredNodes();

safeGet(
client().execute(TransportClusterRerouteAction.TYPE, new ClusterRerouteRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))
);

safeAwait(
ClusterServiceUtils.addMasterTemporaryStateListener(
state -> state.routingTable(ProjectId.DEFAULT)
.index(indexName)
.allShards()
.flatMap(IndexShardRoutingTable::allShards)
.allMatch(shardRouting -> shardRouting.currentNodeId().equals(targetNodeID) && shardRouting.started())
)
);
}

public static class NotPreferredAndThrottledPlugin extends Plugin implements ClusterPlugin {
@Override
public Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) {

return List.of(new AllocationDecider() {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return NOT_PREFERRED_AND_THROTTLED_NODES.contains(node.nodeId()) ? Decision.NOT_PREFERRED : Decision.YES;
}
}, new AllocationDecider() {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
// THROTTLING is not returned in simulation, so only THROTTLE for real moves in the Reconciler.
return (NOT_PREFERRED_AND_THROTTLED_NODES.contains(node.nodeId()) && allocation.isSimulating() == false)
? Decision.THROTTLE
: Decision.YES;
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -530,12 +530,12 @@ private MoveDecision explainRebalanceDecision(final ProjectIndex index, final Sh
// if the simulated weight delta with the shard moved away is better than the weight delta
// with the shard remaining on the current node, and we are allowed to allocate to the
// node in question, then allow the rebalance
if (rebalanceConditionsMet && canAllocate.type().higherThan(bestRebalanceCanAllocateDecisionType)) {
if (rebalanceConditionsMet && canAllocate.type().compareToBetweenNodes(bestRebalanceCanAllocateDecisionType) > 0) {
// Overwrite the best decision since it is better than the last. This means that YES/THROTTLE decisions will replace
// NOT_PREFERRED/NO decisions, and a YES decision will replace a THROTTLE decision. NOT_PREFERRED will also replace
// NO, even if neither are acted upon for rebalancing, for allocation explain purposes.
bestRebalanceCanAllocateDecisionType = canAllocate.type();
if (canAllocate.type().higherThan(Type.NOT_PREFERRED)) {
if (canAllocate.type().compareToBetweenNodes(Type.NOT_PREFERRED) > 0) {
// Movement is only allowed to THROTTLE/YES nodes. NOT_PREFERRED is the same as no for rebalancing, since
// rebalancing aims to distribute resource usage and NOT_PREFERRED means the move could cause hot-spots.
targetNode = node;
Expand Down Expand Up @@ -1053,7 +1053,7 @@ private MoveDecision decideMove(
// Relocating a shard from one NOT_PREFERRED node to another NOT_PREFERRED node would not improve the situation.
continue;
}
if (allocationDecision.type().higherThan(bestDecision)) {
if (allocationDecision.type().compareToBetweenNodes(bestDecision) > 0) {
bestDecision = allocationDecision.type();
if (bestDecision == Type.YES) {
targetNode = target;
Expand Down Expand Up @@ -1570,12 +1570,16 @@ private boolean tryRelocateShard(ModelNode minNode, ModelNode maxNode, ProjectIn
continue;
}

final Decision.Type canAllocateOrRebalance = Decision.Type.min(allocationDecision.type(), rebalanceDecision.type());
assert rebalanceDecision.type() == Type.YES || rebalanceDecision.type() == Type.THROTTLE
: "We should only see YES/THROTTLE decisions here";
assert allocationDecision.type() == Type.YES || allocationDecision.type() == Type.THROTTLE
: "We should only see YES/THROTTLE decisions here";
final Decision.Type canAllocateOrRebalance = allocationDecision.type() == Type.THROTTLE
|| rebalanceDecision.type() == Type.THROTTLE ? Type.THROTTLE : Type.YES;
Comment on lines +1577 to +1578
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we capture this as a method on Type? Or should this use compareToBetweenDecisions since it's technically still decision aggreation for a single node?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer not to use compareToBetweenDecisions because it sort-of implies that the order used by that method is significant, here we are dealing only with THROTTLE and YES so I think it's good to make it clear that it only cares about that.

I think we could add it to Type but perhaps as a subsequent PR to prevent holding things up any further.


maxNode.removeShard(projectIndex(shard), shard);
long shardSize = allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);

assert canAllocateOrRebalance == Type.YES || canAllocateOrRebalance == Type.THROTTLE : canAllocateOrRebalance;
logger.debug(
"decision [{}]: relocate [{}] from [{}] to [{}]",
canAllocateOrRebalance,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,14 @@ private void moveShards() {
iterator.dePrioritizeNode(shardRouting.currentNodeId());
moveOrdering.recordAllocation(shardRouting.currentNodeId());
movedUndesiredShard = true;
} else {
logger.trace(
"Cannot move shard [{}][{}] away from {}, and cannot remain because of [{}]",
shardRouting.index(),
shardRouting.shardId(),
shardRouting.currentNodeId(),
canRemainDecision
);
}
} finally {
if (movedUndesiredShard) {
Expand Down
Loading