Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca
for (AllocationDecider allocationDecider : allocations) {
Decision decision = allocationDecider.canRebalance(shardRouting, allocation);
// short track if a NO is returned.
if (decision == Decision.NO) {
if (decision.type() == Decision.Type.NO) {
if (!allocation.debugDecision()) {
return decision;
return Decision.NO;
} else {
ret.add(decision);
}
Expand All @@ -72,14 +72,14 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
for (AllocationDecider allocationDecider : allocations) {
Decision decision = allocationDecider.canAllocate(shardRouting, node, allocation);
// short track if a NO is returned.
if (decision == Decision.NO) {
if (decision.type() == Decision.Type.NO) {
if (logger.isTraceEnabled()) {
logger.trace("Can not allocate [{}] on node [{}] due to [{}]",
shardRouting, node.node(), allocationDecider.getClass().getSimpleName());
}
// short circuit only if debugging is not enabled
if (!allocation.debugDecision()) {
return decision;
return Decision.NO;
} else {
ret.add(decision);
}
Expand All @@ -102,13 +102,13 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
for (AllocationDecider allocationDecider : allocations) {
Decision decision = allocationDecider.canRemain(shardRouting, node, allocation);
// short track if a NO is returned.
if (decision == Decision.NO) {
if (decision.type() == Decision.Type.NO) {
if (logger.isTraceEnabled()) {
logger.trace("Shard [{}] can not remain on node [{}] due to [{}]",
shardRouting, node.nodeId(), allocationDecider.getClass().getSimpleName());
}
if (!allocation.debugDecision()) {
return decision;
return Decision.NO;
} else {
ret.add(decision);
}
Expand All @@ -125,9 +125,9 @@ public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, Routi
for (AllocationDecider allocationDecider : allocations) {
Decision decision = allocationDecider.canAllocate(indexMetadata, node, allocation);
// short track if a NO is returned.
if (decision == Decision.NO) {
if (decision.type() == Decision.Type.NO) {
if (!allocation.debugDecision()) {
return decision;
return Decision.NO;
} else {
ret.add(decision);
}
Expand All @@ -144,9 +144,9 @@ public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNod
for (AllocationDecider allocationDecider : allocations) {
Decision decision = allocationDecider.shouldAutoExpandToNode(indexMetadata, node, allocation);
// short track if a NO is returned.
if (decision == Decision.NO) {
if (decision.type() == Decision.Type.NO) {
if (!allocation.debugDecision()) {
return decision;
return Decision.NO;
} else {
ret.add(decision);
}
Expand All @@ -163,9 +163,9 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat
for (AllocationDecider allocationDecider : allocations) {
Decision decision = allocationDecider.canAllocate(shardRouting, allocation);
// short track if a NO is returned.
if (decision == Decision.NO) {
if (decision.type() == Decision.Type.NO) {
if (!allocation.debugDecision()) {
return decision;
return Decision.NO;
} else {
ret.add(decision);
}
Expand All @@ -182,9 +182,9 @@ public Decision canRebalance(RoutingAllocation allocation) {
for (AllocationDecider allocationDecider : allocations) {
Decision decision = allocationDecider.canRebalance(allocation);
// short track if a NO is returned.
if (decision == Decision.NO) {
if (decision.type() == Decision.Type.NO) {
if (!allocation.debugDecision()) {
return decision;
return Decision.NO;
} else {
ret.add(decision);
}
Expand All @@ -206,13 +206,13 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n
for (AllocationDecider decider : allocations) {
Decision decision = decider.canForceAllocatePrimary(shardRouting, node, allocation);
// short track if a NO is returned.
if (decision == Decision.NO) {
if (decision.type() == Decision.Type.NO) {
if (logger.isTraceEnabled()) {
logger.trace("Shard [{}] can not be forcefully allocated to node [{}] due to [{}].",
shardRouting.shardId(), node.nodeId(), decider.getClass().getSimpleName());
}
if (!allocation.debugDecision()) {
return decision;
return Decision.NO;
} else {
ret.add(decision);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,22 +125,25 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
return underCapacity(shardRouting, node, allocation, false);
}

private static final Decision YES_NOT_ENABLED = Decision.single(Decision.Type.YES, NAME,
"allocation awareness is not enabled, set cluster setting ["
+ CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey() + "] to enable it");

private static final Decision YES_ALL_MET =
Decision.single(Decision.Type.YES, NAME, "node meets all awareness attribute requirements");

private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) {
if (awarenessAttributes.isEmpty()) {
return allocation.decision(Decision.YES, NAME,
"allocation awareness is not enabled, set cluster setting [%s] to enable it",
CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey());
return YES_NOT_ENABLED;
}

final boolean debug = allocation.debugDecision();
IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());
int shardCount = indexMetadata.getNumberOfReplicas() + 1; // 1 for primary
for (String awarenessAttribute : awarenessAttributes) {
// the node the shard exists on must be associated with an awareness attribute
if (node.node().getAttributes().containsKey(awarenessAttribute) == false) {
return allocation.decision(Decision.NO, NAME,
"node does not contain the awareness attribute [%s]; required attributes cluster setting [%s=%s]",
awarenessAttribute, CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(),
allocation.debugDecision() ? Strings.collectionToCommaDelimitedString(awarenessAttributes) : null);
return debug ? debugNoMissingAttribute(awarenessAttribute, awarenessAttributes) : Decision.NO;
}

// build attr_value -> nodes map
Expand Down Expand Up @@ -185,18 +188,31 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout
final int currentNodeCount = shardPerAttribute.get(node.node().getAttributes().get(awarenessAttribute));
final int maximumNodeCount = (shardCount + numberOfAttributes - 1) / numberOfAttributes; // ceil(shardCount/numberOfAttributes)
if (currentNodeCount > maximumNodeCount) {
return allocation.decision(Decision.NO, NAME,
"there are too many copies of the shard allocated to nodes with attribute [%s], there are [%d] total configured " +
"shard copies for this shard id and [%d] total attribute values, expected the allocated shard count per " +
"attribute [%d] to be less than or equal to the upper bound of the required number of shards per attribute [%d]",
awarenessAttribute,
shardCount,
numberOfAttributes,
currentNodeCount,
maximumNodeCount);
return debug ? debugNoTooManyCopies(shardCount, awarenessAttribute, numberOfAttributes, currentNodeCount, maximumNodeCount)
: Decision.NO;
}
}

return allocation.decision(Decision.YES, NAME, "node meets all awareness attribute requirements");
return YES_ALL_MET;
}

private static Decision debugNoTooManyCopies(int shardCount, String awarenessAttribute, int numberOfAttributes, int currentNodeCount,
int maximumNodeCount) {
return Decision.single(Decision.Type.NO, NAME,
"there are too many copies of the shard allocated to nodes with attribute [%s], there are [%d] total configured " +
"shard copies for this shard id and [%d] total attribute values, expected the allocated shard count per " +
"attribute [%d] to be less than or equal to the upper bound of the required number of shards per attribute [%d]",
awarenessAttribute,
shardCount,
numberOfAttributes,
currentNodeCount,
maximumNodeCount);
}

private static Decision debugNoMissingAttribute(String awarenessAttribute, List<String> awarenessAttributes) {
return Decision.single(Decision.Type.NO, NAME,
"node does not contain the awareness attribute [%s]; required attributes cluster setting [%s=%s]", awarenessAttribute,
CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(),
Strings.collectionToCommaDelimitedString(awarenessAttributes));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -109,40 +110,55 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca
return canRebalance(allocation);
}

private static final Decision YES_ALL_PRIMARIES_ACTIVE = Decision.single(Decision.Type.YES, NAME, "all primary shards are active");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nice to add Decision.constant that still uses Decision.Single but avoids the trap of being able to specify parameters (or eagerly resolves the string if anyone do specify them).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'm starting to wonder how much point there even is in making the String creation in the existing Decision.single lazy? The memory savings probably aren't that massive they only affect debug anyway?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++, seems just resolving this early is not a big deal. It will resolve it anyway in both equals, hashCode and streaming write.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect :) Made it eager serialize now, also makes the Decision object immutable in general :)


private static final Decision YES_ALL_SHARDS_ACTIVE = Decision.single(Decision.Type.YES, NAME, "all shards are active");

private static final Decision NO_UNASSIGNED_PRIMARIES = Decision.single(Decision.Type.NO, NAME,
"the cluster has unassigned primary shards and cluster setting ["
+ CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE + "] is set to [" + ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE + "]");

private static final Decision NO_INACTIVE_PRIMARIES = Decision.single(Decision.Type.NO, NAME,
"the cluster has inactive primary shards and cluster setting [" + CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE +
"] is set to [" + ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE + "]");

private static final Decision NO_UNASSIGNED_SHARDS = Decision.single(Decision.Type.NO, NAME,
"the cluster has unassigned shards and cluster setting [" + CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE +
"] is set to [" + ClusterRebalanceType.INDICES_ALL_ACTIVE + "]");

private static final Decision NO_INACTIVE_SHARDS = Decision.single(Decision.Type.NO, NAME,
"the cluster has inactive shards and cluster setting [" + CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE +
"] is set to [" + ClusterRebalanceType.INDICES_ALL_ACTIVE + "]");

@SuppressWarnings("fallthrough")
@Override
public Decision canRebalance(RoutingAllocation allocation) {
if (type == ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE) {
// check if there are unassigned primaries.
if ( allocation.routingNodes().hasUnassignedPrimaries() ) {
return allocation.decision(Decision.NO, NAME,
"the cluster has unassigned primary shards and cluster setting [%s] is set to [%s]",
CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, type);
}
// check if there are initializing primaries that don't have a relocatingNodeId entry.
if ( allocation.routingNodes().hasInactivePrimaries() ) {
return allocation.decision(Decision.NO, NAME,
"the cluster has inactive primary shards and cluster setting [%s] is set to [%s]",
CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, type);
}

return allocation.decision(Decision.YES, NAME, "all primary shards are active");
}
if (type == ClusterRebalanceType.INDICES_ALL_ACTIVE) {
// check if there are unassigned shards.
if (allocation.routingNodes().hasUnassignedShards() ) {
return allocation.decision(Decision.NO, NAME,
"the cluster has unassigned shards and cluster setting [%s] is set to [%s]",
CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, type);
}
// in case all indices are assigned, are there initializing shards which
// are not relocating?
if ( allocation.routingNodes().hasInactiveShards() ) {
return allocation.decision(Decision.NO, NAME,
"the cluster has inactive shards and cluster setting [%s] is set to [%s]",
CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, type);
}
final RoutingNodes routingNodes = allocation.routingNodes();
switch (type) {
case INDICES_PRIMARIES_ACTIVE:
// check if there are unassigned primaries.
if (routingNodes.hasUnassignedPrimaries()) {
return NO_UNASSIGNED_PRIMARIES;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In AllocationDeciders we choose to early terminate no decisions, but only if the object is Decision.NO. I think we need to change that to check the underlying type if we return constants here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed by checking decision type :) thanks for spotting this!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can add a test that the early termination works in AllocationDeciders? At least one specific test with one specific example if making something that randomly exercises all decider NO decisions is too complicated.

}
// check if there are initializing primaries that don't have a relocatingNodeId entry.
if (routingNodes.hasInactivePrimaries()) {
return NO_INACTIVE_PRIMARIES;
}
return YES_ALL_PRIMARIES_ACTIVE;
case INDICES_ALL_ACTIVE:
// check if there are unassigned shards.
if (routingNodes.hasUnassignedShards()) {
return NO_UNASSIGNED_SHARDS;
}
// in case all indices are assigned, are there initializing shards which
// are not relocating?
if (routingNodes.hasInactiveShards()) {
return NO_INACTIVE_SHARDS;
}
// fall-through
default:
// all shards active from above or type == Type.ALWAYS
return YES_ALL_SHARDS_ACTIVE;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add

// fall-through

to signal that fall through is intended.

// type == Type.ALWAYS
return allocation.decision(Decision.YES, NAME, "all shards are active");
}
}
Loading