Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/125353.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 125353
summary: Throttling reindex requests per node from reindex data stream api
area: Data streams
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -46,6 +48,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexAction;
Expand All @@ -61,18 +64,24 @@
import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate;
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.METADATA;
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY;

public class ReindexDataStreamIndexTransportAction extends HandledTransportAction<
ReindexDataStreamIndexAction.Request,
ReindexDataStreamIndexAction.Response> {
ReindexDataStreamIndexAction.Response> implements ClusterStateListener {

public static final String REINDEX_MAX_REQUESTS_PER_SECOND_KEY = "migrate.data_stream_reindex_max_request_per_second";

Expand Down Expand Up @@ -102,6 +111,11 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio
Setting.Property.NodeScope
);

/*
* We don't want to overburden nodes with reindex requests. This is the number of concurrent reindex requests we will send to any one
* ingest node in the cluster.
*/
private static final int MAX_REINDEXES_PER_NODE = 10;
private static final Logger logger = LogManager.getLogger(ReindexDataStreamIndexTransportAction.class);
private static final IndicesOptions IGNORE_MISSING_OPTIONS = IndicesOptions.fromOptions(true, true, false, false);

Expand All @@ -114,8 +128,17 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio
* the unit test doesn't fail if it rolls over Integer.MAX_VALUE (since the node selected is the same for Integer.MAX_VALUE and
* Integer.MAX_VALUE + 1).
*/
private final AtomicInteger ingestNodeOffsetGenerator = new AtomicInteger(Randomness.get().nextInt(2 ^ 30));
private final AtomicInteger ingestNodeOffsetGenerator = new AtomicInteger(Randomness.get().nextInt((int) Math.pow(2, 30)));
/*
* This maps nodeId to a semaphore for that node, controlling the number of concurrent reindex requests we send to that node.
*/
private final Map<String, Semaphore> nodeToInFlightCountMap = new ConcurrentHashMap<>();
/*
* This is the Deque of reindex requests that we don't have the capacity to execute right now, but that will be sent as others free up.
*/
private final Deque<Tuple<ReindexRequest, ActionListener<BulkByScrollResponse>>> pendingReindexQueue = new ConcurrentLinkedDeque<>();

@SuppressWarnings("this-escape")
@Inject
public ReindexDataStreamIndexTransportAction(
TransportService transportService,
Expand All @@ -134,6 +157,7 @@ public ReindexDataStreamIndexTransportAction(
this.clusterService = clusterService;
this.client = client;
this.transportService = transportService;
clusterService.addListener(this);
}

@Override
Expand Down Expand Up @@ -320,6 +344,10 @@ void reindex(String sourceIndexName, String destIndexName, ActionListener<BulkBy
listener.onResponse(bulkByScrollResponse);
}
}, listener::onFailure);
runReindexAction(reindexRequest, checkForFailuresListener);
}

private void runReindexAction(ReindexRequest reindexRequest, ActionListener<BulkByScrollResponse> listener) {
/*
* Reindex will potentially run a pipeline for each document. If we run all reindex requests on the same node (locally), that
* becomes a bottleneck. This code round-robins reindex requests to all ingest nodes to spread out the pipeline workload. When a
Expand All @@ -329,21 +357,46 @@ void reindex(String sourceIndexName, String destIndexName, ActionListener<BulkBy
if (ingestNodes.length == 0) {
listener.onFailure(new NoNodeAvailableException("No ingest nodes in cluster"));
} else {
DiscoveryNode ingestNode = ingestNodes[Math.floorMod(ingestNodeOffsetGenerator.incrementAndGet(), ingestNodes.length)];
logger.debug("Sending reindex request to {}", ingestNode.getName());
transportService.sendRequest(
ingestNode,
ReindexAction.NAME,
reindexRequest,
new ActionListenerResponseHandler<>(
checkForFailuresListener,
BulkByScrollResponse::new,
TransportResponseHandler.TRANSPORT_WORKER
)
);
DiscoveryNode ingestNode = findAvailableNode(ingestNodes);
if (ingestNode == null) {
pendingReindexQueue.add(Tuple.tuple(reindexRequest, listener));
} else {
runReindexOnNode(reindexRequest, listener, ingestNode);
}
}
}

private void runReindexOnNode(ReindexRequest reindexRequest, ActionListener<BulkByScrollResponse> listener, DiscoveryNode ingestNode) {
logger.debug("Sending reindex request to {}", ingestNode.getName());
transportService.sendRequest(
ingestNode,
ReindexAction.NAME,
reindexRequest,
new ActionListenerResponseHandler<>(ActionListener.runAfter(listener, () -> {
Tuple<ReindexRequest, ActionListener<BulkByScrollResponse>> pendingItem = pendingReindexQueue.poll();
if (pendingItem == null) {
Semaphore semaphore = nodeToInFlightCountMap.get(ingestNode.getId());
if (semaphore != null) {
semaphore.release();
}
} else {
runReindexOnNode(pendingItem.v1(), pendingItem.v2(), ingestNode);
}
}), BulkByScrollResponse::new, TransportResponseHandler.TRANSPORT_WORKER)
);
}

private DiscoveryNode findAvailableNode(DiscoveryNode[] discoveryNodes) {
for (int i = 0; i < discoveryNodes.length; i++) {
DiscoveryNode discoveryNode = discoveryNodes[Math.floorMod(ingestNodeOffsetGenerator.incrementAndGet(), discoveryNodes.length)];
Semaphore semaphore = nodeToInFlightCountMap.computeIfAbsent(discoveryNode.getId(), k -> new Semaphore(MAX_REINDEXES_PER_NODE));
if (semaphore.tryAcquire()) {
return discoveryNode;
}
}
return null;
}

private void updateSettings(
String index,
Settings.Builder settings,
Expand Down Expand Up @@ -497,4 +550,32 @@ private void sanityCheck(
listener.onResponse(null);
}
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
/*
* If a node is added, we want to see if we can send some of the pending requests to it. We also want to make sure that if all
* ingest nodes are shut down and new ones are brought up that requests are not stuck in pendingReindexQueue forever (since other
* than this, they are only executed as other tasks complete and free up space on existing nodes).
*/
if (event.nodesAdded() && event.nodesDelta().addedNodes().stream().anyMatch(DiscoveryNode::isIngestNode)) {
List<Tuple<ReindexRequest, ActionListener<BulkByScrollResponse>>> currentPendingRequests = new ArrayList<>();
Tuple<ReindexRequest, ActionListener<BulkByScrollResponse>> request = pendingReindexQueue.poll();
while (request != null) {
currentPendingRequests.add(request);
request = pendingReindexQueue.poll();
}
for (Tuple<ReindexRequest, ActionListener<BulkByScrollResponse>> pendingRequest : currentPendingRequests) {
runReindexAction(pendingRequest.v1(), pendingRequest.v2());
}
}
if (event.nodesRemoved()) {
event.nodesDelta()
.removedNodes()
.stream()
.filter(DiscoveryNode::isIngestNode)
.map(DiscoveryNode::getId)
.forEach(nodeToInFlightCountMap::remove);
}
}
}
Loading