Skip to content

Commit 4478f10

Browse files
Van0SSChristoph Büscher
authored andcommitted
Rest High Level client: Add List Tasks (#29546)
This change adds a `listTasks` method to the high level java ClusterClient which allows listing running tasks through the task management API. Related to #27205
1 parent a75b8ad commit 4478f10

File tree

19 files changed

+808
-140
lines changed

19 files changed

+808
-140
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/ClusterClient.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import org.apache.http.Header;
2323
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
25+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
2426
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
2527
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
2628

@@ -63,4 +65,26 @@ public void putSettingsAsync(ClusterUpdateSettingsRequest clusterUpdateSettingsR
6365
restHighLevelClient.performRequestAsyncAndParseEntity(clusterUpdateSettingsRequest, RequestConverters::clusterPutSettings,
6466
ClusterUpdateSettingsResponse::fromXContent, listener, emptySet(), headers);
6567
}
68+
69+
/**
70+
* Get current tasks using the Task Management API
71+
* <p>
72+
* See
73+
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html"> Task Management API on elastic.co</a>
74+
*/
75+
public ListTasksResponse listTasks(ListTasksRequest request, Header... headers) throws IOException {
76+
return restHighLevelClient.performRequestAndParseEntity(request, RequestConverters::listTasks, ListTasksResponse::fromXContent,
77+
emptySet(), headers);
78+
}
79+
80+
/**
81+
* Asynchronously get current tasks using the Task Management API
82+
* <p>
83+
* See
84+
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html"> Task Management API on elastic.co</a>
85+
*/
86+
public void listTasksAsync(ListTasksRequest request, ActionListener<ListTasksResponse> listener, Header... headers) {
87+
restHighLevelClient.performRequestAsyncAndParseEntity(request, RequestConverters::listTasks, ListTasksResponse::fromXContent,
88+
listener, emptySet(), headers);
89+
}
6690
}

client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.http.entity.ContentType;
3030
import org.apache.lucene.util.BytesRef;
3131
import org.elasticsearch.action.DocWriteRequest;
32+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
3233
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
3334
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
3435
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
@@ -45,8 +46,8 @@
4546
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
4647
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
4748
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
48-
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
4949
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
50+
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
5051
import org.elasticsearch.action.admin.indices.shrink.ResizeRequest;
5152
import org.elasticsearch.action.admin.indices.shrink.ResizeType;
5253
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
@@ -83,6 +84,7 @@
8384
import org.elasticsearch.rest.action.search.RestSearchAction;
8485
import org.elasticsearch.script.mustache.SearchTemplateRequest;
8586
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
87+
import org.elasticsearch.tasks.TaskId;
8688

8789
import java.io.ByteArrayOutputStream;
8890
import java.io.IOException;
@@ -606,6 +608,22 @@ static Request clusterPutSettings(ClusterUpdateSettingsRequest clusterUpdateSett
606608
return request;
607609
}
608610

