Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
487eed3
Add parsed statement support to EsqlQueryRequest
felixbarny Mar 17, 2026
6ee6ef3
Fix testRowWithParsedStatement failing in EsqlAsyncActionIT
felixbarny Mar 17, 2026
140efde
Collect PlanTelemetry from pre-built logical plans
felixbarny Mar 20, 2026
08a1ae8
Validate QuerySettings for pre-built parsed statements
felixbarny Mar 20, 2026
c01ea41
Merge remote-tracking branch 'origin/main' into promql-query-range-pa…
felixbarny Mar 20, 2026
0c63022
Move plan telemetry collection out of the parser and into a single po…
felixbarny Mar 21, 2026
d6d25cc
Introduce PreparedEsqlQueryRequest for pre-built ES|QL statements
felixbarny Mar 23, 2026
bf9db00
Merge branch 'main' into promql-query-range-parsed-statement
felixbarny Mar 23, 2026
43e9e7d
Fix benchmark compilation errors after PlanTelemetry was removed from…
felixbarny Mar 23, 2026
3abafe7
[CI] Auto commit changes from spotless
Mar 23, 2026
647c995
Address review comments
felixbarny Mar 23, 2026
7178423
Require callers to provide a query description rather than materialis…
felixbarny Mar 23, 2026
99cd9bb
Test Source deserialization with a null-query Configuration
felixbarny Mar 23, 2026
0c5cfeb
Fix telemetry collection for settings (fixes TelemetryIT)
felixbarny Mar 23, 2026
6cbadd3
Merge branch 'main' into promql-query-range-parsed-statement
felixbarny Mar 23, 2026
e2d5bc9
Fix maybeWrapAsPrepared to use real cluster settings when pre-parsing
felixbarny Mar 24, 2026
95a44de
Fix gatherPlanTelemetry counting STATS for INLINE STATS commands
felixbarny Mar 24, 2026
8bf9e81
Fix testExplainMultiNode flakiness when maybeWrapAsPrepared is used
felixbarny Mar 24, 2026
352ed86
Merge branch 'main' into promql-query-range-parsed-statement
felixbarny Mar 24, 2026
c1c96d8
Fix TelemetryIT: track SUBQUERY command in telemetry
felixbarny Mar 25, 2026
899b63b
Merge remote-tracking branch 'origin/main' into promql-query-range-pa…
felixbarny Mar 25, 2026
c5981fa
Merge remote-tracking branch 'felixbarny/promql-query-range-parsed-st…
felixbarny Mar 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.telemetry.Metrics;
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand Down Expand Up @@ -74,7 +73,6 @@ public class QueryPlanningBenchmark {
Utils.configureBenchmarkLogging();
}

