Skip to content

Commit 5a95b0f

Browse files
authored
EQL: Adds delete async EQL search result action (#57258)
Adds support for deleting async EQL search results to EQL search API. Relates to #49638
1 parent b30cc2b commit 5a95b0f

File tree

32 files changed

+500
-255
lines changed

32 files changed

+500
-255
lines changed

x-pack/plugin/async-search/qa/security/src/test/java/org/elasticsearch/xpack/search/AsyncSearchSecurityIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
3030
import static org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField.RUN_AS_USER_HEADER;
3131
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
32-
import static org.elasticsearch.xpack.search.AsyncSearch.INDEX;
32+
import static org.elasticsearch.xpack.core.XPackPlugin.ASYNC_RESULTS_INDEX;
3333
import static org.hamcrest.Matchers.containsString;
3434
import static org.hamcrest.Matchers.equalTo;
3535

@@ -84,7 +84,7 @@ private void testCase(String user, String other) throws Exception {
8484
// other and user cannot access the result from direct get calls
8585
AsyncExecutionId searchId = AsyncExecutionId.decode(id);
8686
for (String runAs : new String[] {user, other}) {
87-
exc = expectThrows(ResponseException.class, () -> get(INDEX, searchId.getDocId(), runAs));
87+
exc = expectThrows(ResponseException.class, () -> get(ASYNC_RESULTS_INDEX, searchId.getDocId(), runAs));
8888
assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(403));
8989
assertThat(exc.getMessage(), containsString("unauthorized"));
9090
}

x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.search.builder.SearchSourceBuilder;
1919
import org.elasticsearch.test.ESIntegTestCase.SuiteScopeTestCase;
2020
import org.elasticsearch.test.junit.annotations.TestIssueLogging;
21+
import org.elasticsearch.xpack.core.XPackPlugin;
2122
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
2223
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;
2324

@@ -371,7 +372,7 @@ public void testRemoveAsyncIndex() throws Exception {
371372
assertThat(response.getExpirationTime(), greaterThan(now));
372373

373374
// remove the async search index
374-
client().admin().indices().prepareDelete(AsyncSearch.INDEX).get();
375+
client().admin().indices().prepareDelete(XPackPlugin.ASYNC_RESULTS_INDEX).get();
375376

376377
Exception exc = expectThrows(Exception.class, () -> getAsyncSearch(response.getId()));
377378
Throwable cause = exc instanceof ExecutionException ?

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@
2929
import org.elasticsearch.script.ScriptService;
3030
import org.elasticsearch.threadpool.ThreadPool;
3131
import org.elasticsearch.watcher.ResourceWatcherService;
32+
import org.elasticsearch.xpack.core.XPackPlugin;
3233
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
3334
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
34-
import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction;
3535
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
3636
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;
3737

@@ -45,7 +45,6 @@
4545
import static org.elasticsearch.xpack.search.AsyncSearchMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;
4646

4747
public final class AsyncSearch extends Plugin implements ActionPlugin {
48-
public static final String INDEX = ".async-search";
4948
private final Settings settings;
5049

5150
public AsyncSearch(Settings settings) {
@@ -56,8 +55,7 @@ public AsyncSearch(Settings settings) {
5655
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
5756
return Arrays.asList(
5857
new ActionHandler<>(SubmitAsyncSearchAction.INSTANCE, TransportSubmitAsyncSearchAction.class),
59-
new ActionHandler<>(GetAsyncSearchAction.INSTANCE, TransportGetAsyncSearchAction.class),
60-
new ActionHandler<>(DeleteAsyncSearchAction.INSTANCE, TransportDeleteAsyncSearchAction.class)
58+
new ActionHandler<>(GetAsyncSearchAction.INSTANCE, TransportGetAsyncSearchAction.class)
6159
);
6260
}
6361

@@ -88,8 +86,8 @@ public Collection<Object> createComponents(Client client,
8886
if (DiscoveryNode.isDataNode(environment.settings())) {
8987
// only data nodes should be eligible to run the maintenance service.
9088
AsyncTaskIndexService<AsyncSearchResponse> indexService =
91-
new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService, threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN,
92-
AsyncSearchResponse::new, namedWriteableRegistry);
89+
new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService, threadPool.getThreadContext(), client,
90+
ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, namedWriteableRegistry);
9391
AsyncSearchMaintenanceService maintenanceService =
9492
new AsyncSearchMaintenanceService(clusterService, nodeEnvironment.nodeId(), settings, threadPool, indexService);
9593
return Collections.singletonList(maintenanceService);

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchMaintenanceService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.common.settings.Settings;
1212
import org.elasticsearch.common.unit.TimeValue;
1313
import org.elasticsearch.threadpool.ThreadPool;
14+
import org.elasticsearch.xpack.core.XPackPlugin;
1415
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
1516
import org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService;
1617

@@ -30,7 +31,7 @@ public class AsyncSearchMaintenanceService extends AsyncTaskMaintenanceService {
3031
Settings nodeSettings,
3132
ThreadPool threadPool,
3233
AsyncTaskIndexService<?> indexService) {
33-
super(clusterService, AsyncSearch.INDEX, localNodeId, threadPool, indexService,
34+
super(clusterService, XPackPlugin.ASYNC_RESULTS_INDEX, localNodeId, threadPool, indexService,
3435
ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.get(nodeSettings));
3536
}
3637
}

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.search.aggregations.InternalAggregation;
2525
import org.elasticsearch.search.aggregations.InternalAggregations;
2626
import org.elasticsearch.tasks.TaskId;
27+
import org.elasticsearch.tasks.TaskManager;
2728
import org.elasticsearch.threadpool.Scheduler.Cancellable;
2829
import org.elasticsearch.threadpool.ThreadPool;
2930
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
@@ -132,6 +133,11 @@ public void setExpirationTime(long expirationTimeMillis) {
132133
this.expirationTimeMillis = expirationTimeMillis;
133134
}
134135

136+
@Override
137+
public void cancelTask(TaskManager taskManager, Runnable runnable) {
138+
cancelTask(runnable);
139+
}
140+
135141
/**
136142
* Cancels the running task and its children.
137143
*/

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestDeleteAsyncSearchAction.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@
77

88
import org.elasticsearch.client.node.NodeClient;
99
import org.elasticsearch.rest.BaseRestHandler;
10-
import org.elasticsearch.rest.RestHandler.Route;
1110
import org.elasticsearch.rest.RestRequest;
1211
import org.elasticsearch.rest.action.RestToXContentListener;
13-
import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction;
12+
import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
13+
import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
1414

1515

1616
import java.io.IOException;
@@ -34,7 +34,7 @@ public String getName() {
3434

3535
@Override
3636
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
37-
DeleteAsyncSearchAction.Request delete = new DeleteAsyncSearchAction.Request(request.param("id"));
38-
return channel -> client.execute(DeleteAsyncSearchAction.INSTANCE, delete, new RestToXContentListener<>(channel));
37+
DeleteAsyncResultRequest delete = new DeleteAsyncResultRequest(request.param("id"));
38+
return channel -> client.execute(DeleteAsyncResultAction.INSTANCE, delete, new RestToXContentListener<>(channel));
3939
}
4040
}

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java

Lines changed: 0 additions & 115 deletions
This file was deleted.

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.threadpool.ThreadPool;
1919
import org.elasticsearch.transport.TransportRequestOptions;
2020
import org.elasticsearch.transport.TransportService;
21+
import org.elasticsearch.xpack.core.XPackPlugin;
2122
import org.elasticsearch.xpack.core.async.AsyncResultsService;
2223
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
2324
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
@@ -39,9 +40,17 @@ public TransportGetAsyncSearchAction(TransportService transportService,
3940
ThreadPool threadPool) {
4041
super(GetAsyncSearchAction.NAME, transportService, actionFilters, GetAsyncResultRequest::new);
4142
this.transportService = transportService;
42-
AsyncTaskIndexService<AsyncSearchResponse> store = new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService,
43+
this.resultsService = createResultsService(transportService, clusterService, registry, client, threadPool);
44+
}
45+
46+
static AsyncResultsService<AsyncSearchTask, AsyncSearchResponse> createResultsService(TransportService transportService,
47+
ClusterService clusterService,
48+
NamedWriteableRegistry registry,
49+
Client client,
50+
ThreadPool threadPool) {
51+
AsyncTaskIndexService<AsyncSearchResponse> store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService,
4352
threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry);
44-
resultsService = new AsyncResultsService<>(store, true, AsyncSearchTask.class, AsyncSearchTask::addCompletionListener,
53+
return new AsyncResultsService<>(store, true, AsyncSearchTask.class, AsyncSearchTask::addCompletionListener,
4554
transportService.getTaskManager(), clusterService);
4655
}
4756

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.tasks.TaskCancelledException;
3535
import org.elasticsearch.tasks.TaskId;
3636
import org.elasticsearch.transport.TransportService;
37+
import org.elasticsearch.xpack.core.XPackPlugin;
3738
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
3839
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
3940
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
@@ -69,7 +70,7 @@ public TransportSubmitAsyncSearchAction(ClusterService clusterService,
6970
this.requestToAggReduceContextBuilder = request -> searchService.aggReduceContextBuilder(request).forFinalReduction();
7071
this.searchAction = searchAction;
7172
this.threadContext = transportService.getThreadPool().getThreadContext();
72-
this.store = new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService, threadContext, client,
73+
this.store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService, threadContext, client,
7374
ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry);
7475
}
7576

x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@
3030
import org.elasticsearch.test.InternalTestCluster;
3131
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
3232
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
33+
import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
3334
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
3435
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
35-
import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction;
36+
import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
3637
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
3738
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;
3839
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;
@@ -48,7 +49,7 @@
4849
import java.util.List;
4950
import java.util.concurrent.ExecutionException;
5051

51-
import static org.elasticsearch.xpack.search.AsyncSearch.INDEX;
52+
import static org.elasticsearch.xpack.core.XPackPlugin.ASYNC_RESULTS_INDEX;
5253
import static org.elasticsearch.xpack.search.AsyncSearchMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;
5354
import static org.hamcrest.Matchers.equalTo;
5455
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -125,7 +126,7 @@ protected void restartTaskNode(String id, String indexName) throws Exception {
125126
stopMaintenanceService();
126127
internalCluster().restartNode(node.getName(), new InternalTestCluster.RestartCallback() {});
127128
startMaintenanceService();
128-
ensureYellow(INDEX, indexName);
129+
ensureYellow(ASYNC_RESULTS_INDEX, indexName);
129130
}
130131

131132
protected AsyncSearchResponse submitAsyncSearch(SubmitAsyncSearchRequest request) throws ExecutionException, InterruptedException {
@@ -141,7 +142,7 @@ protected AsyncSearchResponse getAsyncSearch(String id, TimeValue keepAlive) thr
141142
}
142143

143144
protected AcknowledgedResponse deleteAsyncSearch(String id) throws ExecutionException, InterruptedException {
144-
return client().execute(DeleteAsyncSearchAction.INSTANCE, new DeleteAsyncSearchAction.Request(id)).get();
145+
return client().execute(DeleteAsyncResultAction.INSTANCE, new DeleteAsyncResultRequest(id)).get();
145146
}
146147

147148
/**
@@ -151,7 +152,7 @@ protected void ensureTaskRemoval(String id) throws Exception {
151152
AsyncExecutionId searchId = AsyncExecutionId.decode(id);
152153
assertBusy(() -> {
153154
GetResponse resp = client().prepareGet()
154-
.setIndex(INDEX)
155+
.setIndex(ASYNC_RESULTS_INDEX)
155156
.setId(searchId.getDocId())
156157
.get();
157158
assertFalse(resp.isExists());

0 commit comments

Comments
 (0)