From bf6b036c21d421fba2f4bacb7fced0df5cf1152c Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Wed, 1 Apr 2026 13:27:26 -0400 Subject: [PATCH 1/6] Reindex relocation: store source task result at destination --- .../TransportGetReindexActionTests.java | 15 ++-- .../org/elasticsearch/reindex/Reindexer.java | 69 ++++++++++++++++--- .../reindex/TaskRelocatedException.java | 5 ++ .../reindex/TransportReindexAction.java | 13 ++-- .../elasticsearch/reindex/ReindexerTests.java | 25 ++++--- .../index/reindex/ResumeInfo.java | 24 +++++-- .../org/elasticsearch/tasks/TaskResult.java | 4 ++ .../action/TransportEnrichReindexAction.java | 7 +- 8 files changed, 126 insertions(+), 36 deletions(-) diff --git a/modules/reindex-management/src/test/java/org/elasticsearch/reindex/management/TransportGetReindexActionTests.java b/modules/reindex-management/src/test/java/org/elasticsearch/reindex/management/TransportGetReindexActionTests.java index 5e1f4eee915a5..0b63b85a1d200 100644 --- a/modules/reindex-management/src/test/java/org/elasticsearch/reindex/management/TransportGetReindexActionTests.java +++ b/modules/reindex-management/src/test/java/org/elasticsearch/reindex/management/TransportGetReindexActionTests.java @@ -289,8 +289,7 @@ public void testFollowsSingleRelocation() throws IOException { final TaskId originalTaskId = taskId; final TaskId relocatedTaskId = randomValueOtherThan(taskId, () -> new TaskId(randomAlphaOfLength(10), randomIntBetween(1, 1000))); - final TaskRelocatedException relocatedException = new TaskRelocatedException(); - relocatedException.setOriginalAndRelocatedTaskIdMetadata(originalTaskId, relocatedTaskId); + final var relocatedException = new TaskRelocatedException(originalTaskId, relocatedTaskId); final TaskInfo originalInfo = createTaskInfo(originalTaskId, ReindexAction.NAME); final TaskResult originalResult = new TaskResult(originalInfo, (Exception) relocatedException); @@ -335,11 +334,9 @@ public void testFollowsTwoRelocations() throws IOException { () -> new TaskId(randomAlphaOfLength(10), randomIntBetween(1, 1000)) ); - final TaskRelocatedException firstRelocation = new TaskRelocatedException(); - firstRelocation.setOriginalAndRelocatedTaskIdMetadata(originalTaskId, firstRelocatedTaskId); + final var firstRelocation = new TaskRelocatedException(originalTaskId, firstRelocatedTaskId); - final TaskRelocatedException secondRelocation = new TaskRelocatedException(); - secondRelocation.setOriginalAndRelocatedTaskIdMetadata(firstRelocatedTaskId, secondRelocatedTaskId); + final var secondRelocation = new TaskRelocatedException(firstRelocatedTaskId, secondRelocatedTaskId); final TaskInfo originalInfo = createTaskInfo(originalTaskId, ReindexAction.NAME); final TaskResult originalResult = new TaskResult(originalInfo, (Exception) firstRelocation); @@ -385,8 +382,7 @@ public void testWaitForCompletionHandlesRelocationWhileWaiting() throws IOExcept final TaskId originalTaskId = taskId; final TaskId relocatedTaskId = randomValueOtherThan(taskId, () -> new TaskId(randomAlphaOfLength(10), randomIntBetween(1, 1000))); - final TaskRelocatedException relocatedException = new TaskRelocatedException(); - relocatedException.setOriginalAndRelocatedTaskIdMetadata(originalTaskId, relocatedTaskId); + final var relocatedException = new TaskRelocatedException(originalTaskId, relocatedTaskId); final TaskInfo originalInfo = createTaskInfo(originalTaskId, ReindexAction.NAME); final TaskResult originalIncomplete = new TaskResult(false, originalInfo); @@ -450,8 +446,7 @@ public void testRelocatedTaskNotFound() throws IOException { final TaskId originalTaskId = taskId; final TaskId relocatedTaskId = randomValueOtherThan(taskId, () -> new TaskId(randomAlphaOfLength(10), randomIntBetween(1, 1000))); - final TaskRelocatedException relocatedException = new TaskRelocatedException(); - relocatedException.setOriginalAndRelocatedTaskIdMetadata(originalTaskId, relocatedTaskId); + final var relocatedException = new TaskRelocatedException(originalTaskId, relocatedTaskId); final TaskInfo originalInfo = createTaskInfo(originalTaskId, ReindexAction.NAME); final TaskResult originalResult = new TaskResult(originalInfo, (Exception) relocatedException); diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java index 99fa60b0ab88d..102ca1ef53b34 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java @@ -64,6 +64,7 @@ import org.elasticsearch.index.reindex.RemoteInfo; import org.elasticsearch.index.reindex.ResumeBulkByScrollRequest; import org.elasticsearch.index.reindex.ResumeBulkByScrollResponse; +import org.elasticsearch.index.reindex.ResumeInfo; import org.elasticsearch.index.reindex.ResumeReindexAction; import org.elasticsearch.index.reindex.WorkerBulkByScrollTaskState; import org.elasticsearch.reindex.remote.RemoteReindexingUtils; @@ -76,6 +77,8 @@ import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.tasks.TaskResult; +import org.elasticsearch.tasks.TaskResultsService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.XContentBuilder; @@ -121,6 +124,7 @@ public class Reindexer { private final TransportService transportService; private final ReindexRelocationNodePicker relocationNodePicker; private final FeatureService featureService; + private final TaskResultsService taskResultsService; Reindexer( ClusterService clusterService, @@ -132,7 +136,8 @@ public class Reindexer { @Nullable ReindexMetrics reindexMetrics, TransportService transportService, ReindexRelocationNodePicker relocationNodePicker, - FeatureService featureService + FeatureService featureService, + TaskResultsService taskResultsService ) { this.clusterService = clusterService; this.projectResolver = projectResolver; @@ -145,6 +150,7 @@ public class Reindexer { this.transportService = transportService; this.relocationNodePicker = Objects.requireNonNull(relocationNodePicker); this.featureService = featureService; + this.taskResultsService = Objects.requireNonNull(taskResultsService); } public void initTask(BulkByScrollTask task, ReindexRequest request, ActionListener listener) { @@ -156,6 +162,44 @@ public void initTask(BulkByScrollTask task, ReindexRequest request, ActionListen } public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkClient, ActionListener listener) { + final ResumeInfo resumeInfo = request.getResumeInfo().orElse(null); + if (resumeInfo != null && resumeInfo.sourceTaskResult() != null) { + storeRelocationSourceTaskResult( + task, + resumeInfo, + ActionListener.wrap(v -> doExecute(task, request, bulkClient, listener), listener::onFailure) + ); + } else { + doExecute(task, request, bulkClient, listener); + } + } + + /** + * Stores the source task's result in the {@code .tasks} index, patching the error with a {@link TaskRelocatedException} that contains + * the new task ID on the destination node. This preserves the relocation chain for the management APIs even if the source node fails + * to store its task result. For sliced reindex tasks, only the leader will store the source task result. + */ + private void storeRelocationSourceTaskResult(BulkByScrollTask task, ResumeInfo resumeInfo, ActionListener listener) { + final var relocatedException = new TaskRelocatedException( + resumeInfo.relocationOrigin().originalTaskId(), + new TaskId(clusterService.localNode().getId(), task.getId()) + ); + final TaskResult patched; + try { + patched = resumeInfo.sourceTaskResult().withError(relocatedException); + } catch (IOException e) { + listener.onFailure(e); + return; + } + taskResultsService.storeResult(patched, listener); + } + + private void doExecute( + BulkByScrollTask task, + ReindexRequest request, + Client bulkClient, + ActionListener listener + ) { // todo: move relocations to BulkByPaginatedSearchParallelizationHelper rather than having it in Reindexer, makes it generic // for update-by-query and delete-by-query final ActionListener responseListener = wrapWithMetrics( @@ -501,15 +545,24 @@ ActionListener listenerWithRelocations( ); return; } - request.setResumeInfo(response.getTaskResumeInfo().get()); + final ResumeInfo resumeInfo = response.getTaskResumeInfo().get(); + // Source task result is needed to preserve the relocation chain in the .tasks index on destination node. + // This is to guard against the source node failing before storing its task result containing the new relocated task ID, + // which would break the relocation chain and cause the management APIs to not be able to follow the chain to find + // the relocated task + final TaskResult sourceTaskResult; + try { + sourceTaskResult = task.result(clusterService.localNode(), new TaskRelocatedException()); + } catch (IOException e) { + l.onFailure(e); + return; + } + request.setResumeInfo( + new ResumeInfo(resumeInfo.relocationOrigin(), resumeInfo.worker(), resumeInfo.slices(), sourceTaskResult) + ); final ResumeBulkByScrollRequest resumeRequest = new ResumeBulkByScrollRequest(request); final ActionListener relocationListener = ActionListener.wrap(resp -> { - final var relocatedException = new TaskRelocatedException(); - relocatedException.setOriginalAndRelocatedTaskIdMetadata( - new TaskId(clusterService.localNode().getId(), task.getId()), - resp.getTaskId() - ); - l.onFailure(relocatedException); + l.onFailure(new TaskRelocatedException(new TaskId(clusterService.localNode().getId(), task.getId()), resp.getTaskId())); }, l::onFailure); transportService.sendRequest( nodeToRelocateToNode, diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/TaskRelocatedException.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/TaskRelocatedException.java index 2bd536e686734..166119a9005a3 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/TaskRelocatedException.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/TaskRelocatedException.java @@ -28,6 +28,11 @@ public TaskRelocatedException() { super("Task was relocated"); } + public TaskRelocatedException(TaskId originalTaskId, TaskId relocatedTaskId) { + super("Task was relocated"); + setOriginalAndRelocatedTaskIdMetadata(originalTaskId, relocatedTaskId); + } + /** Returns the relocated task ID if the map is a serialized {@link TaskRelocatedException}. */ public static Optional relocatedTaskIdFromErrorMap(final Map errorMap) { if ("task_relocated_exception".equals(errorMap.get("type")) diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/TransportReindexAction.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/TransportReindexAction.java index 51db712af6aaa..9e3ba23800d32 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/TransportReindexAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/TransportReindexAction.java @@ -30,6 +30,7 @@ import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.script.ScriptService; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskResultsService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -61,7 +62,8 @@ public TransportReindexAction( ReindexSslConfig sslConfig, @Nullable ReindexMetrics reindexMetrics, ReindexRelocationNodePicker relocationNodePicker, - FeatureService featureService + FeatureService featureService, + TaskResultsService taskResultsService ) { this( ReindexAction.NAME, @@ -78,7 +80,8 @@ public TransportReindexAction( sslConfig, reindexMetrics, relocationNodePicker, - featureService + featureService, + taskResultsService ); } @@ -97,7 +100,8 @@ protected TransportReindexAction( ReindexSslConfig sslConfig, @Nullable ReindexMetrics reindexMetrics, ReindexRelocationNodePicker relocationNodePicker, - FeatureService featureService + FeatureService featureService, + TaskResultsService taskResultsService ) { super(name, transportService, actionFilters, ReindexRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE); this.client = client; @@ -118,7 +122,8 @@ protected TransportReindexAction( reindexMetrics, transportService, relocationNodePicker, - featureService + featureService, + taskResultsService ); } diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java index 5c199474ba9e5..4a79af6cf1f4b 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java @@ -62,6 +62,7 @@ import org.elasticsearch.search.SearchResponseUtils; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.tasks.TaskResultsService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLog; import org.elasticsearch.test.client.NoOpClient; @@ -564,7 +565,8 @@ public void testLocalReindexingRequestFailsToOpenPit() { null, mock(TransportService.class), mock(ReindexRelocationNodePicker.class), - featureService + featureService, + mock(TaskResultsService.class) ); final ReindexRequest request = new ReindexRequest(); @@ -633,7 +635,8 @@ public ExecutorService executor(String name) { null, mock(TransportService.class), mock(ReindexRelocationNodePicker.class), - featureService + featureService, + mock(TaskResultsService.class) ); final ReindexRequest request = new ReindexRequest(); @@ -711,7 +714,8 @@ public ExecutorService executor(String name) { null, mock(TransportService.class), mock(ReindexRelocationNodePicker.class), - featureService + featureService, + mock(TaskResultsService.class) ); final ReindexRequest request = new ReindexRequest(); @@ -783,7 +787,8 @@ public ExecutorService executor(String name) { null, mock(TransportService.class), mock(ReindexRelocationNodePicker.class), - featureService + featureService, + mock(TaskResultsService.class) ); final ReindexRequest request = new ReindexRequest(); @@ -857,7 +862,8 @@ public ExecutorService executor(String name) { null, mock(TransportService.class), mock(ReindexRelocationNodePicker.class), - featureService + featureService, + mock(TaskResultsService.class) ); final ReindexRequest request = new ReindexRequest(); @@ -931,7 +937,8 @@ public ExecutorService executor(String name) { null, mock(TransportService.class), mock(ReindexRelocationNodePicker.class), - featureService + featureService, + mock(TaskResultsService.class) ); final ReindexRequest request = new ReindexRequest(); @@ -1216,7 +1223,8 @@ public ExecutorService executor(String name) { null, mock(TransportService.class), mock(ReindexRelocationNodePicker.class), - featureService + featureService, + mock(TaskResultsService.class) ); BulkByScrollTask task = new BulkByScrollTask( @@ -1355,7 +1363,8 @@ private static Reindexer reindexerWithRelocation(ClusterService clusterService, transportService, mock(ReindexRelocationNodePicker.class), // Will default REINDEX_PIT_SEARCH_FEATURE to false - mock(FeatureService.class) + mock(FeatureService.class), + mock(TaskResultsService.class) ); } diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ResumeInfo.java b/server/src/main/java/org/elasticsearch/index/reindex/ResumeInfo.java index bdb598c16517e..48af7d572840c 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/ResumeInfo.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/ResumeInfo.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.core.Nullable; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskResult; import java.io.IOException; import java.util.Map; @@ -34,11 +35,23 @@ *

* TODO: we can use List instead of Map for since the keys are required to be 0-based and contiguous. */ -public record ResumeInfo(RelocationOrigin relocationOrigin, @Nullable WorkerResumeInfo worker, @Nullable Map slices) - implements - Writeable { +public record ResumeInfo( + RelocationOrigin relocationOrigin, + @Nullable WorkerResumeInfo worker, + @Nullable Map slices, + @Nullable TaskResult sourceTaskResult +) implements Writeable { public ResumeInfo(RelocationOrigin relocationOrigin, @Nullable WorkerResumeInfo worker, @Nullable Map slices) { + this(relocationOrigin, worker, slices, null); + } + + public ResumeInfo( + RelocationOrigin relocationOrigin, + @Nullable WorkerResumeInfo worker, + @Nullable Map slices, + @Nullable TaskResult sourceTaskResult + ) { this.relocationOrigin = Objects.requireNonNull(relocationOrigin, "relocation origin cannot be null"); if (worker == null && (slices == null || slices.size() < 2)) { throw new IllegalArgumentException("resume info requires a worker resume info or at minimum two slices"); @@ -48,13 +61,15 @@ public ResumeInfo(RelocationOrigin relocationOrigin, @Nullable WorkerResumeInfo } this.worker = worker; this.slices = slices != null ? Map.copyOf(slices) : null; + this.sourceTaskResult = sourceTaskResult; } public ResumeInfo(StreamInput in) throws IOException { this( new RelocationOrigin(in), // if serialized, always present in.readOptionalNamedWriteable(WorkerResumeInfo.class), - in.readOptionalImmutableMap(StreamInput::readVInt, SliceStatus::new) + in.readOptionalImmutableMap(StreamInput::readVInt, SliceStatus::new), + in.readOptionalWriteable(TaskResult::new) ); } @@ -63,6 +78,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeWriteable(relocationOrigin); out.writeOptionalNamedWriteable(worker); out.writeOptionalMap(slices, StreamOutput::writeVInt, (o, v) -> v.writeTo(o)); + out.writeOptionalWriteable(sourceTaskResult); } public int getTotalSlices() { diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskResult.java b/server/src/main/java/org/elasticsearch/tasks/TaskResult.java index 185f8dea224b9..fd7fce4b80194 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskResult.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskResult.java @@ -143,6 +143,10 @@ public boolean isCompleted() { return completed; } + public TaskResult withError(Exception newError) throws IOException { + return new TaskResult(completed, task, toXContent(newError), response); + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportEnrichReindexAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportEnrichReindexAction.java index 742915f7f653d..28484dd536468 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportEnrichReindexAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportEnrichReindexAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.reindex.ReindexSslConfig; import org.elasticsearch.reindex.TransportReindexAction; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.tasks.TaskResultsService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.watcher.ResourceWatcherService; @@ -53,7 +54,8 @@ public TransportEnrichReindexAction( TransportService transportService, Environment environment, ResourceWatcherService watcherService, - FeatureService featureService + FeatureService featureService, + TaskResultsService taskResultsService ) { super( EnrichReindexAction.NAME, @@ -71,7 +73,8 @@ public TransportEnrichReindexAction( null, // can't be injected due to different classloaders between enrich and reindex (enrich doesn't extend reindex). ReindexPlugin.getReindexRelocationNodePicker(environment), - featureService + featureService, + taskResultsService ); this.bulkClient = new OriginSettingClient(client, ENRICH_ORIGIN); } From 6e79ba8ef0fd6edb6d9ed603cc12d075937daa99 Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Wed, 1 Apr 2026 16:16:37 -0400 Subject: [PATCH 2/6] unit tests --- .../elasticsearch/reindex/ReindexerTests.java | 126 +++++++++++++++++- 1 file changed, 125 insertions(+), 1 deletion(-) diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java index 4a79af6cf1f4b..52421c38f3af7 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java @@ -62,6 +62,7 @@ import org.elasticsearch.search.SearchResponseUtils; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.tasks.TaskResult; import org.elasticsearch.tasks.TaskResultsService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLog; @@ -83,6 +84,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -373,6 +375,120 @@ public void testListenerWithRelocationsTriggersRelocationWhenResumeInfoPresent() assertThat(exception.getMetadata("es.relocated_task_id"), equalTo(List.of("target-node:123"))); } + public void testListenerWithRelocationsSendsSourceTaskResultInResumeRequest() { + assumeTrue("reindex resilience enabled", ReindexPlugin.REINDEX_RESILIENCE_ENABLED); + final ClusterService clusterService = mock(ClusterService.class); + final ClusterState clusterState = mock(ClusterState.class); + final DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class); + final DiscoveryNode sourceNode = DiscoveryNodeUtils.builder("source-node").build(); + final DiscoveryNode targetNode = DiscoveryNodeUtils.builder("target-node").build(); + when(clusterService.state()).thenReturn(clusterState); + when(clusterService.localNode()).thenReturn(sourceNode); + when(clusterState.nodes()).thenReturn(discoveryNodes); + when(discoveryNodes.get("target-node")).thenReturn(targetNode); + + final TransportService transportService = mock(TransportService.class); + doAnswer(invocation -> { + ResumeBulkByScrollRequest resumeRequest = invocation.getArgument(2); + TaskResult sourceTaskResult = resumeRequest.getDelegate().getResumeInfo().get().sourceTaskResult(); + assertNotNull("source task result should be set on the resume request", sourceTaskResult); + assertThat(sourceTaskResult.getTask().taskId(), equalTo(new TaskId("source-node", 987))); + assertTrue("source task result should be completed", sourceTaskResult.isCompleted()); + + TransportResponseHandler handler = invocation.getArgument(3); + handler.handleResponse(new ResumeBulkByScrollResponse(new TaskId("target-node:123"))); + return null; + }).when(transportService).sendRequest(eq(targetNode), eq(ResumeReindexAction.NAME), any(ResumeBulkByScrollRequest.class), any()); + + final Reindexer reindexer = reindexerWithRelocation(clusterService, transportService); + final BulkByScrollTask task = createTaskWithParentIdAndRelocationEnabled(TaskId.EMPTY_TASK_ID); + task.setWorker(Float.POSITIVE_INFINITY, null); + task.getWorkerState().setNodeToRelocateToSupplier(() -> Optional.of("target-node")); + task.requestRelocation(); + + final PlainActionFuture future = new PlainActionFuture<>(); + final ActionListener wrapped = reindexer.listenerWithRelocations(task, reindexRequest(), future); + wrapped.onResponse(reindexResponseWithResumeInfo()); + + assertTrue(future.isDone()); + verify(transportService).sendRequest(eq(targetNode), eq(ResumeReindexAction.NAME), any(ResumeBulkByScrollRequest.class), any()); + } + + public void testExecuteStoresSourceTaskResult() throws Exception { + assumeTrue("reindex resilience enabled", ReindexPlugin.REINDEX_RESILIENCE_ENABLED); + final TaskId sourceTaskId = new TaskId("source-node", 42); + final ResumeInfo.RelocationOrigin origin = new ResumeInfo.RelocationOrigin(sourceTaskId, System.currentTimeMillis()); + final BulkByScrollTask task = createTaskWithParentIdAndRelocationEnabled(TaskId.EMPTY_TASK_ID); + task.setWorker(Float.POSITIVE_INFINITY, null); + + final TaskResultsService taskResultsService = mock(TaskResultsService.class); + doAnswer(invocation -> { + TaskResult stored = invocation.getArgument(0); + assertThat(stored.getTask().taskId().getNodeId(), equalTo("source-node")); + final Map errorMap = stored.getErrorAsMap(); + assertThat(errorMap.get("type"), equalTo("task_relocated_exception")); + assertThat(errorMap.get("original_task_id"), equalTo(sourceTaskId.toString())); + assertThat(errorMap.get("relocated_task_id"), equalTo("dest-node:" + task.getId())); + invocation.>getArgument(1).onResponse(null); + return null; + }).when(taskResultsService).storeResult(any(TaskResult.class), any()); + + final ClusterService clusterService = mock(ClusterService.class); + when(clusterService.localNode()).thenReturn(DiscoveryNodeUtils.builder("dest-node").build()); + final Reindexer reindexer = reindexerWithRelocation(clusterService, mock(TransportService.class), taskResultsService); + + final TaskResult sourceTaskResult = task.result(DiscoveryNodeUtils.builder("source-node").build(), new TaskRelocatedException()); + final var workerResumeInfo = new ResumeInfo.ScrollWorkerResumeInfo( + "test-scroll-id", + System.currentTimeMillis(), + new BulkByScrollTask.Status(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, timeValueMillis(0), 0f, null, timeValueMillis(0)), + null + ); + final ReindexRequest request = reindexRequest(); + request.setResumeInfo(new ResumeInfo(origin, workerResumeInfo, null, sourceTaskResult)); + + final PlainActionFuture future = new PlainActionFuture<>(); + reindexer.execute(task, request, mock(Client.class), future); + + verify(taskResultsService).storeResult(any(TaskResult.class), any()); + } + + public void testExecuteFailsWhenSourceTaskResultStorageFails() throws Exception { + assumeTrue("reindex resilience enabled", ReindexPlugin.REINDEX_RESILIENCE_ENABLED); + final TaskResultsService taskResultsService = mock(TaskResultsService.class); + final Exception storageFailure = new RuntimeException("simulated .tasks write failure"); + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(1); + listener.onFailure(storageFailure); + return null; + }).when(taskResultsService).storeResult(any(TaskResult.class), any()); + + final TaskId sourceTaskId = new TaskId("source-node", 42); + final ResumeInfo.RelocationOrigin origin = new ResumeInfo.RelocationOrigin(sourceTaskId, System.currentTimeMillis()); + final ClusterService clusterService = mock(ClusterService.class); + when(clusterService.localNode()).thenReturn(DiscoveryNodeUtils.builder("dest-node").build()); + final Reindexer reindexer = reindexerWithRelocation(clusterService, mock(TransportService.class), taskResultsService); + final BulkByScrollTask task = createTaskWithParentIdAndRelocationEnabled(TaskId.EMPTY_TASK_ID); + task.setWorker(Float.POSITIVE_INFINITY, null); + + final TaskResult sourceTaskResult = task.result(DiscoveryNodeUtils.builder("source-node").build(), new TaskRelocatedException()); + final var workerResumeInfo = new ResumeInfo.ScrollWorkerResumeInfo( + "test-scroll-id", + System.currentTimeMillis(), + new BulkByScrollTask.Status(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, timeValueMillis(0), 0f, null, timeValueMillis(0)), + null + ); + final ReindexRequest request = reindexRequest(); + request.setResumeInfo(new ResumeInfo(origin, workerResumeInfo, null, sourceTaskResult)); + + final PlainActionFuture future = new PlainActionFuture<>(); + reindexer.execute(task, request, mock(Client.class), future); + + assertTrue(future.isDone()); + ExecutionException e = expectThrows(ExecutionException.class, future::get); + assertSame(storageFailure, e.getCause()); + } + /** * When the remote version lookup fails in lookupRemoteVersionAndExecute * (e.g. server returns 500), the failure propagates to the listener. @@ -1350,6 +1466,14 @@ private static Reindexer reindexerWithRelocation() { } private static Reindexer reindexerWithRelocation(ClusterService clusterService, TransportService transportService) { + return reindexerWithRelocation(clusterService, transportService, mock(TaskResultsService.class)); + } + + private static Reindexer reindexerWithRelocation( + ClusterService clusterService, + TransportService transportService, + TaskResultsService taskResultsService + ) { final ThreadPool threadPool = mock(ThreadPool.class); when(threadPool.generic()).thenReturn(mock(ExecutorService.class)); return new Reindexer( @@ -1364,7 +1488,7 @@ private static Reindexer reindexerWithRelocation(ClusterService clusterService, mock(ReindexRelocationNodePicker.class), // Will default REINDEX_PIT_SEARCH_FEATURE to false mock(FeatureService.class), - mock(TaskResultsService.class) + taskResultsService ); } From 744bb52e72fc284feae1ae50cc6c3625afbc3fbf Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Wed, 1 Apr 2026 18:05:40 -0400 Subject: [PATCH 3/6] Add IT --- .../management/ReindexRelocationIT.java | 118 +++++++++++++++++- 1 file changed, 117 insertions(+), 1 deletion(-) diff --git a/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexRelocationIT.java b/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexRelocationIT.java index a642c74dafe05..c82eb5243546d 100644 --- a/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexRelocationIT.java +++ b/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexRelocationIT.java @@ -10,8 +10,16 @@ package org.elasticsearch.reindex.management; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.support.ActionFilter; +import org.elasticsearch.action.support.ActionFilterChain; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; @@ -25,7 +33,9 @@ import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.index.reindex.BulkByScrollTask; import org.elasticsearch.index.reindex.ReindexAction; +import org.elasticsearch.node.Node; import org.elasticsearch.node.ShutdownPrepareService; +import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.reindex.ReindexMetrics; @@ -34,6 +44,7 @@ import org.elasticsearch.reindex.TransportReindexAction; import org.elasticsearch.rest.root.MainRestPlugin; import org.elasticsearch.tasks.RawTaskStatus; +import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.tasks.TaskResult; @@ -97,7 +108,13 @@ public class ReindexRelocationIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(ReindexPlugin.class, ReindexManagementPlugin.class, MainRestPlugin.class, TestTelemetryPlugin.class); + return Arrays.asList( + ReindexPlugin.class, + ReindexManagementPlugin.class, + MainRestPlugin.class, + TestTelemetryPlugin.class, + BlockTasksWritePlugin.class + ); } @Override @@ -230,6 +247,105 @@ private void testReindexRelocation( assertExpectedNumberOfDocumentsInDestinationIndex(); } + /** + * Verifies that the destination node writes the source task result to {@code .tasks} during relocation, so the chain link is preserved + * even when the source node cannot write. The test uses {@link BlockTasksWritePlugin} to block all {@code .tasks} writes on the source + * node, so only the destination's write (in {@code Reindexer.storeRelocationSourceTaskResult}) succeeds. + */ + public void testDestinationWritesSourceTaskResultToTasksIndex() throws Exception { + assumeTrue("reindex resilience is enabled", ReindexPlugin.REINDEX_RESILIENCE_ENABLED); + final int shards = randomIntBetween(1, 5); + + final String nodeAName = internalCluster().startNode( + NodeRoles.onlyRoles(Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)) + ); + final String nodeAId = nodeIdByName(nodeAName); + final String nodeBName = internalCluster().startNode( + NodeRoles.onlyRoles(Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)) + ); + ensureStableCluster(2); + + createIndexPinnedToNodeName(SOURCE_INDEX, nodeAName, shards); + createIndexPinnedToNodeName(DEST_INDEX, nodeAName, shards); + indexRandom(true, SOURCE_INDEX, numberOfDocumentsThatTakes60SecondsToIngest); + ensureGreen(SOURCE_INDEX, DEST_INDEX); + + final TaskId originalTaskId = startAsyncThrottledLocalReindexOnNode(nodeBName, 1); + assertBusy(() -> { + final TaskResult running = getRunningReindex(originalTaskId); + assertThat(running.getTask().taskId().getNodeId(), equalTo(nodeIdByName(nodeBName))); + }); + + assertFalse(".tasks index should not exist before shutdown", indexExists(TaskResultsService.TASK_INDEX)); + + // Block .tasks writes on the source node so only the destination's write can succeed. + BlockTasksWritePlugin.blockedNodeName = nodeBName; + try { + shutdownNodeNameAndRelocate(nodeBName); + + final TaskId relocatedTaskId = assertOriginalTaskEndStateInTasksIndexAndGetRelocatedTaskId( + originalTaskId, + nodeAId, + localReindexDescription(), + 1, + shards + ); + + unthrottleReindex(relocatedTaskId); + assertRelocatedTaskExpectedEndState(relocatedTaskId, localReindexDescription(), 1, shards); + assertExpectedNumberOfDocumentsInDestinationIndex(); + } finally { + BlockTasksWritePlugin.blockedNodeName = null; + } + } + + /** + * Test plugin that blocks {@code .tasks} index writes on a specific node. + * Used to verify the destination writes the source task result during relocation. + */ + public static class BlockTasksWritePlugin extends Plugin implements ActionPlugin { + static volatile String blockedNodeName = null; + private volatile String myNodeName; + + @Override + public Collection createComponents(PluginServices services) { + myNodeName = Node.NODE_NAME_SETTING.get(services.environment().settings()); + return List.of(); + } + + @Override + public List getActionFilters() { + return List.of(new ActionFilter() { + @Override + public int order() { + return Integer.MIN_VALUE; + } + + @Override + public void apply( + Task task, + String action, + Request request, + ActionListener listener, + ActionFilterChain chain + ) { + if (myNodeName != null && myNodeName.equals(blockedNodeName) && isTasksIndexWrite(action, request)) { + listener.onFailure(new ElasticsearchException("blocked .tasks write on [" + myNodeName + "] for testing")); + return; + } + chain.proceed(task, action, request, listener); + } + + private boolean isTasksIndexWrite(String action, ActionRequest request) { + if (action.equals(TransportBulkAction.NAME) && request instanceof BulkRequest bulkRequest) { + return bulkRequest.requests().stream().anyMatch(r -> TaskResultsService.TASK_INDEX.equals(r.index())); + } + return false; + } + }); + } + } + private void shutdownNodeNameAndRelocate(final String nodeName) throws Exception { // testing assumption: .tasks should not exist yet — it's created when the task result is stored during relocation assertFalse(".tasks index should not exist before shutdown", indexExists(TaskResultsService.TASK_INDEX)); From c30c6b05e8f58509d65bb53718e66f65b25c5da1 Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Tue, 7 Apr 2026 12:27:55 -0400 Subject: [PATCH 4/6] address comment --- .../java/org/elasticsearch/reindex/Reindexer.java | 2 +- .../reindex/TaskRelocatedException.java | 14 +++++--------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java index 09e9010f855b2..611f4d44265c6 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java @@ -712,7 +712,7 @@ ActionListener listenerWithRelocations( ); final ResumeBulkByScrollRequest resumeRequest = new ResumeBulkByScrollRequest(request); final ActionListener relocationListener = ActionListener.wrap(resp -> { - l.onFailure(new TaskRelocatedException(new TaskId(clusterService.localNode().getId(), task.getId()), resp.getTaskId())); + l.onFailure(new TaskRelocatedException(resumeInfo.relocationOrigin().originalTaskId(), resp.getTaskId())); }, l::onFailure); transportService.sendRequest( nodeToRelocateToNode, diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/TaskRelocatedException.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/TaskRelocatedException.java index 166119a9005a3..ead932b8f0281 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/TaskRelocatedException.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/TaskRelocatedException.java @@ -30,7 +30,10 @@ public TaskRelocatedException() { public TaskRelocatedException(TaskId originalTaskId, TaskId relocatedTaskId) { super("Task was relocated"); - setOriginalAndRelocatedTaskIdMetadata(originalTaskId, relocatedTaskId); + assert originalTaskId.isSet() : "original task ID is not set"; + assert relocatedTaskId.isSet() : "relocated task ID is not set"; + this.addMetadata(ORIGINAL_TASK_ID_METADATA_KEY, originalTaskId.toString()); + this.addMetadata(RELOCATED_TASK_ID_METADATA_KEY, relocatedTaskId.toString()); } /** Returns the relocated task ID if the map is a serialized {@link TaskRelocatedException}. */ @@ -38,19 +41,12 @@ public static Optional relocatedTaskIdFromErrorMap(final Map getRelocatedTaskId() { final List relocatedTaskIds = this.getMetadata(RELOCATED_TASK_ID_METADATA_KEY); assert relocatedTaskIds == null || relocatedTaskIds.size() == 1 : "either not present or one value"; From ef73b03eb1165f81b81aea5772a5c3dfbefb6fa4 Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Tue, 7 Apr 2026 12:34:02 -0400 Subject: [PATCH 5/6] merge conflict --- .../java/org/elasticsearch/reindex/ReindexerTests.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java index e55673c6f2959..be87846461238 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java @@ -719,7 +719,8 @@ public ExecutorService executor(String name) { null, mock(TransportService.class), mock(ReindexRelocationNodePicker.class), - featureService + featureService, + mock(TaskResultsService.class) ); // Simulate a worker request: PIT already set by leader, slice info from leader @@ -1175,7 +1176,8 @@ public ExecutorService executor(String name) { null, mock(TransportService.class), mock(ReindexRelocationNodePicker.class), - featureService + featureService, + mock(TaskResultsService.class) ); final String projectRouting = "_alias:linked"; @@ -1250,7 +1252,8 @@ public ExecutorService executor(String name) { null, mock(TransportService.class), mock(ReindexRelocationNodePicker.class), - featureService + featureService, + mock(TaskResultsService.class) ); // Simulate a worker request: PIT already set by leader, slice info from leader From 982522624f95218086574f96d8b39f02c23b2360 Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Tue, 7 Apr 2026 13:25:01 -0400 Subject: [PATCH 6/6] fix tests --- .../elasticsearch/reindex/ReindexerTests.java | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java index be87846461238..32023b5d0dbe7 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java @@ -368,11 +368,11 @@ public void testListenerWithRelocationsTriggersRelocationWhenResumeInfoPresent() task.getWorkerState().setNodeToRelocateToSupplier(() -> Optional.of("target-node")); task.requestRelocation(); + final ResumeInfo.RelocationOrigin origin = new ResumeInfo.RelocationOrigin(new TaskId("source-node", 987), randomNonNegativeLong()); final PlainActionFuture future = new PlainActionFuture<>(); final ActionListener wrapped = reindexer.listenerWithRelocations(task, reindexRequest(), future); - final BulkByScrollResponse response = reindexResponseWithResumeInfo(); - wrapped.onResponse(response); + wrapped.onResponse(reindexResponseWithResumeInfo(origin)); assertTrue(future.isDone()); TaskRelocatedException exception = expectThrows(TaskRelocatedException.class, future::actionGet); @@ -1824,6 +1824,10 @@ private BulkByScrollResponse reindexResponseWithPitId(BytesReference pitId) { } private BulkByScrollResponse reindexResponseWithResumeInfo() { + return reindexResponseWithResumeInfo(randomOrigin()); + } + + private BulkByScrollResponse reindexResponseWithResumeInfo(ResumeInfo.RelocationOrigin origin) { final var workerResumeInfo = new ResumeInfo.ScrollWorkerResumeInfo( "test-scroll-id", System.nanoTime(), @@ -1836,7 +1840,7 @@ private BulkByScrollResponse reindexResponseWithResumeInfo() { List.of(), List.of(), false, - new ResumeInfo(randomOrigin(), workerResumeInfo, null) + new ResumeInfo(origin, workerResumeInfo, null) ); } @@ -1941,10 +1945,14 @@ private static ReindexRequest reindexRequest() { } private static ResumeInfo.RelocationOrigin randomOrigin() { - return new ResumeInfo.RelocationOrigin(randomTaskId(), randomNonNegativeLong()); + return new ResumeInfo.RelocationOrigin(randomRealTaskId(), randomNonNegativeLong()); } private static TaskId randomTaskId() { - return randomBoolean() ? TaskId.EMPTY_TASK_ID : new TaskId(randomAlphaOfLength(10), randomNonNegativeLong()); + return randomBoolean() ? TaskId.EMPTY_TASK_ID : randomRealTaskId(); + } + + private static TaskId randomRealTaskId() { + return new TaskId(randomAlphaOfLength(10), randomNonNegativeLong()); } }