Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@
package org.elasticsearch.cluster.routing.allocation.decider;

import org.apache.logging.log4j.Level;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest;
import org.elasticsearch.action.admin.cluster.allocation.DesiredBalanceRequest;
import org.elasticsearch.action.admin.cluster.allocation.DesiredBalanceResponse;
import org.elasticsearch.action.admin.cluster.allocation.TransportClusterAllocationExplainAction;
import org.elasticsearch.action.admin.cluster.allocation.TransportGetDesiredBalanceAction;
import org.elasticsearch.action.admin.cluster.node.usage.NodeUsageStatsForThreadPoolsAction;
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction;
Expand All @@ -29,14 +27,11 @@
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.AllocationDecision;
import org.elasticsearch.cluster.routing.allocation.Explanations;
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -269,55 +264,13 @@ public void testShardsAreAssignedToNotPreferredWhenAlternativeIsNo() {
}
}

@TestLogging(
reason = "track when reconciliation has completed",
value = "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator:DEBUG"
)
public void testAllocationExplainMoveShardNotPreferred() {
TestHarness harness = setUpThreeTestNodesAndAllIndexShardsOnFirstNode();

// Running the {@link #runCanRemainNotPreferredIsIgnoredWhenAllOtherNodesReturnNotPreferred} logic should set up a cluster where
// shards are all allocated to a {@link AllocationDeciders#canRemain} {@link Decision#NOT_PREFERRED} node, while the other nodes
// return {@link AllocationDeciders#canAllocate} {@link Decision#NOT_PREFERRED} responses. This should exercise NOT_PREFERRED in
// the allocation/explain paths for remaining on a node AND assignment to other nodes.
runCanRemainNotPreferredIsIgnoredWhenAllOtherNodesReturnNotPreferred(harness);
int numDataNodes = internalCluster().numDataNodes();
assertThat(
"test requires at least two nodes, one node for canRemain explanation, one for canAllocation explanation",
numDataNodes,
greaterThanOrEqualTo(2)
);

ClusterAllocationExplainRequest allocationExplainRequest = new ClusterAllocationExplainRequest(TEST_REQUEST_TIMEOUT).setIndex(
harness.indexName
).setShard(0).setPrimary(true);
var allocationExplainResponse = safeGet(client().execute(TransportClusterAllocationExplainAction.TYPE, allocationExplainRequest));
logger.info("---> Allocation explain response: " + Strings.toString(allocationExplainResponse.getExplanation(), true, true));

var decision = allocationExplainResponse.getExplanation().getShardAllocationDecision().getMoveDecision();
assertThat("Rebalancing should be disabled", decision.canRebalanceCluster(), equalTo(false));
assertThat(decision.getCanRemainDecision().type(), equalTo(Decision.NOT_PREFERRED.type()));
assertNull(decision.getTargetNode());
assertThat(decision.getAllocationDecision(), equalTo(AllocationDecision.NOT_PREFERRED));

var canAllocateDecisions = allocationExplainResponse.getExplanation()
.getShardAllocationDecision()
.getMoveDecision()
.getNodeDecisions();
assertThat(canAllocateDecisions.size(), equalTo(/* number of nodes to which the shard can be relocated = */ numDataNodes - 1));
canAllocateDecisions.forEach(nodeDecision -> assertThat(nodeDecision.getNodeDecision(), equalTo(AllocationDecision.NOT_PREFERRED)));
}

@TestLogging(
reason = "track when reconciliation has completed",
value = "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator:DEBUG"
)
public void testCanRemainNotPreferredIsIgnoredWhenAllOtherNodesReturnNotPreferred() {
TestHarness harness = setUpThreeTestNodesAndAllIndexShardsOnFirstNode();
runCanRemainNotPreferredIsIgnoredWhenAllOtherNodesReturnNotPreferred(harness);
}

