Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
eb32d91
Add disabled `POST _reindex/{taskId}/_cancel` API
szybia Dec 8, 2025
c2219a4
self-review
szybia Dec 9, 2025
10bf116
fix permissions test
szybia Dec 9, 2025
b2bc660
use projectResolver in transport
szybia Dec 11, 2025
81d455a
Merge remote-tracking branch 'upstream/main' into reindex-cancel-api
szybia Dec 11, 2025
8c4b8b8
Add filtering test
szybia Dec 10, 2025
1a93ff3
copilot review
szybia Dec 18, 2025
904742a
Merge remote-tracking branch 'upstream/main' into reindex-cancel-api
szybia Dec 18, 2025
63c1aba
move impl on-top-of `reindex-management`
szybia Dec 19, 2025
3531fe7
add API spec
szybia Dec 19, 2025
0df38bd
spotless
szybia Dec 19, 2025
f22aa74
consistent task_id > taskId
szybia Dec 19, 2025
2d2c3da
Merge remote-tracking branch 'upstream/main' into reindex-cancel-api
szybia Dec 19, 2025
3da0d6b
Revert "consistent task_id > taskId"
szybia Dec 19, 2025
5a8f4f9
Merge remote-tracking branch 'upstream/main' into reindex-cancel-api
szybia Dec 19, 2025
9c4638d
remove timeout
szybia Dec 23, 2025
9b5c32d
sam comments
szybia Jan 7, 2026
b204b47
Merge remote-tracking branch 'upstream/main' into reindex-cancel-api
szybia Jan 7, 2026
753c91d
fix after merge
szybia Jan 7, 2026
cd62f84
spotless
szybia Jan 7, 2026
6c1daba
Merge remote-tracking branch 'upstream/main' into reindex-cancel-api
szybia Jan 7, 2026
13e25c5
Pete comment to remove enum
szybia Jan 8, 2026
9cf26cf
fix flaky test
szybia Jan 9, 2026
39c2c66
remove dangling logger
szybia Jan 9, 2026
c983cb7
`s/reindex.cancel/reindex_cancel/g`
szybia Jan 9, 2026
63adb15
Merge remote-tracking branch 'upstream/main' into reindex-cancel-api
szybia Jan 9, 2026
7925b2f
test was still flaky...
szybia Jan 9, 2026
2090620
spotless
szybia Jan 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions modules/reindex-management/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* License v 3.0 only", or the "Server Side Public License, v 1".
*/

apply plugin: 'elasticsearch.internal-cluster-test'
apply plugin: 'elasticsearch.internal-yaml-rest-test'
apply plugin: 'elasticsearch.yaml-rest-compat-test'

