From f4f0e51770808d2c2a972446ddbf225d555a04c6 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 2 Mar 2026 08:50:40 -0800 Subject: [PATCH] Cancel async query on expiry (#143016) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Today, we don’t cancel async queries when they expire. This means an async query continues running until it completes, unless the user explicitly deletes it. With this change, we will schedule a cancellation when an async query starts and reschedule the cancellation whenever the keep-alive is updated via get. Closes #142662 (cherry picked from commit 8feeac89aa6748f5d6ecd9dd44a61ac01a28d976) --- docs/changelog/143016.yaml | 6 +++ .../esql/action/AsyncEsqlQueryActionIT.java | 46 +++++++++++++++++++ .../xpack/esql/action/LookupFromIndexIT.java | 14 +----- .../xpack/esql/action/EsqlQueryTask.java | 42 ++++++++++++++++- .../esql/plugin/TransportEsqlQueryAction.java | 25 ++++++++++ 5 files changed, 119 insertions(+), 14 deletions(-) create mode 100644 docs/changelog/143016.yaml diff --git a/docs/changelog/143016.yaml b/docs/changelog/143016.yaml new file mode 100644 index 0000000000000..f10d3ea8ed24e --- /dev/null +++ b/docs/changelog/143016.yaml @@ -0,0 +1,6 @@ +area: ES|QL +issues: + - 142662 +pr: 143016 +summary: Cancel async query on expiry +type: bug diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java index 9164ff4582ce2..9ffdefd65adb5 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java @@ -44,6 +44,7 @@ import static org.elasticsearch.test.hamcrest.OptionalMatchers.isPresent; import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; import static org.elasticsearch.xpack.esql.action.EsqlQueryRequest.asyncEsqlQueryRequest; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -326,6 +327,51 @@ public void testUpdateKeepAlive() throws Exception { } } + public void testCancelOnExpiry() throws Exception { + TimeValue keepAlive = timeValueMillis(between(1000, 2000)); + var request = asyncEsqlQueryRequest("from test | stats sum(pause_me)").pragmas(queryPragmas()) + .waitForCompletionTimeout(TimeValue.timeValueMillis(between(1, 10))) + .keepOnCompletion(randomBoolean()) + .keepAlive(keepAlive); + final String asyncId; + try { + try (EsqlQueryResponse initialResponse = client().execute(EsqlQueryAction.INSTANCE, request).actionGet(60, TimeUnit.SECONDS)) { + assertThat(initialResponse.isRunning(), is(true)); + assertTrue(initialResponse.asyncExecutionId().isPresent()); + asyncId = initialResponse.asyncExecutionId().get(); + } + // all the started drivers were canceled + assertBusy(() -> { + List tasks = client().admin() + .cluster() + .prepareListTasks() + .setActions(DriverTaskRunner.ACTION_NAME) + .setDetailed(true) + .get() + .getTasks(); + for (TaskInfo task : tasks) { + assertTrue(task.cancelled()); + } + }); + // the async task was canceled + assertBusy(() -> { + List queryTasks = getEsqlQueryTasks(); + assertThat(queryTasks, hasSize(1)); + assertTrue(queryTasks.get(0).cancelled()); + }); + } finally { + scriptPermits.release(numberOfDocs()); + } + TaskCancelledException error = expectThrows(TaskCancelledException.class, () -> { + var getRequest = new GetAsyncResultRequest(asyncId).setWaitForCompletionTimeout(timeValueSeconds(10)) + .setKeepAlive(timeValueSeconds(30)); + try (var resp = client().execute(EsqlAsyncGetResultAction.INSTANCE, getRequest).actionGet()) { + assertThat(resp.isRunning(), is(false)); + } + }); + assertThat(error.getMessage(), containsString("keep_alive expired")); + } + private static long getExpirationFromTask(String asyncId) { List tasks = new ArrayList<>(); for (TransportService ts : internalCluster().getInstances(TransportService.class)) { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index 10da31c79050b..6a281379a88c8 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -51,10 +51,8 @@ import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.lookup.SearchLookup; import org.elasticsearch.tasks.CancellableTask; -import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.esql.core.expression.Alias; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; @@ -360,17 +358,7 @@ private void runLookup(List keyTypes, PopulateIndices populateIndices, ), 0 ); - CancellableTask parentTask = new EsqlQueryTask( - 1, - "test", - "test", - "test", - null, - Map.of(), - Map.of(), - new AsyncExecutionId("test", TaskId.EMPTY_TASK_ID), - TEST_REQUEST_TIMEOUT - ); + CancellableTask parentTask = new CancellableTask(1, "test", "test", "test", null, Map.of()); final String finalNodeWithShard = nodeWithShard; boolean expressionJoin = EsqlCapabilities.Cap.LOOKUP_JOIN_ON_BOOLEAN_EXPRESSION.isEnabled() ? randomBoolean() : false; List matchFields = new ArrayList<>(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java index e759633c69df5..7d8b4345ede6a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java @@ -9,15 +9,18 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.core.async.StoredAsyncTask; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; -public class EsqlQueryTask extends StoredAsyncTask { +public abstract class EsqlQueryTask extends StoredAsyncTask { private EsqlExecutionInfo executionInfo; + private final AtomicReference scheduledCancellation = new AtomicReference<>(); public EsqlQueryTask( long id, @@ -60,4 +63,41 @@ public EsqlQueryResponse getCurrentResult() { executionInfo ); } + + @Override + public void onResponse(EsqlQueryResponse response) { + removeScheduledCancellation(); + super.onResponse(response); + } + + @Override + public void onFailure(Exception e) { + removeScheduledCancellation(); + super.onFailure(e); + } + + @Override + public void setExpirationTime(long expirationTime) { + super.setExpirationTime(expirationTime); + rescheduleCancellationOnExpiry(); + } + + private void removeScheduledCancellation() { + var prev = scheduledCancellation.getAndSet(null); + if (prev != null) { + prev.cancel(); + } + } + + /** + * Schedules task cancellation at the given expiration time + */ + protected abstract Scheduler.ScheduledCancellable scheduleCancellationOnExpiry(long expirationTimeMillis); + + public void rescheduleCancellationOnExpiry() { + var prev = scheduledCancellation.getAndSet(scheduleCancellationOnExpiry(getExpirationTimeMillis())); + if (prev != null) { + prev.cancel(); + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index e17f354c0611e..d290eddaa88bb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -21,16 +21,21 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.compute.data.BlockFactoryProvider; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.crossproject.CrossProjectModeDecider; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; @@ -73,6 +78,8 @@ public class TransportEsqlQueryAction extends HandledTransportAction { + private static final Logger logger = LogManager.getLogger(TransportEsqlQueryAction.class); + private final ThreadPool threadPool; private final PlanExecutor planExecutor; private final ComputeService computeService; @@ -234,6 +241,7 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener public void execute(EsqlQueryRequest request, EsqlQueryTask task, ActionListener listener) { // set EsqlExecutionInfo on async-search task so that it is accessible to GET _query/async while the query is still running task.setExecutionInfo(createEsqlExecutionInfo(request)); + task.rescheduleCancellationOnExpiry(); ActionListener.run(listener, l -> innerExecute(task, request, l)); } @@ -475,6 +483,23 @@ public EsqlQueryTask createTask( public Status getStatus() { return new EsqlQueryStatus(asyncExecutionId); } + + @Override + protected Scheduler.ScheduledCancellable scheduleCancellationOnExpiry(long expirationTimeMillis) { + final long delay = Math.max(expirationTimeMillis - threadPool.absoluteTimeInMillis(), 0); + final CancellableTask task = this; + return threadPool.schedule(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + logger.warn("failed to cancel async-query on expiry", e); + } + + @Override + protected void doRun() { + taskManager.cancelTaskAndDescendants(task, "keep_alive expired", false, ActionListener.noop()); + } + }, TimeValue.timeValueMillis(delay), threadPool.generic()); + } }; }