private void runCanRemainNotPreferredIsIgnoredWhenAllOtherNodesReturnNotPreferred(TestHarness harness) {
/**
* Override the {@link TransportNodeUsageStatsForThreadPoolsAction} action on the data nodes to supply artificial thread pool write
* load stats. The stats will show all the nodes above the high utilization threshold, so they do not accept new shards, while the
Expand Down Expand Up @@ -502,80 +455,6 @@ public void testCanRemainRelocatesOneShardWhenAHotSpotOccurs() {
}
}

public void testAllocationExplainRebalancingNotPreferred() {
var harness = setUpThreeTestNodesAndAllIndexShardsOnFirstNode();

// Set up all data nodes to report write thread pool usage above the utilization threshold, to stop canAllocate, but no write thread
// pool queuing to force a shard relocation. This will leave all the data nodes unable to accept new shards when rebalancing
// attempts to redistribute shards more evenly than all shards on a single node.

final NodeUsageStatsForThreadPools firstNodeAboveUtilizationThresholdNodeStats = createNodeUsageStatsForThreadPools(
harness.firstDiscoveryNode,
harness.randomNumberOfWritePoolThreads,
randomIntBetween(harness.randomUtilizationThresholdPercent, 100) / 100f,
0
);
final NodeUsageStatsForThreadPools secondNodeAboveUtilizationThresholdNodeStats = createNodeUsageStatsForThreadPools(
harness.secondDiscoveryNode,
harness.randomNumberOfWritePoolThreads,
randomIntBetween(harness.randomUtilizationThresholdPercent, 100) / 100f,
0
);
final NodeUsageStatsForThreadPools thirdNodeAboveUtilizationThresholdNodeStats = createNodeUsageStatsForThreadPools(
harness.thirdDiscoveryNode,
harness.randomNumberOfWritePoolThreads,
randomIntBetween(harness.randomUtilizationThresholdPercent, 100) / 100f,
0
);
setUpMockTransportNodeUsageStatsResponse(harness.firstDiscoveryNode, firstNodeAboveUtilizationThresholdNodeStats);
setUpMockTransportNodeUsageStatsResponse(harness.secondDiscoveryNode, secondNodeAboveUtilizationThresholdNodeStats);
setUpMockTransportNodeUsageStatsResponse(harness.thirdDiscoveryNode, thirdNodeAboveUtilizationThresholdNodeStats);

// Override the {@link TransportIndicesStatsAction} action on the data nodes to supply artificial shard write load stats. The stats
// will show that all shards have non-empty write load stats (so that the WriteLoadDecider will evaluate assigning them to a node).
final ClusterState originalClusterState = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state();
final IndexMetadata indexMetadata = originalClusterState.getMetadata().getProject().index(harness.indexName);
setUpMockTransportIndicesStatsResponse(
harness.firstDiscoveryNode,
indexMetadata.getNumberOfShards(),
createShardStatsResponseForIndex(indexMetadata, harness.maxShardWriteLoad, harness.firstDataNodeId)
);
setUpMockTransportIndicesStatsResponse(harness.secondDiscoveryNode, 0, List.of());
setUpMockTransportIndicesStatsResponse(harness.thirdDiscoveryNode, 0, List.of());

logger.info("---> Refreshing the cluster info to pull in the dummy thread pool stats from the data nodes");
refreshClusterInfo();

// Allow rebalancing and clear the exclusion setting that holds the shards on a single node.
updateClusterSettings(
Settings.builder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.ALL)
.putNull("cluster.routing.allocation.exclude._name")
);

logger.info("---> Rebalancing is now allowed");

ClusterAllocationExplainRequest allocationExplainRequest = new ClusterAllocationExplainRequest(TEST_REQUEST_TIMEOUT).setIndex(
harness.indexName
).setShard(0).setPrimary(true);
var allocationExplainResponse = safeGet(client().execute(TransportClusterAllocationExplainAction.TYPE, allocationExplainRequest));
logger.info("---> Allocation explain response: " + Strings.toString(allocationExplainResponse.getExplanation(), true, true));

var decision = allocationExplainResponse.getExplanation().getShardAllocationDecision().getMoveDecision();
assertThat("Rebalancing should be enabled", decision.canRebalanceCluster(), equalTo(true));
assertThat(decision.getCanRemainDecision().type(), equalTo(Decision.YES.type()));
assertNull(decision.getTargetNode());
assertThat(decision.getAllocationDecision(), equalTo(AllocationDecision.NOT_PREFERRED));
assertThat(decision.getExplanation(), equalTo(Explanations.Rebalance.NOT_PREFERRED));

var canAllocateDecisions = allocationExplainResponse.getExplanation()
.getShardAllocationDecision()
.getMoveDecision()
.getNodeDecisions();
assertThat(canAllocateDecisions.size(), equalTo(2));
canAllocateDecisions.forEach(nodeDecision -> assertThat(nodeDecision.getNodeDecision(), equalTo(AllocationDecision.NOT_PREFERRED)));
}

/**
* Determine which shard was moved and check that it's the "best" according to
* {@link org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer.PrioritiseByShardWriteLoadComparator}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ public static AllocationDecision fromAllocationStatus(AllocationStatus allocatio
*/
public static AllocationDecision fromDecisionType(Decision.Type type) {
return switch (type) {
case YES -> YES;
case NOT_PREFERRED -> NOT_PREFERRED;
// TODO: this
case YES, NOT_PREFERRED -> YES;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From here.

The TODO comment isn't right, but presumably this is temporary.

case THROTTLE -> THROTTLED;
case NO -> NO;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,34 +118,15 @@ public static MoveDecision move(
) {
assert canRemainDecision != null;
assert canRemainDecision.type() != Type.YES : "create decision with MoveDecision#createRemainYesDecision instead";
assert decisionAndTargetAreConsistent(canRemainDecision, moveDecision, targetNode)
: "targetNode: " + targetNode + ", move decision: " + moveDecision;
if (nodeDecisions == null && moveDecision == AllocationDecision.NO) {
// the final decision is NO (no node to move the shard to) and we are not in explain mode, return a cached version
return CACHED_CANNOT_MOVE_DECISION;
} else {
assert ((targetNode == null) == (moveDecision != AllocationDecision.YES));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From here

return new MoveDecision(targetNode, nodeDecisions, moveDecision, canRemainDecision, null, 0);
}
}

private static boolean decisionAndTargetAreConsistent(
Decision canRemainDecision,
AllocationDecision moveDecision,
DiscoveryNode targetNode
) {
return switch (moveDecision) {
// YES must always have a target
case AllocationDecision.YES -> (targetNode != null);
case AllocationDecision.NOT_PREFERRED -> {
// If canRemain is not-preferred, then there should be no shard move, and thus no target node.
assert (canRemainDecision.type() == Type.NOT_PREFERRED) == (targetNode == null)
: "remain decision: " + canRemainDecision + ", target node: " + targetNode;
yield true;
}
default -> targetNode == null;
};
}

/**
* Creates a decision for whether to move the shard to a different node to form a better cluster balance.
*/
Expand Down Expand Up @@ -282,29 +263,23 @@ public String getExplanation() {
? Explanations.Rebalance.CANNOT_REBALANCE_CAN_ALLOCATE
: Explanations.Rebalance.CANNOT_REBALANCE_CANNOT_ALLOCATE;
case THROTTLE -> Explanations.Rebalance.CLUSTER_THROTTLE;
case YES -> {
case YES, NOT_PREFERRED -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From here

if (getTargetNode() != null) {
yield canMoveDecision == AllocationDecision.THROTTLED
? Explanations.Rebalance.NODE_THROTTLE
: Explanations.Rebalance.YES;
} else {
yield canMoveDecision == AllocationDecision.NOT_PREFERRED
? Explanations.Rebalance.NOT_PREFERRED
: Explanations.Rebalance.ALREADY_BALANCED;
yield Explanations.Rebalance.ALREADY_BALANCED;
}
}
case NOT_PREFERRED -> Explanations.Rebalance.NOT_PREFERRED;
};
} else {
// it was a decision by an allocation decider to move the shard
assert cannotRemain();
return switch (canMoveDecision) {
case YES -> canRemainNotPreferred() ? Explanations.Move.NOT_PREFERRED_TO_YES : Explanations.Move.YES;
case NOT_PREFERRED -> canRemainNotPreferred()
? Explanations.Move.NOT_PREFERRED_TO_NOT_PREFERRED
: Explanations.Move.NOT_PREFERRED;
case THROTTLED -> canRemainNotPreferred() ? Explanations.Move.NOT_PREFERRED_TO_THROTTLED : Explanations.Move.THROTTLED;
case NO -> canRemainNotPreferred() ? Explanations.Move.NOT_PREFERRED_TO_NO : Explanations.Move.NO;
case YES, NOT_PREFERRED -> Explanations.Move.YES;
case THROTTLED -> Explanations.Move.THROTTLED;
case NO -> Explanations.Move.NO;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From here

case WORSE_BALANCE, AWAITING_INFO, ALLOCATION_DELAYED, NO_VALID_SHARD_COPY, NO_ATTEMPT -> {
assert false : canMoveDecision;
yield canMoveDecision.toString();
Expand Down Expand Up @@ -341,13 +316,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
builder.field("can_rebalance_to_other_node", canMoveDecision);
builder.field("rebalance_explanation", getExplanation());
} else {
if (cannotRemainAndCanMove()) {
builder.field("can_move_to_other_node", "yes");
} else if (cannotRemainAndNotPreferredMove()) {
builder.field("can_move_to_other_node", "not-preferred");
} else {
builder.field("can_move_to_other_node", "no");
}
builder.field("can_move_to_other_node", cannotRemainAndCanMove() ? "yes" : "no");
builder.field("move_explanation", getExplanation());
}
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,15 +531,10 @@ private MoveDecision explainRebalanceDecision(final ProjectIndex index, final Sh
// with the shard remaining on the current node, and we are allowed to allocate to the
// node in question, then allow the rebalance
if (rebalanceConditionsMet && canAllocate.type().higherThan(bestRebalanceCanAllocateDecisionType)) {
// Overwrite the best decision since it is better than the last. This means that YES/THROTTLE decisions will replace
// NOT_PREFERRED/NO decisions, and a YES decision will replace a THROTTLE decision. NOT_PREFERRED will also replace
// NO, even if neither are acted upon for rebalancing, for allocation explain purposes.
// rebalance to the node, only will get overwritten if the decision here is to
// THROTTLE and we get a decision with YES on another node
bestRebalanceCanAllocateDecisionType = canAllocate.type();
if (canAllocate.type().higherThan(Type.NOT_PREFERRED)) {
// Movement is only allowed to THROTTLE/YES nodes. NOT_PREFERRED is the same as no for rebalancing, since
// rebalancing aims to distribute resource usage and NOT_PREFERRED means the move could cause hot-spots.
targetNode = node;
}
targetNode = node;
Comment on lines +534 to +537
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From here

}
}
Tuple<ModelNode, Decision> nodeResult = Tuple.tuple(node, canAllocate);
Expand Down Expand Up @@ -1047,12 +1042,10 @@ private MoveDecision decideMove(
nodeResults.add(new NodeAllocationResult(currentNode.getRoutingNode().node(), allocationDecision, ++weightRanking));
}
if (allocationDecision.type() == Type.NOT_PREFERRED && remainDecision.type() == Type.NOT_PREFERRED) {
// Whether or not a relocation target node can be found, it's important to explain the canAllocate response as
// NOT_PREFERRED, as opposed to NO.
bestDecision = Type.NOT_PREFERRED;
// Relocating a shard from one NOT_PREFERRED node to another NOT_PREFERRED node would not improve the situation.
// Relocating a shard from one NOT_PREFERRED node to another would not improve the situation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From here

continue;
}

if (allocationDecision.type().higherThan(bestDecision)) {
bestDecision = allocationDecision.type();
if (bestDecision == Type.YES) {
Expand All @@ -1064,12 +1057,8 @@ private MoveDecision decideMove(
}
} else if (bestDecision == Type.NOT_PREFERRED) {
assert remainDecision.type() != Type.NOT_PREFERRED;
// If we don't ever find a YES/THROTTLE decision, we'll settle for NOT_PREFERRED as preferable to NO.
// If we don't ever find a YES decision, we'll settle for NOT_PREFERRED as preferable to NO.
targetNode = target;
} else if (bestDecision == Type.THROTTLE) {
assert allocation.isSimulating() == false;
// THROTTLE is better than NOT_PREFERRED, we just need to wait for a YES.
targetNode = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From here

}
}
}
Expand Down
Loading