Skip to content

Commit e999819

Browse files
authored
EQL: Adds get async EQL search result action (#56852)
Adds support for retrieving async EQL search result s to eql search API. Relates to #49638
1 parent 39df45e commit e999819

File tree

36 files changed

+1396
-384
lines changed

36 files changed

+1396
-384
lines changed

server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.threadpool.ThreadPool;
2727

2828
import java.util.concurrent.atomic.AtomicBoolean;
29+
import java.util.function.Consumer;
2930

3031
public class ListenerTimeouts {
3132

@@ -41,9 +42,29 @@ public class ListenerTimeouts {
4142
* @param listenerName name of the listener for timeout exception
4243
* @return the wrapped listener that will timeout
4344
*/
44-
public static <Response> ActionListener<Response> wrapWithTimeout(ThreadPool threadPool, ActionListener<Response> listener,
45+
public static <Response> ActionListener<Response> wrapWithTimeout(ThreadPool threadPool, ActionListener<Response> listener,
4546
TimeValue timeout, String executor, String listenerName) {
46-
TimeoutableListener<Response> wrappedListener = new TimeoutableListener<>(listener, timeout, listenerName);
47+
return wrapWithTimeout(threadPool, timeout, executor, listener, (ignore) -> {
48+
String timeoutMessage = "[" + listenerName + "]" + " timed out after [" + timeout + "]";
49+
listener.onFailure(new ElasticsearchTimeoutException(timeoutMessage));
50+
});
51+
}
52+
53+
/**
54+
* Wraps a listener with a listener that can timeout. After the timeout period the
55+
* onTimeout Runnable will be called.
56+
*
57+
* @param threadPool used to schedule the timeout
58+
* @param timeout period before listener failed
59+
* @param executor to use for scheduling timeout
60+
* @param listener to that can timeout
61+
* @param onTimeout consumer will be called and the resulting wrapper will be passed to it as a parameter
62+
* @return the wrapped listener that will timeout
63+
*/
64+
public static <Response> ActionListener<Response> wrapWithTimeout(ThreadPool threadPool, TimeValue timeout, String executor,
65+
ActionListener<Response> listener,
66+
Consumer<ActionListener<Response>> onTimeout) {
67+
TimeoutableListener<Response> wrappedListener = new TimeoutableListener<>(listener, onTimeout);
4768
wrappedListener.cancellable = threadPool.schedule(wrappedListener, timeout, executor);
4869
return wrappedListener;
4970
}
@@ -52,14 +73,12 @@ private static class TimeoutableListener<Response> implements ActionListener<Res
5273

5374
private final AtomicBoolean isDone = new AtomicBoolean(false);
5475
private final ActionListener<Response> delegate;
55-
private final TimeValue timeout;
56-
private final String listenerName;
76+
private final Consumer<ActionListener<Response>> onTimeout;
5777
private volatile Scheduler.ScheduledCancellable cancellable;
5878

59-
private TimeoutableListener(ActionListener<Response> delegate, TimeValue timeout, String listenerName) {
79+
private TimeoutableListener(ActionListener<Response> delegate, Consumer<ActionListener<Response>> onTimeout) {
6080
this.delegate = delegate;
61-
this.timeout = timeout;
62-
this.listenerName = listenerName;
81+
this.onTimeout = onTimeout;
6382
}
6483

6584
@Override
@@ -81,8 +100,7 @@ public void onFailure(Exception e) {
81100
@Override
82101
public void run() {
83102
if (isDone.compareAndSet(false, true)) {
84-
String timeoutMessage = "[" + listenerName + "]" + " timed out after [" + timeout + "]";
85-
delegate.onFailure(new ElasticsearchTimeoutException(timeoutMessage));
103+
onTimeout.accept(this);
86104
}
87105
}
88106
}

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ Listener getSearchProgressActionListener() {
127127
/**
128128
* Update the expiration time of the (partial) response.
129129
*/
130+
@Override
130131
public void setExpirationTime(long expirationTimeMillis) {
131132
this.expirationTimeMillis = expirationTimeMillis;
132133
}

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestGetAsyncSearchAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.elasticsearch.rest.BaseRestHandler;
1010
import org.elasticsearch.rest.RestRequest;
1111
import org.elasticsearch.rest.action.RestStatusToXContentListener;
12+
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
1213
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
1314

1415
import java.util.List;
@@ -33,7 +34,7 @@ public String getName() {
3334

3435
@Override
3536
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
36-
GetAsyncSearchAction.Request get = new GetAsyncSearchAction.Request(request.param("id"));
37+
GetAsyncResultRequest get = new GetAsyncResultRequest(request.param("id"));
3738
if (request.hasParam("wait_for_completion_timeout")) {
3839
get.setWaitForCompletion(request.paramAsTime("wait_for_completion_timeout", get.getWaitForCompletion()));
3940
}

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java

Lines changed: 17 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,6 @@
55
*/
66
package org.elasticsearch.xpack.search;
77

8-
import org.apache.logging.log4j.LogManager;
9-
import org.apache.logging.log4j.Logger;
10-
import org.apache.logging.log4j.message.ParameterizedMessage;
11-
import org.elasticsearch.ExceptionsHelper;
12-
import org.elasticsearch.ResourceNotFoundException;
138
import org.elasticsearch.action.ActionListener;
149
import org.elasticsearch.action.ActionListenerResponseHandler;
1510
import org.elasticsearch.action.support.ActionFilters;
@@ -19,23 +14,21 @@
1914
import org.elasticsearch.cluster.service.ClusterService;
2015
import org.elasticsearch.common.inject.Inject;
2116
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
22-
import org.elasticsearch.rest.RestStatus;
2317
import org.elasticsearch.tasks.Task;
2418
import org.elasticsearch.threadpool.ThreadPool;
2519
import org.elasticsearch.transport.TransportRequestOptions;
2620
import org.elasticsearch.transport.TransportService;
27-
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
21+
import org.elasticsearch.xpack.core.async.AsyncResultsService;
2822
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
23+
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
2924
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
3025
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
3126

3227
import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
3328

34-
public class TransportGetAsyncSearchAction extends HandledTransportAction<GetAsyncSearchAction.Request, AsyncSearchResponse> {
35-
private final Logger logger = LogManager.getLogger(TransportGetAsyncSearchAction.class);
36-
private final ClusterService clusterService;
29+
public class TransportGetAsyncSearchAction extends HandledTransportAction<GetAsyncResultRequest, AsyncSearchResponse> {
30+
private final AsyncResultsService<AsyncSearchTask, AsyncSearchResponse> resultsService;
3731
private final TransportService transportService;
38-
private final AsyncTaskIndexService<AsyncSearchResponse> store;
3932

4033
@Inject
4134
public TransportGetAsyncSearchAction(TransportService transportService,
@@ -44,113 +37,23 @@ public TransportGetAsyncSearchAction(TransportService transportService,
4437
NamedWriteableRegistry registry,
4538
Client client,
4639
ThreadPool threadPool) {
47-
super(GetAsyncSearchAction.NAME, transportService, actionFilters, GetAsyncSearchAction.Request::new);
48-
this.clusterService = clusterService;
40+
super(GetAsyncSearchAction.NAME, transportService, actionFilters, GetAsyncResultRequest::new);
4941
this.transportService = transportService;
50-
this.store = new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService, threadPool.getThreadContext(), client,
51-
ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry);
42+
AsyncTaskIndexService<AsyncSearchResponse> store = new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService,
43+
threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry);
44+
resultsService = new AsyncResultsService<>(store, true, AsyncSearchTask.class, AsyncSearchTask::addCompletionListener,
45+
transportService.getTaskManager(), clusterService);
5246
}
5347

5448
@Override
55-
protected void doExecute(Task task, GetAsyncSearchAction.Request request, ActionListener<AsyncSearchResponse> listener) {
56-
try {
57-
long nowInMillis = System.currentTimeMillis();
58-
AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId());
59-
DiscoveryNode node = clusterService.state().nodes().get(searchId.getTaskId().getNodeId());
60-
if (clusterService.localNode().getId().equals(searchId.getTaskId().getNodeId()) || node == null) {
61-
if (request.getKeepAlive().getMillis() > 0) {
62-
long expirationTime = nowInMillis + request.getKeepAlive().getMillis();
63-
store.updateExpirationTime(searchId.getDocId(), expirationTime,
64-
ActionListener.wrap(
65-
p -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener),
66-
exc -> {
67-
//don't log when: the async search document or its index is not found. That can happen if an invalid
68-
//search id is provided or no async search initial response has been stored yet.
69-
RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc));
70-
if (status != RestStatus.NOT_FOUND) {
71-
logger.error(() -> new ParameterizedMessage("failed to update expiration time for async-search [{}]",
72-
searchId.getEncoded()), exc);
73-
}
74-
listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
75-
}
76-
));
77-
} else {
78-
getSearchResponseFromTask(searchId, request, nowInMillis, -1, listener);
79-
}
80-
} else {
81-
TransportRequestOptions.Builder builder = TransportRequestOptions.builder();
82-
transportService.sendRequest(node, GetAsyncSearchAction.NAME, request, builder.build(),
83-
new ActionListenerResponseHandler<>(listener, AsyncSearchResponse::new, ThreadPool.Names.SAME));
84-
}
85-
} catch (Exception exc) {
86-
listener.onFailure(exc);
49+
protected void doExecute(Task task, GetAsyncResultRequest request, ActionListener<AsyncSearchResponse> listener) {
50+
DiscoveryNode node = resultsService.getNode(request.getId());
51+
if (node == null || resultsService.isLocalNode(node)) {
52+
resultsService.retrieveResult(request, listener);
53+
} else {
54+
TransportRequestOptions.Builder builder = TransportRequestOptions.builder();
55+
transportService.sendRequest(node, GetAsyncSearchAction.NAME, request, builder.build(),
56+
new ActionListenerResponseHandler<>(listener, AsyncSearchResponse::new, ThreadPool.Names.SAME));
8757
}
8858
}
89-
90-
private void getSearchResponseFromTask(AsyncExecutionId searchId,
91-
GetAsyncSearchAction.Request request,
92-
long nowInMillis,
93-
long expirationTimeMillis,
94-
ActionListener<AsyncSearchResponse> listener) {
95-
try {
96-
final AsyncSearchTask task = store.getTask(taskManager, searchId, AsyncSearchTask.class);
97-
if (task == null) {
98-
getSearchResponseFromIndex(searchId, request, nowInMillis, listener);
99-
return;
100-
}
101-
102-
if (task.isCancelled()) {
103-
listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
104-
return;
105-
}
106-
107-
if (expirationTimeMillis != -1) {
108-
task.setExpirationTime(expirationTimeMillis);
109-
}
110-
task.addCompletionListener(new ActionListener<>() {
111-
@Override
112-
public void onResponse(AsyncSearchResponse response) {
113-
sendFinalResponse(request, response, nowInMillis, listener);
114-
}
115-
116-
@Override
117-
public void onFailure(Exception exc) {
118-
listener.onFailure(exc);
119-
}
120-
}, request.getWaitForCompletion());
121-
} catch (Exception exc) {
122-
listener.onFailure(exc);
123-
}
124-
}
125-
126-
private void getSearchResponseFromIndex(AsyncExecutionId searchId,
127-
GetAsyncSearchAction.Request request,
128-
long nowInMillis,
129-
ActionListener<AsyncSearchResponse> listener) {
130-
store.getResponse(searchId, true,
131-
new ActionListener<>() {
132-
@Override
133-
public void onResponse(AsyncSearchResponse response) {
134-
sendFinalResponse(request, response, nowInMillis, listener);
135-
}
136-
137-
@Override
138-
public void onFailure(Exception e) {
139-
listener.onFailure(e);
140-
}
141-
});
142-
}
143-
144-
private void sendFinalResponse(GetAsyncSearchAction.Request request,
145-
AsyncSearchResponse response,
146-
long nowInMillis,
147-
ActionListener<AsyncSearchResponse> listener) {
148-
// check if the result has expired
149-
if (response.getExpirationTime() < nowInMillis) {
150-
listener.onFailure(new ResourceNotFoundException(request.getId()));
151-
return;
152-
}
153-
154-
listener.onResponse(response);
155-
}
15659
}

x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.test.InternalTestCluster;
3131
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
3232
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
33+
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
3334
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
3435
import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction;
3536
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
@@ -132,11 +133,11 @@ protected AsyncSearchResponse submitAsyncSearch(SubmitAsyncSearchRequest request
132133
}
133134

134135
protected AsyncSearchResponse getAsyncSearch(String id) throws ExecutionException, InterruptedException {
135-
return client().execute(GetAsyncSearchAction.INSTANCE, new GetAsyncSearchAction.Request(id)).get();
136+
return client().execute(GetAsyncSearchAction.INSTANCE, new GetAsyncResultRequest(id)).get();
136137
}
137138

138139
protected AsyncSearchResponse getAsyncSearch(String id, TimeValue keepAlive) throws ExecutionException, InterruptedException {
139-
return client().execute(GetAsyncSearchAction.INSTANCE, new GetAsyncSearchAction.Request(id).setKeepAlive(keepAlive)).get();
140+
return client().execute(GetAsyncSearchAction.INSTANCE, new GetAsyncResultRequest(id).setKeepAlive(keepAlive)).get();
140141
}
141142

142143
protected AcknowledgedResponse deleteAsyncSearch(String id) throws ExecutionException, InterruptedException {
@@ -236,7 +237,7 @@ private AsyncSearchResponse doNext() throws Exception {
236237
}
237238
queryLatch.countDownAndReset();
238239
AsyncSearchResponse newResponse = client().execute(GetAsyncSearchAction.INSTANCE,
239-
new GetAsyncSearchAction.Request(response.getId())
240+
new GetAsyncResultRequest(response.getId())
240241
.setWaitForCompletion(TimeValue.timeValueMillis(10))).get();
241242

242243
if (newResponse.isRunning()) {

x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchResponseTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import java.util.List;
3737

3838
import static java.util.Collections.emptyList;
39-
import static org.elasticsearch.xpack.search.GetAsyncSearchRequestTests.randomSearchId;
39+
import static org.elasticsearch.xpack.core.async.GetAsyncResultRequestTests.randomSearchId;
4040

4141
public class AsyncSearchResponseTests extends ESTestCase {
4242
private SearchResponse searchResponse = randomSearchResponse();

x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/DeleteAsyncSearchRequestTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import org.elasticsearch.test.AbstractWireSerializingTestCase;
1010
import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction;
1111

12-
import static org.elasticsearch.xpack.search.GetAsyncSearchRequestTests.randomSearchId;
12+
import static org.elasticsearch.xpack.core.async.GetAsyncResultRequestTests.randomSearchId;
1313

1414
public class DeleteAsyncSearchRequestTests extends AbstractWireSerializingTestCase<DeleteAsyncSearchAction.Request> {
1515
@Override

x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/GetAsyncSearchRequestTests.java

Lines changed: 0 additions & 38 deletions
This file was deleted.

0 commit comments

Comments
 (0)