Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ public void testFollowsTwoRelocations() throws IOException {

final var firstRelocation = new TaskRelocatedException(originalTaskId, firstRelocatedTaskId);

final var secondRelocation = new TaskRelocatedException(firstRelocatedTaskId, secondRelocatedTaskId);
final var secondRelocation = new TaskRelocatedException(originalTaskId, secondRelocatedTaskId);

final TaskInfo originalInfo = createTaskInfo(originalTaskId, ReindexAction.NAME);
final TaskResult originalResult = new TaskResult(originalInfo, (Exception) firstRelocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ public void initTask(BulkByScrollTask task, ReindexRequest request, ActionListen
public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkClient, ActionListener<BulkByScrollResponse> listener) {
final ResumeInfo resumeInfo = request.getResumeInfo().orElse(null);
if (resumeInfo != null && resumeInfo.sourceTaskResult() != null) {
// source task result should be present for top-level tasks only (e.g. leader or non-sliced worker)
storeRelocationSourceTaskResult(
task,
resumeInfo,
Expand All @@ -187,6 +188,8 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl
* to store its task result. For sliced reindex tasks, only the leader will store the source task result.
*/
private void storeRelocationSourceTaskResult(BulkByScrollTask task, ResumeInfo resumeInfo, ActionListener<Void> listener) {
assert task.isLeader() || (task.isWorker() && task.getParentTaskId().isSet() == false)
: "Only top level source task result should be stored, result for sliced workers should not be stored";
final var relocatedException = new TaskRelocatedException(
resumeInfo.relocationOrigin().originalTaskId(),
new TaskId(clusterService.localNode().getId(), task.getId())
Expand Down Expand Up @@ -726,6 +729,7 @@ ActionListener<BulkByScrollResponse> listenerWithRelocations(
onRelocationResponseListener.onFailure(e);
l.onFailure(e);
});
task.setRelocationHandoffInitiated();
transportService.sendRequest(
nodeToRelocateToNode,
ResumeReindexAction.NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,8 @@ public void testExecuteStoresSourceTaskResult() throws Exception {
final TaskResultsService taskResultsService = mock(TaskResultsService.class);
doAnswer(invocation -> {
TaskResult stored = invocation.getArgument(0);
assertThat(stored.getTask().taskId().getNodeId(), equalTo("source-node"));
assertThat(stored.getTask().taskId(), equalTo(new TaskId("source-node", task.getId())));
assertThat(stored.getTask().action(), equalTo("test_action"));
final Map<String, Object> errorMap = stored.getErrorAsMap();
assertThat(errorMap.get("type"), equalTo("task_relocated_exception"));
assertThat(errorMap.get("original_task_id"), equalTo(sourceTaskId.toString()));
Expand Down Expand Up @@ -542,6 +543,38 @@ public void testExecuteStoresSourceTaskResult() throws Exception {
verify(taskResultsService).storeResult(any(TaskResult.class), any());
}

public void testRelocationsSetsHandoffFlag() {
assumeTrue("reindex resilience enabled", ReindexPlugin.REINDEX_RESILIENCE_ENABLED);
final ClusterService clusterService = mock(ClusterService.class);
final ClusterState clusterState = mock(ClusterState.class);
final DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class);
final DiscoveryNode sourceNode = DiscoveryNodeUtils.builder("source-node").build();
final DiscoveryNode targetNode = DiscoveryNodeUtils.builder("target-node").build();
when(clusterService.state()).thenReturn(clusterState);
when(clusterService.localNode()).thenReturn(sourceNode);
when(clusterState.nodes()).thenReturn(discoveryNodes);
when(discoveryNodes.get("target-node")).thenReturn(targetNode);

final Reindexer reindexer = reindexerWithRelocation(clusterService, mock(TransportService.class));
final BulkByScrollTask task = createTaskWithParentIdAndRelocationEnabled(TaskId.EMPTY_TASK_ID);
task.setWorker(Float.POSITIVE_INFINITY, null);
task.getWorkerState().setNodeToRelocateToSupplier(() -> Optional.of("target-node"));
task.requestRelocation();

assertFalse("handoff flag should not be set before relocation", task.useCreateSemanticsForResultStorage());

final PlainActionFuture<BulkByScrollResponse> future = new PlainActionFuture<>();
final ActionListener<BulkByScrollResponse> wrapped = reindexer.listenerWithRelocations(
task,
reindexRequest(),
ActionListener.noop(),
future
);
wrapped.onResponse(reindexResponseWithResumeInfo());

assertTrue("handoff flag should be set after relocation", task.useCreateSemanticsForResultStorage());
}

public void testExecuteFailsWhenSourceTaskResultStorageFails() throws Exception {
assumeTrue("reindex resilience enabled", ReindexPlugin.REINDEX_RESILIENCE_ENABLED);
final TaskResultsService taskResultsService = mock(TaskResultsService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class BulkByScrollTask extends CancellableTask {
private volatile LeaderBulkByScrollTaskState leaderState;
private volatile WorkerBulkByScrollTaskState workerState;
private volatile boolean relocationRequested = false;
private volatile boolean relocationHandoffInitiated = false;

public BulkByScrollTask(
TaskId taskId,
Expand Down Expand Up @@ -228,6 +229,22 @@ public boolean isRelocationRequested() {
return relocationRequested;
}

/**
* Marks that this task has initiated a relocation handoff (the resume request has been sent to the destination).
*/
public void setRelocationHandoffInitiated() {
this.relocationHandoffInitiated = true;
}

/**
* If relocation handoff has started, the destination may have already stored the task result, so the source should
* use create-if-absent semantics to avoid overwriting it.
*/
@Override
public boolean useCreateSemanticsForResultStorage() {
return relocationHandoffInitiated;
}

/** Returns the relocation origin if this task is a relocated continuation. */
public ResumeInfo.RelocationOrigin relocationOrigin() {
return relocationOrigin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@
* which keeps the state for a single worker task, or a map of SliceResumeInfo which keeps the state for each slice of a leader task.
* It also has information about the original task that was relocated, so the user-facing taskID and start time are preserved in listings.
* But the RelocationOrigin isn't accurate for sliced tasks, they have themselves as the origin, but for listing the leader is correct.
* SourceTaskResult contains the result of the source task, and it is passed through relocation. The destination task persists this result
* in .tasks to ensure the relocation chain is maintained even if the source task fails to store its result. This prevents the
* destination task from becoming orphaned and unreachable through the relocation chain.
* <p>
* Note: For sliced tasks, resume info must include all slices, including those that are already completed. This ensures that the final
* task has a complete result from all slices. A task may be resumed multiple times, so information for completed slices must be carried
* forward to each subsequent resume until the task is fully completed.
* forward to each subsequent resume until the task is fully completed. SourceTaskResult should not be present for sliced workers.
* <p>
* TODO: we can use List instead of Map for since the keys are required to be 0-based and contiguous.
* TODO: we can use List instead of Map for slices since the keys are required to be 0-based and contiguous.
*/
public record ResumeInfo(
RelocationOrigin relocationOrigin,
Expand Down
8 changes: 8 additions & 0 deletions server/src/main/java/org/elasticsearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,14 @@ public Map<String, String> headers() {
return headers;
}

/**
* Whether result storage should use create-if-absent semantics instead of unconditional overwrite. Subclasses override this to
* prevent overwriting a result that was already stored.
*/
public boolean useCreateSemanticsForResultStorage() {
return false;
}

public TaskResult result(DiscoveryNode node, Exception error) throws IOException {
return new TaskResult(taskInfo(node.getId(), true), error);
}
Expand Down
12 changes: 10 additions & 2 deletions server/src/main/java/org/elasticsearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ public <Response extends ActionResponse> void storeResult(Task task, Exception e
listener.onFailure(ex);
return;
}
taskResultsService.storeResult(taskResult, new ActionListener<Void>() {
storeTaskResult(task, taskResult, new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
listener.onFailure(error);
Expand Down Expand Up @@ -450,7 +450,7 @@ public <Response extends ActionResponse> void storeResult(Task task, Response re
return;
}

taskResultsService.storeResult(taskResult, new ActionListener<Void>() {
storeTaskResult(task, taskResult, new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
listener.onResponse(response);
Expand All @@ -464,6 +464,14 @@ public void onFailure(Exception e) {
});
}

private void storeTaskResult(Task task, TaskResult taskResult, ActionListener<Void> listener) {
if (task.useCreateSemanticsForResultStorage()) {
taskResultsService.storeResultIfAbsent(taskResult, listener);
} else {
taskResultsService.storeResult(taskResult, listener);
}
}

/**
* Returns the list of currently running tasks on the node
*/
Expand Down
3 changes: 3 additions & 0 deletions server/src/main/java/org/elasticsearch/tasks/TaskResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public TaskResult(TaskInfo task, ToXContent response) throws IOException {
}

public TaskResult(boolean completed, TaskInfo task, @Nullable BytesReference error, @Nullable BytesReference result) {
if (error != null && result != null) {
throw new IllegalArgumentException("TaskResult cannot have both a non-null error and a non-null result");
}
this.completed = completed;
this.task = requireNonNull(task, "task is required");
this.error = error;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.internal.Client;
Expand All @@ -24,6 +26,7 @@
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -80,14 +83,36 @@ public TaskResultsService(Client client, ThreadPool threadPool) {
}

public void storeResult(TaskResult taskResult, ActionListener<Void> listener) {
doStoreResult(STORE_BACKOFF_POLICY.iterator(), buildIndexRequest(taskResult), listener);
}

/**
* Stores the task result only if a document with the same ID does not already exist. Uses {@code opType(CREATE)} so a concurrent or
* earlier write wins. A version conflict (document already exists) is treated as success.
*/
public void storeResultIfAbsent(TaskResult taskResult, ActionListener<Void> listener) {
doStoreResult(
STORE_BACKOFF_POLICY.iterator(),
buildIndexRequest(taskResult).setOpType(DocWriteRequest.OpType.CREATE),
listener.delegateResponse((l, e) -> {
if (ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) {
l.onResponse(null);
} else {
l.onFailure(e);
}
})
);
}

private IndexRequestBuilder buildIndexRequest(TaskResult taskResult) {
IndexRequestBuilder index = client.prepareIndex(TASK_INDEX).setId(taskResult.getTask().taskId().toString());
try (XContentBuilder builder = XContentFactory.contentBuilder(Requests.INDEX_CONTENT_TYPE)) {
taskResult.toXContent(builder, new ToXContent.MapParams(Map.of(INCLUDE_CANCELLED_PARAM, "false")));
index.setSource(builder);
} catch (IOException e) {
throw new ElasticsearchException("Couldn't convert task result to XContent for [{}]", e, taskResult.getTask());
}
doStoreResult(STORE_BACKOFF_POLICY.iterator(), index, listener);
return index;
}

private void doStoreResult(Iterator<TimeValue> backoff, IndexRequestBuilder index, ActionListener<Void> listener) {
Expand Down
10 changes: 10 additions & 0 deletions server/src/test/java/org/elasticsearch/tasks/TaskResultTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ public void testTaskInfoIsForwardCompatible() throws IOException {
assertEquals(taskInfo, read);
}

public void testNonNullErrorAndResultShouldThrow() throws IOException {
TaskResult withError = new TaskResult(randomTaskInfo(), new RuntimeException("error"));
TaskResult withResponse = new TaskResult(randomTaskInfo(), randomTaskResponse());
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> new TaskResult(randomBoolean(), randomTaskInfo(), withError.getError(), withResponse.getResponse())
);
assertEquals("TaskResult cannot have both a non-null error and a non-null result", e.getMessage());
}

private XContentBuilder addRandomUnknownFields(XContentBuilder builder) throws IOException {
try (XContentParser parser = createParser(builder)) {
Map<String, Object> map = parser.mapOrdered();
Expand Down
Loading