diff --git a/modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/TransportCancelReindexAction.java b/modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/TransportCancelReindexAction.java index de2f40343595c..ed9ac35554e7b 100644 --- a/modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/TransportCancelReindexAction.java +++ b/modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/TransportCancelReindexAction.java @@ -19,12 +19,14 @@ import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.TransportTasksProjectAction; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.index.reindex.BulkByScrollTask; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.tasks.TaskResult; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -40,12 +42,15 @@ public class TransportCancelReindexAction extends TransportTasksProjectAction< public static final ActionType TYPE = new ActionType<>("cluster:admin/reindex/cancel"); + private final Client client; + @Inject public TransportCancelReindexAction( final ClusterService clusterService, final TransportService transportService, final ActionFilters actionFilters, - final ProjectResolver projectResolver + final ProjectResolver projectResolver, + final Client client ) { super( TYPE.name(), @@ -57,6 +62,7 @@ public TransportCancelReindexAction( transportService.getThreadPool().executor(ThreadPool.Names.GENERIC), projectResolver ); + this.client = client; } @Override @@ -77,16 +83,29 @@ protected void taskOperation( ) { assert task instanceof BulkByScrollTask : "Task should be a BulkByScrollTask"; + // cancel the task asynchronously, and if waitForCompletion=true, then wait for it to finish to have the full correct response. taskManager.cancelTaskAndDescendants( task, CancelTasksRequest.DEFAULT_REASON, - request.waitForCompletion(), - ActionListener.wrap(ignored -> { - final TaskResult completedTaskResult = request.waitForCompletion() - ? new TaskResult(true, task.taskInfo(clusterService.localNode().getId(), true)) - : null; - listener.onResponse(new CancelReindexTaskResponse(completedTaskResult)); - }, listener::onFailure) + false, + listener.delegateFailureAndWrap((cancelListener, r) -> { + if (request.waitForCompletion()) { + final TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId()); + final GetReindexRequest getRequest = new GetReindexRequest(taskId, true, null); + client.execute( + TransportGetReindexAction.TYPE, + getRequest, + cancelListener.delegateFailureAndWrap( + (l, getResp) -> l.onResponse( + // return cancelled=true. GET will return false since it's not *currently* cancelled. + new CancelReindexTaskResponse(taskResultWithCancelledTrue(getResp.getTaskResult())) + ) + ) + ); + } else { + cancelListener.onResponse(new CancelReindexTaskResponse((TaskResult) null)); + } + }) ); } @@ -119,4 +138,23 @@ protected CancelReindexResponse newResponse( private static ResourceNotFoundException reindexWithTaskIdNotFoundException(final TaskId requestedTaskId) { return new ResourceNotFoundException("reindex task [{}] either not found or completed", requestedTaskId); } + + private TaskResult taskResultWithCancelledTrue(final TaskResult r) { + final TaskInfo taskInfo = r.getTask(); + final TaskInfo newTaskInfo = new TaskInfo( + taskInfo.taskId(), + taskInfo.type(), + taskInfo.node(), + taskInfo.action(), + taskInfo.description(), + taskInfo.status(), + taskInfo.startTime(), + taskInfo.runningTimeNanos(), + taskInfo.cancellable(), + true, + taskInfo.parentTaskId(), + taskInfo.headers() + ); + return new TaskResult(r.isCompleted(), newTaskInfo, r.getError(), r.getResponse()); + } } diff --git a/modules/reindex-management/src/yamlRestTest/resources/rest-api-spec/test/reindex/30_cancel_reindex.yml b/modules/reindex-management/src/yamlRestTest/resources/rest-api-spec/test/reindex/30_cancel_reindex.yml index 2664e8cfe4008..fe02bdd3ae190 100644 --- a/modules/reindex-management/src/yamlRestTest/resources/rest-api-spec/test/reindex/30_cancel_reindex.yml +++ b/modules/reindex-management/src/yamlRestTest/resources/rest-api-spec/test/reindex/30_cancel_reindex.yml @@ -53,8 +53,7 @@ setup: - match: { cancelled: true } - match: { status.canceled: "by user request" } - gte: { status.total: 0 } - # different from `GET _reindex` - - not_exists: response + # depending on how quickly we cancel, we might either get an error (shard failure) or response, so not asserting - do: reindex_get: diff --git a/muted-tests.yml b/muted-tests.yml index a2c8387300cb6..23a0f12fb7d87 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -335,9 +335,6 @@ tests: - class: org.elasticsearch.xpack.gpu.GPUPluginInitializationWithGPUIT method: testAutoModeSupportedVectorType issue: https://github.com/elastic/elasticsearch/issues/142072 -- class: org.elasticsearch.reindex.management.ReindexManagementClientYamlTestSuiteIT - method: test {yaml=reindex/30_cancel_reindex/Cancel running reindex returns response and GET confirms completed} - issue: https://github.com/elastic/elasticsearch/issues/142079 - class: org.elasticsearch.search.aggregations.metrics.TopHitsIT method: testMixedSortFieldTypes issue: https://github.com/elastic/elasticsearch/issues/142077