diff --git a/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexRelocationIT.java b/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexRelocationIT.java index bbdbaead35731..2841599365710 100644 --- a/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexRelocationIT.java +++ b/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexRelocationIT.java @@ -14,10 +14,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.action.support.ActionFilterChain; import org.elasticsearch.client.Request; @@ -26,8 +28,10 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.common.CheckedBiFunction; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.network.InetAddresses; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.http.HttpServerTransport; @@ -59,16 +63,20 @@ import org.elasticsearch.xcontent.XContentParserConfiguration; import org.elasticsearch.xcontent.XContentType; import org.hamcrest.Matcher; +import org.junit.Before; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; @@ -107,6 +115,11 @@ public class ReindexRelocationIT extends ESIntegTestCase { private final int requestsPerSecond = randomIntBetween(bulkSize, 20); private final int numberOfDocumentsThatTakes60SecondsToIngest = 60 * requestsPerSecond; + @Before + public void resetPlugin() { + BlockTasksWritePlugin.reset(); + } + @Override protected Collection> nodePlugins() { return Arrays.asList( @@ -248,69 +261,211 @@ private void testReindexRelocation( assertExpectedNumberOfDocumentsInDestinationIndex(); } + /** + * Forces the destination to write to .tasks first (by deferring the source's write), then releases the source. + * The source's CREATE actually executes but is a no-op since the document already exists. + */ + public void testTasksIndexDestinationWritesFirstThenSourceIsNoOp() throws Exception { + assumeTrue("reindex resilience is enabled", ReindexPlugin.REINDEX_RESILIENCE_ENABLED); + final int shards = randomIntBetween(1, 5); + final var expectedDescription = localReindexDescription(); + + final String destNodeName = internalCluster().startNode( + NodeRoles.onlyRoles(Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)) + ); + final String destNodeId = nodeIdByName(destNodeName); + final String sourceNodeName = internalCluster().startNode( + NodeRoles.onlyRoles(Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)) + ); + ensureStableCluster(2); + + createIndexPinnedToNodeName(SOURCE_INDEX, destNodeName, shards); + createIndexPinnedToNodeName(DEST_INDEX, destNodeName, shards); + indexRandom(true, SOURCE_INDEX, numberOfDocumentsThatTakes60SecondsToIngest); + ensureGreen(SOURCE_INDEX, DEST_INDEX); + + final TaskId originalTaskId = startAsyncThrottledLocalReindexOnNode(sourceNodeName, 1); + + // Defer source writes so the destination writes first. + // prepareForShutdown blocks until the task completes, but the source's deferred write keeps it waiting + // use a background thread so the test can continue + final CountDownLatch sourceWriteLatch = BlockTasksWritePlugin.deferWritesOnNode(sourceNodeName); + final Thread shutdownThread = new Thread(() -> { + try { + internalCluster().getInstance(ShutdownPrepareService.class, sourceNodeName).prepareForShutdown(); + } catch (Exception e) { + // expected — node may be stopped while waiting + } + }); + shutdownThread.setDaemon(true); + shutdownThread.start(); + + // Wait for the destination's write (not deferred) to create .tasks. + ensureGreen(TaskResultsService.TASK_INDEX); + assertThat("version is 1 after destination write", getTasksDocument(originalTaskId).getVersion(), is(1L)); + + // Release the source's deferred CREATE + sourceWriteLatch.countDown(); + + final TaskId relocatedTaskId = assertOriginalTaskEndStateInTasksIndexAndGetRelocatedTaskId( + originalTaskId, + destNodeId, + expectedDescription, + 1, + shards + ); + unthrottleReindex(relocatedTaskId); + assertRelocatedTaskExpectedEndState(relocatedTaskId, expectedDescription, 1, shards); + assertExpectedNumberOfDocumentsInDestinationIndex(); + + assertThat("version stays 1 — source CREATE is a no-op", getTasksDocument(originalTaskId).getVersion(), is(1L)); + } + + /** + * Forces the source to write first by deferring the destination's write. The destination's deferred INDEX then overwrites + * the source's document, bumping the version to 2. + */ + public void testTasksIndexSourceWritesFirstThenDestinationOverwrites() throws Exception { + assumeTrue("reindex resilience is enabled", ReindexPlugin.REINDEX_RESILIENCE_ENABLED); + final int shards = randomIntBetween(1, 5); + final var expectedDescription = localReindexDescription(); + + final String destNodeName = internalCluster().startNode( + NodeRoles.onlyRoles(Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)) + ); + final String destNodeId = nodeIdByName(destNodeName); + final String sourceNodeName = internalCluster().startNode( + NodeRoles.onlyRoles(Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)) + ); + ensureStableCluster(2); + + createIndexPinnedToNodeName(SOURCE_INDEX, destNodeName, shards); + createIndexPinnedToNodeName(DEST_INDEX, destNodeName, shards); + indexRandom(true, SOURCE_INDEX, numberOfDocumentsThatTakes60SecondsToIngest); + ensureGreen(SOURCE_INDEX, DEST_INDEX); + + final TaskId originalTaskId = startAsyncThrottledLocalReindexOnNode(sourceNodeName, 1); + + // Defer the destination's write so the source writes first. + final CountDownLatch destWriteLatch = BlockTasksWritePlugin.deferWritesOnNode(destNodeName); + shutdownNodeNameAndRelocate(sourceNodeName); + + final GetResponse afterSourceWrite = getTasksDocument(originalTaskId); + assertThat("version is 1 after source write", afterSourceWrite.getVersion(), is(1L)); + assertSourceAndDestinationStoreEquivalentResults(afterSourceWrite, BlockTasksWritePlugin.capturedDocumentSource(sourceNodeName)); + + // Release the destination's deferred INDEX — it overwrites the source's document. + destWriteLatch.countDown(); + assertBusy(() -> assertThat("version is 2 — destination overwrote source", getTasksDocument(originalTaskId).getVersion(), is(2L))); + + final TaskId relocatedTaskId = assertOriginalTaskEndStateInTasksIndexAndGetRelocatedTaskId( + originalTaskId, + destNodeId, + expectedDescription, + 1, + shards + ); + unthrottleReindex(relocatedTaskId); + assertRelocatedTaskExpectedEndState(relocatedTaskId, expectedDescription, 1, shards); + assertExpectedNumberOfDocumentsInDestinationIndex(); + } + /** * Verifies that the destination node writes the source task result to {@code .tasks} during relocation, so the chain link is preserved * even when the source node cannot write. The test uses {@link BlockTasksWritePlugin} to block all {@code .tasks} writes on the source * node, so only the destination's write (in {@code Reindexer.storeRelocationSourceTaskResult}) succeeds. */ - public void testDestinationWritesSourceTaskResultToTasksIndex() throws Exception { + public void testTasksIndexDestinationWrites() throws Exception { assumeTrue("reindex resilience is enabled", ReindexPlugin.REINDEX_RESILIENCE_ENABLED); final int shards = randomIntBetween(1, 5); + final var expectedDescription = localReindexDescription(); - final String nodeAName = internalCluster().startNode( + final String destNodeName = internalCluster().startNode( NodeRoles.onlyRoles(Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)) ); - final String nodeAId = nodeIdByName(nodeAName); - final String nodeBName = internalCluster().startNode( + final String destNodeId = nodeIdByName(destNodeName); + final String sourceNodeName = internalCluster().startNode( NodeRoles.onlyRoles(Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)) ); ensureStableCluster(2); - createIndexPinnedToNodeName(SOURCE_INDEX, nodeAName, shards); - createIndexPinnedToNodeName(DEST_INDEX, nodeAName, shards); + createIndexPinnedToNodeName(SOURCE_INDEX, destNodeName, shards); + createIndexPinnedToNodeName(DEST_INDEX, destNodeName, shards); indexRandom(true, SOURCE_INDEX, numberOfDocumentsThatTakes60SecondsToIngest); ensureGreen(SOURCE_INDEX, DEST_INDEX); - final TaskId originalTaskId = startAsyncThrottledLocalReindexOnNode(nodeBName, 1); - assertBusy(() -> { - final TaskResult running = getRunningReindex(originalTaskId); - assertThat(running.getTask().taskId().getNodeId(), equalTo(nodeIdByName(nodeBName))); - }); - - assertFalse(".tasks index should not exist before shutdown", indexExists(TaskResultsService.TASK_INDEX)); + final TaskId originalTaskId = startAsyncThrottledLocalReindexOnNode(sourceNodeName, 1); + final TaskResult running = getRunningReindex(originalTaskId); + assertThat(running.getTask().taskId().getNodeId(), equalTo(nodeIdByName(sourceNodeName))); // Block .tasks writes on the source node so only the destination's write can succeed. - BlockTasksWritePlugin.blockedNodeName = nodeBName; - try { - shutdownNodeNameAndRelocate(nodeBName); - - final TaskId relocatedTaskId = assertOriginalTaskEndStateInTasksIndexAndGetRelocatedTaskId( - originalTaskId, - nodeAId, - localReindexDescription(), - 1, - shards - ); - - unthrottleReindex(relocatedTaskId); - assertRelocatedTaskExpectedEndState(relocatedTaskId, localReindexDescription(), 1, shards); - assertExpectedNumberOfDocumentsInDestinationIndex(); - } finally { - BlockTasksWritePlugin.blockedNodeName = null; - } + BlockTasksWritePlugin.blockWritesOnNode(sourceNodeName); + + shutdownNodeNameAndRelocate(sourceNodeName); + + final TaskId relocatedTaskId = assertOriginalTaskEndStateInTasksIndexAndGetRelocatedTaskId( + originalTaskId, + destNodeId, + expectedDescription, + 1, + shards + ); + + unthrottleReindex(relocatedTaskId); + assertRelocatedTaskExpectedEndState(relocatedTaskId, expectedDescription, 1, shards); + assertExpectedNumberOfDocumentsInDestinationIndex(); + + // Verify the document was written exactly once (by the destination) with correct content. + final GetResponse doc = getTasksDocument(originalTaskId); + assertThat("document should be written exactly once", doc.getVersion(), is(1L)); + assertTasksDocumentIsRelocatedException(doc, destNodeId); + + // Verify source and destination would store equivalent results. + assertSourceAndDestinationStoreEquivalentResults(doc, BlockTasksWritePlugin.capturedDocumentSource(sourceNodeName)); } /** - * Test plugin that blocks {@code .tasks} index writes on a specific node. - * Used to verify the destination writes the source task result during relocation. + * Test plugin that can block or defer {@code .tasks} index writes on specific nodes and captures the document body. + *

+ * {@code blockedNodeName}: writes are rejected (failed) on this node. + * {@code deferredNodeName} + {@code deferLatch}: writes are deferred on this node — the filter returns immediately (unblocking the + * calling thread) and a background thread waits for the latch before proceeding. These two mechanisms are independent. */ public static class BlockTasksWritePlugin extends Plugin implements ActionPlugin { - static volatile String blockedNodeName = null; - private volatile String myNodeName; + private static volatile String blockedNodeName = null; + private static volatile String deferredNodeName = null; + private static volatile CountDownLatch deferLatch = null; + private static final ConcurrentHashMap capturedDocumentsByNode = new ConcurrentHashMap<>(); + private String myNodeName; + + /** Reject all {@code .tasks} writes on the given node. */ + public static void blockWritesOnNode(String nodeName) { + blockedNodeName = nodeName; + } + + /** Defer {@code .tasks} writes on the given node until the returned latch is released. */ + public static CountDownLatch deferWritesOnNode(String nodeName) { + deferredNodeName = nodeName; + deferLatch = new CountDownLatch(1); + return deferLatch; + } + + /** Returns the document source captured from the given node's intercepted write, or null. */ + public static BytesReference capturedDocumentSource(String nodeName) { + return capturedDocumentsByNode.get(nodeName); + } + + public static void reset() { + blockedNodeName = null; + deferredNodeName = null; + deferLatch = null; + capturedDocumentsByNode.clear(); + } @Override public Collection createComponents(PluginServices services) { myNodeName = Node.NODE_NAME_SETTING.get(services.environment().settings()); + assertNotNull(myNodeName); return List.of(); } @@ -330,9 +485,23 @@ public void app ActionListener listener, ActionFilterChain chain ) { - if (myNodeName != null && myNodeName.equals(blockedNodeName) && isTasksIndexWrite(action, request)) { - listener.onFailure(new ElasticsearchException("blocked .tasks write on [" + myNodeName + "] for testing")); - return; + if (isTasksIndexWrite(action, request)) { + captureDocumentSource((BulkRequest) request); + if (myNodeName.equals(blockedNodeName)) { + listener.onFailure(new ElasticsearchException("blocked .tasks write on [" + myNodeName + "] for testing")); + return; + } + final CountDownLatch latch = deferLatch; + if (latch != null && myNodeName.equals(deferredNodeName)) { + // fork to unblock the calling thread, also simulates async index write + final Thread deferThread = new Thread(() -> { + safeAwait(latch); + chain.proceed(task, action, request, listener); + }, "deferred-.tasks-write"); + deferThread.setDaemon(true); + deferThread.start(); + return; + } } chain.proceed(task, action, request, listener); } @@ -343,10 +512,49 @@ private boolean isTasksIndexWrite(String action, ActionRequest request) { } return false; } + + private void captureDocumentSource(BulkRequest bulkRequest) { + for (DocWriteRequest item : bulkRequest.requests()) { + if (TaskResultsService.TASK_INDEX.equals(item.index()) && item instanceof IndexRequest indexRequest) { + capturedDocumentsByNode.put(myNodeName, indexRequest.source()); + } + } + } }); } } + private GetResponse getTasksDocument(TaskId taskId) { + ensureYellowAndNoInitializingShards(TaskResultsService.TASK_INDEX); + assertNoFailures(indicesAdmin().prepareRefresh(TaskResultsService.TASK_INDEX).get()); + final GetResponse response = client().prepareGet(TaskResultsService.TASK_INDEX, taskId.toString()).get(); + assertThat("task exists in .tasks index", response.isExists(), is(true)); + return response; + } + + private static void assertTasksDocumentIsRelocatedException(GetResponse doc, String expectedDestNodeId) { + final Map source = doc.getSourceAsMap(); + assertThat(source.get("completed"), is(true)); + @SuppressWarnings("unchecked") + final Map error = (Map) source.get("error"); + assertThat(error.get("type"), equalTo("task_relocated_exception")); + assertThat((String) error.get("relocated_task_id"), startsWith(expectedDestNodeId)); + } + + private static void assertSourceAndDestinationStoreEquivalentResults(GetResponse storedDoc, BytesReference capturedSource) { + assertNotNull("source's write should have been captured", capturedSource); + final Map storedByDestination = new HashMap<>(storedDoc.getSourceAsMap()); + final Map attemptedBySource = new HashMap<>(XContentHelper.convertToMap(capturedSource, false).v2()); + removeRunningTimeInNanos(storedByDestination); + removeRunningTimeInNanos(attemptedBySource); + assertThat("destination stores same result as source would have", storedByDestination, equalTo(attemptedBySource)); + } + + @SuppressWarnings("unchecked") + private static void removeRunningTimeInNanos(Map taskResultMap) { + ((Map) taskResultMap.get("task")).remove("running_time_in_nanos"); + } + private void shutdownNodeNameAndRelocate(final String nodeName) throws Exception { // testing assumption: .tasks should not exist yet — it's created when the task result is stored during relocation assertFalse(".tasks index should not exist before shutdown", indexExists(TaskResultsService.TASK_INDEX)); diff --git a/modules/reindex-management/src/test/java/org/elasticsearch/reindex/management/TransportGetReindexActionTests.java b/modules/reindex-management/src/test/java/org/elasticsearch/reindex/management/TransportGetReindexActionTests.java index aa71abf35982d..53f02ed0d7bb2 100644 --- a/modules/reindex-management/src/test/java/org/elasticsearch/reindex/management/TransportGetReindexActionTests.java +++ b/modules/reindex-management/src/test/java/org/elasticsearch/reindex/management/TransportGetReindexActionTests.java @@ -336,7 +336,7 @@ public void testFollowsTwoRelocations() throws IOException { final var firstRelocation = new TaskRelocatedException(originalTaskId, firstRelocatedTaskId); - final var secondRelocation = new TaskRelocatedException(firstRelocatedTaskId, secondRelocatedTaskId); + final var secondRelocation = new TaskRelocatedException(originalTaskId, secondRelocatedTaskId); final TaskInfo originalInfo = createTaskInfo(originalTaskId, ReindexAction.NAME); final TaskResult originalResult = new TaskResult(originalInfo, (Exception) firstRelocation); diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java index 1dc3b3f3ff4c4..35eaf142a1180 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java @@ -171,6 +171,7 @@ public void initTask(BulkByScrollTask task, ReindexRequest request, ActionListen public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkClient, ActionListener listener) { final ResumeInfo resumeInfo = request.getResumeInfo().orElse(null); if (resumeInfo != null && resumeInfo.sourceTaskResult() != null) { + // source task result should be present for top-level tasks only (e.g. leader or non-sliced worker) storeRelocationSourceTaskResult( task, resumeInfo, @@ -187,6 +188,8 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl * to store its task result. For sliced reindex tasks, only the leader will store the source task result. */ private void storeRelocationSourceTaskResult(BulkByScrollTask task, ResumeInfo resumeInfo, ActionListener listener) { + assert task.isLeader() || (task.isWorker() && task.getParentTaskId().isSet() == false) + : "Only top level source task result should be stored, result for sliced workers should not be stored"; final var relocatedException = new TaskRelocatedException( resumeInfo.relocationOrigin().originalTaskId(), new TaskId(clusterService.localNode().getId(), task.getId()) @@ -726,6 +729,7 @@ ActionListener listenerWithRelocations( onRelocationResponseListener.onFailure(e); l.onFailure(e); }); + task.setRelocationHandoffInitiated(); transportService.sendRequest( nodeToRelocateToNode, ResumeReindexAction.NAME, diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java index fa5660d5d2e75..4f80fc7a151cc 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java @@ -513,7 +513,8 @@ public void testExecuteStoresSourceTaskResult() throws Exception { final TaskResultsService taskResultsService = mock(TaskResultsService.class); doAnswer(invocation -> { TaskResult stored = invocation.getArgument(0); - assertThat(stored.getTask().taskId().getNodeId(), equalTo("source-node")); + assertThat(stored.getTask().taskId(), equalTo(new TaskId("source-node", task.getId()))); + assertThat(stored.getTask().action(), equalTo("test_action")); final Map errorMap = stored.getErrorAsMap(); assertThat(errorMap.get("type"), equalTo("task_relocated_exception")); assertThat(errorMap.get("original_task_id"), equalTo(sourceTaskId.toString())); @@ -542,6 +543,38 @@ public void testExecuteStoresSourceTaskResult() throws Exception { verify(taskResultsService).storeResult(any(TaskResult.class), any()); } + public void testRelocationsSetsHandoffFlag() { + assumeTrue("reindex resilience enabled", ReindexPlugin.REINDEX_RESILIENCE_ENABLED); + final ClusterService clusterService = mock(ClusterService.class); + final ClusterState clusterState = mock(ClusterState.class); + final DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class); + final DiscoveryNode sourceNode = DiscoveryNodeUtils.builder("source-node").build(); + final DiscoveryNode targetNode = DiscoveryNodeUtils.builder("target-node").build(); + when(clusterService.state()).thenReturn(clusterState); + when(clusterService.localNode()).thenReturn(sourceNode); + when(clusterState.nodes()).thenReturn(discoveryNodes); + when(discoveryNodes.get("target-node")).thenReturn(targetNode); + + final Reindexer reindexer = reindexerWithRelocation(clusterService, mock(TransportService.class)); + final BulkByScrollTask task = createTaskWithParentIdAndRelocationEnabled(TaskId.EMPTY_TASK_ID); + task.setWorker(Float.POSITIVE_INFINITY, null); + task.getWorkerState().setNodeToRelocateToSupplier(() -> Optional.of("target-node")); + task.requestRelocation(); + + assertFalse("handoff flag should not be set before relocation", task.useCreateSemanticsForResultStorage()); + + final PlainActionFuture future = new PlainActionFuture<>(); + final ActionListener wrapped = reindexer.listenerWithRelocations( + task, + reindexRequest(), + ActionListener.noop(), + future + ); + wrapped.onResponse(reindexResponseWithResumeInfo()); + + assertTrue("handoff flag should be set after relocation", task.useCreateSemanticsForResultStorage()); + } + public void testExecuteFailsWhenSourceTaskResultStorageFails() throws Exception { assumeTrue("reindex resilience enabled", ReindexPlugin.REINDEX_RESILIENCE_ENABLED); final TaskResultsService taskResultsService = mock(TaskResultsService.class); diff --git a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java index 30ca9341d0e74..c6bb91a9a5b3b 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java @@ -62,6 +62,7 @@ public class BulkByScrollTask extends CancellableTask { private volatile LeaderBulkByScrollTaskState leaderState; private volatile WorkerBulkByScrollTaskState workerState; private volatile boolean relocationRequested = false; + private volatile boolean relocationHandoffInitiated = false; public BulkByScrollTask( TaskId taskId, @@ -228,6 +229,22 @@ public boolean isRelocationRequested() { return relocationRequested; } + /** + * Marks that this task has initiated a relocation handoff (the resume request has been sent to the destination). + */ + public void setRelocationHandoffInitiated() { + this.relocationHandoffInitiated = true; + } + + /** + * If relocation handoff has started, the destination may have already stored the task result, so the source should + * use create-if-absent semantics to avoid overwriting it. + */ + @Override + public boolean useCreateSemanticsForResultStorage() { + return relocationHandoffInitiated; + } + /** Returns the relocation origin if this task is a relocated continuation. */ public ResumeInfo.RelocationOrigin relocationOrigin() { return relocationOrigin; diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ResumeInfo.java b/server/src/main/java/org/elasticsearch/index/reindex/ResumeInfo.java index fec60cb5a0703..afbfb0fc3737a 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/ResumeInfo.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/ResumeInfo.java @@ -29,12 +29,15 @@ * which keeps the state for a single worker task, or a map of SliceResumeInfo which keeps the state for each slice of a leader task. * It also has information about the original task that was relocated, so the user-facing taskID and start time are preserved in listings. * But the RelocationOrigin isn't accurate for sliced tasks, they have themselves as the origin, but for listing the leader is correct. + * SourceTaskResult contains the result of the source task, and it is passed through relocation. The destination task persists this result + * in .tasks to ensure the relocation chain is maintained even if the source task fails to store its result. This prevents the + * destination task from becoming orphaned and unreachable through the relocation chain. *

* Note: For sliced tasks, resume info must include all slices, including those that are already completed. This ensures that the final * task has a complete result from all slices. A task may be resumed multiple times, so information for completed slices must be carried - * forward to each subsequent resume until the task is fully completed. + * forward to each subsequent resume until the task is fully completed. SourceTaskResult should not be present for sliced workers. *

- * TODO: we can use List instead of Map for since the keys are required to be 0-based and contiguous. + * TODO: we can use List instead of Map for slices since the keys are required to be 0-based and contiguous. */ public record ResumeInfo( RelocationOrigin relocationOrigin, diff --git a/server/src/main/java/org/elasticsearch/tasks/Task.java b/server/src/main/java/org/elasticsearch/tasks/Task.java index 6e5e383d13256..a5e13b760327a 100644 --- a/server/src/main/java/org/elasticsearch/tasks/Task.java +++ b/server/src/main/java/org/elasticsearch/tasks/Task.java @@ -281,6 +281,14 @@ public Map headers() { return headers; } + /** + * Whether result storage should use create-if-absent semantics instead of unconditional overwrite. Subclasses override this to + * prevent overwriting a result that was already stored. + */ + public boolean useCreateSemanticsForResultStorage() { + return false; + } + public TaskResult result(DiscoveryNode node, Exception error) throws IOException { return new TaskResult(taskInfo(node.getId(), true), error); } diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java index e21cdc5843f6e..f70c03a5a07c2 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java @@ -416,7 +416,7 @@ public void storeResult(Task task, Exception e listener.onFailure(ex); return; } - taskResultsService.storeResult(taskResult, new ActionListener() { + storeTaskResult(task, taskResult, new ActionListener() { @Override public void onResponse(Void aVoid) { listener.onFailure(error); @@ -450,7 +450,7 @@ public void storeResult(Task task, Response re return; } - taskResultsService.storeResult(taskResult, new ActionListener() { + storeTaskResult(task, taskResult, new ActionListener() { @Override public void onResponse(Void aVoid) { listener.onResponse(response); @@ -464,6 +464,14 @@ public void onFailure(Exception e) { }); } + private void storeTaskResult(Task task, TaskResult taskResult, ActionListener listener) { + if (task.useCreateSemanticsForResultStorage()) { + taskResultsService.storeResultIfAbsent(taskResult, listener); + } else { + taskResultsService.storeResult(taskResult, listener); + } + } + /** * Returns the list of currently running tasks on the node */ diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskResult.java b/server/src/main/java/org/elasticsearch/tasks/TaskResult.java index fd7fce4b80194..7c13f7c72f0a6 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskResult.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskResult.java @@ -70,6 +70,9 @@ public TaskResult(TaskInfo task, ToXContent response) throws IOException { } public TaskResult(boolean completed, TaskInfo task, @Nullable BytesReference error, @Nullable BytesReference result) { + if (error != null && result != null) { + throw new IllegalArgumentException("TaskResult cannot have both a non-null error and a non-null result"); + } this.completed = completed; this.task = requireNonNull(task, "task is required"); this.error = error; diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java b/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java index cafad960c168f..0a17560ecd8e8 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java @@ -11,8 +11,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.client.internal.Client; @@ -24,6 +26,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.threadpool.ThreadPool; @@ -80,6 +83,28 @@ public TaskResultsService(Client client, ThreadPool threadPool) { } public void storeResult(TaskResult taskResult, ActionListener listener) { + doStoreResult(STORE_BACKOFF_POLICY.iterator(), buildIndexRequest(taskResult), listener); + } + + /** + * Stores the task result only if a document with the same ID does not already exist. Uses {@code opType(CREATE)} so a concurrent or + * earlier write wins. A version conflict (document already exists) is treated as success. + */ + public void storeResultIfAbsent(TaskResult taskResult, ActionListener listener) { + doStoreResult( + STORE_BACKOFF_POLICY.iterator(), + buildIndexRequest(taskResult).setOpType(DocWriteRequest.OpType.CREATE), + listener.delegateResponse((l, e) -> { + if (ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) { + l.onResponse(null); + } else { + l.onFailure(e); + } + }) + ); + } + + private IndexRequestBuilder buildIndexRequest(TaskResult taskResult) { IndexRequestBuilder index = client.prepareIndex(TASK_INDEX).setId(taskResult.getTask().taskId().toString()); try (XContentBuilder builder = XContentFactory.contentBuilder(Requests.INDEX_CONTENT_TYPE)) { taskResult.toXContent(builder, new ToXContent.MapParams(Map.of(INCLUDE_CANCELLED_PARAM, "false"))); @@ -87,7 +112,7 @@ public void storeResult(TaskResult taskResult, ActionListener listener) { } catch (IOException e) { throw new ElasticsearchException("Couldn't convert task result to XContent for [{}]", e, taskResult.getTask()); } - doStoreResult(STORE_BACKOFF_POLICY.iterator(), index, listener); + return index; } private void doStoreResult(Iterator backoff, IndexRequestBuilder index, ActionListener listener) { diff --git a/server/src/test/java/org/elasticsearch/tasks/TaskResultTests.java b/server/src/test/java/org/elasticsearch/tasks/TaskResultTests.java index 8d8df372f7fa2..25c527350dddf 100644 --- a/server/src/test/java/org/elasticsearch/tasks/TaskResultTests.java +++ b/server/src/test/java/org/elasticsearch/tasks/TaskResultTests.java @@ -86,6 +86,16 @@ public void testTaskInfoIsForwardCompatible() throws IOException { assertEquals(taskInfo, read); } + public void testNonNullErrorAndResultShouldThrow() throws IOException { + TaskResult withError = new TaskResult(randomTaskInfo(), new RuntimeException("error")); + TaskResult withResponse = new TaskResult(randomTaskInfo(), randomTaskResponse()); + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> new TaskResult(randomBoolean(), randomTaskInfo(), withError.getError(), withResponse.getResponse()) + ); + assertEquals("TaskResult cannot have both a non-null error and a non-null result", e.getMessage()); + } + private XContentBuilder addRandomUnknownFields(XContentBuilder builder) throws IOException { try (XContentParser parser = createParser(builder)) { Map map = parser.mapOrdered();