From dd2707babbb36a42b4f8eb9f30f27096cb6e020f Mon Sep 17 00:00:00 2001 From: Pete Gillin Date: Fri, 6 Mar 2026 19:41:47 +0000 Subject: [PATCH 1/6] Improve reindex rethrottle API in stateless This makes a number of improvements to the reindex rethrottle API, in stateless only, in preparation for making it public in serverless as part of the reindex managament API work. The changes are: 1. The `group_by` request parameter is no longer supported. This was never very useful in this API, since the `ListTasksResponse` will only every contain one task. 2. The API never groups the tasks in the response, i.e. it acts as though `group_by=none` (contrast with stateful, which defaults to `group_by=nodes`). Again, grouping is not useful in this case. This also means that it omits the node information which would be present with `group_by=nodes`, and which we do not want to expose in serverless. 3. The `node` property of the task in the response is redacted with `stateless` instead of giving the node ID. (The get task API behaves similarly in serverless.) The API is unchanged in stateful, for backwards compatiblity reasons. Implementation note: This change is done in the REST layer rather, because the `group_by` parameter is only entirely implemented in that layer. To implement changes 1 and 2 in the transport layer would mean passing the requested `group_by` from the REST layer to the transport layer for validation, and passing the `group_by` to use from the transport layer back to the REST layer. Although change 3 could be done in the transport layer, it seems neater to keep the three changes together. Testing note: Adding a YAML REST test for this would require a whole new base class, and a whole new cluster with the stateless setting enabled. This seems unnecessarily heavyweight. Instead, a unit test for the REST action is added. This uses a real `RestController`, a fake `RestChannel`, and a fake `NodeClient` (i.e. the transport layer is faked out). --- .../elasticsearch/reindex/ReindexPlugin.java | 2 +- .../reindex/RestReindexRethrottleAction.java | 58 ++++- .../RestReindexRethrottleActionTests.java | 210 ++++++++++++++++++ 3 files changed, 263 insertions(+), 7 deletions(-) create mode 100644 modules/reindex/src/test/java/org/elasticsearch/reindex/RestReindexRethrottleActionTests.java diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexPlugin.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexPlugin.java index 1d11eacd67364..6f9c0b4aee93f 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexPlugin.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexPlugin.java @@ -109,7 +109,7 @@ public List getRestHandlers( new RestUpdateByQueryAction(clusterSupportsFeature), new RestDeleteByQueryAction(clusterSupportsFeature), new RestUpdateAndDeleteByQueryRethrottleAction(nodesInCluster), - new RestReindexRethrottleAction(nodesInCluster) + new RestReindexRethrottleAction(nodesInCluster, settings) ); } diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/RestReindexRethrottleAction.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/RestReindexRethrottleAction.java index 0596dc0177071..07f20e0cb9237 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/RestReindexRethrottleAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/RestReindexRethrottleAction.java @@ -9,13 +9,19 @@ package org.elasticsearch.reindex; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.TaskOperationFailure; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskInfo; import java.util.List; import java.util.function.Supplier; @@ -25,10 +31,15 @@ @ServerlessScope(Scope.INTERNAL) public class RestReindexRethrottleAction extends BaseRestHandler { + + static final String REDACTED_NODE_ID_IN_STATELESS = "stateless"; + private final Supplier nodesInCluster; + private final boolean isStateless; - public RestReindexRethrottleAction(Supplier nodesInCluster) { + public RestReindexRethrottleAction(Supplier nodesInCluster, Settings settings) { this.nodesInCluster = nodesInCluster; + this.isStateless = DiscoveryNode.isStateless(settings); } @Override @@ -50,11 +61,46 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC throw new IllegalArgumentException("requests_per_second is a required parameter"); } internalRequest.setRequestsPerSecond(requestsPerSecond); - final String groupBy = request.param("group_by", "nodes"); - return channel -> client.execute( - ReindexPlugin.RETHROTTLE_ACTION, - internalRequest, - listTasksResponseListener(nodesInCluster, groupBy, channel) + // This ListTasksResponse will only ever contain a single task, so grouping them is not very useful. + // In stateful, we allow the group_by parameter and default to "nodes", for historical reasons. + // In stateless, we don't allow group_by, we never group, and we redact the node IDs: this minimizes the visibility of node IDs. + final String groupBy = isStateless ? "none" : request.param("group_by", "nodes"); + return channel -> { + ActionListener responseListener = listTasksResponseListener(nodesInCluster, groupBy, channel); + client.execute( + ReindexPlugin.RETHROTTLE_ACTION, + internalRequest, + isStateless ? responseListener.map(RestReindexRethrottleAction::redactNodeIdsInListTasksResponse) : responseListener + ); + }; + } + + private static ListTasksResponse redactNodeIdsInListTasksResponse(ListTasksResponse originalResponse) { + return new ListTasksResponse( + originalResponse.getTasks().stream().map(RestReindexRethrottleAction::redactNodeIdInTaskInfo).toList(), + originalResponse.getTaskFailures().stream().map(RestReindexRethrottleAction::redactNodeIdInTaskOperationFailure).toList(), + originalResponse.getNodeFailures() ); } + + private static TaskInfo redactNodeIdInTaskInfo(TaskInfo originalTaskInfo) { + return new TaskInfo( + originalTaskInfo.taskId(), + originalTaskInfo.type(), + REDACTED_NODE_ID_IN_STATELESS, + originalTaskInfo.action(), + originalTaskInfo.description(), + originalTaskInfo.status(), + originalTaskInfo.startTime(), + originalTaskInfo.runningTimeNanos(), + originalTaskInfo.cancellable(), + originalTaskInfo.cancelled(), + originalTaskInfo.parentTaskId(), + originalTaskInfo.headers() + ); + } + + private static TaskOperationFailure redactNodeIdInTaskOperationFailure(TaskOperationFailure taskOperationFailure) { + return new TaskOperationFailure(REDACTED_NODE_ID_IN_STATELESS, taskOperationFailure.getTaskId(), taskOperationFailure.getCause()); + } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/RestReindexRethrottleActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/RestReindexRethrottleActionTests.java new file mode 100644 index 0000000000000..ac59855563d04 --- /dev/null +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/RestReindexRethrottleActionTests.java @@ -0,0 +1,210 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.reindex; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.reindex.BulkByScrollTask; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestResponseUtils; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.test.rest.FakeRestChannel; +import org.elasticsearch.test.rest.FakeRestRequest; +import org.elasticsearch.test.rest.ObjectPath; +import org.elasticsearch.test.rest.RestActionTestCase; +import org.elasticsearch.transport.Transports; +import org.elasticsearch.xcontent.XContent; +import org.elasticsearch.xcontent.XContentType; +import org.junit.After; +import org.junit.Before; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +public class RestReindexRethrottleActionTests extends RestActionTestCase { + + private static final Settings STATEFUL_SETTINGS = Settings.EMPTY; + private static final Settings STATELESS_SETTINGS = Settings.builder().put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true).build(); + + private String nodeId; + private TaskId taskId; + private float requestsPerSecond; + private ExecutorService transportExecutor; + + @Before + public void initializeRequestParameters() { + nodeId = randomIdentifier(); + taskId = new TaskId(nodeId, randomLongBetween(1, 1000)); + requestsPerSecond = randomIntBetween(1, 1000); // use an int to be sure it round-trips via the URL parameter + } + + @Before + public void setUpFakeTransportLayer() { + verifyingClient.setExecuteVerifier(this::fakeTransportAction); + } + + @Before + public void initializeTransportExecutor() { + transportExecutor = newSingleThreadExecutor(this::createTransportThread); + } + + @After + public void shutdownTransportExecutor() { + transportExecutor.shutdown(); + } + + public void testStateful_groupByDefault_isGroupedByNode() throws Exception { + registerHandler(STATEFUL_SETTINGS); + RestRequest request = createRestRequest(null); + ObjectPath body = execute(request, RestStatus.OK); + // Expect structure "nodes" -> nodeId -> "tasks" -> taskId -> taskInfo, with correct node ID in taskInfo + assertTaskInfo(body.evaluate("nodes." + nodeId + ".tasks." + taskId), nodeId); + } + + public void testStateful_groupByNone() throws Exception { + registerHandler(STATEFUL_SETTINGS); + RestRequest request = createRestRequest("none"); + ObjectPath body = execute(request, RestStatus.OK); + // Expect structure "nodes" -> list of taskInfo, with correct node ID in taskInfo + assertTaskInfo(body.evaluate("tasks.0"), nodeId); + } + + public void testStateless_groupByDefault_isNotGroupedAndNodeIdIsRedacted() throws Exception { + registerHandler(STATELESS_SETTINGS); + RestRequest request = createRestRequest(null); + ObjectPath body = execute(request, RestStatus.OK); + // Expect structure "nodes" -> list of taskInfo, with redacted node ID in taskInfo + assertTaskInfo(body.evaluate("tasks.0"), RestReindexRethrottleAction.REDACTED_NODE_ID_IN_STATELESS); + } + + public void testStateless_groupByNodes_fails() throws Exception { + registerHandler(STATELESS_SETTINGS); + RestRequest request = createRestRequest("nodes"); + // Expect a 400 response, with a body where "error" -> "reason" mentions the illegal "group_by" parameter + ObjectPath body = execute(request, RestStatus.BAD_REQUEST); + String reason = body.evaluate("error.reason"); + assertThat(reason, containsString("group_by")); + } + + /** + * Fakes the behavior of the rethrottle transport action. Returns a {@link ListTasksResponse} containing a single {@link TaskInfo} with + * the task ID and requests-per-second values from the {@link RethrottleRequest}, just like the real transport action would. + */ + private ListTasksResponse fakeTransportAction(ActionType actionType, ActionRequest actionRequest) { + assertThat(actionType, equalTo(ReindexPlugin.RETHROTTLE_ACTION)); + RethrottleRequest rethrottleRequest = asInstanceOf(RethrottleRequest.class, actionRequest); + TaskInfo task = new TaskInfo( + rethrottleRequest.getTargetTaskId(), + "transport", + nodeId, + ReindexPlugin.RETHROTTLE_ACTION.name(), + "doing a reindex", + new BulkByScrollTask.Status( + 0, + randomIntBetween(1, 100), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + randomTimeValue(), + rethrottleRequest.getRequestsPerSecond(), + null, + randomTimeValue() + ), + randomMillisUpToYear9999(), + randomLongBetween(1, 1000000000), + true, + false, + TaskId.EMPTY_TASK_ID, + Map.of() + ); + return new ListTasksResponse(List.of(task), List.of(), List.of()); + } + + /** + * Registers the rethrottle REST handler. + */ + private void registerHandler(Settings settings) { + controller().registerHandler(new RestReindexRethrottleAction(() -> DiscoveryNodes.EMPTY_NODES, settings)); + } + + /** + * Creates a REST request to the rethrottle endpoint, using the stored {@link #taskId} and {@link #requestsPerSecond}, and the + * {@code groupBy} specified if non-null. + */ + private RestRequest createRestRequest(@Nullable String groupBy) { + HashMap params = new HashMap<>(); + params.put("requests_per_second", Float.toString(requestsPerSecond)); + if (groupBy != null) { + params.put("group_by", groupBy); + } + return new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) + .withPath("/_reindex/" + taskId + "/_rethrottle") + .withParams(params) + .build(); + } + + /** + * Dispatches the given {@link RestRequest} to the REST controller, asserts that the response has the expected status, and returns the + * contents of the response body as an {@link ObjectPath}. + */ + private ObjectPath execute(RestRequest request, RestStatus expectedStatus) throws Exception { + FakeRestChannel channel = new FakeRestChannel(request, true, 1); + ThreadContext threadContext = verifyingClient.threadPool().getThreadContext(); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + controller().dispatchRequest(request, channel, threadContext); + try (RestResponse response = channel.capturedResponse()) { + assertThat(response.status(), equalTo(expectedStatus)); + // The response is chunked, so we have to extract it on a thread which the RestController thinks is a transport thread: + BytesReference bodyContent = transportExecutor.submit(() -> RestResponseUtils.getBodyContent(response)).get(); + XContent xContent = requireNonNull(XContentType.fromMediaType(response.contentType())).xContent(); + return ObjectPath.createFromXContent(xContent, bodyContent); + } + } + } + + /** + * Creates a thread to execute the given {@link Runnable}. The thread will have a name which means the REST controller will recognize as + * a transport thread. + */ + private Thread createTransportThread(Runnable runnable) { + return new Thread(runnable, Transports.TEST_MOCK_TRANSPORT_THREAD_PREFIX + "_" + randomIdentifier()); + } + + private void assertTaskInfo(Map taskInfoMap, String expectedNodeId) { + assertThat(taskInfoMap, notNullValue()); + assertThat(taskInfoMap.get("node"), equalTo(expectedNodeId)); + assertThat(asInstanceOf(Number.class, taskInfoMap.get("id")).longValue(), equalTo(taskId.getId())); + Map status = asInstanceOf(Map.class, taskInfoMap.get("status")); + assertThat(asInstanceOf(Number.class, status.get("requests_per_second")).floatValue(), equalTo(requestsPerSecond)); + } +} From 4f562ac1236e5fa4082c3dc61078fb092d347816 Mon Sep 17 00:00:00 2001 From: Pete Gillin Date: Mon, 9 Mar 2026 17:12:49 +0000 Subject: [PATCH 2/6] Remove the node ID redaction, just keep the group_by changes --- .../reindex/RestReindexRethrottleAction.java | 34 +------------------ .../RestReindexRethrottleActionTests.java | 18 +++++----- 2 files changed, 10 insertions(+), 42 deletions(-) diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/RestReindexRethrottleAction.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/RestReindexRethrottleAction.java index 07f20e0cb9237..936ed0e42decc 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/RestReindexRethrottleAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/RestReindexRethrottleAction.java @@ -32,8 +32,6 @@ @ServerlessScope(Scope.INTERNAL) public class RestReindexRethrottleAction extends BaseRestHandler { - static final String REDACTED_NODE_ID_IN_STATELESS = "stateless"; - private final Supplier nodesInCluster; private final boolean isStateless; @@ -66,41 +64,11 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC // In stateless, we don't allow group_by, we never group, and we redact the node IDs: this minimizes the visibility of node IDs. final String groupBy = isStateless ? "none" : request.param("group_by", "nodes"); return channel -> { - ActionListener responseListener = listTasksResponseListener(nodesInCluster, groupBy, channel); client.execute( ReindexPlugin.RETHROTTLE_ACTION, internalRequest, - isStateless ? responseListener.map(RestReindexRethrottleAction::redactNodeIdsInListTasksResponse) : responseListener + listTasksResponseListener(nodesInCluster, groupBy, channel) ); }; } - - private static ListTasksResponse redactNodeIdsInListTasksResponse(ListTasksResponse originalResponse) { - return new ListTasksResponse( - originalResponse.getTasks().stream().map(RestReindexRethrottleAction::redactNodeIdInTaskInfo).toList(), - originalResponse.getTaskFailures().stream().map(RestReindexRethrottleAction::redactNodeIdInTaskOperationFailure).toList(), - originalResponse.getNodeFailures() - ); - } - - private static TaskInfo redactNodeIdInTaskInfo(TaskInfo originalTaskInfo) { - return new TaskInfo( - originalTaskInfo.taskId(), - originalTaskInfo.type(), - REDACTED_NODE_ID_IN_STATELESS, - originalTaskInfo.action(), - originalTaskInfo.description(), - originalTaskInfo.status(), - originalTaskInfo.startTime(), - originalTaskInfo.runningTimeNanos(), - originalTaskInfo.cancellable(), - originalTaskInfo.cancelled(), - originalTaskInfo.parentTaskId(), - originalTaskInfo.headers() - ); - } - - private static TaskOperationFailure redactNodeIdInTaskOperationFailure(TaskOperationFailure taskOperationFailure) { - return new TaskOperationFailure(REDACTED_NODE_ID_IN_STATELESS, taskOperationFailure.getTaskId(), taskOperationFailure.getCause()); - } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/RestReindexRethrottleActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/RestReindexRethrottleActionTests.java index ac59855563d04..3eb66df619730 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/RestReindexRethrottleActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/RestReindexRethrottleActionTests.java @@ -82,24 +82,24 @@ public void testStateful_groupByDefault_isGroupedByNode() throws Exception { registerHandler(STATEFUL_SETTINGS); RestRequest request = createRestRequest(null); ObjectPath body = execute(request, RestStatus.OK); - // Expect structure "nodes" -> nodeId -> "tasks" -> taskId -> taskInfo, with correct node ID in taskInfo - assertTaskInfo(body.evaluate("nodes." + nodeId + ".tasks." + taskId), nodeId); + // Expect structure "nodes" -> nodeId -> "tasks" -> taskId -> taskInfo + assertTaskInfo(body.evaluate("nodes." + nodeId + ".tasks." + taskId)); } public void testStateful_groupByNone() throws Exception { registerHandler(STATEFUL_SETTINGS); RestRequest request = createRestRequest("none"); ObjectPath body = execute(request, RestStatus.OK); - // Expect structure "nodes" -> list of taskInfo, with correct node ID in taskInfo - assertTaskInfo(body.evaluate("tasks.0"), nodeId); + // Expect structure "nodes" -> list of taskInfo + assertTaskInfo(body.evaluate("tasks.0")); } - public void testStateless_groupByDefault_isNotGroupedAndNodeIdIsRedacted() throws Exception { + public void testStateless_groupByDefault_isNotGrouped() throws Exception { registerHandler(STATELESS_SETTINGS); RestRequest request = createRestRequest(null); ObjectPath body = execute(request, RestStatus.OK); - // Expect structure "nodes" -> list of taskInfo, with redacted node ID in taskInfo - assertTaskInfo(body.evaluate("tasks.0"), RestReindexRethrottleAction.REDACTED_NODE_ID_IN_STATELESS); + // Expect structure "nodes" -> list of taskInfo + assertTaskInfo(body.evaluate("tasks.0")); } public void testStateless_groupByNodes_fails() throws Exception { @@ -200,9 +200,9 @@ private Thread createTransportThread(Runnable runnable) { return new Thread(runnable, Transports.TEST_MOCK_TRANSPORT_THREAD_PREFIX + "_" + randomIdentifier()); } - private void assertTaskInfo(Map taskInfoMap, String expectedNodeId) { + private void assertTaskInfo(Map taskInfoMap) { assertThat(taskInfoMap, notNullValue()); - assertThat(taskInfoMap.get("node"), equalTo(expectedNodeId)); + assertThat(taskInfoMap.get("node"), equalTo(nodeId)); assertThat(asInstanceOf(Number.class, taskInfoMap.get("id")).longValue(), equalTo(taskId.getId())); Map status = asInstanceOf(Map.class, taskInfoMap.get("status")); assertThat(asInstanceOf(Number.class, status.get("requests_per_second")).floatValue(), equalTo(requestsPerSecond)); From cc7ea4c0a2d663873d61fa2040a9b8ea629c1e32 Mon Sep 17 00:00:00 2001 From: Pete Gillin Date: Mon, 9 Mar 2026 17:14:40 +0000 Subject: [PATCH 3/6] Update comment --- .../org/elasticsearch/reindex/RestReindexRethrottleAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/RestReindexRethrottleAction.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/RestReindexRethrottleAction.java index 936ed0e42decc..d1a7680bb753a 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/RestReindexRethrottleAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/RestReindexRethrottleAction.java @@ -61,7 +61,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC internalRequest.setRequestsPerSecond(requestsPerSecond); // This ListTasksResponse will only ever contain a single task, so grouping them is not very useful. // In stateful, we allow the group_by parameter and default to "nodes", for historical reasons. - // In stateless, we don't allow group_by, we never group, and we redact the node IDs: this minimizes the visibility of node IDs. + // In stateless, we don't allow group_by, we never group, so that we don't include the unwanted layers and node info. final String groupBy = isStateless ? "none" : request.param("group_by", "nodes"); return channel -> { client.execute( From 49851a64c78ff4ff9d7e24082b3f6fdeff993ff3 Mon Sep 17 00:00:00 2001 From: Pete Gillin Date: Mon, 9 Mar 2026 17:15:22 +0000 Subject: [PATCH 4/6] Reformat --- .../elasticsearch/reindex/RestReindexRethrottleAction.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/RestReindexRethrottleAction.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/RestReindexRethrottleAction.java index d1a7680bb753a..5abcd2c465fff 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/RestReindexRethrottleAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/RestReindexRethrottleAction.java @@ -64,11 +64,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC // In stateless, we don't allow group_by, we never group, so that we don't include the unwanted layers and node info. final String groupBy = isStateless ? "none" : request.param("group_by", "nodes"); return channel -> { - client.execute( - ReindexPlugin.RETHROTTLE_ACTION, - internalRequest, - listTasksResponseListener(nodesInCluster, groupBy, channel) - ); + client.execute(ReindexPlugin.RETHROTTLE_ACTION, internalRequest, listTasksResponseListener(nodesInCluster, groupBy, channel)); }; } } From 9dd91b3150245d68233a8643555b9acc5af781cf Mon Sep 17 00:00:00 2001 From: Pete Gillin Date: Mon, 9 Mar 2026 17:16:07 +0000 Subject: [PATCH 5/6] Reformat --- .../elasticsearch/reindex/RestReindexRethrottleAction.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/RestReindexRethrottleAction.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/RestReindexRethrottleAction.java index 5abcd2c465fff..2880a8dd08d7e 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/RestReindexRethrottleAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/RestReindexRethrottleAction.java @@ -63,8 +63,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC // In stateful, we allow the group_by parameter and default to "nodes", for historical reasons. // In stateless, we don't allow group_by, we never group, so that we don't include the unwanted layers and node info. final String groupBy = isStateless ? "none" : request.param("group_by", "nodes"); - return channel -> { - client.execute(ReindexPlugin.RETHROTTLE_ACTION, internalRequest, listTasksResponseListener(nodesInCluster, groupBy, channel)); - }; + return channel -> client.execute(ReindexPlugin.RETHROTTLE_ACTION, internalRequest, listTasksResponseListener(nodesInCluster, groupBy, channel)); } } From 29c14fffda65635ba24dbfb5e274b3d1d6a79665 Mon Sep 17 00:00:00 2001 From: Pete Gillin Date: Mon, 9 Mar 2026 17:16:54 +0000 Subject: [PATCH 6/6] Reformat --- .../reindex/RestReindexRethrottleAction.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/RestReindexRethrottleAction.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/RestReindexRethrottleAction.java index 2880a8dd08d7e..59e0813c8e054 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/RestReindexRethrottleAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/RestReindexRethrottleAction.java @@ -9,9 +9,6 @@ package org.elasticsearch.reindex; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.TaskOperationFailure; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -21,7 +18,6 @@ import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.tasks.TaskInfo; import java.util.List; import java.util.function.Supplier; @@ -63,6 +59,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC // In stateful, we allow the group_by parameter and default to "nodes", for historical reasons. // In stateless, we don't allow group_by, we never group, so that we don't include the unwanted layers and node info. final String groupBy = isStateless ? "none" : request.param("group_by", "nodes"); - return channel -> client.execute(ReindexPlugin.RETHROTTLE_ACTION, internalRequest, listTasksResponseListener(nodesInCluster, groupBy, channel)); + return channel -> client.execute( + ReindexPlugin.RETHROTTLE_ACTION, + internalRequest, + listTasksResponseListener(nodesInCluster, groupBy, channel) + ); } }