diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java index a3f2c7256090c..29ebb532c01cc 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java @@ -37,7 +37,6 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Supplier; @@ -47,7 +46,6 @@ * Task that tracks the progress of a currently running {@link SearchRequest}. */ final class AsyncSearchTask extends SearchTask implements AsyncTask { - private final BooleanSupplier checkSubmitCancellation; private final AsyncExecutionId searchId; private final Client client; private final ThreadPool threadPool; @@ -74,7 +72,6 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask { * @param type The type of the task. * @param action The action name. * @param parentTaskId The parent task id. - * @param checkSubmitCancellation A boolean supplier that checks if the submit task has been cancelled. * @param originHeaders All the request context headers. * @param taskHeaders The filtered request headers for the task. * @param searchId The {@link AsyncExecutionId} of the task. @@ -85,7 +82,6 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask { String type, String action, TaskId parentTaskId, - BooleanSupplier checkSubmitCancellation, TimeValue keepAlive, Map originHeaders, Map taskHeaders, @@ -94,7 +90,6 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask { ThreadPool threadPool, Supplier aggReduceContextSupplier) { super(id, type, action, "async_search", parentTaskId, taskHeaders); - this.checkSubmitCancellation = checkSubmitCancellation; this.expirationTimeMillis = getStartTime() + keepAlive.getMillis(); this.originHeaders = originHeaders; this.searchId = searchId; @@ -326,12 +321,9 @@ private AsyncSearchResponse getResponseWithHeaders() { // checks if the search task should be cancelled private synchronized void checkCancellation() { long now = System.currentTimeMillis(); - if (hasCompleted == false && - expirationTimeMillis < now || checkSubmitCancellation.getAsBoolean()) { - // we cancel the search task if the initial submit task was cancelled, - // this is needed because the task cancellation mechanism doesn't - // handle the cancellation of grand-children. - cancelTask(() -> {}, checkSubmitCancellation.getAsBoolean() ? "submit was cancelled" : "async search has expired"); + if (hasCompleted == false && expirationTimeMillis < now) { + // we cancel expired search task even if they are still running + cancelTask(() -> {}, "async search has expired"); } } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java index da3adf12b9448..ffb9e03a5cd74 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java @@ -29,9 +29,7 @@ import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackPlugin; @@ -75,8 +73,7 @@ public TransportSubmitAsyncSearchAction(ClusterService clusterService, } @Override - protected void doExecute(Task task, SubmitAsyncSearchRequest request, ActionListener submitListener) { - CancellableTask submitTask = (CancellableTask) task; + protected void doExecute(Task submitTask, SubmitAsyncSearchRequest request, ActionListener submitListener) { final SearchRequest searchRequest = createSearchRequest(request, submitTask, request.getKeepAlive()); AsyncSearchTask searchTask = (AsyncSearchTask) taskManager.register("transport", SearchAction.INSTANCE.name(), searchRequest); searchAction.execute(searchTask, searchRequest, searchTask.getSearchProgressActionListener()); @@ -88,42 +85,34 @@ public void onResponse(AsyncSearchResponse searchResponse) { // the task is still running and the user cannot wait more so we create // a document for further retrieval try { - if (submitTask.isCancelled()) { - // the user cancelled the submit so we don't store anything - // and propagate the failure - Exception cause = new TaskCancelledException(submitTask.getReasonCancelled()); - onFatalFailure(searchTask, cause, searchResponse.isRunning(), - "submit task is cancelled", submitListener); - } else { - final String docId = searchTask.getExecutionId().getDocId(); - // creates the fallback response if the node crashes/restarts in the middle of the request - // TODO: store intermediate results ? - AsyncSearchResponse initialResp = searchResponse.clone(searchResponse.getId()); - store.createResponse(docId, searchTask.getOriginHeaders(), initialResp, - new ActionListener<>() { - @Override - public void onResponse(IndexResponse r) { - if (searchResponse.isRunning()) { - try { - // store the final response on completion unless the submit is cancelled - searchTask.addCompletionListener(finalResponse -> - onFinalResponse(submitTask, searchTask, finalResponse, () -> {})); - } finally { - submitListener.onResponse(searchResponse); - } - } else { - onFinalResponse(submitTask, searchTask, searchResponse, - () -> submitListener.onResponse(searchResponse)); + final String docId = searchTask.getExecutionId().getDocId(); + // creates the fallback response if the node crashes/restarts in the middle of the request + // TODO: store intermediate results ? + AsyncSearchResponse initialResp = searchResponse.clone(searchResponse.getId()); + store.createResponse(docId, searchTask.getOriginHeaders(), initialResp, + new ActionListener<>() { + @Override + public void onResponse(IndexResponse r) { + if (searchResponse.isRunning()) { + try { + // store the final response on completion unless the submit is cancelled + searchTask.addCompletionListener(finalResponse -> + onFinalResponse(searchTask, finalResponse, () -> { + })); + } finally { + submitListener.onResponse(searchResponse); } + } else { + onFinalResponse(searchTask, searchResponse, () -> submitListener.onResponse(searchResponse)); } + } - @Override - public void onFailure(Exception exc) { - onFatalFailure(searchTask, exc, searchResponse.isRunning(), - "unable to store initial response", submitListener); - } - }); - } + @Override + public void onFailure(Exception exc) { + onFatalFailure(searchTask, exc, searchResponse.isRunning(), + "unable to store initial response", submitListener); + } + }); } catch (Exception exc) { onFatalFailure(searchTask, exc, searchResponse.isRunning(), "generic error", submitListener); } @@ -142,7 +131,7 @@ public void onFailure(Exception exc) { }, request.getWaitForCompletionTimeout()); } - private SearchRequest createSearchRequest(SubmitAsyncSearchRequest request, CancellableTask submitTask, TimeValue keepAlive) { + private SearchRequest createSearchRequest(SubmitAsyncSearchRequest request, Task submitTask, TimeValue keepAlive) { String docID = UUIDs.randomBase64UUID(); Map originHeaders = nodeClient.threadPool().getThreadContext().getHeaders(); SearchRequest searchRequest = new SearchRequest(request.getSearchRequest()) { @@ -151,9 +140,8 @@ public AsyncSearchTask createTask(long id, String type, String action, TaskId pa AsyncExecutionId searchId = new AsyncExecutionId(docID, new TaskId(nodeClient.getLocalNodeId(), id)); Supplier aggReduceContextSupplier = () -> requestToAggReduceContextBuilder.apply(request.getSearchRequest()); - return new AsyncSearchTask(id, type, action, parentTaskId, - submitTask::isCancelled, keepAlive, originHeaders, taskHeaders, searchId, store.getClient(), - nodeClient.threadPool(), aggReduceContextSupplier); + return new AsyncSearchTask(id, type, action, parentTaskId, keepAlive, + originHeaders, taskHeaders, searchId, store.getClient(), nodeClient.threadPool(), aggReduceContextSupplier); } }; searchRequest.setParentTask(new TaskId(nodeClient.getLocalNodeId(), submitTask.getId())); @@ -179,11 +167,10 @@ private void onFatalFailure(AsyncSearchTask task, Exception error, boolean shoul } } - private void onFinalResponse(CancellableTask submitTask, - AsyncSearchTask searchTask, + private void onFinalResponse(AsyncSearchTask searchTask, AsyncSearchResponse response, Runnable nextAction) { - if (submitTask.isCancelled() || searchTask.isCancelled()) { + if (searchTask.isCancelled()) { // the task was cancelled so we ensure that there is nothing stored in the response index. store.deleteResponse(searchTask.getExecutionId(), ActionListener.wrap( resp -> unregisterTaskAndMoveOn(searchTask, nextAction), diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java index bc54bc0ac9afe..35e2e32077a81 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java @@ -51,13 +51,13 @@ public void afterTest() { } private AsyncSearchTask createAsyncSearchTask() { - return new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> false, TimeValue.timeValueHours(1), + return new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), TimeValue.timeValueHours(1), Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)), new NoOpClient(threadPool), threadPool, null); } public void testWaitForInit() throws InterruptedException { - AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> false, TimeValue.timeValueHours(1), + AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), TimeValue.timeValueHours(1), Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)), new NoOpClient(threadPool), threadPool, null); int numShards = randomIntBetween(0, 10); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/SubmitAsyncSearchRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/SubmitAsyncSearchRequest.java index f7459c848803a..2bf8019d606bd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/SubmitAsyncSearchRequest.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/SubmitAsyncSearchRequest.java @@ -154,8 +154,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId, return new CancellableTask(id, type, action, null, parentTaskId, headers) { @Override public boolean shouldCancelChildrenOnCancellation() { - // we cancel the underlying search action explicitly in the submit action - return false; + return true; } @Override