diff --git a/server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java b/server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java index df9afd32ca21c..3305ae891a802 100644 --- a/server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java +++ b/server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java @@ -26,6 +26,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; public class ListenerTimeouts { @@ -41,9 +42,29 @@ public class ListenerTimeouts { * @param listenerName name of the listener for timeout exception * @return the wrapped listener that will timeout */ - public static ActionListener wrapWithTimeout(ThreadPool threadPool, ActionListener listener, + public static ActionListener wrapWithTimeout(ThreadPool threadPool, ActionListener listener, TimeValue timeout, String executor, String listenerName) { - TimeoutableListener wrappedListener = new TimeoutableListener<>(listener, timeout, listenerName); + return wrapWithTimeout(threadPool, timeout, executor, listener, (ignore) -> { + String timeoutMessage = "[" + listenerName + "]" + " timed out after [" + timeout + "]"; + listener.onFailure(new ElasticsearchTimeoutException(timeoutMessage)); + }); + } + + /** + * Wraps a listener with a listener that can timeout. After the timeout period the + * onTimeout Runnable will be called. + * + * @param threadPool used to schedule the timeout + * @param timeout period before listener failed + * @param executor to use for scheduling timeout + * @param listener to that can timeout + * @param onTimeout consumer will be called and the resulting wrapper will be passed to it as a parameter + * @return the wrapped listener that will timeout + */ + public static ActionListener wrapWithTimeout(ThreadPool threadPool, TimeValue timeout, String executor, + ActionListener listener, + Consumer> onTimeout) { + TimeoutableListener wrappedListener = new TimeoutableListener<>(listener, onTimeout); wrappedListener.cancellable = threadPool.schedule(wrappedListener, timeout, executor); return wrappedListener; } @@ -52,14 +73,12 @@ private static class TimeoutableListener implements ActionListener delegate; - private final TimeValue timeout; - private final String listenerName; + private final Consumer> onTimeout; private volatile Scheduler.ScheduledCancellable cancellable; - private TimeoutableListener(ActionListener delegate, TimeValue timeout, String listenerName) { + private TimeoutableListener(ActionListener delegate, Consumer> onTimeout) { this.delegate = delegate; - this.timeout = timeout; - this.listenerName = listenerName; + this.onTimeout = onTimeout; } @Override @@ -81,8 +100,7 @@ public void onFailure(Exception e) { @Override public void run() { if (isDone.compareAndSet(false, true)) { - String timeoutMessage = "[" + listenerName + "]" + " timed out after [" + timeout + "]"; - delegate.onFailure(new ElasticsearchTimeoutException(timeoutMessage)); + onTimeout.accept(this); } } } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java index 6954e2d47238b..4c05273c720e0 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java @@ -127,6 +127,7 @@ Listener getSearchProgressActionListener() { /** * Update the expiration time of the (partial) response. */ + @Override public void setExpirationTime(long expirationTimeMillis) { this.expirationTimeMillis = expirationTimeMillis; } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestGetAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestGetAsyncSearchAction.java index 466adfa798abf..155f772c5b701 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestGetAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestGetAsyncSearchAction.java @@ -9,6 +9,7 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.RestStatusToXContentListener; +import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction; import java.util.List; @@ -33,7 +34,7 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { - GetAsyncSearchAction.Request get = new GetAsyncSearchAction.Request(request.param("id")); + GetAsyncResultRequest get = new GetAsyncResultRequest(request.param("id")); if (request.hasParam("wait_for_completion_timeout")) { get.setWaitForCompletion(request.paramAsTime("wait_for_completion_timeout", get.getWaitForCompletion())); } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java index 2bcf053fb9725..2f8d3a5cd386e 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java @@ -5,11 +5,6 @@ */ package org.elasticsearch.xpack.search; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.support.ActionFilters; @@ -19,23 +14,21 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.async.AsyncExecutionId; +import org.elasticsearch.xpack.core.async.AsyncResultsService; import org.elasticsearch.xpack.core.async.AsyncTaskIndexService; +import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction; import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; -public class TransportGetAsyncSearchAction extends HandledTransportAction { - private final Logger logger = LogManager.getLogger(TransportGetAsyncSearchAction.class); - private final ClusterService clusterService; +public class TransportGetAsyncSearchAction extends HandledTransportAction { + private final AsyncResultsService resultsService; private final TransportService transportService; - private final AsyncTaskIndexService store; @Inject public TransportGetAsyncSearchAction(TransportService transportService, @@ -44,113 +37,23 @@ public TransportGetAsyncSearchAction(TransportService transportService, NamedWriteableRegistry registry, Client client, ThreadPool threadPool) { - super(GetAsyncSearchAction.NAME, transportService, actionFilters, GetAsyncSearchAction.Request::new); - this.clusterService = clusterService; + super(GetAsyncSearchAction.NAME, transportService, actionFilters, GetAsyncResultRequest::new); this.transportService = transportService; - this.store = new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService, threadPool.getThreadContext(), client, - ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry); + AsyncTaskIndexService store = new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService, + threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry); + resultsService = new AsyncResultsService<>(store, true, AsyncSearchTask.class, AsyncSearchTask::addCompletionListener, + transportService.getTaskManager(), clusterService); } @Override - protected void doExecute(Task task, GetAsyncSearchAction.Request request, ActionListener listener) { - try { - long nowInMillis = System.currentTimeMillis(); - AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId()); - DiscoveryNode node = clusterService.state().nodes().get(searchId.getTaskId().getNodeId()); - if (clusterService.localNode().getId().equals(searchId.getTaskId().getNodeId()) || node == null) { - if (request.getKeepAlive().getMillis() > 0) { - long expirationTime = nowInMillis + request.getKeepAlive().getMillis(); - store.updateExpirationTime(searchId.getDocId(), expirationTime, - ActionListener.wrap( - p -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener), - exc -> { - //don't log when: the async search document or its index is not found. That can happen if an invalid - //search id is provided or no async search initial response has been stored yet. - RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc)); - if (status != RestStatus.NOT_FOUND) { - logger.error(() -> new ParameterizedMessage("failed to update expiration time for async-search [{}]", - searchId.getEncoded()), exc); - } - listener.onFailure(new ResourceNotFoundException(searchId.getEncoded())); - } - )); - } else { - getSearchResponseFromTask(searchId, request, nowInMillis, -1, listener); - } - } else { - TransportRequestOptions.Builder builder = TransportRequestOptions.builder(); - transportService.sendRequest(node, GetAsyncSearchAction.NAME, request, builder.build(), - new ActionListenerResponseHandler<>(listener, AsyncSearchResponse::new, ThreadPool.Names.SAME)); - } - } catch (Exception exc) { - listener.onFailure(exc); + protected void doExecute(Task task, GetAsyncResultRequest request, ActionListener listener) { + DiscoveryNode node = resultsService.getNode(request.getId()); + if (node == null || resultsService.isLocalNode(node)) { + resultsService.retrieveResult(request, listener); + } else { + TransportRequestOptions.Builder builder = TransportRequestOptions.builder(); + transportService.sendRequest(node, GetAsyncSearchAction.NAME, request, builder.build(), + new ActionListenerResponseHandler<>(listener, AsyncSearchResponse::new, ThreadPool.Names.SAME)); } } - - private void getSearchResponseFromTask(AsyncExecutionId searchId, - GetAsyncSearchAction.Request request, - long nowInMillis, - long expirationTimeMillis, - ActionListener listener) { - try { - final AsyncSearchTask task = store.getTask(taskManager, searchId, AsyncSearchTask.class); - if (task == null) { - getSearchResponseFromIndex(searchId, request, nowInMillis, listener); - return; - } - - if (task.isCancelled()) { - listener.onFailure(new ResourceNotFoundException(searchId.getEncoded())); - return; - } - - if (expirationTimeMillis != -1) { - task.setExpirationTime(expirationTimeMillis); - } - task.addCompletionListener(new ActionListener<>() { - @Override - public void onResponse(AsyncSearchResponse response) { - sendFinalResponse(request, response, nowInMillis, listener); - } - - @Override - public void onFailure(Exception exc) { - listener.onFailure(exc); - } - }, request.getWaitForCompletion()); - } catch (Exception exc) { - listener.onFailure(exc); - } - } - - private void getSearchResponseFromIndex(AsyncExecutionId searchId, - GetAsyncSearchAction.Request request, - long nowInMillis, - ActionListener listener) { - store.getResponse(searchId, true, - new ActionListener<>() { - @Override - public void onResponse(AsyncSearchResponse response) { - sendFinalResponse(request, response, nowInMillis, listener); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); - } - - private void sendFinalResponse(GetAsyncSearchAction.Request request, - AsyncSearchResponse response, - long nowInMillis, - ActionListener listener) { - // check if the result has expired - if (response.getExpirationTime() < nowInMillis) { - listener.onFailure(new ResourceNotFoundException(request.getId())); - return; - } - - listener.onResponse(response); - } } diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java index 65d55d3910628..1aa22778e2d2d 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java @@ -30,6 +30,7 @@ import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import org.elasticsearch.xpack.core.async.AsyncExecutionId; +import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction; import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction; @@ -132,11 +133,11 @@ protected AsyncSearchResponse submitAsyncSearch(SubmitAsyncSearchRequest request } protected AsyncSearchResponse getAsyncSearch(String id) throws ExecutionException, InterruptedException { - return client().execute(GetAsyncSearchAction.INSTANCE, new GetAsyncSearchAction.Request(id)).get(); + return client().execute(GetAsyncSearchAction.INSTANCE, new GetAsyncResultRequest(id)).get(); } protected AsyncSearchResponse getAsyncSearch(String id, TimeValue keepAlive) throws ExecutionException, InterruptedException { - return client().execute(GetAsyncSearchAction.INSTANCE, new GetAsyncSearchAction.Request(id).setKeepAlive(keepAlive)).get(); + return client().execute(GetAsyncSearchAction.INSTANCE, new GetAsyncResultRequest(id).setKeepAlive(keepAlive)).get(); } protected AcknowledgedResponse deleteAsyncSearch(String id) throws ExecutionException, InterruptedException { @@ -236,7 +237,7 @@ private AsyncSearchResponse doNext() throws Exception { } queryLatch.countDownAndReset(); AsyncSearchResponse newResponse = client().execute(GetAsyncSearchAction.INSTANCE, - new GetAsyncSearchAction.Request(response.getId()) + new GetAsyncResultRequest(response.getId()) .setWaitForCompletion(TimeValue.timeValueMillis(10))).get(); if (newResponse.isRunning()) { diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchResponseTests.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchResponseTests.java index 4b1c7382c94fd..9b8c459e1c6cc 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchResponseTests.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchResponseTests.java @@ -36,7 +36,7 @@ import java.util.List; import static java.util.Collections.emptyList; -import static org.elasticsearch.xpack.search.GetAsyncSearchRequestTests.randomSearchId; +import static org.elasticsearch.xpack.core.async.GetAsyncResultRequestTests.randomSearchId; public class AsyncSearchResponseTests extends ESTestCase { private SearchResponse searchResponse = randomSearchResponse(); diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/DeleteAsyncSearchRequestTests.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/DeleteAsyncSearchRequestTests.java index f71d859f648a3..7e1b220b51ca1 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/DeleteAsyncSearchRequestTests.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/DeleteAsyncSearchRequestTests.java @@ -9,7 +9,7 @@ import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction; -import static org.elasticsearch.xpack.search.GetAsyncSearchRequestTests.randomSearchId; +import static org.elasticsearch.xpack.core.async.GetAsyncResultRequestTests.randomSearchId; public class DeleteAsyncSearchRequestTests extends AbstractWireSerializingTestCase { @Override diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/GetAsyncSearchRequestTests.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/GetAsyncSearchRequestTests.java deleted file mode 100644 index 533ae8210ae5f..0000000000000 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/GetAsyncSearchRequestTests.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.search; - -import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.test.AbstractWireSerializingTestCase; -import org.elasticsearch.xpack.core.async.AsyncExecutionId; -import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction; - -public class GetAsyncSearchRequestTests extends AbstractWireSerializingTestCase { - @Override - protected Writeable.Reader instanceReader() { - return GetAsyncSearchAction.Request::new; - } - - @Override - protected GetAsyncSearchAction.Request createTestInstance() { - GetAsyncSearchAction.Request req = new GetAsyncSearchAction.Request(randomSearchId()); - if (randomBoolean()) { - req.setWaitForCompletion(TimeValue.timeValueMillis(randomIntBetween(1, 10000))); - } - if (randomBoolean()) { - req.setKeepAlive(TimeValue.timeValueMillis(randomIntBetween(1, 10000))); - } - return req; - } - - static String randomSearchId() { - return AsyncExecutionId.encode(UUIDs.randomBase64UUID(), - new TaskId(randomAlphaOfLengthBetween(10, 20), randomLongBetween(0, Long.MAX_VALUE))); - } -} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java new file mode 100644 index 0000000000000..81ad3fad44983 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java @@ -0,0 +1,172 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.async; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.TriConsumer; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.TaskManager; + +import java.util.Objects; + +/** + * Service that is capable of retrieving and cleaning up AsyncTasks regardless of their state. It works with the TaskManager, if a task + * is still running and AsyncTaskIndexService if task results already stored there. + */ +public class AsyncResultsService> { + private final Logger logger = LogManager.getLogger(AsyncResultsService.class); + private final Class asyncTaskClass; + private final TaskManager taskManager; + private final ClusterService clusterService; + private final AsyncTaskIndexService store; + private final boolean updateInitialResultsInStore; + private final TriConsumer, TimeValue> addCompletionListener; + + /** + * Creates async results service + * + * @param store AsyncTaskIndexService for the response we are working with + * @param updateInitialResultsInStore true if initial results are stored (Async Search) or false otherwise (EQL Search) + * @param asyncTaskClass async task class + * @param addCompletionListener function that registers a completion listener with the task + * @param taskManager task manager + * @param clusterService cluster service + */ + public AsyncResultsService(AsyncTaskIndexService store, + boolean updateInitialResultsInStore, + Class asyncTaskClass, + TriConsumer, TimeValue> addCompletionListener, + TaskManager taskManager, + ClusterService clusterService) { + this.updateInitialResultsInStore = updateInitialResultsInStore; + this.asyncTaskClass = asyncTaskClass; + this.addCompletionListener = addCompletionListener; + this.taskManager = taskManager; + this.clusterService = clusterService; + this.store = store; + + } + + public DiscoveryNode getNode(String id) { + AsyncExecutionId searchId = AsyncExecutionId.decode(id); + return clusterService.state().nodes().get(searchId.getTaskId().getNodeId()); + } + + public boolean isLocalNode(DiscoveryNode node) { + return Objects.requireNonNull(node).equals(clusterService.localNode()); + } + + public void retrieveResult(GetAsyncResultRequest request, ActionListener listener) { + try { + long nowInMillis = System.currentTimeMillis(); + AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId()); + long expirationTime; + if (request.getKeepAlive() != null && request.getKeepAlive().getMillis() > 0) { + expirationTime = nowInMillis + request.getKeepAlive().getMillis(); + } else { + expirationTime = -1; + } + // EQL doesn't store initial or intermediate results so we only need to update expiration time in store for only in case of + // async search + if (updateInitialResultsInStore & expirationTime > 0) { + store.updateExpirationTime(searchId.getDocId(), expirationTime, + ActionListener.wrap( + p -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener), + exc -> { + //don't log when: the async search document or its index is not found. That can happen if an invalid + //search id is provided or no async search initial response has been stored yet. + RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc)); + if (status != RestStatus.NOT_FOUND) { + logger.error(() -> new ParameterizedMessage("failed to update expiration time for async-search [{}]", + searchId.getEncoded()), exc); + } + listener.onFailure(new ResourceNotFoundException(searchId.getEncoded())); + } + )); + } else { + getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener); + } + } catch (Exception exc) { + listener.onFailure(exc); + } + } + + private void getSearchResponseFromTask(AsyncExecutionId searchId, + GetAsyncResultRequest request, + long nowInMillis, + long expirationTimeMillis, + ActionListener listener) { + try { + final Task task = store.getTask(taskManager, searchId, asyncTaskClass); + if (task == null) { + getSearchResponseFromIndex(searchId, request, nowInMillis, listener); + return; + } + + if (task.isCancelled()) { + listener.onFailure(new ResourceNotFoundException(searchId.getEncoded())); + return; + } + + if (expirationTimeMillis != -1) { + task.setExpirationTime(expirationTimeMillis); + } + addCompletionListener.apply(task, new ActionListener<>() { + @Override + public void onResponse(Response response) { + sendFinalResponse(request, response, nowInMillis, listener); + } + + @Override + public void onFailure(Exception exc) { + listener.onFailure(exc); + } + }, request.getWaitForCompletion()); + } catch (Exception exc) { + listener.onFailure(exc); + } + } + + private void getSearchResponseFromIndex(AsyncExecutionId searchId, + GetAsyncResultRequest request, + long nowInMillis, + ActionListener listener) { + store.getResponse(searchId, true, + new ActionListener<>() { + @Override + public void onResponse(Response response) { + sendFinalResponse(request, response, nowInMillis, listener); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } + + private void sendFinalResponse(GetAsyncResultRequest request, + Response response, + long nowInMillis, + ActionListener listener) { + // check if the result has expired + if (response.getExpirationTime() < nowInMillis) { + listener.onFailure(new ResourceNotFoundException(request.getId())); + return; + } + + listener.onResponse(response); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTask.java index 18ce4a0fc68bf..e2a8ce6b134e4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTask.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTask.java @@ -21,4 +21,14 @@ public interface AsyncTask { * Returns the {@link AsyncExecutionId} of the task */ AsyncExecutionId getExecutionId(); + + /** + * Returns true if the task is cancelled + */ + boolean isCancelled(); + + /** + * Update the expiration time of the (partial) response. + */ + void setExpirationTime(long expirationTimeMillis); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/GetAsyncResultRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/GetAsyncResultRequest.java new file mode 100644 index 0000000000000..d12f4eff88725 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/GetAsyncResultRequest.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.async; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; + +import java.io.IOException; +import java.util.Objects; + +public class GetAsyncResultRequest extends ActionRequest { + private final String id; + private TimeValue waitForCompletion = TimeValue.MINUS_ONE; + private TimeValue keepAlive = TimeValue.MINUS_ONE; + + /** + * Creates a new request + * + * @param id The id of the search progress request. + */ + public GetAsyncResultRequest(String id) { + this.id = id; + } + + public GetAsyncResultRequest(StreamInput in) throws IOException { + super(in); + this.id = in.readString(); + this.waitForCompletion = TimeValue.timeValueMillis(in.readLong()); + this.keepAlive = in.readTimeValue(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(id); + out.writeLong(waitForCompletion.millis()); + out.writeTimeValue(keepAlive); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + /** + * Returns the id of the async search. + */ + public String getId() { + return id; + } + + /** + * Sets the minimum time that the request should wait before returning a partial result (defaults to no wait). + */ + public GetAsyncResultRequest setWaitForCompletion(TimeValue timeValue) { + this.waitForCompletion = timeValue; + return this; + } + + public TimeValue getWaitForCompletion() { + return waitForCompletion; + } + + /** + * Extends the amount of time after which the result will expire (defaults to no extension). + */ + public GetAsyncResultRequest setKeepAlive(TimeValue timeValue) { + this.keepAlive = timeValue; + return this; + } + + public TimeValue getKeepAlive() { + return keepAlive; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + GetAsyncResultRequest request = (GetAsyncResultRequest) o; + return Objects.equals(id, request.id) && + waitForCompletion.equals(request.waitForCompletion) && + keepAlive.equals(request.keepAlive); + } + + @Override + public int hashCode() { + return Objects.hash(id, waitForCompletion, keepAlive); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/eql/EqlAsyncActionNames.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/eql/EqlAsyncActionNames.java new file mode 100644 index 0000000000000..c6ad58472052e --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/eql/EqlAsyncActionNames.java @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.eql; + +/** + * Exposes EQL async action names for RBACEngine + */ +public final class EqlAsyncActionNames { + public static final String EQL_ASYNC_GET_RESULT_ACTION_NAME = "indices:data/read/eql/async/get"; + public static final String EQL_ASYNC_DELETE_RESULT_ACTION_NAME = "indices:data/read/eql/async/delete"; +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/GetAsyncSearchAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/GetAsyncSearchAction.java index 30aceb8d43d9c..3ef53f712bc72 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/GetAsyncSearchAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/GetAsyncSearchAction.java @@ -5,16 +5,7 @@ */ package org.elasticsearch.xpack.core.search.action; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.unit.TimeValue; - -import java.io.IOException; -import java.util.Objects; public class GetAsyncSearchAction extends ActionType { public static final GetAsyncSearchAction INSTANCE = new GetAsyncSearchAction(); @@ -23,90 +14,4 @@ public class GetAsyncSearchAction extends ActionType { private GetAsyncSearchAction() { super(NAME, AsyncSearchResponse::new); } - - @Override - public Writeable.Reader getResponseReader() { - return AsyncSearchResponse::new; - } - - public static class Request extends ActionRequest { - private final String id; - private TimeValue waitForCompletion = TimeValue.MINUS_ONE; - private TimeValue keepAlive = TimeValue.MINUS_ONE; - - /** - * Creates a new request - * - * @param id The id of the search progress request. - */ - public Request(String id) { - this.id = id; - } - - public Request(StreamInput in) throws IOException { - super(in); - this.id = in.readString(); - this.waitForCompletion = TimeValue.timeValueMillis(in.readLong()); - this.keepAlive = in.readTimeValue(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(id); - out.writeLong(waitForCompletion.millis()); - out.writeTimeValue(keepAlive); - } - - @Override - public ActionRequestValidationException validate() { - return null; - } - - /** - * Returns the id of the async search. - */ - public String getId() { - return id; - } - - /** - * Sets the minimum time that the request should wait before returning a partial result (defaults to no wait). - */ - public Request setWaitForCompletion(TimeValue timeValue) { - this.waitForCompletion = timeValue; - return this; - } - - public TimeValue getWaitForCompletion() { - return waitForCompletion; - } - - /** - * Extends the amount of time after which the result will expire (defaults to no extension). - */ - public Request setKeepAlive(TimeValue timeValue) { - this.keepAlive = timeValue; - return this; - } - - public TimeValue getKeepAlive() { - return keepAlive; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Request request = (Request) o; - return Objects.equals(id, request.id) && - waitForCompletion.equals(request.waitForCompletion) && - keepAlive.equals(request.keepAlive); - } - - @Override - public int hashCode() { - return Objects.hash(id, waitForCompletion, keepAlive); - } - } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncExecutionIdTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncExecutionIdTests.java index 6b02edf069749..f4c0496deae96 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncExecutionIdTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncExecutionIdTests.java @@ -28,7 +28,7 @@ public void testEncodeAndDecode() { } } - private static AsyncExecutionId randomAsyncId() { + public static AsyncExecutionId randomAsyncId() { return new AsyncExecutionId(UUIDs.randomBase64UUID(), new TaskId(randomAlphaOfLengthBetween(5, 20), randomNonNegativeLong())); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java new file mode 100644 index 0000000000000..5599d81f2cac2 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java @@ -0,0 +1,254 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.async; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.async.AsyncSearchIndexServiceTests.TestAsyncResponse; +import org.junit.Before; + +import java.util.HashMap; +import java.util.Map; + +import static java.util.Collections.emptyMap; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFutureThrows; +import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; +import static org.elasticsearch.xpack.core.async.AsyncExecutionIdTests.randomAsyncId; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class AsyncResultsServiceTests extends ESSingleNodeTestCase { + private ClusterService clusterService; + private TaskManager taskManager; + private AsyncTaskIndexService indexService; + + public static class TestTask extends CancellableTask implements AsyncTask { + private final AsyncExecutionId executionId; + private final Map, TimeValue> listeners = new HashMap<>(); + private long expirationTimeMillis; + + public TestTask(AsyncExecutionId executionId, long id, String type, String action, String description, TaskId parentTaskId, + Map headers) { + super(id, type, action, description, parentTaskId, headers); + this.executionId = executionId; + } + + @Override + public boolean shouldCancelChildrenOnCancellation() { + return false; + } + + @Override + public Map getOriginHeaders() { + return null; + } + + @Override + public AsyncExecutionId getExecutionId() { + return executionId; + } + + @Override + public void setExpirationTime(long expirationTimeMillis) { + this.expirationTimeMillis = expirationTimeMillis; + } + + public long getExpirationTime() { + return this.expirationTimeMillis; + } + + public synchronized void addListener(ActionListener listener, TimeValue timeout) { + if (timeout.getMillis() < 0) { + listener.onResponse(new TestAsyncResponse(null, expirationTimeMillis)); + } else { + assertThat(listeners.put(listener, timeout), nullValue()); + } + } + + private synchronized void onResponse(String response) { + TestAsyncResponse r = new TestAsyncResponse(response, expirationTimeMillis); + for (ActionListener listener : listeners.keySet()) { + listener.onResponse(r); + } + } + + private synchronized void onFailure(Exception e) { + for (ActionListener listener : listeners.keySet()) { + listener.onFailure(e); + } + } + } + + public class TestRequest extends TransportRequest { + private final String string; + + public TestRequest(String string) { + this.string = string; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + AsyncExecutionId asyncExecutionId = new AsyncExecutionId(randomAlphaOfLength(10), + new TaskId(clusterService.localNode().getId(), id)); + return new TestTask(asyncExecutionId, id, type, action, string, parentTaskId, headers); + } + } + + @Before + public void setup() { + clusterService = getInstanceFromNode(ClusterService.class); + TransportService transportService = getInstanceFromNode(TransportService.class); + taskManager = transportService.getTaskManager(); + indexService = new AsyncTaskIndexService<>("test", clusterService, transportService.getThreadPool().getThreadContext(), + client(), ASYNC_SEARCH_ORIGIN, TestAsyncResponse::new, writableRegistry()); + + } + + private AsyncResultsService createResultsService(boolean updateInitialResultsInStore) { + return new AsyncResultsService<>(indexService, updateInitialResultsInStore, TestTask.class, + TestTask::addListener, taskManager, clusterService); + } + + public void testRecordNotFound() { + AsyncResultsService service = createResultsService(randomBoolean()); + PlainActionFuture listener = new PlainActionFuture<>(); + service.retrieveResult(new GetAsyncResultRequest(randomAsyncId().getEncoded()), listener); + assertFutureThrows(listener, ResourceNotFoundException.class); + } + + public void testRetrieveFromMemoryWithExpiration() throws Exception { + boolean updateInitialResultsInStore = randomBoolean(); + AsyncResultsService service = createResultsService(updateInitialResultsInStore); + TestTask task = (TestTask) taskManager.register("test", "test", new TestRequest("test request")); + try { + boolean shouldExpire = randomBoolean(); + long expirationTime = System.currentTimeMillis() + randomLongBetween(1000, 10000) * (shouldExpire ? -1 : 1); + task.setExpirationTime(expirationTime); + + if (updateInitialResultsInStore) { + // we need to store initial result + PlainActionFuture future = new PlainActionFuture<>(); + indexService.createResponse(task.getExecutionId().getDocId(), task.getOriginHeaders(), + new TestAsyncResponse(null, task.getExpirationTime()), future); + future.actionGet(TimeValue.timeValueSeconds(10)); + } + + PlainActionFuture listener = new PlainActionFuture<>(); + service.retrieveResult(new GetAsyncResultRequest(task.getExecutionId().getEncoded()) + .setWaitForCompletion(TimeValue.timeValueSeconds(5)), listener); + if (randomBoolean()) { + // Test success + String expectedResponse = randomAlphaOfLength(10); + task.onResponse(expectedResponse); + if (shouldExpire) { + assertFutureThrows(listener, ResourceNotFoundException.class); + } else { + TestAsyncResponse response = listener.actionGet(TimeValue.timeValueSeconds(10)); + assertThat(response, notNullValue()); + assertThat(response.test, equalTo(expectedResponse)); + assertThat(response.expirationTimeMillis, equalTo(expirationTime)); + } + } else { + // Test Failure + task.onFailure(new IllegalArgumentException("test exception")); + assertFutureThrows(listener, IllegalArgumentException.class); + } + } finally { + taskManager.unregister(task); + } + } + + public void testAssertExpirationPropagation() throws Exception { + boolean updateInitialResultsInStore = randomBoolean(); + AsyncResultsService service = createResultsService(updateInitialResultsInStore); + TestRequest request = new TestRequest("test request"); + TestTask task = (TestTask) taskManager.register("test", "test", request); + try { + long startTime = System.currentTimeMillis(); + task.setExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis()); + + if (updateInitialResultsInStore) { + // we need to store initial result + PlainActionFuture future = new PlainActionFuture<>(); + indexService.createResponse(task.getExecutionId().getDocId(), task.getOriginHeaders(), + new TestAsyncResponse(null, task.getExpirationTime()), future); + future.actionGet(TimeValue.timeValueSeconds(10)); + } + + TimeValue newKeepAlive = TimeValue.timeValueDays(1); + PlainActionFuture listener = new PlainActionFuture<>(); + // not waiting for completion, so should return immediately with timeout + service.retrieveResult(new GetAsyncResultRequest(task.getExecutionId().getEncoded()).setKeepAlive(newKeepAlive), listener); + listener.actionGet(TimeValue.timeValueSeconds(10)); + assertThat(task.getExpirationTime(), greaterThanOrEqualTo(startTime + newKeepAlive.getMillis())); + assertThat(task.getExpirationTime(), lessThanOrEqualTo(System.currentTimeMillis() + newKeepAlive.getMillis())); + + if (updateInitialResultsInStore) { + PlainActionFuture future = new PlainActionFuture<>(); + indexService.getResponse(task.executionId, randomBoolean(), future); + TestAsyncResponse response = future.actionGet(TimeValue.timeValueMinutes(10)); + assertThat(response.getExpirationTime(), greaterThanOrEqualTo(startTime + newKeepAlive.getMillis())); + assertThat(response.getExpirationTime(), lessThanOrEqualTo(System.currentTimeMillis() + newKeepAlive.getMillis())); + } + } finally { + taskManager.unregister(task); + } + } + + public void testRetrieveFromDisk() throws Exception { + boolean updateInitialResultsInStore = randomBoolean(); + AsyncResultsService service = createResultsService(updateInitialResultsInStore); + TestRequest request = new TestRequest("test request"); + TestTask task = (TestTask) taskManager.register("test", "test", request); + try { + long startTime = System.currentTimeMillis(); + task.setExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis()); + + if (updateInitialResultsInStore) { + // we need to store initial result + PlainActionFuture futureCreate = new PlainActionFuture<>(); + indexService.createResponse(task.getExecutionId().getDocId(), task.getOriginHeaders(), + new TestAsyncResponse(null, task.getExpirationTime()), futureCreate); + futureCreate.actionGet(TimeValue.timeValueSeconds(10)); + + PlainActionFuture futureUpdate = new PlainActionFuture<>(); + indexService.updateResponse(task.getExecutionId().getDocId(), emptyMap(), + new TestAsyncResponse("final_response", task.getExpirationTime()), futureUpdate); + futureUpdate.actionGet(TimeValue.timeValueSeconds(10)); + } else { + PlainActionFuture futureCreate = new PlainActionFuture<>(); + indexService.createResponse(task.getExecutionId().getDocId(), task.getOriginHeaders(), + new TestAsyncResponse("final_response", task.getExpirationTime()), futureCreate); + futureCreate.actionGet(TimeValue.timeValueSeconds(10)); + } + + } finally { + taskManager.unregister(task); + } + + PlainActionFuture listener = new PlainActionFuture<>(); + // not waiting for completion, so should return immediately with timeout + service.retrieveResult(new GetAsyncResultRequest(task.getExecutionId().getEncoded()), listener); + TestAsyncResponse response = listener.actionGet(TimeValue.timeValueSeconds(10)); + assertThat(response.test, equalTo("final_response")); + + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncSearchIndexServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncSearchIndexServiceTests.java index 7457acfefcac3..84e101cc0b3d5 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncSearchIndexServiceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncSearchIndexServiceTests.java @@ -23,8 +23,8 @@ public class AsyncSearchIndexServiceTests extends ESSingleNodeTestCase { private AsyncTaskIndexService indexService; public static class TestAsyncResponse implements AsyncResponse { - private final String test; - private final long expirationTimeMillis; + public final String test; + public final long expirationTimeMillis; public TestAsyncResponse(String test, long expirationTimeMillis) { this.test = test; @@ -38,7 +38,7 @@ public TestAsyncResponse(StreamInput input) throws IOException { @Override public long getExpirationTime() { - return 0; + return expirationTimeMillis; } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/GetAsyncResultRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/GetAsyncResultRequestTests.java new file mode 100644 index 0000000000000..6ada6d32159c5 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/GetAsyncResultRequestTests.java @@ -0,0 +1,35 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.async; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import static org.elasticsearch.xpack.core.async.AsyncExecutionIdTests.randomAsyncId; + +public class GetAsyncResultRequestTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return GetAsyncResultRequest::new; + } + + @Override + protected GetAsyncResultRequest createTestInstance() { + GetAsyncResultRequest req = new GetAsyncResultRequest(randomSearchId()); + if (randomBoolean()) { + req.setWaitForCompletion(TimeValue.timeValueMillis(randomIntBetween(1, 10000))); + } + if (randomBoolean()) { + req.setKeepAlive(TimeValue.timeValueMillis(randomIntBetween(1, 10000))); + } + return req; + } + + public static String randomSearchId() { + return randomAsyncId().getEncoded(); + } +} diff --git a/x-pack/plugin/eql/qa/rest/src/test/resources/rest-api-spec/test/eql/10_basic.yml b/x-pack/plugin/eql/qa/rest/src/test/resources/rest-api-spec/test/eql/10_basic.yml index ca8197e57dd94..fa94799009255 100644 --- a/x-pack/plugin/eql/qa/rest/src/test/resources/rest-api-spec/test/eql/10_basic.yml +++ b/x-pack/plugin/eql/qa/rest/src/test/resources/rest-api-spec/test/eql/10_basic.yml @@ -38,4 +38,17 @@ setup: - match: {is_running: true} - match: {is_partial: true} - is_true: id + - set: {id: id} + + - do: + eql.get: + id: $id + wait_for_completion_timeout: "10s" + + - match: {is_running: false} + - match: {is_partial: false} + - match: {timed_out: false} + - match: {hits.total.value: 1} + - match: {hits.total.relation: "eq"} + - match: {hits.events.0._source.user: "SYSTEM"} diff --git a/x-pack/plugin/eql/qa/security/build.gradle b/x-pack/plugin/eql/qa/security/build.gradle new file mode 100644 index 0000000000000..3a402682ec04f --- /dev/null +++ b/x-pack/plugin/eql/qa/security/build.gradle @@ -0,0 +1,27 @@ +import org.elasticsearch.gradle.info.BuildParams + +apply plugin: 'elasticsearch.testclusters' +apply plugin: 'elasticsearch.standalone-rest-test' +apply plugin: 'elasticsearch.rest-test' + +dependencies { +// testCompile project(path: xpackModule('core'), configuration: 'testArtifacts') + testCompile project(path: xpackModule('eql'), configuration: 'runtime') + testCompile project(path: xpackModule('eql:qa:common'), configuration: 'runtime') + testCompile project(':x-pack:plugin:async-search:qa') +} + +testClusters.integTest { + testDistribution = 'DEFAULT' + if (BuildParams.isSnapshotBuild()) { + setting 'xpack.eql.enabled', 'true' + } + setting 'xpack.license.self_generated.type', 'basic' + setting 'xpack.monitoring.collection.enabled', 'true' + setting 'xpack.security.enabled', 'true' + numberOfNodes = 2 + extraConfigFile 'roles.yml', file('roles.yml') + user username: "test-admin", password: 'x-pack-test-password', role: "test-admin" + user username: "user1", password: 'x-pack-test-password', role: "user1" + user username: "user2", password: 'x-pack-test-password', role: "user2" +} diff --git a/x-pack/plugin/eql/qa/security/roles.yml b/x-pack/plugin/eql/qa/security/roles.yml new file mode 100644 index 0000000000000..4ab3be5ff0571 --- /dev/null +++ b/x-pack/plugin/eql/qa/security/roles.yml @@ -0,0 +1,33 @@ +# All cluster rights +# All operations on all indices +# Run as all users +test-admin: + cluster: + - all + indices: + - names: '*' + privileges: [ all ] + run_as: + - '*' + +user1: + cluster: + - cluster:monitor/main + indices: + - names: ['index-user1', 'index' ] + privileges: + - read + - write + - create_index + - indices:admin/refresh + +user2: + cluster: + - cluster:monitor/main + indices: + - names: [ 'index-user2', 'index' ] + privileges: + - read + - write + - create_index + - indices:admin/refresh diff --git a/x-pack/plugin/eql/qa/security/src/test/java/org/elasticsearch/xpack/eql/AsyncEqlSecurityIT.java b/x-pack/plugin/eql/qa/security/src/test/java/org/elasticsearch/xpack/eql/AsyncEqlSecurityIT.java new file mode 100644 index 0000000000000..80fa5dca30dcc --- /dev/null +++ b/x-pack/plugin/eql/qa/security/src/test/java/org/elasticsearch/xpack/eql/AsyncEqlSecurityIT.java @@ -0,0 +1,169 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql; + +import org.apache.http.util.EntityUtils; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.SecureString; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; +import org.junit.Before; + +import java.io.IOException; +import java.util.Map; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField.RUN_AS_USER_HEADER; +import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; +import static org.elasticsearch.xpack.eql.plugin.EqlPlugin.INDEX; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class AsyncEqlSecurityIT extends ESRestTestCase { + /** + * All tests run as a superuser but use es-security-runas-user to become a less privileged user. + */ + @Override + protected Settings restClientSettings() { + String token = basicAuthHeaderValue("test-admin", new SecureString("x-pack-test-password".toCharArray())); + return Settings.builder() + .put(ThreadContext.PREFIX + ".Authorization", token) + .build(); + } + + @Before + public void indexDocuments() throws IOException { + createIndex("index", Settings.EMPTY); + index("index", "0", "event_type", "my_event", "@timestamp", "2020-04-09T12:35:48Z", "val", 0); + refresh("index"); + + createIndex("index-user1", Settings.EMPTY); + index("index-user1", "0", "event_type", "my_event", "@timestamp", "2020-04-09T12:35:48Z", "val", 0); + refresh("index-user1"); + + createIndex("index-user2", Settings.EMPTY); + index("index-user2", "0", "event_type", "my_event", "@timestamp", "2020-04-09T12:35:48Z", "val", 0); + refresh("index-user2"); + } + + public void testWithUsers() throws Exception { + testCase("user1", "user2"); + testCase("user2", "user1"); + } + + private void testCase(String user, String other) throws Exception { + for (String indexName : new String[] {"index", "index-" + user}) { + Response submitResp = submitAsyncEqlSearch(indexName, "my_event where val=0", TimeValue.timeValueSeconds(10), user); + assertOK(submitResp); + String id = extractResponseId(submitResp); + Response getResp = getAsyncEqlSearch(id, user); + assertOK(getResp); + + // other cannot access the result + ResponseException exc = expectThrows(ResponseException.class, () -> getAsyncEqlSearch(id, other)); + assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(404)); + + // other cannot delete the result + exc = expectThrows(ResponseException.class, () -> deleteAsyncEqlSearch(id, other)); + // TODO: This is not implemented yet, should return 404 when it is done + assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(405)); + + // other and user cannot access the result from direct get calls + AsyncExecutionId searchId = AsyncExecutionId.decode(id); + for (String runAs : new String[] {user, other}) { + exc = expectThrows(ResponseException.class, () -> get(INDEX, searchId.getDocId(), runAs)); + assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(403)); + assertThat(exc.getMessage(), containsString("unauthorized")); + } + // TODO: Deletion is not implemented yet + // Response delResp = deleteAsyncEqlSearch(id, user); + // assertOK(delResp); + } + ResponseException exc = expectThrows(ResponseException.class, + () -> submitAsyncEqlSearch("index-" + other, "*", TimeValue.timeValueSeconds(10), user)); + assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(403)); + assertThat(exc.getMessage(), containsString("unauthorized")); + } + + static String extractResponseId(Response response) throws IOException { + Map map = toMap(response); + return (String) map.get("id"); + } + + static void index(String index, String id, Object... fields) throws IOException { + XContentBuilder document = jsonBuilder().startObject(); + for (int i = 0; i < fields.length; i += 2) { + document.field((String) fields[i], fields[i + 1]); + } + document.endObject(); + final Request request = new Request("POST", "/" + index + "/_doc/" + id); + request.setJsonEntity(Strings.toString(document)); + assertOK(client().performRequest(request)); + } + + static void refresh(String index) throws IOException { + assertOK(adminClient().performRequest(new Request("POST", "/" + index + "/_refresh"))); + } + + static Response get(String index, String id, String user) throws IOException { + final Request request = new Request("GET", "/" + index + "/_doc/" + id); + setRunAsHeader(request, user); + return client().performRequest(request); + } + + static Response submitAsyncEqlSearch(String indexName, String query, TimeValue waitForCompletion, String user) throws IOException { + final Request request = new Request("POST", indexName + "/_eql/search"); + setRunAsHeader(request, user); + request.setJsonEntity(Strings.toString(JsonXContent.contentBuilder() + .startObject() + .field("event_category_field", "event_type") + .field("query", query) + .endObject())); + request.addParameter("wait_for_completion_timeout", waitForCompletion.toString()); + // we do the cleanup explicitly + request.addParameter("keep_on_completion", "true"); + return client().performRequest(request); + } + + static Response getAsyncEqlSearch(String id, String user) throws IOException { + final Request request = new Request("GET", "/_eql/search/" + id); + setRunAsHeader(request, user); + request.addParameter("wait_for_completion_timeout", "0ms"); + return client().performRequest(request); + } + + static Response deleteAsyncEqlSearch(String id, String user) throws IOException { + final Request request = new Request("DELETE", "/_eql/search/" + id); + setRunAsHeader(request, user); + return client().performRequest(request); + } + + static Map toMap(Response response) throws IOException { + return toMap(EntityUtils.toString(response.getEntity())); + } + + static Map toMap(String response) { + return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false); + } + + static void setRunAsHeader(Request request, String user) { + final RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); + builder.addHeader(RUN_AS_USER_HEADER, user); + request.setOptions(builder); + } + +} diff --git a/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AsyncEqlSearchActionIT.java b/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AsyncEqlSearchActionIT.java index 715e99ed31ee8..110f39260419e 100644 --- a/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AsyncEqlSearchActionIT.java +++ b/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AsyncEqlSearchActionIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.eql.action; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; @@ -24,7 +25,9 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.core.async.AsyncExecutionId; +import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; import org.elasticsearch.xpack.eql.async.StoredAsyncResponse; +import org.elasticsearch.xpack.eql.plugin.EqlAsyncGetResultAction; import org.elasticsearch.xpack.eql.plugin.EqlPlugin; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; @@ -90,6 +93,50 @@ private void prepareIndex() throws Exception { indexRandom(true, builders); } + public void testBasicAsyncExecution() throws Exception { + prepareIndex(); + + boolean success = randomBoolean(); + String query = success ? "my_event where i=1" : "my_event where 10/i=1"; + EqlSearchRequest request = new EqlSearchRequest().indices("test").query(query).eventCategoryField("event_type") + .waitForCompletionTimeout(TimeValue.timeValueMillis(1)); + + List plugins = initBlockFactory(true, false); + + logger.trace("Starting async search"); + EqlSearchResponse response = client().execute(EqlSearchAction.INSTANCE, request).get(); + assertThat(response.isRunning(), is(true)); + assertThat(response.isPartial(), is(true)); + assertThat(response.id(), notNullValue()); + + logger.trace("Waiting for block to be established"); + awaitForBlockedSearches(plugins, "test"); + logger.trace("Block is established"); + + if (randomBoolean()) { + // let's timeout first + GetAsyncResultRequest getResultsRequest = new GetAsyncResultRequest(response.id()) + .setWaitForCompletion(TimeValue.timeValueMillis(10)); + EqlSearchResponse responseWithTimeout = client().execute(EqlAsyncGetResultAction.INSTANCE, getResultsRequest).get(); + assertThat(responseWithTimeout.isRunning(), is(true)); + assertThat(responseWithTimeout.isPartial(), is(true)); + assertThat(responseWithTimeout.id(), equalTo(response.id())); + } + + // Now we wait + GetAsyncResultRequest getResultsRequest = new GetAsyncResultRequest(response.id()) + .setWaitForCompletion(TimeValue.timeValueSeconds(10)); + ActionFuture future = client().execute(EqlAsyncGetResultAction.INSTANCE, getResultsRequest); + disableBlocks(plugins); + if (success) { + response = future.get(); + assertThat(response, notNullValue()); + assertThat(response.hits().events().size(), equalTo(1)); + } else { + Exception ex = expectThrows(Exception.class, future::actionGet); + assertThat(ex.getCause().getMessage(), containsString("by zero")); + } + } public void testGoingAsync() throws Exception { prepareIndex(); @@ -169,13 +216,16 @@ public void testFinishingBeforeTimeout() throws Exception { assertThat(doc.getResponse(), notNullValue()); assertThat(doc.getResponse().hits().events().size(), equalTo(1)); } + if (keepOnCompletion) { + EqlSearchResponse storedResponse = client().execute(EqlAsyncGetResultAction.INSTANCE, + new GetAsyncResultRequest(response.id())).actionGet(); + assertThat(storedResponse, equalTo(response)); + } } else { Exception ex = expectThrows(Exception.class, () -> client().execute(EqlSearchAction.INSTANCE, request).get()); assertThat(ex.getMessage(), containsString("by zero")); } - - } public StoredAsyncResponse getStoredRecord(String id) throws Exception { @@ -190,10 +240,10 @@ public StoredAsyncResponse getStoredRecord(String id) throws } } } + return null; } catch (IndexNotFoundException | NoShardAvailableActionException ex) { return null; } - return null; } public static org.hamcrest.Matcher eqlSearchResponseMatcherEqualTo(EqlSearchResponse eqlSearchResponse) { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java index 5761e088f2914..be76c2f1aac15 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java @@ -385,7 +385,7 @@ public IndicesOptions indicesOptions() { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new EqlSearchTask(id, type, action, getDescription(), parentTaskId, headers, null, null); + return new EqlSearchTask(id, type, action, getDescription(), parentTaskId, headers, null, null, keepAlive); } @Override diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchTask.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchTask.java index 5730117f80d9c..844cccab7e098 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchTask.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchTask.java @@ -6,43 +6,28 @@ package org.elasticsearch.xpack.eql.action; -import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.core.async.AsyncExecutionId; -import org.elasticsearch.xpack.core.async.AsyncTask; +import org.elasticsearch.xpack.eql.async.StoredAsyncTask; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; -public class EqlSearchTask extends CancellableTask implements AsyncTask { - private final String description; - private final AsyncExecutionId asyncExecutionId; - private final Map originHeaders; +public class EqlSearchTask extends StoredAsyncTask { + public volatile AtomicReference finalResponse = new AtomicReference<>(); - public EqlSearchTask(long id, String type, String action, String description, TaskId parentTaskId, - Map headers, Map originHeaders, AsyncExecutionId asyncExecutionId) { - super(id, type, action, null, parentTaskId, headers); - this.description = description; - this.asyncExecutionId = asyncExecutionId; - this.originHeaders = originHeaders; + public EqlSearchTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers, + Map originHeaders, AsyncExecutionId asyncExecutionId, TimeValue keepAlive) { + super(id, type, action, description, parentTaskId, headers, originHeaders, asyncExecutionId, keepAlive); } @Override - public boolean shouldCancelChildrenOnCancellation() { - return true; - } - - @Override - public String getDescription() { - return description; - } - - @Override - public Map getOriginHeaders() { - return originHeaders; - } - - @Override - public AsyncExecutionId getExecutionId() { - return asyncExecutionId; + public EqlSearchResponse getCurrentResult() { + return Objects.requireNonNullElseGet(finalResponse.get(), + // we haven't seen the final response yet sending a initial response + () -> new EqlSearchResponse(EqlSearchResponse.Hits.EMPTY, System.currentTimeMillis() - getStartTime(), false, + getExecutionId().getEncoded(), true, true)); } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementService.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementService.java index 7f29294089703..51d74475d309d 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementService.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementService.java @@ -12,6 +12,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ListenerTimeouts; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; @@ -39,7 +40,7 @@ * Service for managing EQL requests */ public class AsyncTaskManagementService { + T extends StoredAsyncTask> { private static final Logger logger = LogManager.getLogger(AsyncTaskManagementService.class); @@ -57,7 +58,7 @@ public interface AsyncOperation headers, Map originHeaders, AsyncExecutionId asyncExecutionId); - void operation(Request request, T task, ActionListener listener); + void execute(Request request, T task, ActionListener listener); Response initialResponse(T task); @@ -121,7 +122,7 @@ public void asyncExecute(Request request, TimeValue waitForCompletionTimeout, Ti T searchTask = (T) taskManager.register("transport", action + "[a]", new AsyncRequestWrapper(request, nodeId)); boolean operationStarted = false; try { - operation.operation(request, searchTask, + operation.execute(request, searchTask, wrapStoringListener(searchTask, waitForCompletionTimeout, keepAlive, keepOnCompletion, listener)); operationStarted = true; } finally { @@ -132,22 +133,6 @@ public void asyncExecute(Request request, TimeValue waitForCompletionTimeout, Ti } } - // TODO: For tests for now, will be merged into comprehensive get operation later - T getTask(AsyncExecutionId asyncExecutionId) throws IOException { - return asyncTaskIndexService.getTask(taskManager, asyncExecutionId, taskClass); - } - - // TODO: For tests for now, will be removed when the final get operation is added - void getResponse(AsyncExecutionId id, ActionListener listener) { - asyncTaskIndexService.getResponse(id, true, ActionListener.wrap(r -> { - if (r.getException() != null) { - listener.onFailure(r.getException()); - } else { - listener.onResponse(r.getResponse()); - } - }, listener::onFailure)); - } - private ActionListener wrapStoringListener(T searchTask, TimeValue waitForCompletionTimeout, TimeValue keepAlive, @@ -173,6 +158,7 @@ private ActionListener wrapStoringListener(T searchTask, ActionListener.wrap(() -> acquiredListener.onResponse(response))); } else { taskManager.unregister(searchTask); + searchTask.onResponse(response); acquiredListener.onResponse(response); } } else { @@ -190,6 +176,7 @@ private ActionListener wrapStoringListener(T searchTask, ActionListener.wrap(() -> acquiredListener.onFailure(e))); } else { taskManager.unregister(searchTask); + searchTask.onFailure(e); acquiredListener.onFailure(e); } } else { @@ -209,15 +196,21 @@ private void storeResults(T searchTask, StoredAsyncResponse storedResp threadPool.getThreadContext().getHeaders(), storedResponse, ActionListener.wrap( // We should only unregister after the result is saved resp -> { - taskManager.unregister(searchTask); logger.trace(() -> new ParameterizedMessage("stored eql search results for [{}]", searchTask.getExecutionId().getEncoded())); + taskManager.unregister(searchTask); + if (storedResponse.getException() != null) { + searchTask.onFailure(storedResponse.getException()); + } else { + searchTask.onResponse(storedResponse.getResponse()); + } if (finalListener != null) { finalListener.onResponse(null); } }, exc -> { taskManager.unregister(searchTask); + searchTask.onFailure(exc); Throwable cause = ExceptionsHelper.unwrapCause(exc); if (cause instanceof DocumentMissingException == false && cause instanceof VersionConflictEngineException == false) { @@ -230,8 +223,43 @@ private void storeResults(T searchTask, StoredAsyncResponse storedResp })); } catch (Exception exc) { taskManager.unregister(searchTask); + searchTask.onFailure(exc); logger.error(() -> new ParameterizedMessage("failed to store eql search results for [{}]", searchTask.getExecutionId().getEncoded()), exc); } } + + /** + * Adds a self-unregistering listener to a task. It works as a normal listener except it retrieves a partial response and unregister + * itself from the task if timeout occurs. + */ + public static > void addCompletionListener( + ThreadPool threadPool, + Task task, + ActionListener> listener, + TimeValue timeout) { + if (timeout.getMillis() <= 0) { + getCurrentResult(task, listener); + } else { + task.addCompletionListener(ListenerTimeouts.wrapWithTimeout(threadPool, timeout, ThreadPool.Names.SEARCH, ActionListener.wrap( + r -> listener.onResponse(new StoredAsyncResponse<>(r, task.getExpirationTimeMillis())), + e -> listener.onResponse(new StoredAsyncResponse<>(e, task.getExpirationTimeMillis())) + ), wrapper -> { + // Timeout was triggered + task.removeCompletionListener(wrapper); + getCurrentResult(task, listener); + })); + } + } + + private static > void getCurrentResult( + Task task, + ActionListener> listener + ) { + try { + listener.onResponse(new StoredAsyncResponse<>(task.getCurrentResult(), task.getExpirationTimeMillis())); + } catch (Exception ex) { + listener.onFailure(ex); + } + } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/StoredAsyncResponse.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/StoredAsyncResponse.java index 7b2987c8c5634..d027d22c0c417 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/StoredAsyncResponse.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/StoredAsyncResponse.java @@ -6,9 +6,12 @@ package org.elasticsearch.xpack.eql.async; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.xpack.core.async.AsyncResponse; import java.io.IOException; @@ -17,7 +20,8 @@ /** * Internal class for temporary storage of eql search results */ -public class StoredAsyncResponse implements AsyncResponse> { +public class StoredAsyncResponse extends ActionResponse + implements AsyncResponse>, ToXContentObject { private final R response; private final Exception exception; private final long expirationTimeMillis; @@ -91,4 +95,9 @@ public int hashCode() { return Objects.hash(response, exception == null ? null : exception.getClass(), exception == null ? null : exception.getMessage(), expirationTimeMillis); } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return null; + } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/StoredAsyncTask.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/StoredAsyncTask.java new file mode 100644 index 0000000000000..df01e3e9064b0 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/StoredAsyncTask.java @@ -0,0 +1,97 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.async; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; +import org.elasticsearch.xpack.core.async.AsyncTask; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public abstract class StoredAsyncTask extends CancellableTask implements AsyncTask { + + private final AsyncExecutionId asyncExecutionId; + private final Map originHeaders; + private volatile long expirationTimeMillis; + private final List> completionListeners; + + public StoredAsyncTask(long id, String type, String action, String description, TaskId parentTaskId, + Map headers, Map originHeaders, AsyncExecutionId asyncExecutionId, + TimeValue keepAlive) { + super(id, type, action, description, parentTaskId, headers); + this.asyncExecutionId = asyncExecutionId; + this.originHeaders = originHeaders; + this.expirationTimeMillis = getStartTime() + keepAlive.getMillis(); + this.completionListeners = new ArrayList<>(); + } + + @Override + public boolean shouldCancelChildrenOnCancellation() { + return true; + } + + @Override + public Map getOriginHeaders() { + return originHeaders; + } + + @Override + public AsyncExecutionId getExecutionId() { + return asyncExecutionId; + } + + /** + * Update the expiration time of the (partial) response. + */ + @Override + public void setExpirationTime(long expirationTimeMillis) { + this.expirationTimeMillis = expirationTimeMillis; + } + + public long getExpirationTimeMillis() { + return expirationTimeMillis; + } + + public synchronized void addCompletionListener(ActionListener listener) { + completionListeners.add(listener); + } + + public synchronized void removeCompletionListener(ActionListener listener) { + completionListeners.remove(listener); + } + + /** + * This method is called when the task is finished successfully before unregistering the task and storing the results + */ + protected synchronized void onResponse(Response response) { + for (ActionListener listener : completionListeners) { + listener.onResponse(response); + } + } + + /** + * This method is called when the task failed before unregistering the task and storing the results + */ + protected synchronized void onFailure(Exception e) { + for (ActionListener listener : completionListeners) { + listener.onFailure(e); + } + } + + /** + * Return currently available partial or the final results + */ + protected abstract Response getCurrentResult(); + +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlAsyncGetResultAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlAsyncGetResultAction.java new file mode 100644 index 0000000000000..02f5b1b3f7bad --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlAsyncGetResultAction.java @@ -0,0 +1,18 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.eql.plugin; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.xpack.core.eql.EqlAsyncActionNames; +import org.elasticsearch.xpack.eql.action.EqlSearchResponse; + +public class EqlAsyncGetResultAction extends ActionType { + public static final EqlAsyncGetResultAction INSTANCE = new EqlAsyncGetResultAction(); + + private EqlAsyncGetResultAction() { + super(EqlAsyncActionNames.EQL_ASYNC_GET_RESULT_ACTION_NAME, EqlSearchResponse::new); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java index d9a1d956affe8..c4dea0d03d84e 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java @@ -113,6 +113,7 @@ public List> getSettings() { return List.of( new ActionHandler<>(EqlSearchAction.INSTANCE, TransportEqlSearchAction.class), new ActionHandler<>(EqlStatsAction.INSTANCE, TransportEqlStatsAction.class), + new ActionHandler<>(EqlAsyncGetResultAction.INSTANCE, TransportEqlAsyncGetResultAction.class), new ActionHandler<>(XPackUsageFeatureAction.EQL, EqlUsageTransportAction.class), new ActionHandler<>(XPackInfoFeatureAction.EQL, EqlInfoTransportAction.class) ); @@ -142,7 +143,11 @@ public List getRestHandlers(Settings settings, Supplier nodesInCluster) { if (enabled) { - return List.of(new RestEqlSearchAction(), new RestEqlStatsAction()); + return List.of( + new RestEqlSearchAction(), + new RestEqlStatsAction(), + new RestEqlGetAsyncResultAction() + ); } return List.of(); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlGetAsyncResultAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlGetAsyncResultAction.java new file mode 100644 index 0000000000000..13352079df4ab --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlGetAsyncResultAction.java @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.eql.plugin; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; + +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.GET; + +public class RestEqlGetAsyncResultAction extends BaseRestHandler { + @Override + public List routes() { + return List.of(new Route(GET, "/_eql/search/{id}")); + } + + @Override + public String getName() { + return "eql_get_async_result"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + GetAsyncResultRequest get = new GetAsyncResultRequest(request.param("id")); + if (request.hasParam("wait_for_completion_timeout")) { + get.setWaitForCompletion(request.paramAsTime("wait_for_completion_timeout", get.getWaitForCompletion())); + } + if (request.hasParam("keep_alive")) { + get.setKeepAlive(request.paramAsTime("keep_alive", get.getKeepAlive())); + } + return channel -> client.execute(EqlAsyncGetResultAction.INSTANCE, get, new RestToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncGetResultAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncGetResultAction.java new file mode 100644 index 0000000000000..8d1c637270ad8 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncGetResultAction.java @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.eql.plugin; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.async.AsyncResultsService; +import org.elasticsearch.xpack.core.async.AsyncTaskIndexService; +import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; +import org.elasticsearch.xpack.core.eql.EqlAsyncActionNames; +import org.elasticsearch.xpack.eql.action.EqlSearchResponse; +import org.elasticsearch.xpack.eql.action.EqlSearchTask; +import org.elasticsearch.xpack.eql.async.StoredAsyncResponse; + +import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; +import static org.elasticsearch.xpack.eql.async.AsyncTaskManagementService.addCompletionListener; + +public class TransportEqlAsyncGetResultAction extends HandledTransportAction { + private final AsyncResultsService> resultsService; + private final TransportService transportService; + + @Inject + public TransportEqlAsyncGetResultAction(TransportService transportService, + ActionFilters actionFilters, + ClusterService clusterService, + NamedWriteableRegistry registry, + Client client, + ThreadPool threadPool) { + super(EqlAsyncActionNames.EQL_ASYNC_GET_RESULT_ACTION_NAME, transportService, actionFilters, GetAsyncResultRequest::new); + this.transportService = transportService; + Writeable.Reader> reader = in -> new StoredAsyncResponse<>(EqlSearchResponse::new, in); + AsyncTaskIndexService> store = new AsyncTaskIndexService<>(EqlPlugin.INDEX, clusterService, + threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, reader, registry); + resultsService = new AsyncResultsService<>(store, true, EqlSearchTask.class, + (task, listener, timeout) -> addCompletionListener(threadPool, task, listener, timeout), + transportService.getTaskManager(), clusterService); + } + + @Override + protected void doExecute(Task task, GetAsyncResultRequest request, ActionListener listener) { + DiscoveryNode node = resultsService.getNode(request.getId()); + if (node == null || resultsService.isLocalNode(node)) { + resultsService.retrieveResult(request, ActionListener.wrap( + r -> { + if (r.getException() != null) { + listener.onFailure(r.getException()); + } else { + listener.onResponse(r.getResponse()); + } + }, + listener::onFailure + )); + } else { + TransportRequestOptions.Builder builder = TransportRequestOptions.builder(); + transportService.sendRequest(node, EqlAsyncActionNames.EQL_ASYNC_GET_RESULT_ACTION_NAME, request, builder.build(), + new ActionListenerResponseHandler<>(listener, EqlSearchResponse::new, ThreadPool.Names.SAME)); + } + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java index f712cddcbe16a..ba3da64973446 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java @@ -69,11 +69,12 @@ public TransportEqlSearchAction(Settings settings, ClusterService clusterService @Override public EqlSearchTask createTask(EqlSearchRequest request, long id, String type, String action, TaskId parentTaskId, Map headers, Map originHeaders, AsyncExecutionId asyncExecutionId) { - return new EqlSearchTask(id, type, action, request.getDescription(), parentTaskId, headers, originHeaders, asyncExecutionId); + return new EqlSearchTask(id, type, action, request.getDescription(), parentTaskId, headers, originHeaders, asyncExecutionId, + request.keepAlive()); } @Override - public void operation(EqlSearchRequest request, EqlSearchTask task, ActionListener listener) { + public void execute(EqlSearchRequest request, EqlSearchTask task, ActionListener listener) { operation(planExecutor, task, request, username(securityContext), clusterName(clusterService), clusterService.localNode().getId(), listener); } @@ -115,7 +116,7 @@ public static void operation(PlanExecutor planExecutor, EqlSearchTask task, EqlS .implicitJoinKey(request.implicitJoinKeyField()); EqlConfiguration cfg = new EqlConfiguration(request.indices(), zoneId, username, clusterName, filter, timeout, request.fetchSize(), - includeFrozen, request.isCaseSensitive(), clientId, new TaskId(nodeId, task.getId()), task); + includeFrozen, request.isCaseSensitive(), clientId, new TaskId(nodeId, task.getId()), task); planExecutor.eql(cfg, request.query(), params, wrap(r -> listener.onResponse(createResponse(r, task.getExecutionId())), listener::onFailure)); } diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/EqlTestUtils.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/EqlTestUtils.java index ff0fa90a369b3..d9e67dbfc133f 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/EqlTestUtils.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/EqlTestUtils.java @@ -63,6 +63,6 @@ public static EqlConfiguration randomConfigurationWithCaseSensitive(boolean isCa public static EqlSearchTask randomTask() { return new EqlSearchTask(randomLong(), "transport", EqlSearchAction.NAME, "", null, Collections.emptyMap(), Collections.emptyMap(), - new AsyncExecutionId("", new TaskId(randomAlphaOfLength(10), 1))); + new AsyncExecutionId("", new TaskId(randomAlphaOfLength(10), 1)), TimeValue.timeValueDays(5)); } } diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementServiceTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementServiceTests.java index 91686b2ed7b0c..83dfe78f91c9c 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementServiceTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementServiceTests.java @@ -13,36 +13,38 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESSingleNodeTestCase; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.async.AsyncExecutionId; -import org.elasticsearch.xpack.core.async.AsyncTask; +import org.elasticsearch.xpack.core.async.AsyncResultsService; +import org.elasticsearch.xpack.core.async.AsyncTaskIndexService; +import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; import org.junit.After; import org.junit.Before; import java.io.IOException; import java.util.Map; +import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import static org.elasticsearch.xpack.eql.async.AsyncTaskManagementService.addCompletionListener; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; -// TODO: test CRUD operations public class AsyncTaskManagementServiceTests extends ESSingleNodeTestCase { private ClusterService clusterService; private TransportService transportService; + private AsyncResultsService> results; private final ExecutorService executorService = Executors.newFixedThreadPool(1); - public static class TestRequest extends ActionRequest { private final String string; @@ -77,31 +79,17 @@ public void writeTo(StreamOutput out) throws IOException { } } - public static class TestTask extends CancellableTask implements AsyncTask { - - private final Map originHeaders; - private final AsyncExecutionId asyncExecutionId; + public static class TestTask extends StoredAsyncTask { + public volatile AtomicReference finalResponse = new AtomicReference<>(); public TestTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers, - Map originHeaders, AsyncExecutionId asyncExecutionId) { - super(id, type, action, description, parentTaskId, headers); - this.originHeaders = originHeaders; - this.asyncExecutionId = asyncExecutionId; - } - - @Override - public boolean shouldCancelChildrenOnCancellation() { - return true; - } - - @Override - public Map getOriginHeaders() { - return originHeaders; + Map originHeaders, AsyncExecutionId asyncExecutionId, TimeValue keepAlive) { + super(id, type, action, description, parentTaskId, headers, originHeaders, asyncExecutionId, keepAlive); } @Override - public AsyncExecutionId getExecutionId() { - return asyncExecutionId; + public TestResponse getCurrentResult() { + return Objects.requireNonNullElseGet(finalResponse.get(), () -> new TestResponse(null, getExecutionId().getEncoded())); } } @@ -110,21 +98,22 @@ public static class TestOperation implements AsyncTaskManagementService.AsyncOpe @Override public TestTask createTask(TestRequest request, long id, String type, String action, TaskId parentTaskId, Map headers, Map originHeaders, AsyncExecutionId asyncExecutionId) { - return new TestTask(id, type, action, request.getDescription(), parentTaskId, headers, originHeaders, asyncExecutionId); + return new TestTask(id, type, action, request.getDescription(), parentTaskId, headers, originHeaders, asyncExecutionId, + TimeValue.timeValueDays(5)); } @Override - public void operation(TestRequest request, TestTask task, ActionListener listener) { + public void execute(TestRequest request, TestTask task, ActionListener listener) { if (request.string.equals("die")) { listener.onFailure(new IllegalArgumentException("test exception")); } else { - listener.onResponse(new TestResponse("response for [" + request.string + "]", task.asyncExecutionId.getEncoded())); + listener.onResponse(new TestResponse("response for [" + request.string + "]", task.getExecutionId().getEncoded())); } } @Override public TestResponse initialResponse(TestTask task) { - return new TestResponse(null, task.asyncExecutionId.getEncoded()); + return new TestResponse(null, task.getExecutionId().getEncoded()); } @Override @@ -139,6 +128,12 @@ public TestResponse readResponse(StreamInput inputStream) throws IOException { public void setup() { clusterService = getInstanceFromNode(ClusterService.class); transportService = getInstanceFromNode(TransportService.class); + AsyncTaskIndexService> store = + new AsyncTaskIndexService<>(index, clusterService, transportService.getThreadPool().getThreadContext(), client(), "test", + in -> new StoredAsyncResponse<>(TestResponse::new, in), writableRegistry()); + results = new AsyncResultsService<>(store, true, TestTask.class, + (task, listener, timeout) -> addCompletionListener(transportService.getThreadPool(), task, listener, timeout), + transportService.getTaskManager(), clusterService); } /** @@ -149,14 +144,14 @@ public void shutdownExec() { executorService.shutdown(); } - private AsyncTaskManagementService createService( + private AsyncTaskManagementService createManagementService( AsyncTaskManagementService.AsyncOperation operation) { return new AsyncTaskManagementService<>(index, client(), "test_origin", writableRegistry(), transportService.getTaskManager(), "test_action", operation, TestTask.class, clusterService, transportService.getThreadPool()); } public void testReturnBeforeTimeout() throws Exception { - AsyncTaskManagementService service = createService(new TestOperation()); + AsyncTaskManagementService service = createManagementService(new TestOperation()); boolean success = randomBoolean(); boolean keepOnCompletion = randomBoolean(); CountDownLatch latch = new CountDownLatch(1); @@ -175,62 +170,118 @@ public void testReturnBeforeTimeout() throws Exception { assertThat(latch.await(10, TimeUnit.SECONDS), equalTo(true)); } - @TestLogging(value = "org.elasticsearch.xpack.core.async:trace", reason = "remove me") public void testReturnAfterTimeout() throws Exception { CountDownLatch executionLatch = new CountDownLatch(1); - AsyncTaskManagementService service = createService(new TestOperation() { + AsyncTaskManagementService service = createManagementService(new TestOperation() { @Override - public void operation(TestRequest request, TestTask task, ActionListener listener) { + public void execute(TestRequest request, TestTask task, ActionListener listener) { executorService.submit(() -> { try { assertThat(executionLatch.await(10, TimeUnit.SECONDS), equalTo(true)); } catch (InterruptedException ex) { fail("Shouldn't be here"); } - super.operation(request, task, listener); + super.execute(request, task, listener); }); } }); boolean success = randomBoolean(); boolean keepOnCompletion = randomBoolean(); + boolean timeoutOnFirstAttempt = randomBoolean(); + boolean waitForCompletion = randomBoolean(); CountDownLatch latch = new CountDownLatch(1); TestRequest request = new TestRequest(success ? randomAlphaOfLength(10) : "die"); AtomicReference responseHolder = new AtomicReference<>(); - service.asyncExecute(request, TimeValue.timeValueMillis(0), TimeValue.timeValueMinutes(10), keepOnCompletion, + service.asyncExecute(request, TimeValue.timeValueMillis(1), TimeValue.timeValueMinutes(10), keepOnCompletion, ActionListener.wrap(r -> { assertThat(r.string, nullValue()); assertThat(r.id, notNullValue()); assertThat(responseHolder.getAndSet(r), nullValue()); latch.countDown(); - }, e -> { - fail("Shouldn't be here"); - })); - assertThat(latch.await(10, TimeUnit.SECONDS), equalTo(true)); - executionLatch.countDown(); - assertThat(responseHolder.get(), notNullValue()); - AsyncExecutionId id = AsyncExecutionId.decode(responseHolder.get().id); - assertThat(service.getTask(id), notNullValue()); + }, e -> fail("Shouldn't be here"))); + assertThat(latch.await(20, TimeUnit.SECONDS), equalTo(true)); + + if (timeoutOnFirstAttempt) { + logger.trace("Getting an in-flight response"); + // try getting results, but fail with timeout because it is not ready yet + StoredAsyncResponse response = getResponse(responseHolder.get().id, TimeValue.timeValueMillis(2)); + assertThat(response.getException(), nullValue()); + assertThat(response.getResponse(), notNullValue()); + assertThat(response.getResponse().id, equalTo(responseHolder.get().id)); + assertThat(response.getResponse().string, nullValue()); + } - CountDownLatch responseLatch = new CountDownLatch(1); + if (waitForCompletion) { + // now we are waiting for the task to finish + logger.trace("Waiting for response to complete"); + AtomicReference> responseRef = new AtomicReference<>(); + CountDownLatch getResponseCountDown = getResponse(responseHolder.get().id, TimeValue.timeValueSeconds(5), + ActionListener.wrap(responseRef::set, e -> fail("Shouldn't be here"))); + + executionLatch.countDown(); + assertThat(getResponseCountDown.await(10, TimeUnit.SECONDS), equalTo(true)); + + StoredAsyncResponse response = responseRef.get(); + if (success) { + assertThat(response.getException(), nullValue()); + assertThat(response.getResponse(), notNullValue()); + assertThat(response.getResponse().id, equalTo(responseHolder.get().id)); + assertThat(response.getResponse().string, equalTo("response for [" + request.string + "]")); + } else { + assertThat(response.getException(), notNullValue()); + assertThat(response.getResponse(), nullValue()); + assertThat(response.getException().getMessage(), equalTo("test exception")); + } + } else { + executionLatch.countDown(); + } - // Wait until task finishes + // finally wait until the task disappears and get the response from the index + logger.trace("Wait for task to disappear "); assertBusy(() -> { - TestTask t = service.getTask(id); - logger.info(t); - assertThat(t, nullValue()); + Task task = transportService.getTaskManager().getTask(AsyncExecutionId.decode(responseHolder.get().id).getTaskId().getId()); + assertThat(task, nullValue()); }); - ensureGreen(index); - logger.info("Getting the the response back"); - service.getResponse(id, ActionListener.wrap( + logger.trace("Getting the the final response from the index"); + StoredAsyncResponse response = getResponse(responseHolder.get().id, TimeValue.ZERO); + if (success) { + assertThat(response.getException(), nullValue()); + assertThat(response.getResponse(), notNullValue()); + assertThat(response.getResponse().string, equalTo("response for [" + request.string + "]")); + } else { + assertThat(response.getException(), notNullValue()); + assertThat(response.getResponse(), nullValue()); + assertThat(response.getException().getMessage(), equalTo("test exception")); + } + } + + private StoredAsyncResponse getResponse(String id, TimeValue timeout) throws InterruptedException { + AtomicReference> response = new AtomicReference<>(); + assertThat( + getResponse(id, timeout, ActionListener.wrap(response::set, e -> fail("Shouldn't be here"))).await(10, TimeUnit.SECONDS), + equalTo(true) + ); + return response.get(); + } + + private CountDownLatch getResponse(String id, + TimeValue timeout, + ActionListener> listener) { + CountDownLatch responseLatch = new CountDownLatch(1); + GetAsyncResultRequest getResultsRequest = new GetAsyncResultRequest(id) + .setWaitForCompletion(timeout); + results.retrieveResult(getResultsRequest, ActionListener.wrap( r -> { - assertThat(r.string, equalTo("response for [" + request.string + "]")); + listener.onResponse(r); responseLatch.countDown(); }, e -> { - assertThat(e.getMessage(), equalTo("test exception")); + listener.onFailure(e); responseLatch.countDown(); - })); - assertThat(responseLatch.await(10, TimeUnit.SECONDS), equalTo(true)); + } + )); + return responseLatch; } + } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java index 81b35e4dfb163..745345aeee3c8 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.xpack.core.eql.EqlAsyncActionNames; import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction; import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction; import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction; @@ -266,7 +267,7 @@ public void authorizeIndexAction(RequestInfo requestInfo, AuthorizationInfo auth // information such as the index and the incoming address of the request listener.onResponse(new IndexAuthorizationResult(true, IndicesAccessControl.ALLOW_NO_INDICES)); } - } else if (isAsyncSearchRelatedAction(action)) { + } else if (isAsyncRelatedAction(action)) { if (SubmitAsyncSearchAction.NAME.equals(action)) { // we check if the user has any indices permission when submitting an async-search request in order to be // able to fail the request early. Fine grained index-level permissions are handled by the search action @@ -587,9 +588,11 @@ private static boolean isScrollRelatedAction(String action) { action.equals(SearchTransportService.CLEAR_SCROLL_CONTEXTS_ACTION_NAME); } - private static boolean isAsyncSearchRelatedAction(String action) { + private static boolean isAsyncRelatedAction(String action) { return action.equals(SubmitAsyncSearchAction.NAME) || action.equals(GetAsyncSearchAction.NAME) || - action.equals(DeleteAsyncSearchAction.NAME); + action.equals(DeleteAsyncSearchAction.NAME) || + action.equals(EqlAsyncActionNames.EQL_ASYNC_GET_RESULT_ACTION_NAME) || + action.equals(EqlAsyncActionNames.EQL_ASYNC_DELETE_RESULT_ACTION_NAME); } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/eql.get.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/eql.get.json new file mode 100644 index 0000000000000..9271f43edf736 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/eql.get.json @@ -0,0 +1,36 @@ +{ + "eql.get":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/eql-search-api.html", + "description": "Returns async results from previously executed Event Query Language (EQL) search" + }, + "stability": "beta", + "url":{ + "paths":[ + { + "path":"/_eql/search/{id}", + "methods":[ + "GET" + ], + "parts":{ + "id":{ + "type":"string", + "description":"The async search ID" + } + } + } + ] + }, + "params":{ + "wait_for_completion_timeout":{ + "type":"time", + "description":"Specify the time that the request should block waiting for the final response" + }, + "keep_alive": { + "type": "time", + "description": "Update the time interval in which the results (partial or final) for this search will be available", + "default": "5d" + } + } + } +}