diff --git a/docs/changelog/144010.yaml b/docs/changelog/144010.yaml new file mode 100644 index 0000000000000..46d520a00fa49 --- /dev/null +++ b/docs/changelog/144010.yaml @@ -0,0 +1,5 @@ +area: Search +issues: [] +pr: 144010 +summary: Expose keep_alive in async task status +type: enhancement diff --git a/server/src/main/resources/transport/definitions/referable/async_task_keep_alive_status.csv b/server/src/main/resources/transport/definitions/referable/async_task_keep_alive_status.csv new file mode 100644 index 0000000000000..0de9fe271e5f2 --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/async_task_keep_alive_status.csv @@ -0,0 +1 @@ +9318000 diff --git a/server/src/main/resources/transport/upper_bounds/9.4.csv b/server/src/main/resources/transport/upper_bounds/9.4.csv index e84e43c0c6345..6855e43e420b1 100644 --- a/server/src/main/resources/transport/upper_bounds/9.4.csv +++ b/server/src/main/resources/transport/upper_bounds/9.4.csv @@ -1 +1 @@ -esql_async_source_bytes_buffered,9317000 +async_task_keep_alive_status,9318000 diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java index ecc76d4048014..2a31bae5b9606 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java @@ -25,10 +25,12 @@ import org.elasticsearch.search.aggregations.metrics.Min; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.query.ThrowingQueryBuilder; +import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.ESIntegTestCase.SuiteScopeTestCase; import org.elasticsearch.test.TransportVersionUtils; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.core.XPackPlugin; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse; import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest; @@ -417,6 +419,7 @@ public void testUpdateRunningKeepAlive() throws Exception { assertThat(response.getSearchResponse().getFailedShards(), equalTo(0)); assertThat(response.getExpirationTime(), greaterThan(now)); expirationTime = response.getExpirationTime(); + assertThat(getRunningAsyncSearchTask(responseId).toString(), containsString("\"keep_alive\" : \"5d\"")); } finally { response.decRef(); } @@ -445,6 +448,7 @@ public void testUpdateRunningKeepAlive() throws Exception { assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards)); assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0)); assertThat(response.getSearchResponse().getFailedShards(), equalTo(0)); + assertThat(getRunningAsyncSearchTask(response.getId()).toString(), containsString("\"keep_alive\" : \"10d\"")); AsyncStatusResponse statusResponse = getAsyncStatus(response.getId(), TimeValue.timeValueDays(10)); assertTrue(statusResponse.isRunning()); @@ -491,6 +495,22 @@ public void testUpdateRunningKeepAlive() throws Exception { } } + private TaskInfo getRunningAsyncSearchTask(String asyncSearchId) throws Exception { + var targetTaskId = AsyncExecutionId.decode(asyncSearchId).getTaskId(); + TaskInfo found = client().admin() + .cluster() + .prepareListTasks() + .setDetailed(true) + .get() + .getTasks() + .stream() + .filter(taskInfo -> taskInfo.taskId().equals(targetTaskId)) + .findAny() + .orElse(null); + assertNotNull(found); + return found; + } + public void testUpdateStoreKeepAlive() throws Exception { SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indexName); long now = System.currentTimeMillis(); diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java index b70757c481350..13744b3b9fda1 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java @@ -24,6 +24,8 @@ import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.Requests; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; @@ -32,16 +34,19 @@ import org.elasticsearch.search.aggregations.AggregationReduceContext; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.tasks.RawTaskStatus; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.Scheduler.Cancellable; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.core.async.AsyncTask; import org.elasticsearch.xpack.core.async.AsyncTaskIndexService; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -75,6 +80,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask, Releasable private final Map> completionListeners = new HashMap<>(); private volatile long expirationTimeMillis; + private volatile TimeValue keepAlive; private final AtomicBoolean isCancelling = new AtomicBoolean(false); private final MutableSearchResponse searchResponse; @@ -112,6 +118,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask, Releasable ) { super(id, type, action, () -> "async_search{" + descriptionSupplier.get() + "}", parentTaskId, taskHeaders); this.expirationTimeMillis = getStartTime() + keepAlive.getMillis(); + this.keepAlive = keepAlive; this.originHeaders = originHeaders; this.searchId = searchId; this.client = client; @@ -147,8 +154,27 @@ Listener getSearchProgressActionListener() { * Update the expiration time of the (partial) response. */ @Override - public void setExpirationTime(long expirationTime) { + public void setExpirationTime(long expirationTime, TimeValue keepAlive) { this.expirationTimeMillis = expirationTime; + this.keepAlive = keepAlive; + } + + @Override + public TimeValue getKeepAlive() { + return keepAlive; + } + + @Override + public Status getStatus() { + try (XContentBuilder builder = XContentBuilder.builder(Requests.INDEX_CONTENT_TYPE.xContent())) { + builder.startObject(); + builder.field("request_id", searchId.getEncoded()); + builder.field("keep_alive", keepAlive.getStringRep()); + builder.endObject(); + return new RawTaskStatus(BytesReference.bytes(builder)); + } catch (IOException e) { + throw new IllegalStateException("failed to build async search task status", e); + } } @Override diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncStatusAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncStatusAction.java index da12506d0e2c7..e06c53b3e42da 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncStatusAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncStatusAction.java @@ -88,7 +88,7 @@ protected void doExecute(Task task, GetAsyncStatusRequest request, ActionListene store.updateExpirationTime(searchId.getDocId(), expirationTime, ActionListener.wrap(p -> { AsyncSearchTask asyncSearchTask = getTask(taskManager, searchId, AsyncSearchTask.class); if (asyncSearchTask != null) { - asyncSearchTask.setExpirationTime(expirationTime); + asyncSearchTask.setExpirationTime(expirationTime, request.getKeepAlive()); } store.retrieveStatus( request, diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java index 16109df887cc2..63bf6aabea9bc 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java @@ -8,6 +8,7 @@ import org.apache.lucene.search.TotalHits; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequest; @@ -18,6 +19,7 @@ import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.shard.ShardId; @@ -30,13 +32,18 @@ import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.tasks.RawTaskStatus; +import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.TransportVersionUtils; import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xpack.core.async.AsyncExecutionId; +import org.elasticsearch.xpack.core.async.AsyncTask; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; import org.junit.After; import org.junit.Before; @@ -45,12 +52,17 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -126,6 +138,29 @@ public void testTaskDescription() { } } + public void testTaskStatusIncludesKeepAlive() { + try (AsyncSearchTask task = createAsyncSearchTask()) { + TaskInfo taskInfo = task.taskInfo("node1", true); + assertThat(taskInfo.status(), notNullValue()); + assertThat(taskInfo, hasToString(containsString("\"request_id\" : \"" + task.getExecutionId().getEncoded() + "\""))); + assertThat(taskInfo, hasToString(containsString("\"keep_alive\" : \"1h\""))); + } + } + + public void testTaskStatusSerializationToPreviousTransportVersionUsesRawTaskStatus() throws IOException { + try (AsyncSearchTask task = createAsyncSearchTask()) { + NamedWriteableRegistry oldRegistry = new NamedWriteableRegistry( + List.of(new NamedWriteableRegistry.Entry(Task.Status.class, RawTaskStatus.NAME, RawTaskStatus::new)) + ); + TaskInfo taskInfo = task.taskInfo("node1", true); + TransportVersion previousVersion = TransportVersionUtils.randomVersionNotSupporting(AsyncTask.ASYNC_TASK_KEEP_ALIVE_STATUS); + TaskInfo serialized = copyWriteable(taskInfo, oldRegistry, TaskInfo::from, previousVersion); + assertThat(serialized.status(), instanceOf(RawTaskStatus.class)); + Map statusMap = ((RawTaskStatus) serialized.status()).toMap(); + assertThat(statusMap, allOf(hasEntry("request_id", task.getExecutionId().getEncoded()), hasEntry("keep_alive", "1h"))); + } + } + public void testWaitForInit() throws InterruptedException { try ( AsyncSearchTask task = new AsyncSearchTask( diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java index a564eac33a103..6cf523cd8c732 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java @@ -115,7 +115,7 @@ private void getSearchResponseFromTask( } if (expirationTimeMillis != -1) { - task.setExpirationTime(expirationTimeMillis); + task.setExpirationTime(expirationTimeMillis, request.getKeepAlive()); } boolean added = addCompletionListener.apply( task, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTask.java index 87983c179f881..7b227ee9e38b5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTask.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTask.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.core.async; +import org.elasticsearch.TransportVersion; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.tasks.TaskManager; import java.util.Map; @@ -15,6 +17,11 @@ * A task that supports asynchronous execution and provides information necessary for safe temporary storage of results */ public interface AsyncTask { + /** + * Transport version that added {@code keep_alive} to async task status payloads. + */ + TransportVersion ASYNC_TASK_KEEP_ALIVE_STATUS = TransportVersion.fromName("async_task_keep_alive_status"); + /** * Returns all of the request contexts headers */ @@ -33,7 +40,12 @@ public interface AsyncTask { /** * Update the expiration time of the (partial) response. */ - void setExpirationTime(long expirationTimeMillis); + void setExpirationTime(long expirationTimeMillis, TimeValue keepAlive); + + /** + * Returns the currently effective keep-alive for this task. + */ + TimeValue getKeepAlive(); /** * Performs necessary checks, cancels the task and calls the runnable upon completion diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/StoredAsyncTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/StoredAsyncTask.java index 69be247ca3608..b9a8eab8f6c16 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/StoredAsyncTask.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/StoredAsyncTask.java @@ -24,6 +24,7 @@ public abstract class StoredAsyncTask extends C private final AsyncExecutionId asyncExecutionId; private final Map originHeaders; private volatile long expirationTimeMillis; + private volatile TimeValue keepAlive; protected final List> completionListeners; private boolean hasCompleted = false; @@ -43,6 +44,7 @@ public StoredAsyncTask( this.asyncExecutionId = asyncExecutionId; this.originHeaders = originHeaders; this.expirationTimeMillis = getStartTime() + keepAlive.getMillis(); + this.keepAlive = keepAlive; this.completionListeners = new ArrayList<>(); } @@ -60,14 +62,20 @@ public AsyncExecutionId getExecutionId() { * Update the expiration time of the (partial) response. */ @Override - public void setExpirationTime(long expirationTime) { + public void setExpirationTime(long expirationTime, TimeValue keepAlive) { this.expirationTimeMillis = expirationTime; + this.keepAlive = keepAlive; } public long getExpirationTimeMillis() { return expirationTimeMillis; } + @Override + public TimeValue getKeepAlive() { + return keepAlive; + } + public synchronized boolean addCompletionListener(Supplier> listenerSupplier) { if (hasCompleted) { return false; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java index 6b0e4ef979a9f..4b845d553fa9a 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java @@ -53,6 +53,7 @@ public static class TestTask extends CancellableTask implements AsyncTask { private final AsyncExecutionId executionId; private final Map, TimeValue> listeners = new HashMap<>(); private long expirationTimeMillis; + private TimeValue keepAlive = TimeValue.ZERO; public TestTask( AsyncExecutionId executionId, @@ -83,8 +84,14 @@ public AsyncExecutionId getExecutionId() { } @Override - public void setExpirationTime(long expirationTime) { + public void setExpirationTime(long expirationTime, TimeValue keepAlive) { this.expirationTimeMillis = expirationTime; + this.keepAlive = keepAlive; + } + + @Override + public TimeValue getKeepAlive() { + return keepAlive; } @Override @@ -192,7 +199,7 @@ public void testRetrieveFromMemoryWithExpiration() throws Exception { try { boolean shouldExpire = randomBoolean(); long expirationTime = System.currentTimeMillis() + randomLongBetween(100000, 1000000) * (shouldExpire ? -1 : 1); - task.setExpirationTime(expirationTime); + task.setExpirationTime(expirationTime, TimeValue.timeValueMillis(Math.max(expirationTime - System.currentTimeMillis(), 0))); if (updateInitialResultsInStore) { // we need to store initial result @@ -240,7 +247,7 @@ public void testAssertExpirationPropagation() throws Exception { TestTask task = (TestTask) taskManager.register("test", "test", request); try { long startTime = System.currentTimeMillis(); - task.setExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis()); + task.setExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis(), TimeValue.timeValueMinutes(1)); boolean taskCompleted = randomBoolean(); if (taskCompleted) { taskManager.unregister(task); @@ -286,7 +293,7 @@ public void testRetrieveFromDisk() throws Exception { TestTask task = (TestTask) taskManager.register("test", "test", request); try { long startTime = System.currentTimeMillis(); - task.setExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis()); + task.setExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis(), TimeValue.timeValueMinutes(1)); if (updateInitialResultsInStore) { // we need to store initial result @@ -355,7 +362,7 @@ public void testFailWithIncompatibleResults() throws Exception { TestTask task = (TestTask) taskManager.register("test", "test", request); try { long startTime = System.currentTimeMillis(); - task.setExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis()); + task.setExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis(), TimeValue.timeValueMinutes(1)); // we need to store initial result PlainActionFuture futureCreate = new PlainActionFuture<>(); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java index 9ffdefd65adb5..662c280e2a6fe 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction; import org.elasticsearch.xpack.esql.core.async.AsyncTaskManagementService; +import org.elasticsearch.xpack.esql.plugin.EsqlQueryStatus; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import org.hamcrest.core.IsEqual; @@ -284,6 +285,7 @@ public void testUpdateKeepAlive() throws Exception { } currentExpiration = getExpirationFromTask(asyncId); assertThat(currentExpiration, greaterThanOrEqualTo(nowInMillis + keepAlive.getMillis())); + assertThat(getTaskKeepAlive(asyncId), equalTo(keepAlive.getStringRep())); // update the expiration while the task is still running int iters = iterations(1, 5); for (int i = 0; i < iters; i++) { @@ -298,6 +300,7 @@ public void testUpdateKeepAlive() throws Exception { long updatedExpiration = getExpirationFromTask(asyncId); assertThat(updatedExpiration, greaterThanOrEqualTo(currentExpiration + extraKeepAlive)); assertThat(updatedExpiration, greaterThanOrEqualTo(nowInMillis + keepAlive.getMillis())); + assertThat(getTaskKeepAlive(asyncId), equalTo(keepAlive.getStringRep())); currentExpiration = updatedExpiration; } } finally { @@ -388,6 +391,14 @@ private static long getExpirationFromTask(String asyncId) { return tasks.getFirst().getExpirationTimeMillis(); } + private String getTaskKeepAlive(String asyncId) throws Exception { + List tasks = getEsqlQueryTasks(); + assertThat(tasks, hasSize(1)); + EsqlQueryStatus status = (EsqlQueryStatus) tasks.getFirst().status(); + assertThat(status.id().getEncoded(), equalTo(asyncId)); + return status.keepAlive().getStringRep(); + } + private static long getExpirationFromDoc(String asyncId) { String docId = AsyncExecutionId.decode(asyncId).getDocId(); GetResponse doc = client().prepareGet().setIndex(XPackPlugin.ASYNC_RESULTS_INDEX).setId(docId).get(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java index ae5a9aa92a592..eaee8bee2fa9a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java @@ -277,7 +277,7 @@ public EsqlQueryRequest allowPartialResults(boolean allowPartialResults) { @Override public Task createTask(TaskId taskId, String type, String action, TaskId parentTaskId, Map headers) { - var status = new EsqlQueryStatus(new AsyncExecutionId(UUIDs.randomBase64UUID(), taskId)); + var status = new EsqlQueryStatus(new AsyncExecutionId(UUIDs.randomBase64UUID(), taskId), keepAlive); return new EsqlQueryRequestTask(query, taskId.getId(), type, action, parentTaskId, headers, status); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java index b7a26b8248ef2..63d36c7603d02 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java @@ -85,12 +85,6 @@ public void onFailure(Exception e) { super.onFailure(e); } - @Override - public void setExpirationTime(long expirationTime) { - super.setExpirationTime(expirationTime); - rescheduleCancellationOnExpiry(); - } - private void removeScheduledCancellation() { var prev = scheduledCancellation.getAndSet(null); if (prev != null) { @@ -109,4 +103,10 @@ public void rescheduleCancellationOnExpiry() { prev.cancel(); } } + + @Override + public void setExpirationTime(long expirationTime, TimeValue keepAlive) { + super.setExpirationTime(expirationTime, keepAlive); + rescheduleCancellationOnExpiry(); + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlQueryStatus.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlQueryStatus.java index bb45b1bf41945..5c61fe5396eeb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlQueryStatus.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlQueryStatus.java @@ -10,13 +10,15 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.tasks.Task; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xpack.core.async.AsyncExecutionId; +import org.elasticsearch.xpack.core.async.AsyncTask; import java.io.IOException; -public record EsqlQueryStatus(AsyncExecutionId id) implements Task.Status { +public record EsqlQueryStatus(AsyncExecutionId id, TimeValue keepAlive) implements Task.Status { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( Task.Status.class, "EsqlDocIdStatus", @@ -29,16 +31,26 @@ public String getWriteableName() { } private EsqlQueryStatus(StreamInput stream) throws IOException { - this(AsyncExecutionId.readFrom(stream)); + this( + AsyncExecutionId.readFrom(stream), + stream.getTransportVersion().supports(AsyncTask.ASYNC_TASK_KEEP_ALIVE_STATUS) ? stream.readOptionalTimeValue() : null + ); } @Override public void writeTo(StreamOutput out) throws IOException { id.writeTo(out); + if (out.getTransportVersion().supports(AsyncTask.ASYNC_TASK_KEEP_ALIVE_STATUS)) { + out.writeOptionalTimeValue(keepAlive); + } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return builder.startObject().field("request_id", id.getEncoded()).endObject(); + builder.startObject().field("request_id", id.getEncoded()); + if (keepAlive != null) { + builder.field("keep_alive", keepAlive.getStringRep()); + } + return builder.endObject(); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 75bd250df82df..5f9bc41964fa4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -538,7 +538,7 @@ public EsqlQueryTask createTask( ) { @Override public Status getStatus() { - return new EsqlQueryStatus(asyncExecutionId); + return new EsqlQueryStatus(asyncExecutionId, getKeepAlive()); } @Override diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequestTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequestTests.java index 76600b95a92db..fe8d91fa08997 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequestTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequestTests.java @@ -8,8 +8,10 @@ package org.elasticsearch.xpack.esql.action; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.TransportVersion; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.NoopCircuitBreaker; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.compute.data.BlockFactory; @@ -28,11 +30,14 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.TransportVersionUtils; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.XContentParseException; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentParserConfiguration; import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; +import org.elasticsearch.xpack.core.async.AsyncTask; import org.elasticsearch.xpack.esql.Column; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; @@ -67,6 +72,7 @@ import static org.hamcrest.Matchers.is; public class EsqlQueryRequestTests extends ESTestCase { + private final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(List.of(EsqlQueryStatus.ENTRY)); public void testParseFields() throws IOException { String query = randomAlphaOfLengthBetween(1, 100); @@ -835,13 +841,15 @@ private void assertTablesOnlyValidOnSnapshot(EsqlQueryRequest request) { public void testTask() throws IOException { String query = randomAlphaOfLength(10); int id = randomInt(); + TimeValue keepAlive = TimeValue.timeValueDays(2); String requestJson = """ { - "query": "QUERY" - }""".replace("QUERY", query); + "query": "QUERY", + "keep_alive": "KEEP_ALIVE" + }""".replace("QUERY", query).replace("KEEP_ALIVE", keepAlive.getStringRep()); - EsqlQueryRequest request = parseEsqlQueryRequestSync(requestJson); + EsqlQueryRequest request = parseEsqlQueryRequestAsync(requestJson); String localNode = randomAlphaOfLength(2); Task task = request.createTask(new TaskId(localNode, id), "transport", EsqlQueryAction.NAME, TaskId.EMPTY_TASK_ID, Map.of()); assertThat(task.getDescription(), equalTo(query)); @@ -856,7 +864,8 @@ public void testTask() throws IOException { "type" : "transport", "action" : "indices:data/read/esql", "status" : { - "request_id" : "%s" + "request_id" : "%s", + "keep_alive" : "%s" }, "description" : "%s", "start_time" : "%s", @@ -871,6 +880,7 @@ public void testTask() throws IOException { localNode, id, ((EsqlQueryStatus) taskInfo.status()).id().getEncoded(), + keepAlive, query, DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(taskInfo.startTime()), taskInfo.startTime(), @@ -880,6 +890,45 @@ public void testTask() throws IOException { assertThat(json, equalTo(expected)); } + public void testTaskStatusSerializationToPreviousTransportVersionOmitsKeepAlive() throws IOException { + EsqlQueryStatus status = new EsqlQueryStatus( + new AsyncExecutionId(randomAlphaOfLength(10), new TaskId(randomAlphaOfLength(4), randomNonNegativeLong())), + TimeValue.timeValueDays(2) + ); + TransportVersion previousVersion = TransportVersionUtils.randomVersionNotSupporting(AsyncTask.ASYNC_TASK_KEEP_ALIVE_STATUS); + EsqlQueryStatus serialized = copyWriteable( + status, + namedWriteableRegistry, + in -> (EsqlQueryStatus) EsqlQueryStatus.ENTRY.reader.read(in), + previousVersion + ); + assertThat(serialized.id(), equalTo(status.id())); + assertNull(serialized.keepAlive()); + } + + public void testTaskStatusWithNullKeepAliveCanBeSerializedBackToCurrentTransportVersion() throws IOException { + EsqlQueryStatus status = new EsqlQueryStatus( + new AsyncExecutionId(randomAlphaOfLength(10), new TaskId(randomAlphaOfLength(4), randomNonNegativeLong())), + TimeValue.timeValueDays(2) + ); + TransportVersion previousVersion = TransportVersionUtils.randomVersionNotSupporting(AsyncTask.ASYNC_TASK_KEEP_ALIVE_STATUS); + EsqlQueryStatus withoutKeepAlive = copyWriteable( + status, + namedWriteableRegistry, + in -> (EsqlQueryStatus) EsqlQueryStatus.ENTRY.reader.read(in), + previousVersion + ); + + EsqlQueryStatus serializedBack = copyWriteable( + withoutKeepAlive, + namedWriteableRegistry, + in -> (EsqlQueryStatus) EsqlQueryStatus.ENTRY.reader.read(in), + TransportVersion.current() + ); + assertThat(serializedBack.id(), equalTo(status.id())); + assertNull(serializedBack.keepAlive()); + } + public void testProjectRouting() throws IOException { String json = """ {