diff --git a/modules/reindex-management/build.gradle b/modules/reindex-management/build.gradle index 01d961580ec92..040df16bcd30d 100644 --- a/modules/reindex-management/build.gradle +++ b/modules/reindex-management/build.gradle @@ -7,6 +7,7 @@ * License v 3.0 only", or the "Server Side Public License, v 1". */ +apply plugin: 'elasticsearch.internal-cluster-test' apply plugin: 'elasticsearch.internal-yaml-rest-test' apply plugin: 'elasticsearch.yaml-rest-compat-test' @@ -19,6 +20,7 @@ esplugin { dependencies { compileOnly project(path: ':modules:reindex') + internalClusterTestImplementation project(path: ':modules:reindex') testImplementation project(':modules:rest-root') diff --git a/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexCancelIT.java b/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexCancelIT.java new file mode 100644 index 0000000000000..1cdeb9cc581c2 --- /dev/null +++ b/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexCancelIT.java @@ -0,0 +1,317 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.reindex.management; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Strings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.reindex.ReindexAction; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.reindex.ReindexPlugin; +import org.elasticsearch.tasks.RawTaskStatus; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.tasks.TaskResult; +import org.elasticsearch.test.ESIntegTestCase; +import org.junit.Before; + +import java.util.Collection; +import java.util.List; +import java.util.Optional; + +import static org.elasticsearch.test.rest.ESRestTestCase.entityAsMap; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** Integration tests for POST _reindex/{taskId}/_cancel endpoint. */ +public class ReindexCancelIT extends ESIntegTestCase { + + private static final String SOURCE_INDEX = "reindex_src"; + private static final String DEST_INDEX = "reindex_dst"; + private static final int BULK_SIZE = 1; + private static final int REQUESTS_PER_SECOND = 1; + private static final int NUM_OF_SLICES = 2; + private static final int NUMBER_OF_DOCUMENTS_THAT_TAKES_30_SECS_TO_INGEST = 30 * REQUESTS_PER_SECOND * BULK_SIZE; + + @Override + protected Collection> nodePlugins() { + return List.of(ReindexPlugin.class, ReindexManagementPlugin.class); + } + + @Override + protected boolean addMockHttpTransport() { + return false; + } + + @Override + protected Settings nodeSettings(int ordinal, Settings otherSettings) { + return Settings.builder().put(super.nodeSettings(ordinal, otherSettings)).build(); + } + + @Before + public void setup() { + assumeTrue("reindex resilience is enabled", ReindexPlugin.REINDEX_RESILIENCE_ENABLED); + + createIndex(SOURCE_INDEX, DEST_INDEX); + indexRandom(true, SOURCE_INDEX, NUMBER_OF_DOCUMENTS_THAT_TAKES_30_SECS_TO_INGEST); + ensureGreen(SOURCE_INDEX, DEST_INDEX); + } + + /** + * Test POST _reindex/{taskId}/_cancel endpoint, and its intended side effects, end-to-end, by doing the following: + * 1. Create throttled reindex task that takes a while to complete + * 2. Ensure task has expected number of sub-tasks + * 3. Ensure there's an expected number of search scroll contexts open for the reindexing + * 4. Cancel reindex + * 5. Ensure there's no failures, and all scroll contexts and sub-tasks are closed/cancelled + * 6. Ensure reindex task and sub-tasks have correct cancelled reason + * 7. Subsequent calls to cancel already-cancelled reindex task fail + *

