Skip to content

Commit 7b18c5c

Browse files
author
Christoph Büscher
committed
HLRC: Add throttling for update & delete-by-query (#33951)
This change adds throttling to the update-by-query and delete-by-query cases similar to throttling for reindex. This mostly means additional methods on the client class itself, since the request hits the same RestHandler, just with slightly different endpoints, and also the return values are similar.
1 parent e6b9669 commit 7b18c5c

File tree

10 files changed

+288
-47
lines changed

10 files changed

+288
-47
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -544,8 +544,20 @@ static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws I
544544
return request;
545545
}
546546

547-
static Request rethrottle(RethrottleRequest rethrottleRequest) throws IOException {
548-
String endpoint = new EndpointBuilder().addPathPart("_reindex").addPathPart(rethrottleRequest.getTaskId().toString())
547+
static Request rethrottleReindex(RethrottleRequest rethrottleRequest) {
548+
return rethrottle(rethrottleRequest, "_reindex");
549+
}
550+
551+
static Request rethrottleUpdateByQuery(RethrottleRequest rethrottleRequest) {
552+
return rethrottle(rethrottleRequest, "_update_by_query");
553+
}
554+
555+
static Request rethrottleDeleteByQuery(RethrottleRequest rethrottleRequest) {
556+
return rethrottle(rethrottleRequest, "_delete_by_query");
557+
}
558+
559+
private static Request rethrottle(RethrottleRequest rethrottleRequest, String firstPathPart) {
560+
String endpoint = new EndpointBuilder().addPathPart(firstPathPart).addPathPart(rethrottleRequest.getTaskId().toString())
549561
.addPathPart("_rethrottle").build();
550562
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
551563
Params params = new Params(request)

client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,62 @@ public final void deleteByQueryAsync(DeleteByQueryRequest deleteByQueryRequest,
536536
);
537537
}
538538

539+
/**
540+
* Executes a delete by query rethrottle request.
541+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">
542+
* Delete By Query API on elastic.co</a>
543+
* @param rethrottleRequest the request
544+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
545+
* @return the response
546+
* @throws IOException in case there is a problem sending the request or parsing back the response
547+
*/
548+
public final ListTasksResponse deleteByQueryRethrottle(RethrottleRequest rethrottleRequest, RequestOptions options) throws IOException {
549+
return performRequestAndParseEntity(rethrottleRequest, RequestConverters::rethrottleDeleteByQuery, options,
550+
ListTasksResponse::fromXContent, emptySet());
551+
}
552+
553+
/**
554+
* Asynchronously execute an delete by query rethrottle request.
555+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">
556+
* Delete By Query API on elastic.co</a>
557+
* @param rethrottleRequest the request
558+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
559+
* @param listener the listener to be notified upon request completion
560+
*/
561+
public final void deleteByQueryRethrottleAsync(RethrottleRequest rethrottleRequest, RequestOptions options,
562+
ActionListener<ListTasksResponse> listener) {
563+
performRequestAsyncAndParseEntity(rethrottleRequest, RequestConverters::rethrottleDeleteByQuery, options,
564+
ListTasksResponse::fromXContent, listener, emptySet());
565+
}
566+
567+
/**
568+
* Executes a update by query rethrottle request.
569+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">
570+
* Update By Query API on elastic.co</a>
571+
* @param rethrottleRequest the request
572+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
573+
* @return the response
574+
* @throws IOException in case there is a problem sending the request or parsing back the response
575+
*/
576+
public final ListTasksResponse updateByQueryRethrottle(RethrottleRequest rethrottleRequest, RequestOptions options) throws IOException {
577+
return performRequestAndParseEntity(rethrottleRequest, RequestConverters::rethrottleUpdateByQuery, options,
578+
ListTasksResponse::fromXContent, emptySet());
579+
}
580+
581+
/**
582+
* Asynchronously execute an update by query rethrottle request.
583+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">
584+
* Update By Query API on elastic.co</a>
585+
* @param rethrottleRequest the request
586+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
587+
* @param listener the listener to be notified upon request completion
588+
*/
589+
public final void updateByQueryRethrottleAsync(RethrottleRequest rethrottleRequest, RequestOptions options,
590+
ActionListener<ListTasksResponse> listener) {
591+
performRequestAsyncAndParseEntity(rethrottleRequest, RequestConverters::rethrottleUpdateByQuery, options,
592+
ListTasksResponse::fromXContent, listener, emptySet());
593+
}
594+
539595
/**
540596
* Executes a reindex rethrottling request.
541597
* See the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#docs-reindex-rethrottle">
@@ -547,8 +603,8 @@ public final void deleteByQueryAsync(DeleteByQueryRequest deleteByQueryRequest,
547603
* @throws IOException in case there is a problem sending the request or parsing back the response
548604
*/
549605
public final ListTasksResponse reindexRethrottle(RethrottleRequest rethrottleRequest, RequestOptions options) throws IOException {
550-
return performRequestAndParseEntity(rethrottleRequest, RequestConverters::rethrottle, options, ListTasksResponse::fromXContent,
551-
emptySet());
606+
return performRequestAndParseEntity(rethrottleRequest, RequestConverters::rethrottleReindex, options,
607+
ListTasksResponse::fromXContent, emptySet());
552608
}
553609

554610
/**
@@ -561,9 +617,9 @@ public final ListTasksResponse reindexRethrottle(RethrottleRequest rethrottleReq
561617
* @param listener the listener to be notified upon request completion
562618
*/
563619
public final void reindexRethrottleAsync(RethrottleRequest rethrottleRequest, RequestOptions options,
564-
ActionListener<ListTasksResponse> listener) {
565-
performRequestAsyncAndParseEntity(rethrottleRequest, RequestConverters::rethrottle, options, ListTasksResponse::fromXContent,
566-
listener, emptySet());
620+
ActionListener<ListTasksResponse> listener) {
621+
performRequestAsyncAndParseEntity(rethrottleRequest, RequestConverters::rethrottleReindex, options, ListTasksResponse::fromXContent,
622+
listener, emptySet());
567623
}
568624

569625
/**

client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java

Lines changed: 101 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,11 @@
5555
import org.elasticsearch.index.get.GetResult;
5656
import org.elasticsearch.index.query.IdsQueryBuilder;
5757
import org.elasticsearch.index.reindex.BulkByScrollResponse;
58+
import org.elasticsearch.index.reindex.DeleteByQueryAction;
5859
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
5960
import org.elasticsearch.index.reindex.ReindexAction;
6061
import org.elasticsearch.index.reindex.ReindexRequest;
62+
import org.elasticsearch.index.reindex.UpdateByQueryAction;
6163
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
6264
import org.elasticsearch.rest.RestStatus;
6365
import org.elasticsearch.script.Script;
@@ -792,10 +794,7 @@ public void onFailure(Exception e) {
792794
}
793795
});
794796

795-
TaskGroup taskGroupToRethrottle = findTaskToRethrottle();
796-
assertThat(taskGroupToRethrottle.getChildTasks(), empty());
797-
TaskId taskIdToRethrottle = taskGroupToRethrottle.getTaskInfo().getTaskId();
798-
797+
TaskId taskIdToRethrottle = findTaskToRethrottle(ReindexAction.NAME);
799798
float requestsPerSecond = 1000f;
800799
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
801800
highLevelClient()::reindexRethrottle, highLevelClient()::reindexRethrottleAsync);
@@ -817,10 +816,10 @@ public void onFailure(Exception e) {
817816
}
818817
}
819818

820-
private TaskGroup findTaskToRethrottle() throws IOException {
819+
private TaskId findTaskToRethrottle(String actionName) throws IOException {
821820
long start = System.nanoTime();
822821
ListTasksRequest request = new ListTasksRequest();
823-
request.setActions(ReindexAction.NAME);
822+
request.setActions(actionName);
824823
request.setDetailed(true);
825824
do {
826825
ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT);
@@ -831,13 +830,15 @@ private TaskGroup findTaskToRethrottle() throws IOException {
831830
// The parent task hasn't started yet
832831
continue;
833832
}
834-
return list.getTaskGroups().get(0);
833+
TaskGroup taskGroup = list.getTaskGroups().get(0);
834+
assertThat(taskGroup.getChildTasks(), empty());
835+
return taskGroup.getTaskInfo().getTaskId();
835836
} while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10));
836837
throw new AssertionError("Couldn't find tasks to rethrottle. Here are the running tasks " +
837838
highLevelClient().tasks().list(request, RequestOptions.DEFAULT));
838839
}
839840

840-
public void testUpdateByQuery() throws IOException {
841+
public void testUpdateByQuery() throws Exception {
841842
final String sourceIndex = "source1";
842843
{
843844
// Prepare
@@ -901,9 +902,53 @@ public void testUpdateByQuery() throws IOException {
901902
.getSourceAsMap().get("foo"))
902903
);
903904
}
905+
{
906+
// test update-by-query rethrottling
907+
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
908+
updateByQueryRequest.indices(sourceIndex);
909+
updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1").types("type"));
910+
updateByQueryRequest.setRefresh(true);
911+
912+
// this following settings are supposed to halt reindexing after first document
913+
updateByQueryRequest.setBatchSize(1);
914+
updateByQueryRequest.setRequestsPerSecond(0.00001f);
915+
final CountDownLatch taskFinished = new CountDownLatch(1);
916+
highLevelClient().updateByQueryAsync(updateByQueryRequest, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
917+
918+
@Override
919+
public void onResponse(BulkByScrollResponse response) {
920+
taskFinished.countDown();
921+
}
922+
923+
@Override
924+
public void onFailure(Exception e) {
925+
fail(e.toString());
926+
}
927+
});
928+
929+
TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME);
930+
float requestsPerSecond = 1000f;
931+
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
932+
highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
933+
assertThat(response.getTasks(), hasSize(1));
934+
assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
935+
assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
936+
assertEquals(Float.toString(requestsPerSecond),
937+
((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
938+
taskFinished.await(2, TimeUnit.SECONDS);
939+
940+
// any rethrottling after the update-by-query is done performed with the same taskId should result in a failure
941+
response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
942+
highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
943+
assertTrue(response.getTasks().isEmpty());
944+
assertFalse(response.getNodeFailures().isEmpty());
945+
assertEquals(1, response.getNodeFailures().size());
946+
assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
947+
response.getNodeFailures().get(0).getCause().getMessage());
948+
}
904949
}
905950

906-
public void testDeleteByQuery() throws IOException {
951+
public void testDeleteByQuery() throws Exception {
907952
final String sourceIndex = "source1";
908953
{
909954
// Prepare
@@ -920,6 +965,8 @@ public void testDeleteByQuery() throws IOException {
920965
.source(Collections.singletonMap("foo", 1), XContentType.JSON))
921966
.add(new IndexRequest(sourceIndex, "type", "2")
922967
.source(Collections.singletonMap("foo", 2), XContentType.JSON))
968+
.add(new IndexRequest(sourceIndex, "type", "3")
969+
.source(Collections.singletonMap("foo", 3), XContentType.JSON))
923970
.setRefreshPolicy(RefreshPolicy.IMMEDIATE),
924971
RequestOptions.DEFAULT
925972
).status()
@@ -943,10 +990,54 @@ public void testDeleteByQuery() throws IOException {
943990
assertEquals(0, bulkResponse.getBulkFailures().size());
944991
assertEquals(0, bulkResponse.getSearchFailures().size());
945992
assertEquals(
946-
1,
993+
2,
947994
highLevelClient().search(new SearchRequest(sourceIndex), RequestOptions.DEFAULT).getHits().totalHits
948995
);
949996
}
997+
{
998+
// test delete-by-query rethrottling
999+
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
1000+
deleteByQueryRequest.indices(sourceIndex);
1001+
deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("2", "3").types("type"));
1002+
deleteByQueryRequest.setRefresh(true);
1003+
1004+
// this following settings are supposed to halt reindexing after first document
1005+
deleteByQueryRequest.setBatchSize(1);
1006+
deleteByQueryRequest.setRequestsPerSecond(0.00001f);
1007+
final CountDownLatch taskFinished = new CountDownLatch(1);
1008+
highLevelClient().deleteByQueryAsync(deleteByQueryRequest, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
1009+
1010+
@Override
1011+
public void onResponse(BulkByScrollResponse response) {
1012+
taskFinished.countDown();
1013+
}
1014+
1015+
@Override
1016+
public void onFailure(Exception e) {
1017+
fail(e.toString());
1018+
}
1019+
});
1020+
1021+
TaskId taskIdToRethrottle = findTaskToRethrottle(DeleteByQueryAction.NAME);
1022+
float requestsPerSecond = 1000f;
1023+
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
1024+
highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync);
1025+
assertThat(response.getTasks(), hasSize(1));
1026+
assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
1027+
assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
1028+
assertEquals(Float.toString(requestsPerSecond),
1029+
((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
1030+
taskFinished.await(2, TimeUnit.SECONDS);
1031+
1032+
// any rethrottling after the delete-by-query is done performed with the same taskId should result in a failure
1033+
response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
1034+
highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync);
1035+
assertTrue(response.getTasks().isEmpty());
1036+
assertFalse(response.getNodeFailures().isEmpty());
1037+
assertEquals(1, response.getNodeFailures().size());
1038+
assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
1039+
response.getNodeFailures().get(0).getCause().getMessage());
1040+
}
9501041
}
9511042

9521043
public void testBulkProcessorIntegration() throws IOException {

client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.elasticsearch.common.Strings;
6060
import org.elasticsearch.common.bytes.BytesArray;
6161
import org.elasticsearch.common.bytes.BytesReference;
62+
import org.elasticsearch.common.collect.Tuple;
6263
import org.elasticsearch.common.io.Streams;
6364
import org.elasticsearch.common.lucene.uid.Versions;
6465
import org.elasticsearch.common.unit.TimeValue;
@@ -478,7 +479,7 @@ public void testDeleteByQuery() throws IOException {
478479
assertToXContentBody(deleteByQueryRequest, request.getEntity());
479480
}
480481

481-
public void testRethrottle() throws IOException {
482+
public void testRethrottle() {
482483
TaskId taskId = new TaskId(randomAlphaOfLength(10), randomIntBetween(1, 100));
483484
RethrottleRequest rethrottleRequest;
484485
Float requestsPerSecond;
@@ -492,11 +493,20 @@ public void testRethrottle() throws IOException {
492493
expectedParams.put(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, "-1");
493494
}
494495
expectedParams.put("group_by", "none");
495-
Request request = RequestConverters.rethrottle(rethrottleRequest);
496-
assertEquals("/_reindex/" + taskId + "/_rethrottle", request.getEndpoint());
497-
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
498-
assertEquals(expectedParams, request.getParameters());
499-
assertNull(request.getEntity());
496+
List<Tuple<String, Supplier<Request>>> variants = new ArrayList<>();
497+
variants.add(new Tuple<String, Supplier<Request>>("_reindex", () -> RequestConverters.rethrottleReindex(rethrottleRequest)));
498+
variants.add(new Tuple<String, Supplier<Request>>("_update_by_query",
499+
() -> RequestConverters.rethrottleUpdateByQuery(rethrottleRequest)));
500+
variants.add(new Tuple<String, Supplier<Request>>("_delete_by_query",
501+
() -> RequestConverters.rethrottleDeleteByQuery(rethrottleRequest)));
502+
503+
for (Tuple<String, Supplier<Request>> variant : variants) {
504+
Request request = variant.v2().get();
505+
assertEquals("/" + variant.v1() + "/" + taskId + "/_rethrottle", request.getEndpoint());
506+
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
507+
assertEquals(expectedParams, request.getParameters());
508+
assertNull(request.getEntity());
509+
}
500510

501511
// test illegal RethrottleRequest values
502512
Exception e = expectThrows(NullPointerException.class, () -> new RethrottleRequest(null, 1.0f));

0 commit comments

Comments
 (0)