diff --git a/docs/changelog/143016.yaml b/docs/changelog/143016.yaml new file mode 100644 index 0000000000000..f10d3ea8ed24e --- /dev/null +++ b/docs/changelog/143016.yaml @@ -0,0 +1,6 @@ +area: ES|QL +issues: + - 142662 +pr: 143016 +summary: Cancel async query on expiry +type: bug diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java index 9164ff4582ce2..9ffdefd65adb5 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java @@ -44,6 +44,7 @@ import static org.elasticsearch.test.hamcrest.OptionalMatchers.isPresent; import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; import static org.elasticsearch.xpack.esql.action.EsqlQueryRequest.asyncEsqlQueryRequest; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -326,6 +327,51 @@ public void testUpdateKeepAlive() throws Exception { } } + public void testCancelOnExpiry() throws Exception { + TimeValue keepAlive = timeValueMillis(between(1000, 2000)); + var request = asyncEsqlQueryRequest("from test | stats sum(pause_me)").pragmas(queryPragmas()) + .waitForCompletionTimeout(TimeValue.timeValueMillis(between(1, 10))) + .keepOnCompletion(randomBoolean()) + .keepAlive(keepAlive); + final String asyncId; + try { + try (EsqlQueryResponse initialResponse = client().execute(EsqlQueryAction.INSTANCE, request).actionGet(60, TimeUnit.SECONDS)) { + assertThat(initialResponse.isRunning(), is(true)); + assertTrue(initialResponse.asyncExecutionId().isPresent()); + asyncId = initialResponse.asyncExecutionId().get(); + } + // all the started drivers were canceled + assertBusy(() -> { + List tasks = client().admin() + .cluster() + .prepareListTasks() + .setActions(DriverTaskRunner.ACTION_NAME) + .setDetailed(true) + .get() + .getTasks(); + for (TaskInfo task : tasks) { + assertTrue(task.cancelled()); + } + }); + // the async task was canceled + assertBusy(() -> { + List queryTasks = getEsqlQueryTasks(); + assertThat(queryTasks, hasSize(1)); + assertTrue(queryTasks.get(0).cancelled()); + }); + } finally { + scriptPermits.release(numberOfDocs()); + } + TaskCancelledException error = expectThrows(TaskCancelledException.class, () -> { + var getRequest = new GetAsyncResultRequest(asyncId).setWaitForCompletionTimeout(timeValueSeconds(10)) + .setKeepAlive(timeValueSeconds(30)); + try (var resp = client().execute(EsqlAsyncGetResultAction.INSTANCE, getRequest).actionGet()) { + assertThat(resp.isRunning(), is(false)); + } + }); + assertThat(error.getMessage(), containsString("keep_alive expired")); + } + private static long getExpirationFromTask(String asyncId) { List tasks = new ArrayList<>(); for (TransportService ts : internalCluster().getInstances(TransportService.class)) { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index 2417428a13923..8019e41787143 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -50,10 +50,8 @@ import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.tasks.CancellableTask; -import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.expression.Literal; @@ -360,18 +358,7 @@ private void runLookup(List keyTypes, PopulateIndices populateIndices, true, 0 ); - CancellableTask parentTask = new EsqlQueryTask( - "test-session", - 1, - "test", - "test", - "test", - null, - Map.of(), - Map.of(), - new AsyncExecutionId("test", TaskId.EMPTY_TASK_ID), - TEST_REQUEST_TIMEOUT - ); + CancellableTask parentTask = new CancellableTask(1, "test", "test", "test", null, Map.of()); final String finalNodeWithShard = nodeWithShard; boolean expressionJoin = EsqlCapabilities.Cap.LOOKUP_JOIN_ON_BOOLEAN_EXPRESSION.isEnabled() ? randomBoolean() : false; List matchFields = new ArrayList<>(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java index c7b78383160dc..b7a26b8248ef2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java @@ -9,17 +9,20 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.core.async.StoredAsyncTask; import java.time.ZoneOffset; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; -public class EsqlQueryTask extends StoredAsyncTask { +public abstract class EsqlQueryTask extends StoredAsyncTask { private final String sessionId; private EsqlExecutionInfo executionInfo; + private final AtomicReference scheduledCancellation = new AtomicReference<>(); public EsqlQueryTask( String sessionId, @@ -69,4 +72,41 @@ public EsqlQueryResponse getCurrentResult() { executionInfo ); } + + @Override + public void onResponse(EsqlQueryResponse response) { + removeScheduledCancellation(); + super.onResponse(response); + } + + @Override + public void onFailure(Exception e) { + removeScheduledCancellation(); + super.onFailure(e); + } + + @Override + public void setExpirationTime(long expirationTime) { + super.setExpirationTime(expirationTime); + rescheduleCancellationOnExpiry(); + } + + private void removeScheduledCancellation() { + var prev = scheduledCancellation.getAndSet(null); + if (prev != null) { + prev.cancel(); + } + } + + /** + * Schedules task cancellation at the given expiration time + */ + protected abstract Scheduler.ScheduledCancellable scheduleCancellationOnExpiry(long expirationTimeMillis); + + public void rescheduleCancellationOnExpiry() { + var prev = scheduledCancellation.getAndSet(scheduleCancellationOnExpiry(getExpirationTimeMillis())); + if (prev != null) { + prev.cancel(); + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 15fb73daec89e..7397472d7dd46 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -24,17 +24,22 @@ import org.elasticsearch.common.logging.activity.ActivityLogWriterProvider; import org.elasticsearch.common.logging.activity.ActivityLogger; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.compute.data.BlockFactoryProvider; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.ActionLoggingFieldsProvider; import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.crossproject.CrossProjectModeDecider; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; @@ -84,6 +89,8 @@ public class TransportEsqlQueryAction extends HandledTransportAction { + private static final Logger logger = LogManager.getLogger(TransportEsqlQueryAction.class); + private final ThreadPool threadPool; private final PlanExecutor planExecutor; private final ComputeService computeService; @@ -265,6 +272,7 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener public void execute(EsqlQueryRequest request, EsqlQueryTask task, ActionListener listener) { // set EsqlExecutionInfo on async-search task so that it is accessible to GET _query/async while the query is still running task.setExecutionInfo(createEsqlExecutionInfo(request)); + task.rescheduleCancellationOnExpiry(); ActionListener.run(listener, l -> innerExecute(task, request, l)); } @@ -528,6 +536,23 @@ public EsqlQueryTask createTask( public Status getStatus() { return new EsqlQueryStatus(asyncExecutionId); } + + @Override + protected Scheduler.ScheduledCancellable scheduleCancellationOnExpiry(long expirationTimeMillis) { + final long delay = Math.max(expirationTimeMillis - threadPool.absoluteTimeInMillis(), 0); + final CancellableTask task = this; + return threadPool.schedule(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + logger.warn("failed to cancel async-query on expiry", e); + } + + @Override + protected void doRun() { + taskManager.cancelTaskAndDescendants(task, "keep_alive expired", false, ActionListener.noop()); + } + }, TimeValue.timeValueMillis(delay), threadPool.generic()); + } }; }