Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
28bc7f9
Initial plumbing - need to address TODOs
joshua-adams-1 Feb 11, 2026
9427763
Creates Reindexer.lookupRemoteVersion
joshua-adams-1 Feb 13, 2026
3430f18
Refactor RemoteScrollableHitSource to use the utils class
joshua-adams-1 Feb 13, 2026
66f7deb
Clean up code
joshua-adams-1 Feb 13, 2026
a434f0d
Clean up TODOs
joshua-adams-1 Feb 13, 2026
7516540
[CI] Auto commit changes from spotless
Feb 13, 2026
1739e0f
Nits
joshua-adams-1 Feb 23, 2026
a805729
Move remote lookup into execute
joshua-adams-1 Feb 23, 2026
d5471f0
Add Tests
joshua-adams-1 Feb 23, 2026
b0c22a9
Merge branch 'main' into reindexing-plumb-pittable-hit-source
joshua-adams-1 Feb 23, 2026
6b86337
Merge conflicts
joshua-adams-1 Feb 23, 2026
61b187a
Merge branch 'reindexing-plumb-pittable-hit-source' of github.com:jos…
joshua-adams-1 Feb 23, 2026
13bdc0b
[CI] Auto commit changes from spotless
Feb 23, 2026
1dd0afc
Clean up
joshua-adams-1 Feb 23, 2026
9cf2f11
Merge branch 'reindexing-plumb-pittable-hit-source' of github.com:jos…
joshua-adams-1 Feb 23, 2026
460140c
[CI] Auto commit changes from spotless
Feb 23, 2026
5fd8ab0
Update RemoteScrollablePaginatedHitSourceTests to randomly set remote
joshua-adams-1 Feb 24, 2026
298f6fb
Merge branch 'reindexing-plumb-pittable-hit-source' of github.com:jos…
joshua-adams-1 Feb 24, 2026
6192133
Merge branch 'main' into reindexing-plumb-pittable-hit-source
joshua-adams-1 Feb 24, 2026
ec2517b
Close REST client
joshua-adams-1 Feb 24, 2026
3ba1670
[CI] Auto commit changes from spotless
Feb 24, 2026
363f2c2
Add retry logic to remote version lookup
joshua-adams-1 Feb 25, 2026
2a4e999
Add retry logic unit tests
joshua-adams-1 Feb 25, 2026
5899ba4
Merge branch 'reindexing-plumb-pittable-hit-source' of github.com:jos…
joshua-adams-1 Feb 25, 2026
04fe410
Merge branch 'main' into reindexing-plumb-pittable-hit-source
joshua-adams-1 Feb 25, 2026
8552f60
Merge main
joshua-adams-1 Feb 25, 2026
40ed11f
Fix fully qualified imports
joshua-adams-1 Feb 27, 2026
1700728
Remove retryCount from lookupRemoteVersionWithRetries
joshua-adams-1 Feb 27, 2026
6f6e96a
Add ticket to TODO comment
joshua-adams-1 Feb 27, 2026
5252944
Merge branch 'main' into reindexing-plumb-pittable-hit-source
joshua-adams-1 Feb 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.reindex;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
Expand All @@ -17,6 +18,7 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
Expand All @@ -35,6 +37,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.AUTO_SLICES;
Expand Down Expand Up @@ -73,19 +76,24 @@ static <Request extends AbstractBulkByScrollRequest<Request>> void startSlicedAc
task,
request,
client,
listener.delegateFailure((l, v) -> executeSlicedAction(task, request, action, l, client, node, workerAction))
listener.delegateFailure(
(l, v) -> executeSlicedAction(task, request, action, l, client, node, null, version -> workerAction.run())
Copy link
Copy Markdown
Contributor Author

@joshua-adams-1 joshua-adams-1 Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called exclusively by u-b-q and d-b-q (which has no concept of remote versions since they're executed on the local node). Therefore, setting the remote version parameter to null has no effect. Subsequent changes to introduce PIT will be behind the REINDEX_PIT_SEARCH_ENABLED feature flag anyways, but in the future, once that flag is lifted, this will fail the 'use pit' check, and default to scroll

)
);
}

