Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import java.io.IOException;
import java.util.Map;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;

/** Tests that endpoints in reindex-management module are project-aware and behave as expected in multi-project environments. */
public class ReindexManagementMultiProjectIT extends ESRestTestCase {
Expand Down Expand Up @@ -94,7 +96,7 @@ public void testCancellingReindexOnlyWorksForCorrectProject() throws Exception {
assertTrue(runningTaskExistsInProject(taskId, projectWithReindex));

final Map<String, Object> response = cancelReindexInProjectAndWaitForCompletion(taskId, projectWithReindex);
assertThat("reindex is cancelled", response, equalTo(Map.of("acknowledged", true)));
assertThat("reindex is cancelled", response, allOf(hasEntry("cancelled", true), hasEntry("completed", true)));
Copy link
Copy Markdown
Contributor Author

@szybia szybia Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incoming yaml rest tests (next PR) will assert response more definitively

for now, just asserting the basics that are relevant for these tests


assertFalse(runningTaskExistsInProject(taskId, projectWithReindex));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,19 @@
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.tasks.TaskResult;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.XContentTestUtils;
import org.junit.Before;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.elasticsearch.test.rest.ESRestTestCase.entityAsMap;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.is;

/** Integration tests for <code>POST _reindex/{taskId}/_cancel</code> endpoint. */
Expand Down Expand Up @@ -116,6 +120,12 @@ public void testCancelEndpointEndToEndSynchronously() throws Exception {
final CancelReindexResponse cancelResponse = cancelReindexSynchronously(parentTaskId);
assertThat(cancelResponse.getTaskFailures(), empty());
assertThat(cancelResponse.getNodeFailures(), empty());
final Map<String, Object> responseBody = XContentTestUtils.convertToMap(cancelResponse);
assertThat(
"reindex is cancelled and contains GET response",
responseBody,
allOf(hasEntry("cancelled", true), hasEntry("completed", true))
);

final var notFoundException = expectThrows(ResourceNotFoundException.class, () -> cancelReindexSynchronously(parentTaskId));
assertThat(notFoundException.getMessage(), is(Strings.format("reindex task [%s] either not found or completed", parentTaskId)));
Expand Down Expand Up @@ -161,6 +171,8 @@ public void testCancelEndpointEndToEndAsynchronously() throws Exception {
final CancelReindexResponse cancelResponse = cancelReindexAsynchronously(parentTaskId);
assertThat(cancelResponse.getTaskFailures(), empty());
assertThat(cancelResponse.getNodeFailures(), empty());
final Map<String, Object> responseBody = XContentTestUtils.convertToMap(cancelResponse);
assertThat("reindex is cancelled and contains acknowledged response", responseBody, equalTo(Map.of("acknowledged", true)));

assertBusy(() -> assertThat("there are no open scroll contexts", currentNumberOfScrollContexts(), equalTo(0L)));
assertBusy(() -> assertThat("parent group should be absent", findTaskGroup(parentTaskId).isEmpty(), is(true)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,54 @@
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.List;
import java.util.Optional;

/**
* Response returned from {@code POST /_reindex/{taskId}/_cancel}.
*/
public class CancelReindexResponse extends BaseTasksResponse implements ToXContentObject {

public CancelReindexResponse(List<TaskOperationFailure> taskFailures, List<FailedNodeException> failedNodes) {
@Nullable
private final GetReindexResponse completedReindexResponse;

public CancelReindexResponse(
final List<TaskOperationFailure> taskFailures,
final List<FailedNodeException> failedNodes,
@Nullable final GetReindexResponse completedReindexResponse
) {
super(taskFailures, failedNodes);
this.completedReindexResponse = completedReindexResponse;
}

public CancelReindexResponse(final StreamInput in) throws IOException {
super(in);
this.completedReindexResponse = in.readOptionalWriteable(GetReindexResponse::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalWriteable(completedReindexResponse);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
assert getNodeFailures().isEmpty() && getTaskFailures().isEmpty() : "should be thrown before being able to call serialization";
if (completedReindexResponse != null) {
return completedReindexResponse.toXContent(builder, params);
}
builder.startObject();
builder.field("acknowledged", true);
builder.endObject();
return builder;
return builder.endObject();
}

public Optional<GetReindexResponse> getCompletedReindexResponse() {
return Optional.ofNullable(completedReindexResponse);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,32 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.tasks.TaskResult;

import java.io.IOException;
import java.util.Optional;

/** Response to a single reindex task cancel action. */
public class CancelReindexTaskResponse implements Writeable {

public CancelReindexTaskResponse() {}
@Nullable
private final TaskResult completedTaskResult;

public CancelReindexTaskResponse(@Nullable final TaskResult completedTaskResult) {
this.completedTaskResult = completedTaskResult;
}

public CancelReindexTaskResponse(final StreamInput in) {}
public CancelReindexTaskResponse(final StreamInput in) throws IOException {
this.completedTaskResult = in.readOptionalWriteable(TaskResult::new);
}

@Override
public void writeTo(final StreamOutput out) {}
public void writeTo(final StreamOutput out) throws IOException {
out.writeOptionalWriteable(completedTaskResult);
}

public Optional<TaskResult> getCompletedTaskResult() {
return Optional.ofNullable(completedTaskResult);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskResult;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -80,7 +81,12 @@ protected void taskOperation(
task,
CancelTasksRequest.DEFAULT_REASON,
request.waitForCompletion(),
ActionListener.wrap(ignored -> listener.onResponse(new CancelReindexTaskResponse()), listener::onFailure)
ActionListener.wrap(ignored -> {
final TaskResult completedTaskResult = request.waitForCompletion()
? new TaskResult(true, task.taskInfo(clusterService.localNode().getId(), true))
Copy link
Copy Markdown
Contributor Author

@szybia szybia Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a choice to be made here that we discussed on the meeting, detailing so we can make a concrete decision.


firstly, context/existing art:

  • cancelled field is not recorded in tasks index, so a GET after cancellation seems to default to default value (false)
  • POST _tasks/:taskId/_cancel?wait_for_completion=true&filter_path=nodes.*.tasks
    • Note the cancelled: true and absence of response/full-featured response
      {
        "nodes": {
          "1RCnK_OnSamzfgmhcshBrA": {
            "tasks": {
              "1RCnK_OnSamzfgmhcshBrA:7281": {
                "node": "1RCnK_OnSamzfgmhcshBrA",
                "id": 7281,
                "type": "transport",
                "action": "indices:data/write/reindex",
                "start_time_in_millis": 1768412466161,
                "running_time_in_nanos": 6862322167,
                "cancellable": true,
                "cancelled": true,
                "headers": {}
              }
            }
          }
        }
      }
      
  • GET _tasks/:taskId
    • Returns fully-featured response (task, slices, response, response.failures) and notably cancelled: false

in terms of what we're doing here, I see two main choices to get what we want (a more fully-featured response on POST _reindex/{task_id}/_cancel?wait_for_completion=true)

  • what i currently have here: we generate the taskInfo here
    • pros:
      • cancelled: true (makes sense)
      • slightly simpler code
      • still get status.slices information, same as GET _reindex/{task_id}
    • cons
      • running_time_in_nanos is calculated right here System.nanoTime() - start, so this will be slightly different from what we will see in GET _reindex/{task_id} (taskInfo generated elsewhere)
      • don't get response, response.failures objects
  • do a GET _reindex/{task_id} transport call
    • pros:
      • get exact same response as GET _reindex/{task_id}, including response
      • building on top of GET
    • cons:
      • cancelled: false, confusing

other potential avenues:

  • we could get the full response using GET, and hackily flip cancelled to true
  • i could investigate further whether any way to get from cancelTaskAndDescendants the full TaskResult that will be stored in tasks index, but assuming async stuff will make it hard to hand off

stating preference:

  • i'm leaning towards saying what i have here is good enough, on first glance response field returned in GET doesn't seem to have extra useful info other than what is here already within slices, maybe response.failures from the BulkByScroll
  • in the unlikely case this is wanted, you can just do a folllow-up GET
  • in previous conversations we didn't seem too bothered about having response but we included it (from what i read, because why not)

attaching below two files so reviewers can diff what is returned in POST _reindex/{task_id}/_cancel?wait_for_completion=true, and GET _reindex/{task_id} after cancellation

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm happy with what you have here. As discussed, I'm okay with not including the full response. I think I'm also okay with the times not matching up perfectly, especially as they're in different units. Reporting the correct value for cancelled is probably a good thing. The only thing that's giving me pause is that we're not consistent with get tasks... but there's no real reason we should be, since we're trying to mostly obfuscate the fact that this is wrappers around tasks (and esp. since that is doing something confusing around the cancelled field.)

: null;
listener.onResponse(new CancelReindexTaskResponse(completedTaskResult));
}, listener::onFailure)
);
}

Expand All @@ -99,7 +105,10 @@ protected CancelReindexResponse newResponse(
}
}

final var response = new CancelReindexResponse(taskFailures, nodeExceptions);
final GetReindexResponse completedReindexResponse = tasks.isEmpty()
? null
: tasks.getFirst().getCompletedTaskResult().map(GetReindexResponse::new).orElse(null);
final var response = new CancelReindexResponse(taskFailures, nodeExceptions, completedReindexResponse);
response.rethrowFailures("cancel_reindex"); // if we haven't handled any exception already, throw here
if (tasks.isEmpty()) {
throw reindexWithTaskIdNotFoundException(request.getTargetTaskId());
Expand Down