private PlanTelemetry telemetry;
private Analyzer manyFieldsAnalyzer;
private LogicalPlanOptimizer defaultOptimizer;
private Configuration config;
Expand Down Expand Up @@ -118,7 +116,6 @@ public void setup() {
// Assume all nodes are on the current version for the benchmark.
TransportVersion minimumVersion = TransportVersion.current();

telemetry = new PlanTelemetry(functionRegistry);
manyFieldsAnalyzer = new Analyzer(
new AnalyzerContext(
config,
Expand All @@ -136,7 +133,7 @@ public void setup() {
}

private LogicalPlan plan(EsqlParser parser, Analyzer analyzer, LogicalPlanOptimizer optimizer, String query) {
var parsed = parser.parseQuery(query, new QueryParams(), telemetry, new InferenceSettings(Settings.EMPTY));
var parsed = parser.parseQuery(query, new QueryParams(), new InferenceSettings(Settings.EMPTY));
var analyzed = analyzer.analyze(parsed);
var optimized = optimizer.optimize(analyzed);
return optimized;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.telemetry.Metrics;
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
import org.elasticsearch.xpack.esql.view.ViewResolutionService;
import org.elasticsearch.xpack.esql.view.ViewResolver;
import org.openjdk.jmh.annotations.Benchmark;
Expand Down Expand Up @@ -139,7 +138,6 @@ public abstract class ViewResolutionBenchmarkBase {
private LogicalPlan preParsedPlan;
private String queryString;
private BiFunction<String, String, LogicalPlan> viewParser;
private PlanTelemetry telemetry;
private Analyzer analyzer;
private LogicalPlanOptimizer optimizer;
private ThreadPool threadPool;
Expand All @@ -156,10 +154,8 @@ public void setup() {
SettingsValidationContext validationCtx = new SettingsValidationContext(false, false);
TransportVersion minimumVersion = TransportVersion.current();

telemetry = new PlanTelemetry(functionRegistry);
parser = new EsqlParser(new EsqlConfig(functionRegistry));
viewParser = (query, viewName) -> parser.parseView(query, new QueryParams(), validationCtx, telemetry, inferenceSettings, viewName)
.plan();
viewParser = (query, viewName) -> parser.parseView(query, new QueryParams(), validationCtx, inferenceSettings, viewName).plan();

LinkedHashMap<String, EsField> mapping = new LinkedHashMap<>();
for (int i = 0; i < 5; i++) {
Expand Down Expand Up @@ -268,7 +264,7 @@ static ViewMetadata buildViewChain(int depth, boolean withFilter) {
}

private LogicalPlan parsePlan(String query) {
return parser.parseQuery(query, new QueryParams(), telemetry, new InferenceSettings(Settings.EMPTY));
return parser.parseQuery(query, new QueryParams(), new InferenceSettings(Settings.EMPTY));
}

private ViewResolver createResolver(boolean viewsEnabled, ViewMetadata viewMetadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.action.LegacyActionRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.query.QueryBuilder;

import java.io.IOException;
Expand All @@ -21,6 +22,7 @@ protected EsqlQueryRequest(StreamInput in) throws IOException {
super(in);
}

@Nullable
public abstract String query();

public abstract QueryBuilder filter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
import org.elasticsearch.xpack.esql.inference.InferenceSettings;
import org.elasticsearch.xpack.esql.parser.EsqlConfig;
import org.elasticsearch.xpack.esql.parser.EsqlParser;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.elasticsearch.xpack.esql.plugin.TransportEsqlQueryAction;
Expand Down Expand Up @@ -176,12 +180,26 @@ protected QueryPragmas getPragmas() {

public EsqlQueryResponse run(EsqlQueryRequest request) {
try {
return client().execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS);
return client().execute(EsqlQueryAction.INSTANCE, maybeWrapAsPrepared(request)).actionGet(30, TimeUnit.SECONDS);
} catch (ElasticsearchTimeoutException e) {
throw new AssertionError("timeout", e);
}
}

/**
* Randomly wraps the given request in a {@link PreparedEsqlQueryRequest} to exercise the
* pre-built-plan code path.
*/
private EsqlQueryRequest maybeWrapAsPrepared(EsqlQueryRequest request) {
if (request instanceof PreparedEsqlQueryRequest || randomBoolean()) {
return request;
}
var parser = new EsqlParser(new EsqlConfig(new EsqlFunctionRegistry()));
var inferenceSettings = new InferenceSettings(clusterService().state().metadata().settings());
var statement = parser.parse(request.query(), request.params(), inferenceSettings);
return PreparedEsqlQueryRequest.from(request, statement, "pre-built statement for testing");
}

protected static QueryPragmas randomPragmas() {
Settings.Builder settings = Settings.builder();
if (canUseQueryPragmas()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2214,6 +2214,8 @@ private String determinizePlanString(String plan) {
// Normalize source position references like @1:19 or @_:19
.replaceAll("@\\d+:\\d+", "@_:_")
.replaceAll("@_:\\d+", "@_:_")
// Normalize source text (may be absent when query is null, e.g. PreparedEsqlQueryRequest)
.replaceAll("(\"source\":\"?)[^@\"]*(@_:_)", "$1$2")
// Remove memory addresses
.replaceAll("@[0-9a-f]+", "@_")
// Remove UUIDs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {

@Override
public EsqlQueryResponse run(EsqlQueryRequest original) {
EsqlQueryRequest request = EsqlQueryRequest.asyncEsqlQueryRequest(original.query());
EsqlQueryRequest request;
if (original instanceof PreparedEsqlQueryRequest prepared) {
request = PreparedEsqlQueryRequest.async(prepared.statement(), "pre-built async statement for testing");
} else {
request = EsqlQueryRequest.asyncEsqlQueryRequest(original.query());
}
request.pragmas(original.pragmas());
request.profile(original.profile());
request.acceptedPragmaRisks(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
@Override
public EsqlQueryResponse run(EsqlQueryRequest request) {
// IndexResolver currently ignores failures from field-caps responses and can resolve to a smaller set of concrete indices.
boolean singleIndex = request.query().startsWith("from test |");
boolean singleIndex = request.queryDescription().startsWith("from test |");
if (singleIndex && randomIntBetween(0, 100) <= 20) {
return runQueryWithDisruption(request);
} else {
Expand All @@ -82,7 +82,7 @@ private EsqlQueryResponse runQueryWithDisruption(EsqlQueryRequest request) {
final ServiceDisruptionScheme disruptionScheme = addRandomDisruptionScheme();
logger.info("--> start disruption scheme [{}]", disruptionScheme);
disruptionScheme.startDisrupting();
logger.info("--> executing esql query with disruption {} ", request.query());
logger.info("--> executing esql query with disruption {} ", request.queryDescription());
if (randomBoolean()) {
request.allowPartialResults(randomBoolean());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
import org.junit.BeforeClass;

import static org.elasticsearch.common.logging.activity.QueryLogging.QUERY_FIELD_INDICES;
import static org.elasticsearch.common.logging.activity.QueryLogging.QUERY_FIELD_QUERY;
import static org.elasticsearch.common.logging.activity.QueryLogging.QUERY_FIELD_RESULT_COUNT;
import static org.elasticsearch.common.logging.activity.QueryLogging.QUERY_FIELD_SHARDS;
import static org.elasticsearch.test.ActivityLoggingUtils.assertMessageFailure;
import static org.elasticsearch.test.ActivityLoggingUtils.assertMessageSuccess;
import static org.elasticsearch.test.ActivityLoggingUtils.getMessageData;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.esql.action.EsqlQueryRequest.syncEsqlQueryRequest;
import static org.elasticsearch.xpack.esql.action.PreparedEsqlQueryRequest.PREPARED_QUERY_PREFIX;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -88,7 +90,11 @@ public void testLogging() throws Exception {
private void assertQuery(String query, long hits) {
try (var resp = run(query)) {
var message = getMessageData(appender.getLastEventAndReset());
assertMessageSuccess(message, EsqlLogContext.TYPE, query);
// When the request was randomly promoted to a PreparedEsqlQueryRequest the logged
// query will be the plan representation rather than the original query string.
var loggedQuery = message.get(QUERY_FIELD_QUERY);
var queryForAssert = loggedQuery != null && loggedQuery.startsWith(PREPARED_QUERY_PREFIX) ? PREPARED_QUERY_PREFIX : query;
assertMessageSuccess(message, EsqlLogContext.TYPE, queryForAssert);
assertThat(Integer.valueOf(message.get(QUERY_FIELD_SHARDS + "successful")), greaterThanOrEqualTo(1));
assertThat(Integer.valueOf(message.getOrDefault(QUERY_FIELD_SHARDS + "skipped", "0")), greaterThanOrEqualTo(0));
assertThat(message.getOrDefault(QUERY_FIELD_SHARDS + "failed", "0"), equalTo("0"));
Expand All @@ -108,7 +114,9 @@ private void assertQuery(String query, long hits) {
private void assertFailedQuery(String query, String expectedMessage, Class<? extends Throwable> expectedException) {
expectThrows(VerificationException.class, () -> run(query));
var message = getMessageData(appender.getLastEventAndReset());
assertMessageFailure(message, EsqlLogContext.TYPE, query, expectedException, expectedMessage);
var loggedQuery = message.get(QUERY_FIELD_QUERY);
var queryForAssert = loggedQuery != null && loggedQuery.startsWith(PREPARED_QUERY_PREFIX) ? PREPARED_QUERY_PREFIX : query;
assertMessageFailure(message, EsqlLogContext.TYPE, queryForAssert, expectedException, expectedMessage);
}

private int setupIndex(String name, String prefix) {
Expand Down Expand Up @@ -151,7 +159,11 @@ public void testLoggingPartialShardFailure() throws Exception {
var event = appender.getLastEventAndReset();
assertNotNull(event);
var message = getMessageData(event);
assertMessageSuccess(message, EsqlLogContext.TYPE, "FROM esql_partial_test | LIMIT 100");
var loggedQuery = message.get(QUERY_FIELD_QUERY);
var queryForAssert = loggedQuery != null && loggedQuery.startsWith(PREPARED_QUERY_PREFIX)
? PREPARED_QUERY_PREFIX
: "FROM esql_partial_test | LIMIT 100";
assertMessageSuccess(message, EsqlLogContext.TYPE, queryForAssert);
assertThat(Integer.valueOf(message.get(QUERY_FIELD_SHARDS + "successful")), greaterThanOrEqualTo(1));
assertThat(Integer.valueOf(message.getOrDefault(QUERY_FIELD_SHARDS + "skipped", "0")), equalTo(0));
assertThat(Integer.valueOf(message.get(QUERY_FIELD_SHARDS + "failed")), greaterThanOrEqualTo(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public static Iterable<Object[]> parameters() {
| WHERE id > 10
""",
EsqlCapabilities.Cap.SUBQUERY_IN_FROM_COMMAND.isEnabled()
? Map.of("FROM", 2, "UNIONALL", 1, "WHERE", 2)
? Map.of("FROM", 2, "UNIONALL", 1, "WHERE", 2, "SUBQUERY", 1)
: Collections.emptyMap(),
Collections.emptyMap(),
EsqlCapabilities.Cap.SUBQUERY_IN_FROM_COMMAND.isEnabled()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
Expand All @@ -24,7 +25,12 @@
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.esql.Column;
import org.elasticsearch.xpack.esql.approximation.ApproximationSettings;
import org.elasticsearch.xpack.esql.inference.InferenceSettings;
import org.elasticsearch.xpack.esql.parser.EsqlParser;
import org.elasticsearch.xpack.esql.parser.QueryParams;
import org.elasticsearch.xpack.esql.plan.EsqlStatement;
import org.elasticsearch.xpack.esql.plan.QuerySettings;
import org.elasticsearch.xpack.esql.plan.SettingsValidationContext;
import org.elasticsearch.xpack.esql.plugin.EsqlQueryStatus;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;

Expand Down Expand Up @@ -76,11 +82,40 @@ public static EsqlQueryRequest asyncEsqlQueryRequest(String query) {
return new EsqlQueryRequest(true, query);
}

private EsqlQueryRequest(boolean async, String query) {
EsqlQueryRequest(boolean async, String query) {
this.async = async;
this.query = query;
}

/**
* Copy constructor. Copies all fields from {@code source}. Subclasses that need to override
* specific fields (e.g. {@link PreparedEsqlQueryRequest} overrides {@code query}) should do
* so after calling this constructor. If a new field is added to this class, it must also be
* added here.
*/
EsqlQueryRequest(EsqlQueryRequest source) {
this.async = source.async;
this.query = source.query;
this.columnar = source.columnar;
this.profile = source.profile;
this.includeCCSMetadata = source.includeCCSMetadata;
this.includeExecutionMetadata = source.includeExecutionMetadata;
this.timeZone = source.timeZone;
this.locale = source.locale;
this.filter = source.filter;
this.pragmas = source.pragmas;
this.params = source.params;
this.waitForCompletionTimeout = source.waitForCompletionTimeout;
this.keepAlive = source.keepAlive;
this.keepOnCompletion = source.keepOnCompletion;
this.onSnapshotBuild = source.onSnapshotBuild;
this.acceptedPragmaRisks = source.acceptedPragmaRisks;
this.allowPartialResults = source.allowPartialResults;
this.projectRouting = source.projectRouting;
this.approximation = source.approximation;
this.tables.putAll(source.tables);
}

public EsqlQueryRequest() {}

public EsqlQueryRequest(StreamInput in) throws IOException {
Expand All @@ -89,11 +124,7 @@ public EsqlQueryRequest(StreamInput in) throws IOException {

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (Strings.hasText(query) == false) {
validationException = addValidationError("[" + RequestXContent.QUERY_FIELD + "] is required", validationException);
}

ActionRequestValidationException validationException = validateQuery();
if (onSnapshotBuild == false) {
if (pragmas.isEmpty() == false && acceptedPragmaRisks == false) {
validationException = addValidationError(
Expand All @@ -111,16 +142,43 @@ public ActionRequestValidationException validate() {
return validationException;
}

protected ActionRequestValidationException validateQuery() {
if (Strings.hasText(query) == false) {
return addValidationError("[" + RequestXContent.QUERY_FIELD + "] is required", null);
}
return null;
}

public EsqlQueryRequest query(String query) {
this.query = query;
return this;
}

@Override
@Nullable
public String query() {
return query;
}

/**
* Returns a non-null human-readable description of the query for logging, task descriptions, and error messages.
* For regular requests this is the same as {@link #query()}. Overridden by {@link PreparedEsqlQueryRequest}
* to return a display string when there is no query text.
*/
public String queryDescription() {
return query();
}

/**
* Parses the query string into an {@link EsqlStatement} and validates its settings.
* Overridden by {@link PreparedEsqlQueryRequest} to return a pre-built statement directly.
*/
public EsqlStatement parse(EsqlParser parser, SettingsValidationContext settingsValidationCtx, InferenceSettings inferenceSettings) {
EsqlStatement statement = parser.parse(query(), params(), inferenceSettings);
QuerySettings.validate(statement, settingsValidationCtx);
return statement;
}

public boolean async() {
return async;
}
Expand Down Expand Up @@ -280,7 +338,7 @@ public EsqlQueryRequest allowPartialResults(boolean allowPartialResults) {
@Override
public Task createTask(TaskId taskId, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
var status = new EsqlQueryStatus(new AsyncExecutionId(UUIDs.randomBase64UUID(), taskId), keepAlive);
return new EsqlQueryRequestTask(query, taskId.getId(), type, action, parentTaskId, headers, status);
return new EsqlQueryRequestTask(queryDescription(), taskId.getId(), type, action, parentTaskId, headers, status);
}

private static class EsqlQueryRequestTask extends CancellableTask {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public TimeValue stop() {
* To correctly time the execution of a request, a {@link EsqlResponseListener} must be constructed immediately before execution begins.
*/
public EsqlResponseListener(RestChannel channel, RestRequest restRequest, EsqlQueryRequest esqlRequest) {
this(channel, restRequest, esqlRequest.query(), EsqlMediaTypeParser.getResponseMediaType(restRequest, esqlRequest));
this(channel, restRequest, esqlRequest.queryDescription(), EsqlMediaTypeParser.getResponseMediaType(restRequest, esqlRequest));
}

/**
Expand Down
Loading
Loading