/**
* Takes an action and a {@link BulkByScrollTask} and runs it with regard to whether this task is a
* leader or worker.
*
* If this task is a worker, the worker action in the given {@link Runnable} will be started on the local
* node. If the task is a leader (i.e. the number of slices is more than 1), then a subrequest will be
* created for each slice and sent.
* If this task is a worker, the worker action is invoked with the given {@code remoteVersion} (may be null
* for local reindex). If the task is a leader (i.e. the number of slices is more than 1), then a subrequest
* will be created for each slice and sent.
*
* This method can only be called after the task state is initialized {@link #initTaskState}.
*
* @param remoteVersion the version of the remote cluster when reindexing from remote, or null for local reindex
* @param workerAction invoked when this task is a worker, with the remote version (or null)
*/
static <Request extends AbstractBulkByScrollRequest<Request>> void executeSlicedAction(
BulkByScrollTask task,
Expand All @@ -94,12 +102,13 @@ static <Request extends AbstractBulkByScrollRequest<Request>> void executeSliced
ActionListener<BulkByScrollResponse> listener,
Client client,
DiscoveryNode node,
Runnable workerAction
@Nullable Version remoteVersion,
Consumer<Version> workerAction
) {
if (task.isLeader()) {
sendSubRequests(client, action, node.getId(), task, request, listener);
} else if (task.isWorker()) {
workerAction.run();
workerAction.accept(remoteVersion);
} else {
throw new AssertionError("Task should have been initialized at this point.");
}
Expand Down
142 changes: 118 additions & 24 deletions modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.http.message.BasicHeader;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.DocWriteRequest;
Expand Down Expand Up @@ -53,11 +54,13 @@
import org.elasticsearch.index.reindex.PaginatedHitSource;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.RejectAwareActionListener;
import org.elasticsearch.index.reindex.RemoteInfo;
import org.elasticsearch.index.reindex.ResumeBulkByScrollRequest;
import org.elasticsearch.index.reindex.ResumeBulkByScrollResponse;
import org.elasticsearch.index.reindex.ResumeReindexAction;
import org.elasticsearch.index.reindex.WorkerBulkByScrollTaskState;
import org.elasticsearch.reindex.remote.RemoteReindexingUtils;
import org.elasticsearch.reindex.remote.RemoteScrollablePaginatedHitSource;
import org.elasticsearch.script.CtxMap;
import org.elasticsearch.script.ReindexMetadata;
Expand All @@ -83,12 +86,15 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

import static java.util.Collections.emptyList;
import static java.util.Collections.synchronizedList;
import static org.elasticsearch.common.BackoffPolicy.exponentialBackoff;
import static org.elasticsearch.index.VersionType.INTERNAL;
import static org.elasticsearch.reindex.ReindexPlugin.REINDEX_PIT_SEARCH_ENABLED;

public class Reindexer {

Expand Down Expand Up @@ -143,33 +149,113 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl
// for update-by-query and delete-by-query
final ActionListener<BulkByScrollResponse> listenerWithRelocations = listenerWithRelocations(task, request, listener);

BulkByPaginatedSearchParallelizationHelper.executeSlicedAction(
task,
request,
ReindexAction.INSTANCE,
listenerWithRelocations,
client,
clusterService.localNode(),
() -> {
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task);
ParentTaskAssigningClient assigningBulkClient = new ParentTaskAssigningClient(bulkClient, clusterService.localNode(), task);
AsyncIndexBySearchAction searchAction = new AsyncIndexBySearchAction(
task,
logger,
assigningClient,
assigningBulkClient,
threadPool,
scriptService,
projectResolver.getProjectState(clusterService.state()),
reindexSslConfig,
request,
workerListenerWithRelocationAndMetrics(listenerWithRelocations, startTime, request.getRemoteInfo() != null)
Consumer<Version> workerAction = remoteVersion -> {
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task);
ParentTaskAssigningClient assigningBulkClient = new ParentTaskAssigningClient(bulkClient, clusterService.localNode(), task);
AsyncIndexBySearchAction searchAction = new AsyncIndexBySearchAction(
task,
logger,
assigningClient,
assigningBulkClient,
threadPool,
scriptService,
projectResolver.getProjectState(clusterService.state()),
reindexSslConfig,
request,
workerListenerWithRelocationAndMetrics(listenerWithRelocations, startTime, request.getRemoteInfo() != null),
remoteVersion
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unchanged, except I pass the remoteVersion downstream

);
searchAction.start();
};

/**
* If this is a request to reindex from remote, then we need to determine the remote version prior to execution
* NB {@link ReindexRequest} forbids remote requests and slices > 1, so we're guaranteed to be running on the only slice
*/
if (REINDEX_PIT_SEARCH_ENABLED && request.getRemoteInfo() != null) {
lookupRemoteVersionAndExecute(task, request, listenerWithRelocations, workerAction);
} else {
BulkByPaginatedSearchParallelizationHelper.executeSlicedAction(
task,
request,
ReindexAction.INSTANCE,
listenerWithRelocations,
client,
clusterService.localNode(),
null,
workerAction
);
}
}

/**
* Looks up the remote cluster version when reindexing from a remote source, then runs the sliced action with that version.
* The RestClient used for the lookup is closed after the callback; closing must happen on a thread other than the
* RestClient's own thread pool to avoid shutdown failures.
*/
private void lookupRemoteVersionAndExecute(
BulkByScrollTask task,
ReindexRequest request,
ActionListener<BulkByScrollResponse> listenerWithRelocations,
Consumer<Version> workerAction
) {
RemoteInfo remoteInfo = request.getRemoteInfo();
assert reindexSslConfig != null : "Reindex ssl config must be set";
RestClient restClient = buildRestClient(remoteInfo, reindexSslConfig, task.getId(), synchronizedList(new ArrayList<>()));
RejectAwareActionListener<Version> rejectAwareListener = new RejectAwareActionListener<>() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the remote version lookup has retries as part of starting the search. It'd be nice to add retries here too?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch - will add now

@Override
public void onResponse(Version version) {
closeRestClientAndRun(
restClient,
() -> BulkByPaginatedSearchParallelizationHelper.executeSlicedAction(
task,
request,
ReindexAction.INSTANCE,
listenerWithRelocations,
client,
clusterService.localNode(),
version,
workerAction
)
);
searchAction.start();
}

@Override
public void onFailure(Exception e) {
closeRestClientAndRun(restClient, () -> listenerWithRelocations.onFailure(e));
}

@Override
public void onRejection(Exception e) {
closeRestClientAndRun(restClient, () -> listenerWithRelocations.onFailure(e));
}
};
RemoteReindexingUtils.lookupRemoteVersionWithRetries(
logger,
exponentialBackoff(request.getRetryBackoffInitialTime(), request.getMaxRetries()),
threadPool,
restClient,
// TODO - Do we want to pass in a countRetry runnable here to count the number of times we retry?
// https://github.com/elastic/elasticsearch-team/issues/2382
rejectAwareListener
);
}

/**
* Closes the RestClient on the generic thread pool (to avoid closing from the client's own thread), then runs the given action.
*/
private void closeRestClientAndRun(RestClient restClient, Runnable onCompletion) {
threadPool.generic().submit(() -> {
try {
restClient.close();
} catch (IOException e) {
logger.warn("Failed to close RestClient after version lookup", e);
} finally {
onCompletion.run();
}
});
}

/** Wraps the listener with metrics tracking and relocation handling (if applicable). Visible for testing. */
ActionListener<BulkByScrollResponse> workerListenerWithRelocationAndMetrics(
ActionListener<BulkByScrollResponse> potentiallyWrappedRelocationListener,
Expand Down Expand Up @@ -413,6 +499,11 @@ static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<Re
*/
private List<Thread> createdThreads = emptyList();

/**
* Version of the remote cluster when reindexing from remote, or null when reindexing locally.
*/
private final Version remoteVersion;

AsyncIndexBySearchAction(
BulkByScrollTask task,
Logger logger,
Expand All @@ -423,7 +514,8 @@ static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<Re
ProjectState state,
ReindexSslConfig sslConfig,
ReindexRequest request,
ActionListener<BulkByScrollResponse> listener
ActionListener<BulkByScrollResponse> listener,
@Nullable Version remoteVersion
) {
super(
task,
Expand All @@ -444,6 +536,7 @@ static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<Re
sslConfig
);
this.destinationIndexIdMapper = destinationIndexMode(state).idFieldMapperWithoutFieldData();
this.remoteVersion = remoteVersion;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I store it as a state variable so it can be referenced by buildScrollableResultSource below.

}

private IndexMode destinationIndexMode(ProjectState state) {
Expand Down Expand Up @@ -477,7 +570,8 @@ protected PaginatedHitSource buildScrollableResultSource(BackoffPolicy backoffPo
this::finishHim,
restClient,
remoteInfo,
searchRequest
searchRequest,
remoteVersion
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case where we've enabled the PIT feature flag, but we're trying to reindex from remote from a remote node with a version < PIT was introduced, then we need to fallback to using scroll. However, since we've already done the remote version lookup, we may as well pass this value in.

If this is a scrollable workflow, then remoteVersion is null.

);
}
return super.buildScrollableResultSource(backoffPolicy, searchRequest);
Expand Down
Loading