Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksProjectAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.index.reindex.BulkByScrollTask;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.tasks.TaskResult;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -40,12 +42,15 @@ public class TransportCancelReindexAction extends TransportTasksProjectAction<

public static final ActionType<CancelReindexResponse> TYPE = new ActionType<>("cluster:admin/reindex/cancel");

private final Client client;

@Inject
public TransportCancelReindexAction(
final ClusterService clusterService,
final TransportService transportService,
final ActionFilters actionFilters,
final ProjectResolver projectResolver
final ProjectResolver projectResolver,
final Client client
) {
super(
TYPE.name(),
Expand All @@ -57,6 +62,7 @@ public TransportCancelReindexAction(
transportService.getThreadPool().executor(ThreadPool.Names.GENERIC),
projectResolver
);
this.client = client;
}

@Override
Expand All @@ -77,16 +83,29 @@ protected void taskOperation(
) {
assert task instanceof BulkByScrollTask : "Task should be a BulkByScrollTask";

// cancel the task asynchronously, and if waitForCompletion=true, then wait for it to finish to have the full correct response.
taskManager.cancelTaskAndDescendants(
task,
CancelTasksRequest.DEFAULT_REASON,
request.waitForCompletion(),
ActionListener.wrap(ignored -> {
final TaskResult completedTaskResult = request.waitForCompletion()
? new TaskResult(true, task.taskInfo(clusterService.localNode().getId(), true))
: null;
listener.onResponse(new CancelReindexTaskResponse(completedTaskResult));
}, listener::onFailure)
false,
listener.delegateFailureAndWrap((cancelListener, r) -> {
if (request.waitForCompletion()) {
final TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
final GetReindexRequest getRequest = new GetReindexRequest(taskId, true, null);
client.execute(
TransportGetReindexAction.TYPE,
getRequest,
cancelListener.delegateFailureAndWrap(
(l, getResp) -> l.onResponse(
// return cancelled=true. GET will return false since it's not *currently* cancelled.
new CancelReindexTaskResponse(taskResultWithCancelledTrue(getResp.getTaskResult()))
)
)
);
} else {
cancelListener.onResponse(new CancelReindexTaskResponse((TaskResult) null));
}
})
);
}

Expand Down Expand Up @@ -119,4 +138,23 @@ protected CancelReindexResponse newResponse(
private static ResourceNotFoundException reindexWithTaskIdNotFoundException(final TaskId requestedTaskId) {
return new ResourceNotFoundException("reindex task [{}] either not found or completed", requestedTaskId);
}

private TaskResult taskResultWithCancelledTrue(final TaskResult r) {
final TaskInfo taskInfo = r.getTask();
final TaskInfo newTaskInfo = new TaskInfo(
taskInfo.taskId(),
taskInfo.type(),
taskInfo.node(),
taskInfo.action(),
taskInfo.description(),
taskInfo.status(),
taskInfo.startTime(),
taskInfo.runningTimeNanos(),
taskInfo.cancellable(),
true,
taskInfo.parentTaskId(),
taskInfo.headers()
);
return new TaskResult(r.isCompleted(), newTaskInfo, r.getError(), r.getResponse());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ setup:
- match: { cancelled: true }
- match: { status.canceled: "by user request" }
- gte: { status.total: 0 }
# different from `GET _reindex`
- not_exists: response
# depending on how quickly we cancel, we might either get an error (shard failure) or response, so not asserting

- do:
reindex_get:
Expand Down
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,6 @@ tests:
- class: org.elasticsearch.xpack.gpu.GPUPluginInitializationWithGPUIT
method: testAutoModeSupportedVectorType
issue: https://github.com/elastic/elasticsearch/issues/142072
- class: org.elasticsearch.reindex.management.ReindexManagementClientYamlTestSuiteIT
method: test {yaml=reindex/30_cancel_reindex/Cancel running reindex returns response and GET confirms completed}
issue: https://github.com/elastic/elasticsearch/issues/142079
- class: org.elasticsearch.search.aggregations.metrics.TopHitsIT
method: testMixedSortFieldTypes
issue: https://github.com/elastic/elasticsearch/issues/142077
Expand Down