diff --git a/docs/changelog/143016.yaml b/docs/changelog/143016.yaml new file mode 100644 index 0000000000000..f10d3ea8ed24e --- /dev/null +++ b/docs/changelog/143016.yaml @@ -0,0 +1,6 @@ +area: ES|QL +issues: + - 142662 +pr: 143016 +summary: Cancel async query on expiry +type: bug diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java index 9164ff4582ce2..9ffdefd65adb5 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java @@ -44,6 +44,7 @@ import static org.elasticsearch.test.hamcrest.OptionalMatchers.isPresent; import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; import static org.elasticsearch.xpack.esql.action.EsqlQueryRequest.asyncEsqlQueryRequest; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -326,6 +327,51 @@ public void testUpdateKeepAlive() throws Exception { } } + public void testCancelOnExpiry() throws Exception { + TimeValue keepAlive = timeValueMillis(between(1000, 2000)); + var request = asyncEsqlQueryRequest("from test | stats sum(pause_me)").pragmas(queryPragmas()) + .waitForCompletionTimeout(TimeValue.timeValueMillis(between(1, 10))) + .keepOnCompletion(randomBoolean()) + .keepAlive(keepAlive); + final String asyncId; + try { + try (EsqlQueryResponse initialResponse = client().execute(EsqlQueryAction.INSTANCE, request).actionGet(60, TimeUnit.SECONDS)) { + assertThat(initialResponse.isRunning(), is(true)); + assertTrue(initialResponse.asyncExecutionId().isPresent()); + asyncId = initialResponse.asyncExecutionId().get(); + } + // all the started drivers were canceled + assertBusy(() -> { + List tasks = client().admin() + .cluster() + .prepareListTasks() + .setActions(DriverTaskRunner.ACTION_NAME) + .setDetailed(true) + .get() + .getTasks(); + for (TaskInfo task : tasks) { + assertTrue(task.cancelled()); + } + }); + // the async task was canceled + assertBusy(() -> { + List queryTasks = getEsqlQueryTasks(); + assertThat(queryTasks, hasSize(1)); + assertTrue(queryTasks.get(0).cancelled()); + }); + } finally { + scriptPermits.release(numberOfDocs()); + } + TaskCancelledException error = expectThrows(TaskCancelledException.class, () -> { + var getRequest = new GetAsyncResultRequest(asyncId).setWaitForCompletionTimeout(timeValueSeconds(10)) + .setKeepAlive(timeValueSeconds(30)); + try (var resp = client().execute(EsqlAsyncGetResultAction.INSTANCE, getRequest).actionGet()) { + assertThat(resp.isRunning(), is(false)); + } + }); + assertThat(error.getMessage(), containsString("keep_alive expired")); + } + private static long getExpirationFromTask(String asyncId) { List tasks = new ArrayList<>(); for (TransportService ts : internalCluster().getInstances(TransportService.class)) { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index 10da31c79050b..6a281379a88c8 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -51,10 +51,8 @@ import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.lookup.SearchLookup; import org.elasticsearch.tasks.CancellableTask; -import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.esql.core.expression.Alias; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; @@ -360,17 +358,7 @@ private void runLookup(List keyTypes, PopulateIndices populateIndices, ), 0 ); - CancellableTask parentTask = new EsqlQueryTask( - 1, - "test", - "test", - "test", - null, - Map.of(), - Map.of(), - new AsyncExecutionId("test", TaskId.EMPTY_TASK_ID), - TEST_REQUEST_TIMEOUT - ); + CancellableTask parentTask = new CancellableTask(1, "test", "test", "test", null, Map.of()); final String finalNodeWithShard = nodeWithShard; boolean expressionJoin = EsqlCapabilities.Cap.LOOKUP_JOIN_ON_BOOLEAN_EXPRESSION.isEnabled() ? randomBoolean() : false; List matchFields = new ArrayList<>(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java index e759633c69df5..7d8b4345ede6a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java @@ -9,15 +9,18 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.core.async.StoredAsyncTask; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; -public class EsqlQueryTask extends StoredAsyncTask { +public abstract class EsqlQueryTask extends StoredAsyncTask { private EsqlExecutionInfo executionInfo; + private final AtomicReference scheduledCancellation = new AtomicReference<>(); public EsqlQueryTask( long id, @@ -60,4 +63,41 @@ public EsqlQueryResponse getCurrentResult() { executionInfo ); } + + @Override + public void onResponse(EsqlQueryResponse response) { + removeScheduledCancellation(); + super.onResponse(response); + } + + @Override + public void onFailure(Exception e) { + removeScheduledCancellation(); + super.onFailure(e); + } + + @Override + public void setExpirationTime(long expirationTime) { + super.setExpirationTime(expirationTime); + rescheduleCancellationOnExpiry(); + } + + private void removeScheduledCancellation() { + var prev = scheduledCancellation.getAndSet(null); + if (prev != null) { + prev.cancel(); + } + } + + /** + * Schedules task cancellation at the given expiration time + */ + protected abstract Scheduler.ScheduledCancellable scheduleCancellationOnExpiry(long expirationTimeMillis); + + public void rescheduleCancellationOnExpiry() { + var prev = scheduledCancellation.getAndSet(scheduleCancellationOnExpiry(getExpirationTimeMillis())); + if (prev != null) { + prev.cancel(); + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index e17f354c0611e..d290eddaa88bb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -21,16 +21,21 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.compute.data.BlockFactoryProvider; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.crossproject.CrossProjectModeDecider; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; @@ -73,6 +78,8 @@ public class TransportEsqlQueryAction extends HandledTransportAction { + private static final Logger logger = LogManager.getLogger(TransportEsqlQueryAction.class); + private final ThreadPool threadPool; private final PlanExecutor planExecutor; private final ComputeService computeService; @@ -234,6 +241,7 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener public void execute(EsqlQueryRequest request, EsqlQueryTask task, ActionListener listener) { // set EsqlExecutionInfo on async-search task so that it is accessible to GET _query/async while the query is still running task.setExecutionInfo(createEsqlExecutionInfo(request)); + task.rescheduleCancellationOnExpiry(); ActionListener.run(listener, l -> innerExecute(task, request, l)); } @@ -475,6 +483,23 @@ public EsqlQueryTask createTask( public Status getStatus() { return new EsqlQueryStatus(asyncExecutionId); } + + @Override + protected Scheduler.ScheduledCancellable scheduleCancellationOnExpiry(long expirationTimeMillis) { + final long delay = Math.max(expirationTimeMillis - threadPool.absoluteTimeInMillis(), 0); + final CancellableTask task = this; + return threadPool.schedule(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + logger.warn("failed to cancel async-query on expiry", e); + } + + @Override + protected void doRun() { + taskManager.cancelTaskAndDescendants(task, "keep_alive expired", false, ActionListener.noop()); + } + }, TimeValue.timeValueMillis(delay), threadPool.generic()); + } }; }