Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/143016.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
area: ES|QL
issues:
- 142662
pr: 143016
summary: Cancel async query on expiry
type: bug
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static org.elasticsearch.test.hamcrest.OptionalMatchers.isPresent;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
import static org.elasticsearch.xpack.esql.action.EsqlQueryRequest.asyncEsqlQueryRequest;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand Down Expand Up @@ -326,6 +327,51 @@ public void testUpdateKeepAlive() throws Exception {
}
}

public void testCancelOnExpiry() throws Exception {
TimeValue keepAlive = timeValueMillis(between(1000, 2000));
var request = asyncEsqlQueryRequest("from test | stats sum(pause_me)").pragmas(queryPragmas())
.waitForCompletionTimeout(TimeValue.timeValueMillis(between(1, 10)))
.keepOnCompletion(randomBoolean())
.keepAlive(keepAlive);
final String asyncId;
try {
try (EsqlQueryResponse initialResponse = client().execute(EsqlQueryAction.INSTANCE, request).actionGet(60, TimeUnit.SECONDS)) {
assertThat(initialResponse.isRunning(), is(true));
assertTrue(initialResponse.asyncExecutionId().isPresent());
asyncId = initialResponse.asyncExecutionId().get();
}
// all the started drivers were canceled
assertBusy(() -> {
List<TaskInfo> tasks = client().admin()
.cluster()
.prepareListTasks()
.setActions(DriverTaskRunner.ACTION_NAME)
.setDetailed(true)
.get()
.getTasks();
for (TaskInfo task : tasks) {
assertTrue(task.cancelled());
}
});
// the async task was canceled
assertBusy(() -> {
List<TaskInfo> queryTasks = getEsqlQueryTasks();
assertThat(queryTasks, hasSize(1));
assertTrue(queryTasks.get(0).cancelled());
});
} finally {
scriptPermits.release(numberOfDocs());
}
TaskCancelledException error = expectThrows(TaskCancelledException.class, () -> {
var getRequest = new GetAsyncResultRequest(asyncId).setWaitForCompletionTimeout(timeValueSeconds(10))
.setKeepAlive(timeValueSeconds(30));
try (var resp = client().execute(EsqlAsyncGetResultAction.INSTANCE, getRequest).actionGet()) {
assertThat(resp.isRunning(), is(false));
}
});
assertThat(error.getMessage(), containsString("keep_alive expired"));
}

private static long getExpirationFromTask(String asyncId) {
List<EsqlQueryTask> tasks = new ArrayList<>();
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,8 @@
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.Literal;
Expand Down Expand Up @@ -360,18 +358,7 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices,
true,
0
);
CancellableTask parentTask = new EsqlQueryTask(
"test-session",
1,
"test",
"test",
"test",
null,
Map.of(),
Map.of(),
new AsyncExecutionId("test", TaskId.EMPTY_TASK_ID),
TEST_REQUEST_TIMEOUT
);
CancellableTask parentTask = new CancellableTask(1, "test", "test", "test", null, Map.of());
final String finalNodeWithShard = nodeWithShard;
boolean expressionJoin = EsqlCapabilities.Cap.LOOKUP_JOIN_ON_BOOLEAN_EXPRESSION.isEnabled() ? randomBoolean() : false;
List<MatchConfig> matchFields = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,20 @@

import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.StoredAsyncTask;

import java.time.ZoneOffset;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

public class EsqlQueryTask extends StoredAsyncTask<EsqlQueryResponse> {
public abstract class EsqlQueryTask extends StoredAsyncTask<EsqlQueryResponse> {

private final String sessionId;
private EsqlExecutionInfo executionInfo;
private final AtomicReference<Scheduler.ScheduledCancellable> scheduledCancellation = new AtomicReference<>();

public EsqlQueryTask(
String sessionId,
Expand Down Expand Up @@ -69,4 +72,41 @@ public EsqlQueryResponse getCurrentResult() {
executionInfo
);
}

@Override
public void onResponse(EsqlQueryResponse response) {
removeScheduledCancellation();
super.onResponse(response);
}

@Override
public void onFailure(Exception e) {
removeScheduledCancellation();
super.onFailure(e);
}

@Override
public void setExpirationTime(long expirationTime) {
super.setExpirationTime(expirationTime);
rescheduleCancellationOnExpiry();
}

private void removeScheduledCancellation() {
var prev = scheduledCancellation.getAndSet(null);
if (prev != null) {
prev.cancel();
}
}

/**
* Schedules task cancellation at the given expiration time
*/
protected abstract Scheduler.ScheduledCancellable scheduleCancellationOnExpiry(long expirationTimeMillis);

public void rescheduleCancellationOnExpiry() {
var prev = scheduledCancellation.getAndSet(scheduleCancellationOnExpiry(getExpirationTimeMillis()));
if (prev != null) {
prev.cancel();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,22 @@
import org.elasticsearch.common.logging.activity.ActivityLogWriterProvider;
import org.elasticsearch.common.logging.activity.ActivityLogger;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.compute.data.BlockFactoryProvider;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.ActionLoggingFieldsProvider;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
Expand Down Expand Up @@ -84,6 +89,8 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
implements
AsyncTaskManagementService.AsyncOperation<EsqlQueryRequest, EsqlQueryResponse, EsqlQueryTask> {

private static final Logger logger = LogManager.getLogger(TransportEsqlQueryAction.class);

private final ThreadPool threadPool;
private final PlanExecutor planExecutor;
private final ComputeService computeService;
Expand Down Expand Up @@ -265,6 +272,7 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener
public void execute(EsqlQueryRequest request, EsqlQueryTask task, ActionListener<EsqlQueryResponse> listener) {
// set EsqlExecutionInfo on async-search task so that it is accessible to GET _query/async while the query is still running
task.setExecutionInfo(createEsqlExecutionInfo(request));
task.rescheduleCancellationOnExpiry();
ActionListener.run(listener, l -> innerExecute(task, request, l));
}

Expand Down Expand Up @@ -528,6 +536,23 @@ public EsqlQueryTask createTask(
public Status getStatus() {
return new EsqlQueryStatus(asyncExecutionId);
}

@Override
protected Scheduler.ScheduledCancellable scheduleCancellationOnExpiry(long expirationTimeMillis) {
final long delay = Math.max(expirationTimeMillis - threadPool.absoluteTimeInMillis(), 0);
final CancellableTask task = this;
return threadPool.schedule(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("failed to cancel async-query on expiry", e);
}

@Override
protected void doRun() {
taskManager.cancelTaskAndDescendants(task, "keep_alive expired", false, ActionListener.noop());
}
}, TimeValue.timeValueMillis(delay), threadPool.generic());
}
};
}

Expand Down