Expand All @@ -19,6 +20,7 @@ esplugin {

dependencies {
compileOnly project(path: ':modules:reindex')
internalClusterTestImplementation project(path: ':modules:reindex')

testImplementation project(':modules:rest-root')

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.reindex.management;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.tasks.Task;

import java.io.IOException;

import static org.elasticsearch.action.ValidateActions.addValidationError;

/** A request to cancel an ongoing reindex task. */
public class CancelReindexRequest extends BaseTasksRequest<CancelReindexRequest> {

private final boolean waitForCompletion;

public CancelReindexRequest(StreamInput in) throws IOException {
super(in);
waitForCompletion = in.readBoolean();
}

public CancelReindexRequest(boolean waitForCompletion) {
this.waitForCompletion = waitForCompletion;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(waitForCompletion);
}

@Override
public String getDescription() {
return "waitForCompletion[" + waitForCompletion + "], targetTaskId[" + getTargetTaskId() + "]";
}

@Override
public ActionRequestValidationException validate() {
var validationException = super.validate();
if (getTargetTaskId().isSet() == false) {
validationException = addValidationError("task id must be provided", validationException);
}
return validationException;
}

public boolean waitForCompletion() {
return waitForCompletion;
}

@Override
public boolean match(Task task) {
return ReindexAction.NAME.equals(task.getAction()) && task.getParentTaskId().isSet() == false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.reindex.management;

import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.List;

/**
* Response returned from {@code POST /_reindex/{taskId}/_cancel}.
*/
public class CancelReindexResponse extends BaseTasksResponse implements ToXContentObject {

public CancelReindexResponse(List<TaskOperationFailure> taskFailures, List<FailedNodeException> failedNodes) {
super(taskFailures, failedNodes);
}

public CancelReindexResponse(final StreamInput in) throws IOException {
super(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("acknowledged", true);
builder.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.reindex.management;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;

/** Response to a single reindex task cancel action. */
public class CancelReindexTaskResponse implements Writeable {

public CancelReindexTaskResponse() {}

public CancelReindexTaskResponse(final StreamInput in) {}

@Override
public void writeTo(final StreamOutput out) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public List<ActionHandler> getActions() {
if (REINDEX_RESILIENCE_ENABLED) {
return List.of(
new ActionHandler(TransportGetReindexAction.TYPE, TransportGetReindexAction.class),
new ActionHandler(TransportListReindexAction.TYPE, TransportListReindexAction.class)
new ActionHandler(TransportListReindexAction.TYPE, TransportListReindexAction.class),
new ActionHandler(TransportCancelReindexAction.TYPE, TransportCancelReindexAction.class)
);
} else {
return List.of();
Expand All @@ -57,7 +58,11 @@ public List<RestHandler> getRestHandlers(
Predicate<NodeFeature> clusterSupportsFeature
) {
if (REINDEX_RESILIENCE_ENABLED) {
return List.of(new RestGetReindexAction(clusterSupportsFeature), new RestListReindexAction(clusterSupportsFeature));
return List.of(
new RestGetReindexAction(clusterSupportsFeature),
new RestListReindexAction(clusterSupportsFeature),
new RestCancelReindexAction(clusterSupportsFeature)
);
} else {
return List.of();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.reindex.management;

import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.tasks.TaskId;

import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;

import static org.elasticsearch.rest.RestRequest.Method.POST;

/** REST handler for cancelling an ongoing reindex task. */
@ServerlessScope(Scope.PUBLIC)
public class RestCancelReindexAction extends BaseRestHandler {

private final Predicate<NodeFeature> clusterSupportsFeature;

public RestCancelReindexAction(final Predicate<NodeFeature> clusterSupportsFeature) {
this.clusterSupportsFeature = Objects.requireNonNull(clusterSupportsFeature);
}

@Override
public List<Route> routes() {
return List.of(new Route(POST, "/_reindex/{task_id}/_cancel"));
}

@Override
public String getName() {
return "cancel_reindex_action";
}

@Override
protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) {
if (clusterSupportsFeature.test(ReindexManagementFeatures.NEW_ENDPOINTS) == false) {
throw new IllegalArgumentException("endpoint not supported on all nodes in the cluster");
}
final String taskIdParam = request.param("task_id");
final TaskId taskId = new TaskId(taskIdParam);
if (taskId.isSet() == false) {
throw new IllegalArgumentException("invalid taskId provided: " + taskIdParam);
}

final boolean waitForCompletion = request.paramAsBoolean("wait_for_completion", true);
final CancelReindexRequest cancelRequest = new CancelReindexRequest(waitForCompletion);
cancelRequest.setTargetTaskId(taskId);

return channel -> client.execute(TransportCancelReindexAction.TYPE, cancelRequest, new RestToXContentListener<>(channel));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.reindex.management;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.NoSuchNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksProjectAction;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.index.reindex.BulkByScrollTask;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.List;

/** Transport action that cancels an in-flight reindex task and its descendants. */
public class TransportCancelReindexAction extends TransportTasksProjectAction<
CancellableTask,
CancelReindexRequest,
CancelReindexResponse,
CancelReindexTaskResponse> {

public static final ActionType<CancelReindexResponse> TYPE = new ActionType<>("cluster:admin/reindex/cancel");

@Inject
public TransportCancelReindexAction(
final ClusterService clusterService,
final TransportService transportService,
final ActionFilters actionFilters,
final ProjectResolver projectResolver
) {
super(
TYPE.name(),
clusterService,
transportService,
actionFilters,
CancelReindexRequest::new,
CancelReindexTaskResponse::new,
transportService.getThreadPool().executor(ThreadPool.Names.GENERIC),
projectResolver
);
}

@Override
protected List<CancellableTask> processTasks(final CancelReindexRequest request) {
final CancellableTask requestedTask = taskManager.getCancellableTask(request.getTargetTaskId().getId());
if (requestedTask != null && super.match(requestedTask) && request.match(requestedTask)) {
return List.of(requestedTask);
}
return List.of();
}

@Override
protected void taskOperation(
final CancellableTask actionTask,
final CancelReindexRequest request,
final CancellableTask task,
final ActionListener<CancelReindexTaskResponse> listener
) {
assert task instanceof BulkByScrollTask : "Task should be a BulkByScrollTask";

taskManager.cancelTaskAndDescendants(
task,
CancelTasksRequest.DEFAULT_REASON,
request.waitForCompletion(),
ActionListener.wrap(ignored -> listener.onResponse(new CancelReindexTaskResponse()), listener::onFailure)
);
}

@Override
protected CancelReindexResponse newResponse(
final CancelReindexRequest request,
final List<CancelReindexTaskResponse> tasks,
final List<TaskOperationFailure> taskFailures,
final List<FailedNodeException> nodeExceptions
) {
assert tasks.size() + taskFailures.size() + nodeExceptions.size() <= 1 : "currently only supports cancelling one task max";
// check whether node in requested TaskId doesn't exist and throw 404
for (final FailedNodeException e : nodeExceptions) {
if (ExceptionsHelper.unwrap(e, NoSuchNodeException.class) != null) {
throw reindexWithTaskIdNotFoundException(request.getTargetTaskId());
}
}

final var response = new CancelReindexResponse(taskFailures, nodeExceptions);
response.rethrowFailures("cancel_reindex"); // if we haven't handled any exception already, throw here
if (tasks.isEmpty()) {
throw reindexWithTaskIdNotFoundException(request.getTargetTaskId());
}
return response;
}

private static ResourceNotFoundException reindexWithTaskIdNotFoundException(final TaskId requestedTaskId) {
return new ResourceNotFoundException("reindex task [{}] either not found or completed", requestedTaskId);
}
}
Loading