+ * We test synchronous (?wait_for_completion=true) invocation of the _cancel endpoint in this test. + */ + public void testCancelEndpointEndToEndSynchronously() throws Exception { + final TaskId parentTaskId = startAsyncThrottledReindex(); + + final TaskInfo running = getRunningTask(parentTaskId); + assertThat(running.description(), is("reindex from [" + SOURCE_INDEX + "] to [" + DEST_INDEX + "]")); + assertThat(running.cancellable(), is(true)); + assertThat(running.cancelled(), is(false)); + + final TaskGroup parent = findTaskGroup(parentTaskId).orElse(null); + assertNotNull("parent group should exist", parent); + assertThat(parent.childTasks().size(), equalTo(2)); + + final TaskId firstSubTask = parent.childTasks().getFirst().task().taskId(); + final var cancelSubTaskException = expectThrows(ResourceNotFoundException.class, () -> cancelReindexSynchronously(firstSubTask)); + assertThat( + cancelSubTaskException.getMessage(), + is(Strings.format("reindex task [%s] either not found or completed", firstSubTask)) + ); + + final int sourceIndexNumOfPrimaryShards = primaryShards(SOURCE_INDEX); + assertBusy(() -> { + final long currentScrollContexts = currentNumberOfScrollContexts(); + final long expectedScrollContexts = (long) sourceIndexNumOfPrimaryShards * NUM_OF_SLICES; + assertThat("expected number of scroll contexts are open", currentScrollContexts, equalTo(expectedScrollContexts)); + }); + + final CancelReindexResponse cancelResponse = cancelReindexSynchronously(parentTaskId); + assertThat(cancelResponse.getTaskFailures(), empty()); + assertThat(cancelResponse.getNodeFailures(), empty()); + + final var notFoundException = expectThrows(ResourceNotFoundException.class, () -> cancelReindexSynchronously(parentTaskId)); + assertThat(notFoundException.getMessage(), is(Strings.format("reindex task [%s] either not found or completed", parentTaskId))); + assertThat("parent group should be absent", findTaskGroup(parentTaskId).isEmpty(), is(true)); + + assertThat("there are no open scroll contexts", currentNumberOfScrollContexts(), equalTo(0L)); + + final RawTaskStatus parentTaskStatus = (RawTaskStatus) getCompletedTaskResult(parentTaskId).getTask().status(); + final String cancelledReason = (String) parentTaskStatus.toMap().get("canceled"); + assertThat(cancelledReason, equalTo("by user request")); + } + + /** Same test as above but calling _cancel asynchronously and wrapping assertions after cancellation in assertBusy. */ + public void testCancelEndpointEndToEndAsynchronously() throws Exception { + final TaskId parentTaskId = startAsyncThrottledReindex(); + + final TaskInfo running = getRunningTask(parentTaskId); + assertThat(running.description(), is("reindex from [" + SOURCE_INDEX + "] to [" + DEST_INDEX + "]")); + assertThat(running.cancellable(), is(true)); + assertThat(running.cancelled(), is(false)); + + final TaskGroup parent = findTaskGroup(parentTaskId).orElse(null); + assertNotNull("parent group should exist", parent); + assertThat(parent.childTasks().size(), equalTo(2)); + + final TaskId firstSubTask = parent.childTasks().getFirst().task().taskId(); + final var cancellingSubTaskException = expectThrows( + ResourceNotFoundException.class, + () -> cancelReindexSynchronously(firstSubTask) + ); + assertThat( + cancellingSubTaskException.getMessage(), + is(Strings.format("reindex task [%s] either not found or completed", firstSubTask)) + ); + + final int sourceIndexNumOfPrimaryShards = primaryShards(SOURCE_INDEX); + assertBusy(() -> { + final long currentScrollContexts = currentNumberOfScrollContexts(); + final long expectedScrollContexts = (long) sourceIndexNumOfPrimaryShards * NUM_OF_SLICES; + assertThat("expected number of scroll contexts are open", currentScrollContexts, equalTo(expectedScrollContexts)); + }); + + final CancelReindexResponse cancelResponse = cancelReindexAsynchronously(parentTaskId); + assertThat(cancelResponse.getTaskFailures(), empty()); + assertThat(cancelResponse.getNodeFailures(), empty()); + + assertBusy(() -> assertThat("there are no open scroll contexts", currentNumberOfScrollContexts(), equalTo(0L))); + assertBusy(() -> assertThat("parent group should be absent", findTaskGroup(parentTaskId).isEmpty(), is(true))); + assertBusy(() -> { + final RawTaskStatus parentTaskStatus = (RawTaskStatus) getCompletedTaskResult(parentTaskId).getTask().status(); + final String cancelledReason = (String) parentTaskStatus.toMap().get("canceled"); + assertThat(cancelledReason, equalTo("by user request")); + }); + + final var notFoundException = expectThrows(ResourceNotFoundException.class, () -> cancelReindexAsynchronously(parentTaskId)); + assertThat(notFoundException.getMessage(), is(Strings.format("reindex task [%s] either not found or completed", parentTaskId))); + } + + public void testCancellingNonexistingTaskOnExistingNode() { + final TaskId nonExistingTaskOnExistingNode = new TaskId(clusterService().localNode().getId(), Long.MAX_VALUE); + + final String expectedExceptionMessage = Strings.format( + "reindex task [%s] either not found or completed", + nonExistingTaskOnExistingNode + ); + final var synchronousException = expectThrows( + ResourceNotFoundException.class, + () -> cancelReindexSynchronously(nonExistingTaskOnExistingNode) + ); + assertThat(synchronousException.getMessage(), is(expectedExceptionMessage)); + + final var asynchronousException = expectThrows( + ResourceNotFoundException.class, + () -> cancelReindexAsynchronously(nonExistingTaskOnExistingNode) + ); + assertThat(asynchronousException.getMessage(), is(expectedExceptionMessage)); + } + + public void testCancellingTaskOnNonexistingNode() { + final TaskId taskId = new TaskId("non-existing-node-" + randomAlphaOfLength(8), randomLongBetween(1, 1_000_000L)); + + final String expectedExceptionMessage = Strings.format("reindex task [%s] either not found or completed", taskId); + + final var synchronousException = expectThrows(ResourceNotFoundException.class, () -> cancelReindexSynchronously(taskId)); + assertThat(synchronousException.getMessage(), is(expectedExceptionMessage)); + + final var asynchronousException = expectThrows(ResourceNotFoundException.class, () -> cancelReindexAsynchronously(taskId)); + assertThat(asynchronousException.getMessage(), is(expectedExceptionMessage)); + } + + public void testCancellingExistingNonReindexTaskReturns404() throws Exception { + final TaskId deleteByQueryTaskId = startAsyncThrottledDeleteByQuery(); + try { + final TaskInfo running = getRunningTask(deleteByQueryTaskId); + assertThat(running.description(), equalTo("delete-by-query [reindex_src]")); + assertThat(running.cancellable(), is(true)); + assertThat(running.cancelled(), is(false)); + + final String expectedExceptionMessage = Strings.format("reindex task [%s] either not found or completed", deleteByQueryTaskId); + final var exception = expectThrows(ResourceNotFoundException.class, () -> cancelReindexSynchronously(deleteByQueryTaskId)); + assertThat(exception.getMessage(), is(expectedExceptionMessage)); + } finally { // cleanup by killing deleteByQuery, gracefully handles if task is dead (in case of *very slow* CI) + final CancelTasksRequest cancelRequest = new CancelTasksRequest(); + cancelRequest.setWaitForCompletion(true); + cancelRequest.setTargetTaskId(deleteByQueryTaskId); + clusterAdmin().cancelTasks(cancelRequest).get(); + } + } + + private TaskId startAsyncThrottledReindex() throws Exception { + final RestClient restClient = getRestClient(); + final Request request = new Request("POST", "/_reindex"); + request.addParameter("wait_for_completion", "false"); + request.addParameter("slices", Integer.toString(NUM_OF_SLICES)); + request.addParameter("requests_per_second", Integer.toString(REQUESTS_PER_SECOND)); + request.setJsonEntity(Strings.format(""" + { + "source": { + "index": "%s", + "size": %d + }, + "dest": { + "index": "%s" + } + }""", SOURCE_INDEX, BULK_SIZE, DEST_INDEX)); + + final Response response = restClient.performRequest(request); + final String task = (String) entityAsMap(response).get("task"); + assertNotNull("reindex did not return a task id", task); + return new TaskId(task); + } + + private TaskId startAsyncThrottledDeleteByQuery() throws Exception { + final RestClient restClient = getRestClient(); + final Request request = new Request("POST", "/" + SOURCE_INDEX + "/_delete_by_query"); + request.addParameter("wait_for_completion", "false"); + request.addParameter("slices", Integer.toString(NUM_OF_SLICES)); + request.addParameter("requests_per_second", Integer.toString(REQUESTS_PER_SECOND)); + request.setJsonEntity(""" + { + "query": { + "match_all": {} + } + }"""); + + final Response response = restClient.performRequest(request); + final String task = (String) entityAsMap(response).get("task"); + assertNotNull("delete by query did not return a task id", task); + return new TaskId(task); + } + + private TaskInfo getRunningTask(final TaskId taskId) { + final GetTaskResponse response = clusterAdmin().prepareGetTask(taskId).get(); + final TaskResult task = response.getTask(); + assertNotNull(task); + assertThat(task.isCompleted(), is(false)); + return task.getTask(); + } + + private CancelReindexResponse cancelReindexSynchronously(final TaskId taskId) { + final CancelReindexRequest request = new CancelReindexRequest(true); + request.setTargetTaskId(taskId); + return client().execute(TransportCancelReindexAction.TYPE, request).actionGet(); + } + + private CancelReindexResponse cancelReindexAsynchronously(final TaskId taskId) { + final CancelReindexRequest request = new CancelReindexRequest(false); + request.setTargetTaskId(taskId); + return client().execute(TransportCancelReindexAction.TYPE, request).actionGet(); + } + + private Optional findTaskGroup(final TaskId taskId) { + final ListTasksResponse response = clusterAdmin().prepareListTasks().setActions(ReindexAction.NAME).setDetailed(true).get(); + return response.getTaskGroups().stream().filter(group -> group.taskInfo().taskId().equals(taskId)).findFirst(); + } + + private TaskResult getCompletedTaskResult(final TaskId taskId) { + final GetTaskResponse response = clusterAdmin().prepareGetTask(taskId).setWaitForCompletion(true).get(); + final TaskResult task = response.getTask(); + assertNotNull(task); + assertThat(task.isCompleted(), is(true)); + return task; + } + + private long currentNumberOfScrollContexts() { + final NodesStatsResponse stats = clusterAdmin().prepareNodesStats().clear().setIndices(true).get(); + long total = 0; + for (var nodeStats : stats.getNodes()) { + total += nodeStats.getIndices().getSearch().getTotal().getScrollCurrent(); + } + return total; + } + + private int primaryShards(final String index) { + final TimeValue timeout = TimeValue.THIRTY_SECONDS; + final var response = client().admin().indices().prepareGetIndex(timeout).addIndices(index).get(); + return Integer.parseInt(response.getSetting(index, IndexMetadata.SETTING_NUMBER_OF_SHARDS)); + } +} diff --git a/modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/CancelReindexRequest.java b/modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/CancelReindexRequest.java new file mode 100644 index 0000000000000..5dffca03513a0 --- /dev/null +++ b/modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/CancelReindexRequest.java @@ -0,0 +1,65 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.reindex.management; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.tasks.BaseTasksRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.reindex.ReindexAction; +import org.elasticsearch.tasks.Task; + +import java.io.IOException; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +/** A request to cancel an ongoing reindex task. */ +public class CancelReindexRequest extends BaseTasksRequest { + + private final boolean waitForCompletion; + + public CancelReindexRequest(StreamInput in) throws IOException { + super(in); + waitForCompletion = in.readBoolean(); + } + + public CancelReindexRequest(boolean waitForCompletion) { + this.waitForCompletion = waitForCompletion; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(waitForCompletion); + } + + @Override + public String getDescription() { + return "waitForCompletion[" + waitForCompletion + "], targetTaskId[" + getTargetTaskId() + "]"; + } + + @Override + public ActionRequestValidationException validate() { + var validationException = super.validate(); + if (getTargetTaskId().isSet() == false) { + validationException = addValidationError("task id must be provided", validationException); + } + return validationException; + } + + public boolean waitForCompletion() { + return waitForCompletion; + } + + @Override + public boolean match(Task task) { + return ReindexAction.NAME.equals(task.getAction()) && task.getParentTaskId().isSet() == false; + } +} diff --git a/modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/CancelReindexResponse.java b/modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/CancelReindexResponse.java new file mode 100644 index 0000000000000..48eb399ce199a --- /dev/null +++ b/modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/CancelReindexResponse.java @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.reindex.management; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.TaskOperationFailure; +import org.elasticsearch.action.support.tasks.BaseTasksResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; + +/** + * Response returned from {@code POST /_reindex/{taskId}/_cancel}. + */ +public class CancelReindexResponse extends BaseTasksResponse implements ToXContentObject { + + public CancelReindexResponse(List taskFailures, List failedNodes) { + super(taskFailures, failedNodes); + } + + public CancelReindexResponse(final StreamInput in) throws IOException { + super(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("acknowledged", true); + builder.endObject(); + return builder; + } +} diff --git a/modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/CancelReindexTaskResponse.java b/modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/CancelReindexTaskResponse.java new file mode 100644 index 0000000000000..82efc5239d118 --- /dev/null +++ b/modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/CancelReindexTaskResponse.java @@ -0,0 +1,25 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.reindex.management; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +/** Response to a single reindex task cancel action. */ +public class CancelReindexTaskResponse implements Writeable { + + public CancelReindexTaskResponse() {} + + public CancelReindexTaskResponse(final StreamInput in) {} + + @Override + public void writeTo(final StreamOutput out) {} +} diff --git a/modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/ReindexManagementPlugin.java b/modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/ReindexManagementPlugin.java index 674b3b1b3db7f..7e9feb79b4bbf 100644 --- a/modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/ReindexManagementPlugin.java +++ b/modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/ReindexManagementPlugin.java @@ -37,7 +37,8 @@ public List getActions() { if (REINDEX_RESILIENCE_ENABLED) { return List.of( new ActionHandler(TransportGetReindexAction.TYPE, TransportGetReindexAction.class), - new ActionHandler(TransportListReindexAction.TYPE, TransportListReindexAction.class) + new ActionHandler(TransportListReindexAction.TYPE, TransportListReindexAction.class), + new ActionHandler(TransportCancelReindexAction.TYPE, TransportCancelReindexAction.class) ); } else { return List.of(); @@ -57,7 +58,11 @@ public List getRestHandlers( Predicate clusterSupportsFeature ) { if (REINDEX_RESILIENCE_ENABLED) { - return List.of(new RestGetReindexAction(clusterSupportsFeature), new RestListReindexAction(clusterSupportsFeature)); + return List.of( + new RestGetReindexAction(clusterSupportsFeature), + new RestListReindexAction(clusterSupportsFeature), + new RestCancelReindexAction(clusterSupportsFeature) + ); } else { return List.of(); } diff --git a/modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/RestCancelReindexAction.java b/modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/RestCancelReindexAction.java new file mode 100644 index 0000000000000..b99071dd7135f --- /dev/null +++ b/modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/RestCancelReindexAction.java @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.reindex.management; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.features.NodeFeature; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.Scope; +import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.tasks.TaskId; + +import java.util.List; +import java.util.Objects; +import java.util.function.Predicate; + +import static org.elasticsearch.rest.RestRequest.Method.POST; + +/** REST handler for cancelling an ongoing reindex task. */ +@ServerlessScope(Scope.PUBLIC) +public class RestCancelReindexAction extends BaseRestHandler { + + private final Predicate clusterSupportsFeature; + + public RestCancelReindexAction(final Predicate clusterSupportsFeature) { + this.clusterSupportsFeature = Objects.requireNonNull(clusterSupportsFeature); + } + + @Override + public List routes() { + return List.of(new Route(POST, "/_reindex/{task_id}/_cancel")); + } + + @Override + public String getName() { + return "cancel_reindex_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) { + if (clusterSupportsFeature.test(ReindexManagementFeatures.NEW_ENDPOINTS) == false) { + throw new IllegalArgumentException("endpoint not supported on all nodes in the cluster"); + } + final String taskIdParam = request.param("task_id"); + final TaskId taskId = new TaskId(taskIdParam); + if (taskId.isSet() == false) { + throw new IllegalArgumentException("invalid taskId provided: " + taskIdParam); + } + + final boolean waitForCompletion = request.paramAsBoolean("wait_for_completion", true); + final CancelReindexRequest cancelRequest = new CancelReindexRequest(waitForCompletion); + cancelRequest.setTargetTaskId(taskId); + + return channel -> client.execute(TransportCancelReindexAction.TYPE, cancelRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/TransportCancelReindexAction.java b/modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/TransportCancelReindexAction.java new file mode 100644 index 0000000000000..511145e55edea --- /dev/null +++ b/modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/TransportCancelReindexAction.java @@ -0,0 +1,113 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.reindex.management; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.NoSuchNodeException; +import org.elasticsearch.action.TaskOperationFailure; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.tasks.TransportTasksProjectAction; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.index.reindex.BulkByScrollTask; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.List; + +/** Transport action that cancels an in-flight reindex task and its descendants. */ +public class TransportCancelReindexAction extends TransportTasksProjectAction< + CancellableTask, + CancelReindexRequest, + CancelReindexResponse, + CancelReindexTaskResponse> { + + public static final ActionType TYPE = new ActionType<>("cluster:admin/reindex/cancel"); + + @Inject + public TransportCancelReindexAction( + final ClusterService clusterService, + final TransportService transportService, + final ActionFilters actionFilters, + final ProjectResolver projectResolver + ) { + super( + TYPE.name(), + clusterService, + transportService, + actionFilters, + CancelReindexRequest::new, + CancelReindexTaskResponse::new, + transportService.getThreadPool().executor(ThreadPool.Names.GENERIC), + projectResolver + ); + } + + @Override + protected List processTasks(final CancelReindexRequest request) { + final CancellableTask requestedTask = taskManager.getCancellableTask(request.getTargetTaskId().getId()); + if (requestedTask != null && super.match(requestedTask) && request.match(requestedTask)) { + return List.of(requestedTask); + } + return List.of(); + } + + @Override + protected void taskOperation( + final CancellableTask actionTask, + final CancelReindexRequest request, + final CancellableTask task, + final ActionListener listener + ) { + assert task instanceof BulkByScrollTask : "Task should be a BulkByScrollTask"; + + taskManager.cancelTaskAndDescendants( + task, + CancelTasksRequest.DEFAULT_REASON, + request.waitForCompletion(), + ActionListener.wrap(ignored -> listener.onResponse(new CancelReindexTaskResponse()), listener::onFailure) + ); + } + + @Override + protected CancelReindexResponse newResponse( + final CancelReindexRequest request, + final List tasks, + final List taskFailures, + final List nodeExceptions + ) { + assert tasks.size() + taskFailures.size() + nodeExceptions.size() <= 1 : "currently only supports cancelling one task max"; + // check whether node in requested TaskId doesn't exist and throw 404 + for (final FailedNodeException e : nodeExceptions) { + if (ExceptionsHelper.unwrap(e, NoSuchNodeException.class) != null) { + throw reindexWithTaskIdNotFoundException(request.getTargetTaskId()); + } + } + + final var response = new CancelReindexResponse(taskFailures, nodeExceptions); + response.rethrowFailures("cancel_reindex"); // if we haven't handled any exception already, throw here + if (tasks.isEmpty()) { + throw reindexWithTaskIdNotFoundException(request.getTargetTaskId()); + } + return response; + } + + private static ResourceNotFoundException reindexWithTaskIdNotFoundException(final TaskId requestedTaskId) { + return new ResourceNotFoundException("reindex task [{}] either not found or completed", requestedTaskId); + } +} diff --git a/modules/reindex-management/src/test/java/org/elasticsearch/reindex/management/CancelReindexRequestTests.java b/modules/reindex-management/src/test/java/org/elasticsearch/reindex/management/CancelReindexRequestTests.java new file mode 100644 index 0000000000000..c61c64c069237 --- /dev/null +++ b/modules/reindex-management/src/test/java/org/elasticsearch/reindex/management/CancelReindexRequestTests.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.reindex.management; + +import org.elasticsearch.index.reindex.ReindexAction; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.ESTestCase; + +import java.util.Map; + +public class CancelReindexRequestTests extends ESTestCase { + + public void testNotReindexTaskIsNotEligibleForCancellation() { + final CancelReindexRequest request = new CancelReindexRequest(randomBoolean()); + final CancellableTask task = taskWithActionAndParent("indices:data/write/update", null); + assertFalse(request.match(task)); + } + + public void testTaskWithParentIsNotEligibleForCancellation() { + final CancelReindexRequest request = new CancelReindexRequest(randomBoolean()); + final CancellableTask task = taskWithActionAndParent(ReindexAction.NAME, new TaskId("node", 0)); + assertFalse(request.match(task)); + } + + public void testReindexTaskWithNoParentIsEligibleForCancellation() { + final CancelReindexRequest request = new CancelReindexRequest(randomBoolean()); + final CancellableTask task = taskWithActionAndParent(ReindexAction.NAME, TaskId.EMPTY_TASK_ID); + assertTrue(request.match(task)); + } + + private static CancellableTask taskWithActionAndParent(final String action, final TaskId parent) { + return new CancellableTask(1L, "type", action, "desc", parent, Map.of()); + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/reindex_cancel.json b/rest-api-spec/src/main/resources/rest-api-spec/api/reindex_cancel.json new file mode 100644 index 0000000000000..f1871dab50f8d --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/reindex_cancel.json @@ -0,0 +1,40 @@ +{ + "reindex_cancel": { + "documentation": { + "url": "https://www.elastic.co/docs/api/doc/elasticsearch#TODO", + "description": "Cancel a reindex operation" + }, + "stability": "beta", + "visibility": "feature_flag", + "feature_flag": "reindex_resilience", + "headers": { + "accept": [ + "application/json" + ] + }, + "url": { + "paths": [ + { + "path": "/_reindex/{task_id}/_cancel", + "methods": [ + "POST" + ], + "parts": { + "task_id": { + "type": "string", + "description": "Cancel the reindex operation with specified id" + } + } + } + ] + }, + "params": { + "wait_for_completion": { + "type": "boolean", + "default": true, + "description": "Should the request block until the cancellation of the reindex operation is completed. Defaults to true" + } + } + } +} + diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 80f6028901061..bde60506cecba 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -57,6 +57,7 @@ public class Constants { "cluster:admin/persistent/remove", "cluster:admin/persistent/start", "cluster:admin/persistent/update_status", + "cluster:admin/reindex/cancel", "cluster:admin/reindex/rethrottle", "cluster:admin/repository/_cleanup", "cluster:admin/repository/analyze",