diff --git a/docs/changelog/125353.yaml b/docs/changelog/125353.yaml new file mode 100644 index 0000000000000..03a8e1919f95f --- /dev/null +++ b/docs/changelog/125353.yaml @@ -0,0 +1,5 @@ +pr: 125353 +summary: Throttling reindex requests per node from reindex data stream api +area: Data streams +type: enhancement +issues: [] diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java index 189bdc1012790..0b5de0174bf74 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java @@ -37,6 +37,8 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.transport.NoNodeAvailableException; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -46,6 +48,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.ReindexAction; @@ -61,10 +64,16 @@ import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate; import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Deque; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.METADATA; @@ -72,7 +81,7 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportAction< ReindexDataStreamIndexAction.Request, - ReindexDataStreamIndexAction.Response> { + ReindexDataStreamIndexAction.Response> implements ClusterStateListener { public static final String REINDEX_MAX_REQUESTS_PER_SECOND_KEY = "migrate.data_stream_reindex_max_request_per_second"; @@ -102,6 +111,11 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio Setting.Property.NodeScope ); + /* + * We don't want to overburden nodes with reindex requests. This is the number of concurrent reindex requests we will send to any one + * ingest node in the cluster. + */ + private static final int MAX_REINDEXES_PER_NODE = 10; private static final Logger logger = LogManager.getLogger(ReindexDataStreamIndexTransportAction.class); private static final IndicesOptions IGNORE_MISSING_OPTIONS = IndicesOptions.fromOptions(true, true, false, false); @@ -114,8 +128,17 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio * the unit test doesn't fail if it rolls over Integer.MAX_VALUE (since the node selected is the same for Integer.MAX_VALUE and * Integer.MAX_VALUE + 1). */ - private final AtomicInteger ingestNodeOffsetGenerator = new AtomicInteger(Randomness.get().nextInt(2 ^ 30)); + private final AtomicInteger ingestNodeOffsetGenerator = new AtomicInteger(Randomness.get().nextInt((int) Math.pow(2, 30))); + /* + * This maps nodeId to a semaphore for that node, controlling the number of concurrent reindex requests we send to that node. + */ + private final Map nodeToInFlightCountMap = new ConcurrentHashMap<>(); + /* + * This is the Deque of reindex requests that we don't have the capacity to execute right now, but that will be sent as others free up. + */ + private final Deque>> pendingReindexQueue = new ConcurrentLinkedDeque<>(); + @SuppressWarnings("this-escape") @Inject public ReindexDataStreamIndexTransportAction( TransportService transportService, @@ -134,6 +157,7 @@ public ReindexDataStreamIndexTransportAction( this.clusterService = clusterService; this.client = client; this.transportService = transportService; + clusterService.addListener(this); } @Override @@ -320,6 +344,10 @@ void reindex(String sourceIndexName, String destIndexName, ActionListener listener) { /* * Reindex will potentially run a pipeline for each document. If we run all reindex requests on the same node (locally), that * becomes a bottleneck. This code round-robins reindex requests to all ingest nodes to spread out the pipeline workload. When a @@ -329,21 +357,46 @@ void reindex(String sourceIndexName, String destIndexName, ActionListener( - checkForFailuresListener, - BulkByScrollResponse::new, - TransportResponseHandler.TRANSPORT_WORKER - ) - ); + DiscoveryNode ingestNode = findAvailableNode(ingestNodes); + if (ingestNode == null) { + pendingReindexQueue.add(Tuple.tuple(reindexRequest, listener)); + } else { + runReindexOnNode(reindexRequest, listener, ingestNode); + } } } + private void runReindexOnNode(ReindexRequest reindexRequest, ActionListener listener, DiscoveryNode ingestNode) { + logger.debug("Sending reindex request to {}", ingestNode.getName()); + transportService.sendRequest( + ingestNode, + ReindexAction.NAME, + reindexRequest, + new ActionListenerResponseHandler<>(ActionListener.runAfter(listener, () -> { + Tuple> pendingItem = pendingReindexQueue.poll(); + if (pendingItem == null) { + Semaphore semaphore = nodeToInFlightCountMap.get(ingestNode.getId()); + if (semaphore != null) { + semaphore.release(); + } + } else { + runReindexOnNode(pendingItem.v1(), pendingItem.v2(), ingestNode); + } + }), BulkByScrollResponse::new, TransportResponseHandler.TRANSPORT_WORKER) + ); + } + + private DiscoveryNode findAvailableNode(DiscoveryNode[] discoveryNodes) { + for (int i = 0; i < discoveryNodes.length; i++) { + DiscoveryNode discoveryNode = discoveryNodes[Math.floorMod(ingestNodeOffsetGenerator.incrementAndGet(), discoveryNodes.length)]; + Semaphore semaphore = nodeToInFlightCountMap.computeIfAbsent(discoveryNode.getId(), k -> new Semaphore(MAX_REINDEXES_PER_NODE)); + if (semaphore.tryAcquire()) { + return discoveryNode; + } + } + return null; + } + private void updateSettings( String index, Settings.Builder settings, @@ -497,4 +550,32 @@ private void sanityCheck( listener.onResponse(null); } } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + /* + * If a node is added, we want to see if we can send some of the pending requests to it. We also want to make sure that if all + * ingest nodes are shut down and new ones are brought up that requests are not stuck in pendingReindexQueue forever (since other + * than this, they are only executed as other tasks complete and free up space on existing nodes). + */ + if (event.nodesAdded() && event.nodesDelta().addedNodes().stream().anyMatch(DiscoveryNode::isIngestNode)) { + List>> currentPendingRequests = new ArrayList<>(); + Tuple> request = pendingReindexQueue.poll(); + while (request != null) { + currentPendingRequests.add(request); + request = pendingReindexQueue.poll(); + } + for (Tuple> pendingRequest : currentPendingRequests) { + runReindexAction(pendingRequest.v1(), pendingRequest.v2()); + } + } + if (event.nodesRemoved()) { + event.nodesDelta() + .removedNodes() + .stream() + .filter(DiscoveryNode::isIngestNode) + .map(DiscoveryNode::getId) + .forEach(nodeToInFlightCountMap::remove); + } + } }