diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/DefaultReindexRelocationNodePicker.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/DefaultReindexRelocationNodePicker.java new file mode 100644 index 0000000000000..1b38fc7bf0e3f --- /dev/null +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/DefaultReindexRelocationNodePicker.java @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.reindex; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.Randomness; + +import java.util.List; +import java.util.Random; + +class DefaultReindexRelocationNodePicker implements ReindexRelocationNodePicker { + + private static final Logger logger = LogManager.getLogger(DefaultReindexRelocationNodePicker.class); + + private final Random random; + + DefaultReindexRelocationNodePicker() { + this.random = Randomness.get(); + } + + @Override + public String pickNode(DiscoveryNodes nodes) { + String currentNodeId = nodes.getLocalNodeId(); + if (currentNodeId == null) { + logger.debug( + "Trying to pick a node to relocate a reindex task to, but the current node ID is unexpectedly unknown:" + + " the relocation attempt will be aborted" + ); + return null; + } + List eligibleDedicatedCoordinatingNodes = nodes.getNodes() + .values() + .stream() + .filter(node -> node.getRoles().isEmpty()) + .map(DiscoveryNode::getId) + .filter(id -> id.equals(currentNodeId) == false) + .toList(); + if (eligibleDedicatedCoordinatingNodes.isEmpty() == false) { + String newNodeId = randomNodeId(eligibleDedicatedCoordinatingNodes); + logger.debug("Chose dedicated coordinating node ID {} for relocating a reindex task from node {}", newNodeId, currentNodeId); + return newNodeId; + } + List eligibleDataNodes = nodes.getDataNodes().keySet().stream().filter(id -> id.equals(currentNodeId) == false).toList(); + if (eligibleDataNodes.isEmpty() == false) { + String newNodeId = randomNodeId(eligibleDataNodes); + logger.debug( + "Chose data node ID {} for relocating a reindex task from node {}" + + " (there are no dedicated coordinating nodes, perhaps excluding the current node)", + newNodeId, + currentNodeId + ); + return newNodeId; + } + logger.debug( + "Trying to pick a node to relocate a reindex task to, but there are no dedicated coordinating or data nodes" + + " (perhaps excluding the current node): the relocation attempt will be aborted" + ); + return null; + } + + private String randomNodeId(List nodeIds) { + return nodeIds.get(random.nextInt(nodeIds.size())); + } +} diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexPlugin.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexPlugin.java index 9490446bb946f..259d245e093bf 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexPlugin.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexPlugin.java @@ -9,6 +9,7 @@ package org.elasticsearch.reindex; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -25,6 +26,7 @@ import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.ReindexAction; import org.elasticsearch.index.reindex.UpdateByQueryAction; +import org.elasticsearch.node.PluginComponentBinding; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.ExtensiblePlugin; import org.elasticsearch.plugins.Plugin; @@ -51,6 +53,8 @@ public class ReindexPlugin extends Plugin implements ActionPlugin, ExtensiblePlu */ public static final boolean REINDEX_RESILIENCE_ENABLED = new FeatureFlag("reindex_resilience").isEnabled(); + private final SetOnce relocationNodePicker = new SetOnce<>(); + @Override public List getActions() { return Arrays.asList( @@ -90,11 +94,13 @@ public List getRestHandlers( @Override public Collection createComponents(PluginServices services) { + assert relocationNodePicker.get() != null : "ReindexPlugin.relocationNodePicker was not set"; return List.of( new ReindexSslConfig(services.environment().settings(), services.environment(), services.resourceWatcherService()), new ReindexMetrics(services.telemetryProvider().getMeterRegistry()), new UpdateByQueryMetrics(services.telemetryProvider().getMeterRegistry()), - new DeleteByQueryMetrics(services.telemetryProvider().getMeterRegistry()) + new DeleteByQueryMetrics(services.telemetryProvider().getMeterRegistry()), + new PluginComponentBinding<>(ReindexRelocationNodePicker.class, relocationNodePicker.get()) ); } @@ -105,4 +111,18 @@ public List> getSettings() { settings.addAll(ReindexSslConfig.getSettings()); return settings; } + + @Override + public void loadExtensions(ExtensionLoader loader) { + relocationNodePicker.set(loadRelocationNodePicker(loader)); + } + + private ReindexRelocationNodePicker loadRelocationNodePicker(ExtensionLoader loader) { + List relocationNodePickersFromExtensions = loader.loadExtensions(ReindexRelocationNodePicker.class); + return switch (relocationNodePickersFromExtensions.size()) { + case 0 -> new DefaultReindexRelocationNodePicker(); + case 1 -> relocationNodePickersFromExtensions.getFirst(); + default -> throw new IllegalStateException(ReindexRelocationNodePicker.class + " may not have multiple implementations"); + }; + } } diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexRelocationNodePicker.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexRelocationNodePicker.java new file mode 100644 index 0000000000000..0d521fa641fe5 --- /dev/null +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexRelocationNodePicker.java @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.reindex; + +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.core.Nullable; + +/** + * Interface for helper to pick a node to relocate a running reindex task to when the current node is about to shut down. + */ +public interface ReindexRelocationNodePicker { + + /** + * Picks a suitable node. + * + * @param nodes The set of nodes to pick from + * @return The ID of the selected node, or null if there are no suitable nodes + */ + @Nullable + String pickNode(DiscoveryNodes nodes); +} diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/DefaultReindexRelocationNodePickerTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/DefaultReindexRelocationNodePickerTests.java new file mode 100644 index 0000000000000..50a96f879f22f --- /dev/null +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/DefaultReindexRelocationNodePickerTests.java @@ -0,0 +1,115 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.reindex; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodeUtils; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.test.ESTestCase; + +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.oneOf; + +public class DefaultReindexRelocationNodePickerTests extends ESTestCase { + + private final DefaultReindexRelocationNodePicker picker = new DefaultReindexRelocationNodePicker(); + + public void testPickNode_prefersDedicatedCoordinatingNode() { + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(createNode("dataMaster1", DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)) + .add(createNode("dataMaster2", DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)) + .add(createNode("dataMaster3", DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)) + .add(createNode("data4", DiscoveryNodeRole.DATA_ROLE)) + .add(createNode("data5", DiscoveryNodeRole.DATA_ROLE)) + .add(createNode("data6", DiscoveryNodeRole.DATA_ROLE)) + .add(createNode("ingest1", DiscoveryNodeRole.INGEST_ROLE)) + .add(createNode("coordinating1")) + .add(createNode("coordinating2Local")) + .add(createNode("coordinating3")) + .masterNodeId("dataMaster" + randomIntBetween(1, 3)) + .localNodeId("coordinating2Local") + .build(); + String id = picker.pickNode(nodes); + assertThat(id, oneOf("coordinating1", "coordinating3")); + } + + public void testPickNode_noDedicatedCoordinatingNodes_fallsBackToDataNode() { + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(createNode("dataMaster1", DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)) + .add(createNode("dataMaster2", DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)) + .add(createNode("dataMaster3", DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)) + .add(createNode("data4", DiscoveryNodeRole.DATA_ROLE)) + .add(createNode("data5Local", DiscoveryNodeRole.DATA_ROLE)) + .add(createNode("data6", DiscoveryNodeRole.DATA_ROLE)) + .add(createNode("ingest1", DiscoveryNodeRole.INGEST_ROLE)) + .add(createNode("ml1", DiscoveryNodeRole.ML_ROLE)) + .add(createNode("voting1", DiscoveryNodeRole.VOTING_ONLY_NODE_ROLE)) + .masterNodeId("dataMaster" + randomIntBetween(1, 3)) + .localNodeId("data5Local") + .build(); + String id = picker.pickNode(nodes); + assertThat(id, oneOf("dataMaster1", "dataMaster2", "dataMaster3", "data4", "data6")); + } + + public void testPickNode_onlyDedicatedCoordinatingNodeIsLocal_fallsBackToDataNode() { + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(createNode("dataMaster1", DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)) + .add(createNode("dataMaster2", DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)) + .add(createNode("dataMaster3", DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)) + .add(createNode("data4", DiscoveryNodeRole.DATA_ROLE)) + .add(createNode("data5", DiscoveryNodeRole.DATA_ROLE)) + .add(createNode("data6", DiscoveryNodeRole.DATA_ROLE)) + .add(createNode("ingest1", DiscoveryNodeRole.INGEST_ROLE)) + .add(createNode("coordinating1Local")) + .masterNodeId("dataMaster" + randomIntBetween(1, 3)) + .localNodeId("coordinating1Local") + .build(); + String id = picker.pickNode(nodes); + assertThat(id, oneOf("dataMaster1", "dataMaster2", "dataMaster3", "data4", "data5", "data6")); + } + + public void testPickNode_noDedicatedCoordinatingNodes_onlyDataNodeIsLocal_returnsNull() { + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(createNode("dataMaster1Local", DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)) + .add(createNode("ingestMaster1", DiscoveryNodeRole.INGEST_ROLE, DiscoveryNodeRole.MASTER_ROLE)) + .masterNodeId(randomFrom("dataMaster1Local", "ingestMaster1")) + .localNodeId("dataMaster1Local") + .build(); + String id = picker.pickNode(nodes); + assertThat(id, nullValue()); + } + + public void testPickNode_localNodeUnknown_returnsNull() { + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(createNode("dataMaster1", DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)) + .add(createNode("dataMaster2", DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)) + .add(createNode("dataMaster3", DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)) + .add(createNode("data4", DiscoveryNodeRole.DATA_ROLE)) + .add(createNode("data5", DiscoveryNodeRole.DATA_ROLE)) + .add(createNode("data6", DiscoveryNodeRole.DATA_ROLE)) + .add(createNode("ingest1", DiscoveryNodeRole.INGEST_ROLE)) + .add(createNode("coordinating1")) + .add(createNode("coordinating2Local")) + .add(createNode("coordinating3")) + .masterNodeId("dataMaster" + randomIntBetween(1, 3)) + // We never set the local node + .build(); + String id = picker.pickNode(nodes); + assertThat(id, nullValue()); + } + + private static DiscoveryNode createNode(String id, DiscoveryNodeRole... roles) { + return DiscoveryNodeUtils.create(id, buildNewFakeTransportAddress(), Map.of(), Set.of(roles)); + } +}