diff --git a/docs/changelog/140150.yaml b/docs/changelog/140150.yaml new file mode 100644 index 0000000000000..c30e79842d892 --- /dev/null +++ b/docs/changelog/140150.yaml @@ -0,0 +1,5 @@ +pr: 140150 +summary: Allow allocation to replacement target node on vacate completion +area: Allocation +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDecider.java index 949ea5c3fc87b..c5d3e925de6ba 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDecider.java @@ -79,15 +79,66 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing } final SingleNodeShutdownMetadata shutdown = allocation.replacementTargetShutdowns().get(node.node().getName()); - return allocation.decision( - Decision.NO, - NAME, - "node [%s] is replacing the vacating node [%s], only data currently allocated to the source node " - + "may be allocated to it until the replacement is complete", - node.nodeId(), - shutdown == null ? null : shutdown.getNodeId(), - shardRouting.currentNodeId() - ); + assert shutdown != null : node.node() + " is a replacement target but no shutdown metadata found"; + + final String sourceNodeId = shutdown.getNodeId(); + final var sourceNode = allocation.routingNodes().node(sourceNodeId); + if (sourceNode != null) { + if (sourceNode.isEmpty() == false) { + return allocation.decision( + Decision.NO, + NAME, + "node [%s] is replacing the vacating node [%s], only data currently allocated to the source node " + + "may be allocated to it until the replacement is complete", + node.nodeId(), + sourceNodeId + ); + } else { + return allocation.decision( + Decision.YES, + NAME, + "node [%s] has completed replacing the vacating node [%s] and can receive shards from other sources", + node.nodeId(), + sourceNodeId + ); + } + } else { + final boolean sourceNodeLeftAnyUnassignedShards = allocation.routingNodes() + .unassigned() + .stream() + .anyMatch(s -> sourceNodeId.equals(s.unassignedInfo().lastAllocatedNodeId())); + + if (sourceNodeLeftAnyUnassignedShards) { + if (shardRouting.unassigned() && sourceNodeId.equals(shardRouting.unassignedInfo().lastAllocatedNodeId())) { + return allocation.decision( + Decision.YES, + NAME, + "the vacating node [%s] is no longer in the cluster and has left unassigned shards, " + + "the replacing node [%s] can receive those shards", + sourceNodeId, + node.nodeId() + ); + } else { + return allocation.decision( + Decision.NO, + NAME, + "the vacating node [%s] is no longer in the cluster and has left unassigned shards, " + + "the replacing node [%s] can only receive those unassigned shards until the replacement is complete", + sourceNodeId, + node.nodeId() + ); + } + } else { + return allocation.decision( + Decision.YES, + NAME, + "the vacating node [%s] is no longer in the cluster and has left no unassigned shards, " + + "the replacing node [%s] can receive shards from other sources", + sourceNodeId, + node.nodeId() + ); + } + } } else { return YES__NO_APPLICABLE_REPLACEMENTS; } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDeciderTests.java index 7750a01a13a84..8bd80e885ffd6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDeciderTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; @@ -34,6 +35,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.Set; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; @@ -149,8 +151,22 @@ public void testCannotRemainOnReplacedNode() { public void testCanAllocateToNeitherSourceNorTarget() { ClusterState state = prepareState(NODE_A.getId(), NODE_B.getName()); + // Source node still has a shard to vacate + state = ClusterState.builder(state) + .putRoutingTable( + ProjectId.DEFAULT, + RoutingTable.builder() + .add( + IndexRoutingTable.builder(indexMetadata.getIndex()) + .addShard(newShardRouting(new ShardId(indexMetadata.getIndex(), 0), NODE_A.getId(), true, STARTED)) + .build() + ) + .build() + ) + .build(); + RoutingAllocation allocation = createRoutingAllocation(state); - RoutingNode routingNode = RoutingNodesHelper.routingNode(NODE_A.getId(), NODE_A, shard); + RoutingNode routingNode = allocation.routingNodes().node(NODE_A.getId()); ShardRouting testShard = this.shard; if (randomBoolean()) { @@ -163,7 +179,7 @@ public void testCanAllocateToNeitherSourceNorTarget() { "node [" + NODE_A.getId() + "] is being replaced by [" + NODE_B.getName() + "], so no data may be allocated to it" ); - routingNode = RoutingNodesHelper.routingNode(NODE_B.getId(), NODE_B, testShard); + routingNode = allocation.routingNodes().node(NODE_B.getId()); assertThatDecision( decider.canAllocate(testShard, routingNode, allocation), @@ -184,6 +200,128 @@ public void testCanAllocateToNeitherSourceNorTarget() { ); } + public void testCanAllocateToTargetWhenSourceFinishesVacate() { + ClusterState state = prepareState(NODE_A.getId(), NODE_B.getName()); + // Randomly assigne a shard on NODE_C + if (randomBoolean()) { + state = ClusterState.builder(state) + .putRoutingTable( + ProjectId.DEFAULT, + RoutingTable.builder() + .add( + IndexRoutingTable.builder(indexMetadata.getIndex()) + .addShard(newShardRouting(new ShardId(indexMetadata.getIndex(), 0), NODE_C.getId(), true, STARTED)) + .build() + ) + .build() + ) + .build(); + } + + RoutingAllocation allocation = createRoutingAllocation(state); + RoutingNode routingNode = allocation.routingNodes().node(NODE_A.getId()); + + ShardRouting testShard = this.shard; + if (randomBoolean()) { + testShard = shard.initialize(NODE_C.getId(), null, 1); + testShard = testShard.moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); + } + assertThatDecision( + decider.canAllocate(testShard, routingNode, allocation), + Decision.Type.NO, + "node [" + NODE_A.getId() + "] is being replaced by [" + NODE_B.getName() + "], so no data may be allocated to it" + ); + + // Randomly remove the source node from the cluster, it should not affect allocation since it has no unassigned shard left + final boolean sourceNodeLeft = randomBoolean(); + if (sourceNodeLeft) { + state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).remove(NODE_A.getId()).build()).build(); + allocation = createRoutingAllocation(state); + } + + routingNode = allocation.routingNodes().node(NODE_B.getId()); + + assertThatDecision( + decider.canAllocate(testShard, routingNode, allocation), + Decision.Type.YES, + sourceNodeLeft + ? ("the vacating node [" + + NODE_A.getId() + + "] is no longer in the cluster and has left no unassigned shards, the replacing node [" + + NODE_B.getId() + + "] can receive shards from other sources") + : ("node [" + + NODE_B.getId() + + "] has completed replacing the vacating node [" + + NODE_A.getId() + + "] and can receive shards from other sources") + ); + } + + public void testCanAllocateToTargetWhenSourceLeftBeforeFinishVacate() { + ClusterState state = prepareState(NODE_A.getId(), NODE_B.getName()); + // Source NODE_A left with unassigned shard + final var unassignedShardLeftBySourceNode = ShardRouting.newUnassigned( + new ShardId(indexMetadata.getIndex(), 0), + true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo( + UnassignedInfo.Reason.NODE_LEFT, + "node left", + null, + 0, + System.nanoTime(), + System.currentTimeMillis(), + false, + UnassignedInfo.AllocationStatus.NO_ATTEMPT, + Set.of(), + NODE_A.getId() + ), + ShardRouting.Role.DEFAULT + ); + state = ClusterState.builder(state) + .nodes(DiscoveryNodes.builder(state.nodes()).remove(NODE_A.getId()).build()) + .putRoutingTable( + ProjectId.DEFAULT, + RoutingTable.builder() + .add(IndexRoutingTable.builder(indexMetadata.getIndex()).addShard(unassignedShardLeftBySourceNode).build()) + .build() + ) + .build(); + + RoutingAllocation allocation = createRoutingAllocation(state); + + var routingNode = allocation.routingNodes().node(NODE_B.getId()); + + // The replacement NODE_B can receive unassigned shards left by the old source NODE_A + assertThatDecision( + decider.canAllocate(unassignedShardLeftBySourceNode, routingNode, allocation), + Decision.Type.YES, + "the vacating node [" + + NODE_A.getId() + + "] is no longer in the cluster and has left unassigned shards, the replacing node [" + + NODE_B.getId() + + "] can receive those shards" + ); + + // The replacement NODE_B cannot receive a shard that is not from the old source NODE_A + ShardRouting testShard = this.shard; + if (randomBoolean()) { + testShard = shard.initialize(NODE_C.getId(), null, 1); + testShard = testShard.moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); + } + + assertThatDecision( + decider.canAllocate(testShard, routingNode, allocation), + Decision.Type.NO, + "the vacating node [" + + NODE_A.getId() + + "] is no longer in the cluster and has left unassigned shards, the replacing node [" + + NODE_B.getId() + + "] can only receive those unassigned shards until the replacement is complete" + ); + } + public void testShouldNotAutoExpandReplicasDuringUnrelatedNodeReplacement() { var indexMetadata = IndexMetadata.builder(idxName) diff --git a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java index f8091a1d2ebdf..ee9de0d1fbd14 100644 --- a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java +++ b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java @@ -9,24 +9,32 @@ import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplanationUtils; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; +import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodesHelper; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Strings; +import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import java.util.Collection; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Status.COMPLETE; @@ -40,7 +48,43 @@ public class NodeShutdownShardsIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return List.of(ShutdownPlugin.class); + return List.of(ShutdownPlugin.class, TestPlugin.class); + } + + private static final String UNMOVABLE_INDEX_NAME = "unmoveable_index"; + + private static class UnmovableIndexAllocationDecider extends AllocationDecider { + + private final AtomicBoolean enabled; + + private UnmovableIndexAllocationDecider(AtomicBoolean enabled) { + super(); + this.enabled = enabled; + } + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + // Allow initial allocation but not relocation for UNMOVABLE_INDEX_NAME + if (enabled.get() && shardRouting.unassigned() == false && shardRouting.getIndexName().equals(UNMOVABLE_INDEX_NAME)) { + return Decision.NO; + } else { + return Decision.YES; + } + } + + @Override + public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return (enabled.get() && shardRouting.getIndexName().equals(UNMOVABLE_INDEX_NAME)) ? Decision.NO : Decision.YES; + } + } + + public static class TestPlugin extends Plugin implements ClusterPlugin { + private final AtomicBoolean enabled = new AtomicBoolean(true); + + @Override + public Collection createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) { + return List.of(new UnmovableIndexAllocationDecider(enabled)); + } } /** @@ -125,6 +169,7 @@ public void testNotStalledIfAllShardsHaveACopyOnAnotherNode() throws Exception { public void testNodeReplacementOnlyAllowsShardsFromReplacedNode() throws Exception { String nodeA = internalCluster().startNode(Settings.builder().put("node.name", "node-a")); createIndex("myindex", 3, 1); + createIndex(UNMOVABLE_INDEX_NAME, 1, 0); final String nodeAId = getNodeId(nodeA); final String nodeB = "node_t1"; // TODO: fix this to so it's actually overrideable @@ -140,7 +185,7 @@ public void testNodeReplacementOnlyAllowsShardsFromReplacedNode() throws Excepti assertIndexPrimaryShardsAreAllocatedOnNode("myindex", nodeBId); assertIndexReplicaShardsAreNotAllocated("myindex"); }); - assertBusy(() -> assertNodeShutdownStatus(nodeAId, COMPLETE)); + assertBusy(() -> assertNodeShutdownStatus(nodeAId, STALLED)); final String nodeC = internalCluster().startNode(); @@ -174,6 +219,20 @@ public void testNodeReplacementOnlyAllowsShardsFromReplacedNode() throws Excepti ) ); }, () -> fail("expected a 'NO' decision for nodeB but there was no explanation for that node")); + + if (randomBoolean()) { + // Let the unmovable index move, it should complete the vacates and also allow replica to be assigned to the replacement target + internalCluster().getCurrentMasterNodeInstance(PluginsService.class) + .filterPlugins(TestPlugin.class) + .findFirst() + .orElseThrow().enabled.set(false); + ClusterRerouteUtils.reroute(client()); + assertBusy(() -> assertNodeShutdownStatus(nodeAId, COMPLETE)); + } else { + // Stop the source node which also allows allocation to complete + internalCluster().stopNode(nodeA); + } + ensureGreen("other"); } public void testNodeReplacementOverridesFilters() throws Exception { @@ -181,6 +240,7 @@ public void testNodeReplacementOverridesFilters() throws Exception { // Create an index and pin it to nodeA, when we replace it with nodeB, // it'll move the data, overridding the `_name` allocation filter createIndex("myindex", indexSettings(3, 0).put("index.routing.allocation.require._name", nodeA).build()); + createIndex(UNMOVABLE_INDEX_NAME, 1, 0); final String nodeAId = getNodeId(nodeA); final String nodeB = "node_t2"; // TODO: fix this to so it's actually overrideable @@ -195,7 +255,7 @@ public void testNodeReplacementOverridesFilters() throws Exception { logger.info("--> NodeB: {} -- {}", nodeB, nodeBId); assertBusy(() -> assertIndexPrimaryShardsAreAllocatedOnNode("myindex", nodeBId)); - assertBusy(() -> assertNodeShutdownStatus(nodeAId, COMPLETE)); + assertBusy(() -> assertNodeShutdownStatus(nodeAId, STALLED)); assertIndexSetting("myindex", "index.routing.allocation.require._name", nodeA); createIndex("other", 1, 1); @@ -228,6 +288,20 @@ public void testNodeReplacementOverridesFilters() throws Exception { ) ); }, () -> fail("expected a 'NO' decision for nodeB but there was no explanation for that node")); + + if (randomBoolean()) { + // Let the unmovable index move, it should complete the vacates and also allow replica to be assigned to the replacement target + internalCluster().getCurrentMasterNodeInstance(PluginsService.class) + .filterPlugins(TestPlugin.class) + .findFirst() + .orElseThrow().enabled.set(false); + ClusterRerouteUtils.reroute(client()); + assertBusy(() -> assertNodeShutdownStatus(nodeAId, COMPLETE)); + } else { + // Stop the source node which also allows allocation to complete + internalCluster().stopNode(nodeA); + } + ensureGreen("other"); } public void testNodeReplacementAcceptIndexThatCouldNotBeAllocatedAnywhere() throws Exception {