611+
static Request listTasks(ListTasksRequest listTaskRequest) {
612+
if (listTaskRequest.getTaskId() != null && listTaskRequest.getTaskId().isSet()) {
613+
throw new IllegalArgumentException("TaskId cannot be used for list tasks request");
614+
}
615+
Request request = new Request(HttpGet.METHOD_NAME, "/_tasks");
616+
Params params = new Params(request);
617+
params.withTimeout(listTaskRequest.getTimeout())
618+
.withDetailed(listTaskRequest.getDetailed())
619+
.withWaitForCompletion(listTaskRequest.getWaitForCompletion())
620+
.withParentTaskId(listTaskRequest.getParentTaskId())
621+
.withNodes(listTaskRequest.getNodes())
622+
.withActions(listTaskRequest.getActions())
623+
.putParam("group_by", "none");
624+
return request;
625+
}
626+
609627
static Request rollover(RolloverRequest rolloverRequest) throws IOException {
610628
String endpoint = new EndpointBuilder().addPathPart(rolloverRequest.getAlias()).addPathPartAsIs("_rollover")
611629
.addPathPart(rolloverRequest.getNewIndexName()).build();
@@ -932,6 +950,41 @@ Params withPreserveExisting(boolean preserveExisting) {
932950
return this;
933951
}
934952

953+
Params withDetailed(boolean detailed) {
954+
if (detailed) {
955+
return putParam("detailed", Boolean.TRUE.toString());
956+
}
957+
return this;
958+
}
959+
960+
Params withWaitForCompletion(boolean waitForCompletion) {
961+
if (waitForCompletion) {
962+
return putParam("wait_for_completion", Boolean.TRUE.toString());
963+
}
964+
return this;
965+
}
966+
967+
Params withNodes(String[] nodes) {
968+
if (nodes != null && nodes.length > 0) {
969+
return putParam("nodes", String.join(",", nodes));
970+
}
971+
return this;
972+
}
973+
974+
Params withActions(String[] actions) {
975+
if (actions != null && actions.length > 0) {
976+
return putParam("actions", String.join(",", actions));
977+
}
978+
return this;
979+
}
980+
981+
Params withParentTaskId(TaskId parentTaskId) {
982+
if (parentTaskId != null && parentTaskId.isSet()) {
983+
return putParam("parent_task_id", parentTaskId.toString());
984+
}
985+
return this;
986+
}
987+
935988
Params withVerify(boolean verify) {
936989
if (verify) {
937990
return putParam("verify", Boolean.TRUE.toString());

client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
package org.elasticsearch.client;
2121

2222
import org.elasticsearch.ElasticsearchException;
23+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
24+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
25+
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
2326
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
2427
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
2528
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
@@ -29,13 +32,16 @@
2932
import org.elasticsearch.common.xcontent.support.XContentMapValues;
3033
import org.elasticsearch.indices.recovery.RecoverySettings;
3134
import org.elasticsearch.rest.RestStatus;
35+
import org.elasticsearch.tasks.TaskInfo;
3236

3337
import java.io.IOException;
3438
import java.util.HashMap;
3539
import java.util.Map;
3640

41+
import static java.util.Collections.emptyList;
3742
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
3843
import static org.hamcrest.Matchers.equalTo;
44+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
3945
import static org.hamcrest.Matchers.notNullValue;
4046
import static org.hamcrest.Matchers.nullValue;
4147

@@ -105,4 +111,29 @@ public void testClusterUpdateSettingNonExistent() {
105111
assertThat(exception.getMessage(), equalTo(
106112
"Elasticsearch exception [type=illegal_argument_exception, reason=transient setting [" + setting + "], not recognized]"));
107113
}
114+
115+
public void testListTasks() throws IOException {
116+
ListTasksRequest request = new ListTasksRequest();
117+
ListTasksResponse response = execute(request, highLevelClient().cluster()::listTasks, highLevelClient().cluster()::listTasksAsync);
118+
119+
assertThat(response, notNullValue());
120+
assertThat(response.getNodeFailures(), equalTo(emptyList()));
121+
assertThat(response.getTaskFailures(), equalTo(emptyList()));
122+
// It's possible that there are other tasks except 'cluster:monitor/tasks/lists[n]' and 'action":"cluster:monitor/tasks/lists'
123+
assertThat(response.getTasks().size(), greaterThanOrEqualTo(2));
124+
boolean listTasksFound = false;
125+
for (TaskGroup taskGroup : response.getTaskGroups()) {
126+
TaskInfo parent = taskGroup.getTaskInfo();
127+
if ("cluster:monitor/tasks/lists".equals(parent.getAction())) {
128+
assertThat(taskGroup.getChildTasks().size(), equalTo(1));
129+
TaskGroup childGroup = taskGroup.getChildTasks().iterator().next();
130+
assertThat(childGroup.getChildTasks().isEmpty(), equalTo(true));
131+
TaskInfo child = childGroup.getTaskInfo();
132+
assertThat(child.getAction(), equalTo("cluster:monitor/tasks/lists[n]"));
133+
assertThat(child.getParentTaskId(), equalTo(parent.getTaskId()));
134+
listTasksFound = true;
135+
}
136+
}
137+
assertTrue("List tasks were not found", listTasksFound);
138+
}
108139
}

0 commit comments

Comments
 (0)