Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -705,12 +705,16 @@ private boolean balanceByWeights(NodeSorter sorter) {
highIdx = relevantNodes - 1;

if (routingNodes.getRelocatingShardCount() > 0) {
// ES-12955: Check routingNodes.getRelocatingShardCount() > 0 in case the first relocation is a THROTTLE.
// This should rarely happen since in most cases, we don't throttle unless there is an existing relocation.
// But it can happen in production for frozen indices when the cache is still being prepared. It can also
// happen in tests because we have decider like RandomAllocationDecider that can randomly return THROTTLE
// when there is no existing relocation.
// Check routingNodes.getRelocatingShardCount() > 0 in case the first relocation is a THROTTLE.
shardBalanced = true;
} else {
// A THROTTLE decision can happen when not simulating
assert allocation.isSimulating() == false
: "unexpected THROTTLE decision (simulation="
+ allocation.isSimulating()
+ ") when balancing index ["
+ index
+ "]";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seeing the assert in a conditional branch makes this seem like certain paths are okay for THROTTLE. Suppose we were in a simulation and somehow got more than 1 relocating shard at once: that would be unsafe, right? Is there any reason we can't safely always have the assert in place? Like,

assert allocation.isSimulating() == false || routingNodes.getRelocatingShardCount() <= 1 : "....";
if (routingNodes.getRelocatingShardCount() > 0) {
    shardBalanced = true;
}
if (completeEarlyOnShardAssignmentChange && shardBalanced) {
....

}
if (completeEarlyOnShardAssignmentChange && shardBalanced) {
return true;
Expand Down Expand Up @@ -835,6 +839,18 @@ public boolean moveShards() {
} else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) {
logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
}

// A THROTTLE allocation decision can happen when not simulating
assert moveDecision.getAllocationDecision() != AllocationDecision.THROTTLED || allocation.isSimulating() == false
Copy link
Contributor

@DiannaHohensee DiannaHohensee Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be done right after the decideMove call? There's an early return path above on which we'd miss this assert.

: "unexpected allocation decision ["
+ moveDecision.getAllocationDecision()
+ "] (simulation="
+ allocation.isSimulating()
+ ") with "
+ (shardMoved ? "" : "no ")
+ "prior shard movements when moving shard ["
+ shardRouting
+ "]";
}

// If we get here, attempt to move one of the best not-preferred shards that we identified earlier
Expand Down Expand Up @@ -1268,9 +1284,15 @@ private boolean allocateUnassigned() {
assert allocationDecision.getAllocationStatus() == AllocationStatus.DECIDERS_THROTTLED;
final long shardSize = getExpectedShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, allocation);
minNode.addShard(projectIndex(shard), shard.initialize(minNode.getNodeId(), null, shardSize));
// If we see a throttle decision in simulation, there must be other shards that got assigned before it.
// If we see a THROTTLE decision, it's either:
// 1. Not simulating
// 2. Or, there is shard assigned before this one
assert allocation.isSimulating() == false || shardAssignmentChanged
: "shard " + shard + " was throttled but no other shards were assigned";
: "unexpected THROTTLE decision (simulation="
+ allocation.isSimulating()
+ ") with no prior assignment when allocating unassigned shard ["
+ shard
+ "]";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than putting this in a conditional, can we assert that either isSimulating==false || decision is not throttle up on line 1272? Or better yet, right after the decideAllocateUnassigned call, outside another layer of if-else condition?

} else {
if (logger.isTraceEnabled()) {
logger.trace("No Node found to assign shard [{}]", shard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca
public Decision canRebalance(RoutingAllocation allocation) {
int relocatingShards = allocation.routingNodes().getRelocatingShardCount();
if (allocation.isSimulating() && relocatingShards >= 2) {
// This branch should no longer run after https://github.com/elastic/elasticsearch/pull/134786
assert false : "allocation simulation should have returned earlier and not hit throttling";
// BalancedShardAllocator is prone to perform unnecessary moves when cluster_concurrent_rebalance is set to high values (>2).
// (See https://github.com/elastic/elasticsearch/issues/87279)
// Above allocator is used in DesiredBalanceComputer. Since we do not move actual shard data during calculation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,29 +206,33 @@ public RandomAllocationDecider(Random random) {

@Override
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
return getRandomDecision();
return getRandomDecision(allocation.isSimulating() == false || allocation.routingNodes().getRelocatingShardCount() > 0);
}

private Decision getRandomDecision() {
private Decision getRandomDecision(boolean canThrottle) {
if (alwaysSayYes) {
return Decision.YES;
}
return switch (random.nextInt(10)) {
case 9, 8, 7, 6, 5 -> Decision.NO;
case 4 -> Decision.THROTTLE;
case 4 -> canThrottle ? Decision.THROTTLE : Decision.YES;
case 3, 2, 1 -> Decision.YES;
default -> Decision.ALWAYS;
};
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return getRandomDecision();
return getRandomDecision(
allocation.isSimulating() == false
|| allocation.routingNodes().getIncomingRecoveries(node.nodeId()) > 0
|| allocation.routingNodes().getOutgoingRecoveries(node.nodeId()) > 0
);
}

@Override
public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return getRandomDecision();
return getRandomDecision(false); // throttle does not make sense for canRemain
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xpack.searchablesnapshots.cache.shared.FrozenCacheInfoService;

import static org.elasticsearch.blobcache.shared.SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING;
Expand All @@ -28,6 +29,12 @@ public class HasFrozenCacheAllocationDecider extends AllocationDecider {
"value of [" + SHARED_CACHE_SIZE_SETTING.getKey() + "] on this node is not known yet"
);

private static final Decision NO_STILL_FETCHING = Decision.single(
Decision.Type.NO,
NAME,
"shard movement is not allowed when value of [" + SHARED_CACHE_SIZE_SETTING.getKey() + "] on this node is not known yet"
);

private static final Decision HAS_FROZEN_CACHE = Decision.single(
Decision.Type.YES,
NAME,
Expand Down Expand Up @@ -56,25 +63,30 @@ public HasFrozenCacheAllocationDecider(FrozenCacheInfoService frozenCacheService

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return canAllocateToNode(allocation.metadata().indexMetadata(shardRouting.index()), node.node());
return canAllocateToNode(allocation.metadata().indexMetadata(shardRouting.index()), shardRouting, node.node(), allocation);
}

@Override
public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return canAllocateToNode(indexMetadata, node.node());
return canAllocateToNode(indexMetadata, shardRouting, node.node(), allocation);
}

@Override
public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, RoutingAllocation allocation) {
return canAllocateToNode(indexMetadata, node.node());
return canAllocateToNode(indexMetadata, null, node.node(), allocation);
}

@Override
public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) {
return canAllocateToNode(indexMetadata, node);
return canAllocateToNode(indexMetadata, null, node, allocation);
}

private Decision canAllocateToNode(IndexMetadata indexMetadata, DiscoveryNode discoveryNode) {
private Decision canAllocateToNode(
IndexMetadata indexMetadata,
@Nullable ShardRouting shardRouting,
DiscoveryNode discoveryNode,
RoutingAllocation allocation
) {
if (indexMetadata.isPartialSearchableSnapshot() == false) {
return Decision.ALWAYS;
}
Expand All @@ -83,7 +95,21 @@ private Decision canAllocateToNode(IndexMetadata indexMetadata, DiscoveryNode di
case HAS_CACHE -> HAS_FROZEN_CACHE;
case NO_CACHE -> NO_FROZEN_CACHE;
case FAILED -> UNKNOWN_FROZEN_CACHE;
default -> STILL_FETCHING;
// THROTTLE is an odd choice here. In all other places, it means YES but not right now. But here it can eventually
// turn into a NO if the node responds that it has no frozen cache. Since BalancedShardAllocator effectively handles
// THROTTLE as a rejection, it is simpler that we just return a NO here for shard level decisions. For BWC, we keep
// THROTTLE for unassigned shards (throttle can happen in other ways) and index level decisions (to not exclude nodes
// too early in the computation process).
// TODO: We can consider dropping them for all simulation cases in a future change.
default -> {
if (allocation.isSimulating() == false) { // not simulating, i.e. legacy allocator
yield STILL_FETCHING;
}
if (shardRouting == null || shardRouting.unassigned()) { // either index level decision or unassigned shard
yield STILL_FETCHING;
}
yield NO_STILL_FETCHING; // simply reject moving and balancing shards in simulation
}
};
}

Expand Down