Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/140150.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 140150
summary: Allow allocation to replacement target node on vacate completion
area: Allocation
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,66 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
}

final SingleNodeShutdownMetadata shutdown = allocation.replacementTargetShutdowns().get(node.node().getName());
return allocation.decision(
Decision.NO,
NAME,
"node [%s] is replacing the vacating node [%s], only data currently allocated to the source node "
+ "may be allocated to it until the replacement is complete",
node.nodeId(),
shutdown == null ? null : shutdown.getNodeId(),
shardRouting.currentNodeId()
);
assert shutdown != null : node.node() + " is a replacement target but no shutdown metadata found";

final String sourceNodeId = shutdown.getNodeId();
final var sourceNode = allocation.routingNodes().node(sourceNodeId);
if (sourceNode != null) {
if (sourceNode.isEmpty() == false) {
return allocation.decision(
Decision.NO,
NAME,
"node [%s] is replacing the vacating node [%s], only data currently allocated to the source node "
+ "may be allocated to it until the replacement is complete",
node.nodeId(),
sourceNodeId
);
} else {
return allocation.decision(
Decision.YES,
NAME,
"node [%s] has completed replacing the vacating node [%s] and can receive shards from other sources",
node.nodeId(),
sourceNodeId
);
}
} else {
final boolean sourceNodeLeftAnyUnassignedShards = allocation.routingNodes()
.unassigned()
.stream()
.anyMatch(s -> sourceNodeId.equals(s.unassignedInfo().lastAllocatedNodeId()));

if (sourceNodeLeftAnyUnassignedShards) {
if (shardRouting.unassigned() && sourceNodeId.equals(shardRouting.unassignedInfo().lastAllocatedNodeId())) {
return allocation.decision(
Decision.YES,
NAME,
"the vacating node [%s] is no longer in the cluster and has left unassigned shards, "
+ "the replacing node [%s] can receive those shards",
sourceNodeId,
node.nodeId()
);
} else {
return allocation.decision(
Decision.NO,
NAME,
"the vacating node [%s] is no longer in the cluster and has left unassigned shards, "
+ "the replacing node [%s] can only receive those unassigned shards until the replacement is complete",
sourceNodeId,
node.nodeId()
);
}
} else {
return allocation.decision(
Decision.YES,
NAME,
"the vacating node [%s] is no longer in the cluster and has left no unassigned shards, "
+ "the replacing node [%s] can receive shards from other sources",
sourceNodeId,
node.nodeId()
);
}
}
} else {
return YES__NO_APPLICABLE_REPLACEMENTS;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
Expand All @@ -34,6 +35,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Set;

import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
Expand Down Expand Up @@ -149,8 +151,22 @@ public void testCannotRemainOnReplacedNode() {

public void testCanAllocateToNeitherSourceNorTarget() {
ClusterState state = prepareState(NODE_A.getId(), NODE_B.getName());
// Source node still has a shard to vacate
state = ClusterState.builder(state)
.putRoutingTable(
ProjectId.DEFAULT,
RoutingTable.builder()
.add(
IndexRoutingTable.builder(indexMetadata.getIndex())
.addShard(newShardRouting(new ShardId(indexMetadata.getIndex(), 0), NODE_A.getId(), true, STARTED))
.build()
)
.build()
)
.build();

RoutingAllocation allocation = createRoutingAllocation(state);
RoutingNode routingNode = RoutingNodesHelper.routingNode(NODE_A.getId(), NODE_A, shard);
RoutingNode routingNode = allocation.routingNodes().node(NODE_A.getId());

ShardRouting testShard = this.shard;
if (randomBoolean()) {
Expand All @@ -163,7 +179,7 @@ public void testCanAllocateToNeitherSourceNorTarget() {
"node [" + NODE_A.getId() + "] is being replaced by [" + NODE_B.getName() + "], so no data may be allocated to it"
);

routingNode = RoutingNodesHelper.routingNode(NODE_B.getId(), NODE_B, testShard);
routingNode = allocation.routingNodes().node(NODE_B.getId());

assertThatDecision(
decider.canAllocate(testShard, routingNode, allocation),
Expand All @@ -184,6 +200,128 @@ public void testCanAllocateToNeitherSourceNorTarget() {
);
}

public void testCanAllocateToTargetWhenSourceFinishesVacate() {
ClusterState state = prepareState(NODE_A.getId(), NODE_B.getName());
// Randomly assigne a shard on NODE_C
if (randomBoolean()) {
state = ClusterState.builder(state)
.putRoutingTable(
ProjectId.DEFAULT,
RoutingTable.builder()
.add(
IndexRoutingTable.builder(indexMetadata.getIndex())
.addShard(newShardRouting(new ShardId(indexMetadata.getIndex(), 0), NODE_C.getId(), true, STARTED))
.build()
)
.build()
)
.build();
}

RoutingAllocation allocation = createRoutingAllocation(state);
RoutingNode routingNode = allocation.routingNodes().node(NODE_A.getId());

ShardRouting testShard = this.shard;
if (randomBoolean()) {
testShard = shard.initialize(NODE_C.getId(), null, 1);
testShard = testShard.moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
}
assertThatDecision(
decider.canAllocate(testShard, routingNode, allocation),
Decision.Type.NO,
"node [" + NODE_A.getId() + "] is being replaced by [" + NODE_B.getName() + "], so no data may be allocated to it"
);

// Randomly remove the source node from the cluster, it should not affect allocation since it has no unassigned shard left
final boolean sourceNodeLeft = randomBoolean();
if (sourceNodeLeft) {
state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).remove(NODE_A.getId()).build()).build();
allocation = createRoutingAllocation(state);
}

routingNode = allocation.routingNodes().node(NODE_B.getId());

assertThatDecision(
decider.canAllocate(testShard, routingNode, allocation),
Decision.Type.YES,
sourceNodeLeft
? ("the vacating node ["
+ NODE_A.getId()
+ "] is no longer in the cluster and has left no unassigned shards, the replacing node ["
+ NODE_B.getId()
+ "] can receive shards from other sources")
: ("node ["
+ NODE_B.getId()
+ "] has completed replacing the vacating node ["
+ NODE_A.getId()
+ "] and can receive shards from other sources")
);
}

public void testCanAllocateToTargetWhenSourceLeftBeforeFinishVacate() {
ClusterState state = prepareState(NODE_A.getId(), NODE_B.getName());
// Source NODE_A left with unassigned shard
final var unassignedShardLeftBySourceNode = ShardRouting.newUnassigned(
new ShardId(indexMetadata.getIndex(), 0),
true,
RecoverySource.EmptyStoreRecoverySource.INSTANCE,
new UnassignedInfo(
UnassignedInfo.Reason.NODE_LEFT,
"node left",
null,
0,
System.nanoTime(),
System.currentTimeMillis(),
false,
UnassignedInfo.AllocationStatus.NO_ATTEMPT,
Set.of(),
NODE_A.getId()
),
ShardRouting.Role.DEFAULT
);
state = ClusterState.builder(state)
.nodes(DiscoveryNodes.builder(state.nodes()).remove(NODE_A.getId()).build())
.putRoutingTable(
ProjectId.DEFAULT,
RoutingTable.builder()
.add(IndexRoutingTable.builder(indexMetadata.getIndex()).addShard(unassignedShardLeftBySourceNode).build())
.build()
)
.build();

RoutingAllocation allocation = createRoutingAllocation(state);

var routingNode = allocation.routingNodes().node(NODE_B.getId());

// The replacement NODE_B can receive unassigned shards left by the old source NODE_A
assertThatDecision(
decider.canAllocate(unassignedShardLeftBySourceNode, routingNode, allocation),
Decision.Type.YES,
"the vacating node ["
+ NODE_A.getId()
+ "] is no longer in the cluster and has left unassigned shards, the replacing node ["
+ NODE_B.getId()
+ "] can receive those shards"
);

// The replacement NODE_B cannot receive a shard that is not from the old source NODE_A
ShardRouting testShard = this.shard;
if (randomBoolean()) {
testShard = shard.initialize(NODE_C.getId(), null, 1);
testShard = testShard.moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
}

assertThatDecision(
decider.canAllocate(testShard, routingNode, allocation),
Decision.Type.NO,
"the vacating node ["
+ NODE_A.getId()
+ "] is no longer in the cluster and has left unassigned shards, the replacing node ["
+ NODE_B.getId()
+ "] can only receive those unassigned shards until the replacement is complete"
);
}

public void testShouldNotAutoExpandReplicasDuringUnrelatedNodeReplacement() {

var indexMetadata = IndexMetadata.builder(idxName)
Expand Down
Loading