From a1fa1cba6799ab729b8eea0f76c2fbd88d84f5fe Mon Sep 17 00:00:00 2001 From: Jan Kuipers Date: Wed, 8 Apr 2026 11:13:40 +0200 Subject: [PATCH 1/9] assert metadata in CsvIT --- .../java/org/elasticsearch/xpack/esql/CsvIT.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/CsvIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/CsvIT.java index a1d9650b67e96..a368794052d6a 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/CsvIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/CsvIT.java @@ -100,6 +100,7 @@ import java.util.stream.Stream; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.esql.CsvAssert.assertMetadata; import static org.elasticsearch.xpack.esql.CsvSpecReader.specParser; import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled; import static org.elasticsearch.xpack.esql.CsvTestUtils.loadCsvSpecValues; @@ -254,6 +255,7 @@ public final void test() throws Throwable { Map.of() ); + CsvAssert.assertMetadata(expected, actual.columnNames(), actual.columnTypes(), logger); CsvAssert.assertDataWithValueConverter( expected, actual.values(), From 4c6194a823b0119aeff2e45d2045d67ff11a6bc4 Mon Sep 17 00:00:00 2001 From: Jan Kuipers Date: Wed, 8 Apr 2026 11:16:06 +0200 Subject: [PATCH 2/9] approximate lookup join --- .../src/main/resources/approximation.csv-spec | 38 +++++++++++++++++++ .../esql/approximation/Approximation.java | 2 + .../ReplaceSampledStatsBySampleAndStats.java | 13 ++++++- .../ApproximationSupportTests.java | 3 +- 4 files changed, 53 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/approximation.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/approximation.csv-spec index 205a499e3a2be..51c148881c97a 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/approximation.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/approximation.csv-spec @@ -614,6 +614,44 @@ c:long | b:date | _approximation_confidence_interval(c):long | ; +Lookup join before stats +required_capability: approximation_v6 +request_stored: skip + +SET approximation={"rows":10000}\; +FROM many_numbers +| EVAL language_code = sv % 4 + 1 +| LOOKUP JOIN languages_lookup ON language_code +| EVAL length = LENGTH(language_name) +| STATS AVG(length) +; + +AVG(length):double | _approximation_confidence_interval(AVG(length)):double | _approximation_certified(AVG(length)):boolean +6.4..6.6 | [6.4..6.6,6.4..6.6] | {any} +; + + +Lookup join after stats by +required_capability: approximation_v6 +request_stored: skip + +SET approximation={"rows":10000}\; +FROM many_numbers + | EVAL language_code = sv % 4 + 1 + | STATS count=COUNT() BY language_code + | LOOKUP JOIN languages_lookup ON language_code + | SORT language_name + | LIMIT 5 +; + +count:long | language_code:integer | language_name:keyword | _approximation_confidence_interval(count):long | _approximation_certified(count):boolean +40000..80000 | 1 | English | [40000..80000,40000..80000] | {any} +40000..80000 | 2 | French | [40000..80000,40000..80000] | {any} +40000..80000 | 4 | German | [40000..80000,40000..80000] | {any} +40000..80000 | 3 | Spanish | [40000..80000,40000..80000] | {any} +; + + Warn on no stats required_capability: approximation_v6 request_stored: skip diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximation/Approximation.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximation/Approximation.java index a74a3a26e7163..145cf64b45834 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximation/Approximation.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximation/Approximation.java @@ -64,6 +64,7 @@ import org.elasticsearch.xpack.esql.plan.logical.UserAgent; import org.elasticsearch.xpack.esql.plan.logical.inference.Completion; import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank; +import org.elasticsearch.xpack.esql.plan.logical.join.Join; import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier; import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; import org.elasticsearch.xpack.esql.session.Result; @@ -147,6 +148,7 @@ public record QueryProperties(boolean hasGrouping, boolean canDecreaseRowCount, Grok.class, Insist.class, LocalRelation.class, + Join.class, MvExpand.class, OrderBy.class, Project.class, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStats.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStats.java index 3f709b252c7fd..dd71eadf20389 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStats.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStats.java @@ -16,6 +16,7 @@ import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.util.Holder; import org.elasticsearch.xpack.esql.expression.Foldables; import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction; import org.elasticsearch.xpack.esql.expression.function.aggregate.CountApproximate; @@ -59,7 +60,17 @@ protected PhysicalPlan rule(SampledAggregateExec plan) { double sampleProbability = (double) Foldables.literalValueOf(plan.sampleProbability()); assert sampleProbability < 1.0; - PhysicalPlan child = plan.child().transformUp(LeafExec.class, leaf -> new SampleExec(Source.EMPTY, leaf, plan.sampleProbability())); + // The only non-unary plans that are currently supported are Joins. + // For these, only the first index needs to be sampled. + Holder sampledAdded = new Holder<>(false); + PhysicalPlan child = plan.child().transformDown(p -> { + if (p instanceof LeafExec && sampledAdded.get() == false) { + sampledAdded.set(true); + return new SampleExec(Source.EMPTY, p, plan.sampleProbability()); + } else { + return p; + } + }); List sampleCorrections = new ArrayList<>(); List intermediateAttributes = new ArrayList<>(); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationSupportTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationSupportTests.java index 420b3397a4b3c..7b19aeea57f5e 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationSupportTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationSupportTests.java @@ -146,9 +146,7 @@ public class ApproximationSupportTests extends ESTestCase { Fork.class, UnionAll.class, ViewUnionAll.class, - Join.class, InlineJoin.class, - LookupJoin.class, ParameterizedQuery.class, // InlineStats is not supported yet. @@ -180,6 +178,7 @@ public class ApproximationSupportTests extends ESTestCase { StubRelation.class, Drop.class, Keep.class, + LookupJoin.class, Rename.class, ResolvingProject.class, SparklineGenerateEmptyBuckets.class, From 9fb202303014c222a0c432b307751184ba5503b0 Mon Sep 17 00:00:00 2001 From: Jan Kuipers Date: Wed, 8 Apr 2026 11:55:34 +0200 Subject: [PATCH 3/9] approximate inline stats --- .../src/main/resources/approximation.csv-spec | 62 ++++++++++++++ .../org/elasticsearch/xpack/esql/CsvIT.java | 1 - .../esql/approximation/Approximation.java | 37 ++++---- .../xpack/esql/session/EsqlSession.java | 66 +++++++++----- .../ApproximationSupportTests.java | 10 +-- .../approximation/ApproximationTestCase.java | 33 ++----- .../approximation/ApproximationTests.java | 85 ++++++++++--------- 7 files changed, 178 insertions(+), 116 deletions(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/approximation.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/approximation.csv-spec index 51c148881c97a..16a136691389e 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/approximation.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/approximation.csv-spec @@ -652,6 +652,68 @@ count:long | language_code:integer | language_name:keyword | _approximation_co ; +Inline stats +required_capability: approximation_v6 +request_stored: skip + +SET approximation={"rows":10000}\; +FROM many_numbers +| KEEP sv +| INLINE STATS AVG(sv) +| SORT sv +| LIMIT 5 +; + +sv:integer | AVG(sv):double | _approximation_confidence_interval(AVG(sv)):double | _approximation_certified(AVG(sv)):boolean +1 | 440..490 | [440..490,440..490] | {any} +2 | 440..490 | [440..490,440..490] | {any} +2 | 440..490 | [440..490,440..490] | {any} +3 | 440..490 | [440..490,440..490] | {any} +3 | 440..490 | [440..490,440..490] | {any} +; + + +Inline stats and where +required_capability: approximation_v6 +request_stored: skip + +SET approximation={"rows":10000}\; +FROM many_numbers +| KEEP sv +| WHERE sv >= 400 +| INLINE STATS SUM(sv) +| SORT sv +| LIMIT 401 +| SORT sv DESC +| LIMIT 3 +; + +sv:integer | SUM(sv):long | _approximation_confidence_interval(SUM(sv)):long | _approximation_certified(SUM(sv)):boolean +401 | 80000000..110000000 | [80000000..110000000,80000000..110000000] | {any} +400 | 80000000..110000000 | [80000000..110000000,80000000..110000000] | {any} +400 | 80000000..110000000 | [80000000..110000000,80000000..110000000] | {any} +; + + +Inline stats by +required_capability: approximation_v6 +request_stored: skip + +SET approximation={"rows":10000}\; +FROM many_numbers +| KEEP sv +| INLINE STATS COUNT(), AVG(sv) BY sv +| SORT sv DESC +| LIMIT 3 +; + +COUNT():long | AVG(sv):double | sv:integer |_approximation_confidence_interval(COUNT()):long | _approximation_certified(COUNT()):boolean | _approximation_confidence_interval(AVG(sv)):double | _approximation_certified(AVG(sv)):boolean +100..2000 | 699.999999..700.000001 | 700 | [100..2000,100..2000] | {any} | [699.999999..700.000001,699.999999..700.000001] | {any} +100..2000 | 699.999999..700.000001 | 700 | [100..2000,100..2000] | {any} | [699.999999..700.000001,699.999999..700.000001] | {any} +100..2000 | 699.999999..700.000001 | 700 | [100..2000,100..2000] | {any} | [699.999999..700.000001,699.999999..700.000001] | {any} +; + + Warn on no stats required_capability: approximation_v6 request_stored: skip diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/CsvIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/CsvIT.java index a368794052d6a..9923ae109504b 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/CsvIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/CsvIT.java @@ -100,7 +100,6 @@ import java.util.stream.Stream; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.xpack.esql.CsvAssert.assertMetadata; import static org.elasticsearch.xpack.esql.CsvSpecReader.specParser; import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled; import static org.elasticsearch.xpack.esql.CsvTestUtils.loadCsvSpecValues; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximation/Approximation.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximation/Approximation.java index 145cf64b45834..c29d1912354cd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximation/Approximation.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximation/Approximation.java @@ -64,7 +64,9 @@ import org.elasticsearch.xpack.esql.plan.logical.UserAgent; import org.elasticsearch.xpack.esql.plan.logical.inference.Completion; import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank; +import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin; import org.elasticsearch.xpack.esql.plan.logical.join.Join; +import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation; import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier; import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; import org.elasticsearch.xpack.esql.session.Result; @@ -146,6 +148,7 @@ public record QueryProperties(boolean hasGrouping, boolean canDecreaseRowCount, Eval.class, Filter.class, Grok.class, + InlineJoin.class, Insist.class, LocalRelation.class, Join.class, @@ -158,6 +161,7 @@ public record QueryProperties(boolean hasGrouping, boolean canDecreaseRowCount, Row.class, Sample.class, SampledAggregate.class, + StubRelation.class, UriParts.class, UserAgent.class ); @@ -316,14 +320,7 @@ public record QueryProperties(boolean hasGrouping, boolean canDecreaseRowCount, private int subPlanIterationCount; private final SetOnce sourceRowCount; - /** - * Creates an Approximation object for a logical plan if it's an approximation plan, and returns null otherwise. - */ - public static Approximation create(LogicalPlan logicalPlan, ApproximationSettings approximationSettings) { - return ApproximationPlan.is(logicalPlan) ? new Approximation(logicalPlan, approximationSettings) : null; - } - - Approximation(LogicalPlan logicalPlan, ApproximationSettings settings) { + public Approximation(LogicalPlan logicalPlan, ApproximationSettings settings) { this.queryProperties = verifyPlanOrThrow(logicalPlan); // The plan is executed multiple times. Use CopyingLocalSupplier to // make sure the page is not released between executions. @@ -463,9 +460,11 @@ public LogicalPlan firstSubPlan() { } /** - * Returns the new main plan to execute for approximation after executing a subplan, based on the result of the subplan. + * Processes the subplan results. + * Returns the sample probability suitable for approximation if possible, + * or null if more subplans need to be executed to obtain it. */ - public LogicalPlan newMainPlan(Result result) { + public Double processResult(Result result) { if (sourceRowCount.get() == null) { return processSourceCount(rowCount(result)); } else { @@ -497,13 +496,13 @@ private LogicalPlan sourceCountSubPlan() { * need to the executed, based on the total number of rows in the source * index and the query properties. */ - private LogicalPlan processSourceCount(long sourceRowCount) { + private Double processSourceCount(long sourceRowCount) { logger.debug("total number of source rows: [{}] rows", sourceRowCount); this.sourceRowCount.set(sourceRowCount); if (sourceRowCount == 0) { // If there are no rows, run the original query. nextSubPlanSampleProbability = null; - return ApproximationPlan.substituteSampleProbability(logicalPlan, 1.0); + return 1.0; } double sampleProbability = Math.min(1.0, (double) sampleRowCount / sourceRowCount); if (queryProperties.canIncreaseRowCount == false && sampleProbability >= sampleProbabilityThreshold) { @@ -511,15 +510,15 @@ private LogicalPlan processSourceCount(long sourceRowCount) { // we can directly run the original query without sampling. logger.debug("using original plan (too few rows)"); nextSubPlanSampleProbability = null; - return ApproximationPlan.substituteSampleProbability(logicalPlan, 1.0); + return 1.0; } else if (queryProperties.canIncreaseRowCount == false && queryProperties.canDecreaseRowCount == false) { // If the query preserves all rows, we can directly approximate with the sample probability. nextSubPlanSampleProbability = null; - return ApproximationPlan.substituteSampleProbability(logicalPlan, sampleProbability); + return sampleProbability; } else { // Otherwise, we need to sample the number of rows first to obtain a good sample probability. nextSubPlanSampleProbability = Math.min(1.0, (double) ROW_COUNT_FOR_COUNT_ESTIMATION / sourceRowCount); - return logicalPlan; + return null; } } @@ -612,7 +611,7 @@ private LogicalPlan countSubPlan(double sampleProbability) { * To be safe, the maximum iteration count is capped at 10, and an exception is thrown * when this count is exceeded. */ - private LogicalPlan processCount(long rowCount) { + private Double processCount(long rowCount) { subPlanIterationCount += 1; if (subPlanIterationCount > 10) { throw new IllegalStateException("Approximation count iteration limit exceeded"); @@ -627,15 +626,15 @@ private LogicalPlan processCount(long rowCount) { // If the new sample probability is large, run the original query. logger.debug("using original plan (too few rows)"); nextSubPlanSampleProbability = null; - return ApproximationPlan.substituteSampleProbability(logicalPlan, 1.0); + return 1.0; } else if (rowCount <= ROW_COUNT_FOR_COUNT_ESTIMATION / 2) { // Not enough rows are sampled yet; increase the sample probability and try again. nextSubPlanSampleProbability = Math.min(1.0, sampleProbability * ROW_COUNT_FOR_COUNT_ESTIMATION / Math.max(1, rowCount)); - return logicalPlan; + return null; } else { // A good sample probability is found; run the approximation plan. nextSubPlanSampleProbability = null; - return ApproximationPlan.substituteSampleProbability(logicalPlan, newSampleProbability); + return newSampleProbability; } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 6aee5bf9c7c59..6ce8c6f836f3d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -53,6 +53,7 @@ import org.elasticsearch.xpack.esql.analysis.UnmappedResolution; import org.elasticsearch.xpack.esql.analysis.Verifier; import org.elasticsearch.xpack.esql.approximation.Approximation; +import org.elasticsearch.xpack.esql.approximation.ApproximationPlan; import org.elasticsearch.xpack.esql.approximation.ApproximationSettings; import org.elasticsearch.xpack.esql.capabilities.TelemetryAware; import org.elasticsearch.xpack.esql.core.expression.Attribute; @@ -61,6 +62,7 @@ import org.elasticsearch.xpack.esql.core.expression.function.Function; import org.elasticsearch.xpack.esql.core.querydsl.QueryDslTimestampBoundsExtractor; import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.util.Holder; import org.elasticsearch.xpack.esql.datasources.ExternalSourceResolution; import org.elasticsearch.xpack.esql.datasources.ExternalSourceResolver; import org.elasticsearch.xpack.esql.datasources.PartitionFilterHintExtractor; @@ -369,7 +371,7 @@ public void onResponse(Versioned analyzedPlan) { p, finalConfiguration, foldContext, - Approximation.create(p, configuration.approximationSettings()), + new Holder<>(), minimumVersion, planTimeProfile, l @@ -412,7 +414,7 @@ private void executeOptimizedPlan( LogicalPlan optimizedPlan, Configuration configuration, FoldContext foldContext, - Approximation approximation, + Holder approximation, TransportVersion minimumVersion, PlanTimeProfile planTimeProfile, ActionListener listener @@ -549,7 +551,7 @@ private void executeSubPlans( LogicalPlan optimizedPlan, Configuration configuration, FoldContext foldContext, - Approximation approximation, + Holder approximation, PlanRunner runner, EsqlExecutionInfo executionInfo, EsqlQueryRequest request, @@ -558,7 +560,7 @@ private void executeSubPlans( ActionListener listener ) { var subPlansResults = new HashSet(); - var subPlan = firstSubPlan(optimizedPlan, approximation, subPlansResults); + var subPlan = firstSubPlan(optimizedPlan, configuration, approximation, subPlansResults); // TODO: merge into one method if (subPlan != null) { @@ -598,26 +600,46 @@ private record SubPlanAndCallback( Runnable cleanup ) {}; - private SubPlanAndCallback firstSubPlan(LogicalPlan optimizedPlan, Approximation approximation, Set subPlansResults) { - if (approximation != null) { - LogicalPlan subPlan = approximation.firstSubPlan(); + private SubPlanAndCallback firstSubPlan( + LogicalPlan mainPlan, + Configuration configuration, + Holder approximation, + Set subPlansResults + ) { + SubPlanAndCallback subPlanAndCallback = null; + + // InlineJoin must be first, because approximation may need to approximate a subplan of it. + InlineJoin.LogicalPlanTuple subPlans = InlineJoin.firstSubPlan(mainPlan, subPlansResults); + if (subPlans != null) { + AtomicReference localRelationPage = new AtomicReference<>(); + subPlanAndCallback = new SubPlanAndCallback(subPlans.stubReplacedSubPlan(), result -> { + // Translate the subquery into a separate, coordinator based plan and the results 'broadcasted' as a local relation + LocalRelation resultWrapper = resultToPlan(subPlans.stubReplacedSubPlan().source(), result); + localRelationPage.set(resultWrapper.supplier().get()); + subPlansResults.add(resultWrapper); + return InlineJoin.newMainPlan(mainPlan, subPlans, resultWrapper); + }, () -> releaseLocalRelationBlocks(localRelationPage)); + } + + LogicalPlan plan = subPlanAndCallback != null ? subPlanAndCallback.subPlan : mainPlan; + if (ApproximationPlan.is(plan)) { + if (approximation.get() == null) { + approximation.set(new Approximation(plan, configuration.approximationSettings())); + } + LogicalPlan subPlan = approximation.get().firstSubPlan(); if (subPlan != null) { - return new SubPlanAndCallback(subPlan, approximation::newMainPlan, () -> {}); + subPlanAndCallback = new SubPlanAndCallback(subPlan, result -> { + Double sampleProbability = approximation.get().processResult(result); + if (sampleProbability != null) { + return ApproximationPlan.substituteSampleProbability(mainPlan, sampleProbability); + } else { + return mainPlan; + } + }, () -> {}); } } - InlineJoin.LogicalPlanTuple subPlans = InlineJoin.firstSubPlan(optimizedPlan, subPlansResults); - if (subPlans == null) { - return null; - } - AtomicReference localRelationPage = new AtomicReference<>(); - return new SubPlanAndCallback(subPlans.stubReplacedSubPlan(), result -> { - // Translate the subquery into a separate, coordinator based plan and the results 'broadcasted' as a local relation - LocalRelation resultWrapper = resultToPlan(subPlans.stubReplacedSubPlan().source(), result); - localRelationPage.set(resultWrapper.supplier().get()); - subPlansResults.add(resultWrapper); - return InlineJoin.newMainPlan(optimizedPlan, subPlans, resultWrapper); - }, () -> releaseLocalRelationBlocks(localRelationPage)); + return subPlanAndCallback; } private void executeSubPlan( @@ -626,7 +648,7 @@ private void executeSubPlan( SubPlanAndCallback subPlan, Configuration configuration, FoldContext foldContext, - Approximation approximation, + Holder approximation, EsqlExecutionInfo executionInfo, PlanRunner runner, EsqlQueryRequest request, @@ -648,7 +670,7 @@ private void executeSubPlan( LogicalPlan newMainPlan = subPlan.newMainPlan.apply(result); // look for the next inlinejoin plan - var newSubPlan = firstSubPlan(newMainPlan, approximation, subPlansResults); + var newSubPlan = firstSubPlan(newMainPlan, configuration, approximation, subPlansResults); if (newSubPlan == null) {// run the final "main" plan executionInfo.finishSubPlans(); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationSupportTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationSupportTests.java index 7b19aeea57f5e..8862905d4349d 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationSupportTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationSupportTests.java @@ -85,10 +85,7 @@ import org.elasticsearch.xpack.esql.plan.logical.fuse.Fuse; import org.elasticsearch.xpack.esql.plan.logical.fuse.FuseScoreEval; import org.elasticsearch.xpack.esql.plan.logical.inference.InferencePlan; -import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin; -import org.elasticsearch.xpack.esql.plan.logical.join.Join; import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin; -import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation; import org.elasticsearch.xpack.esql.plan.logical.local.ResolvingProject; import org.elasticsearch.xpack.esql.plan.logical.promql.AcrossSeriesAggregate; import org.elasticsearch.xpack.esql.plan.logical.promql.PlaceholderRelation; @@ -146,13 +143,8 @@ public class ApproximationSupportTests extends ESTestCase { Fork.class, UnionAll.class, ViewUnionAll.class, - InlineJoin.class, ParameterizedQuery.class, - // InlineStats is not supported yet. - // Only a single Stats command is supported. - InlineStats.class, - // Timeseries indices are not supported yet. // They require chained Stats commands. TimeSeriesAggregate.class, @@ -175,9 +167,9 @@ public class ApproximationSupportTests extends ESTestCase { // These plans don't occur in a correct analyzed query. UnresolvedRelation.class, UnresolvedExternalRelation.class, - StubRelation.class, Drop.class, Keep.class, + InlineStats.class, LookupJoin.class, Rename.class, ResolvingProject.class, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationTestCase.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationTestCase.java index 581b35bcbd26a..c8d92fe7fe754 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationTestCase.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationTestCase.java @@ -7,23 +7,16 @@ package org.elasticsearch.xpack.esql.approximation; -import org.apache.lucene.util.SetOnce; -import org.elasticsearch.TransportVersion; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.esql.TestOptimizer; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.core.expression.Alias; -import org.elasticsearch.xpack.esql.core.expression.FoldContext; import org.elasticsearch.xpack.esql.expression.Foldables; -import org.elasticsearch.xpack.esql.inference.InferenceService; -import org.elasticsearch.xpack.esql.optimizer.LogicalPlanPreOptimizer; -import org.elasticsearch.xpack.esql.optimizer.LogicalPreOptimizerContext; -import org.elasticsearch.xpack.esql.parser.QueryParams; import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; @@ -39,33 +32,21 @@ import java.util.function.Predicate; import java.util.stream.Collectors; -import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_PARSER; -import static org.elasticsearch.xpack.esql.EsqlTestUtils.analyzer; -import static org.mockito.Mockito.mock; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.optimizer; public abstract class ApproximationTestCase extends ESTestCase { - private static final LogicalPlanPreOptimizer preOptimizer = new LogicalPlanPreOptimizer( - new LogicalPreOptimizerContext(FoldContext.small(), mock(InferenceService.class), TransportVersion.current()) - ); private static final BlockFactory blockFactory = BlockFactory.builder(BigArrays.NON_RECYCLING_INSTANCE) .breaker(new NoopCircuitBreaker("none")) .build(); - static LogicalPlan getLogicalPlan(String query) throws Exception { - SetOnce resultHolder = new SetOnce<>(); - SetOnce exceptionHolder = new SetOnce<>(); - LogicalPlan plan = TEST_PARSER.createStatement(query, new QueryParams()).plan(); - plan = analyzer().addEmployees("test").addK8s().addTestLookup().buildAnalyzer().analyze(plan); - plan.setAnalyzed(); - preOptimizer.preOptimize(plan, ActionListener.wrap(resultHolder::set, exceptionHolder::set)); - if (exceptionHolder.get() != null) { - throw exceptionHolder.get(); - } - return resultHolder.get().children().getFirst(); + private static final TestOptimizer optimizer = optimizer().addDefaultIndex().addTestLookup().addK8s(); + + static LogicalPlan getLogicalPlan(String query) { + return optimizer.coordinatorPlan(query); } - static Approximation.QueryProperties verify(String query) throws Exception { + static Approximation.QueryProperties verify(String query) { return Approximation.verifyPlanOrThrow(getLogicalPlan(query)); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationTests.java index 9732afef2c201..1f3f91b3489e1 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.MvExpand; import org.elasticsearch.xpack.esql.plan.logical.SampledAggregate; +import org.elasticsearch.xpack.esql.session.Result; import java.util.List; @@ -38,6 +39,7 @@ public void testVerify_validQuery() throws Exception { verify("ROW i=[1,2,3] | EVAL x=TO_STRING(i) | DISSECT x \"%{x}\" | STATS i=10*POW(PERCENTILE(i, 0.5), 2) | LIMIT 10"); verify("FROM test | URI_PARTS parts = last_name | STATS scheme_count = COUNT() BY parts.scheme | LIMIT 10"); verify("FROM test | REGISTERED_DOMAIN rd = last_name | STATS c = COUNT() BY rd.registered_domain | LIMIT 10"); + verify("FROM test | INLINE STATS COUNT() BY last_name | LIMIT 10"); } public void testVerify_validQuery_queryProperties() throws Exception { @@ -65,6 +67,10 @@ public void testVerify_validQuery_queryProperties() throws Exception { verify("FROM test | MV_EXPAND gender | WHERE emp_no < 3 | STATS COUNT()"), equalTo(new Approximation.QueryProperties(false, true, true)) ); + assertThat( + verify("FROM test | WHERE emp_no < 3 | INLINE STATS COUNT()"), + equalTo(new Approximation.QueryProperties(false, true, false)) + ); } public void testVerify_exactlyOneStats() { @@ -76,6 +82,18 @@ public void testVerify_exactlyOneStats() { "FROM test | STATS COUNT() BY emp_no | STATS COUNT()", equalTo("line 1:39: approximation not supported: query with multiple [STATS] cannot be approximated") ); + assertError( + "FROM test | INLINE STATS count=COUNT() BY emp_no | STATS AVG(count)", + equalTo("line 1:52: approximation not supported: query with multiple [STATS] cannot be approximated") + ); + assertError( + "FROM test | STATS COUNT() BY emp_no | INLINE STATS COUNT()", + equalTo("line 1:39: approximation not supported: query with multiple [STATS] cannot be approximated") + ); + assertError( + "FROM test | INLINE STATS count=COUNT() | INLINE STATS SUM(count)", + equalTo("line 1:42: approximation not supported: query with multiple [STATS] cannot be approximated") + ); } public void testVerify_incompatibleSourceCommand() { @@ -84,7 +102,7 @@ public void testVerify_incompatibleSourceCommand() { equalTo("line 1:1: approximation not supported: query with [SHOW INFO] cannot be approximated") ); assertError( - "TS k8s | STATS COUNT(network.cost)", + "TS k8s | STATS RATE(network.total_bytes_in)", equalTo("line 1:1: approximation not supported: query with [TS k8s] cannot be approximated") ); } @@ -98,22 +116,6 @@ public void testVerify_incompatibleProcessingCommand() { "FROM test | STATS COUNT() | FORK (EVAL x=1) (EVAL y=1)", equalTo("line 1:29: approximation not supported: query with [FORK (EVAL x=1) (EVAL y=1)] cannot be approximated") ); - assertError( - "FROM test | INLINE STATS COUNT() | STATS COUNT()", - equalTo("line 1:13: approximation not supported: query with [INLINE STATS COUNT()] cannot be approximated") - ); - assertError( - "FROM test | STATS COUNT() | INLINE STATS COUNT()", - equalTo("line 1:29: approximation not supported: query with [INLINE STATS COUNT()] cannot be approximated") - ); - assertError( - "FROM test | LOOKUP JOIN test_lookup ON emp_no | FORK (EVAL x=1) (EVAL y=1) | STATS COUNT()", - equalTo("line 1:13: approximation not supported: query with [LOOKUP JOIN test_lookup ON emp_no] cannot be approximated") - ); - assertError( - "FROM test | STATS emp_no=COUNT() | LOOKUP JOIN test_lookup ON emp_no | FORK (EVAL x=1) (EVAL y=1)", - equalTo("line 1:36: approximation not supported: query with [LOOKUP JOIN test_lookup ON emp_no] cannot be approximated") - ); } public void testVerify_incompatibleProcessingCommandBeforeStats() { @@ -161,7 +163,7 @@ public void testPlans_noData() throws Exception { assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class))); // Source count of 0, so no more subplans. - mainPlan = approximation.newMainPlan(newCountResult(0)); + mainPlan = processResult(approximation, mainPlan, newCountResult(0)); subplan = approximation.firstSubPlan(); assertThat(subplan, nullValue()); @@ -183,7 +185,7 @@ public void testPlans_smallDataNoFilters() throws Exception { assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class))); // Source count of 1000, so no more subplans. - mainPlan = approximation.newMainPlan(newCountResult(1_000)); + mainPlan = processResult(approximation, mainPlan, newCountResult(1_000)); subplan = approximation.firstSubPlan(); assertThat(subplan, nullValue()); @@ -205,7 +207,7 @@ public void testPlans_largeDataNoFilters() throws Exception { assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class))); // Source count of 10^9. - mainPlan = approximation.newMainPlan(newCountResult(1_000_000_000)); + mainPlan = processResult(approximation, mainPlan, newCountResult(1_000_000_000)); // No filtering, so no more subplans. subplan = approximation.firstSubPlan(); assertThat(subplan, nullValue()); @@ -229,7 +231,7 @@ public void testPlans_largeDataAfterFiltering() throws Exception { assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class))); // Source count of 10^12. - approximation.newMainPlan(newCountResult(1_000_000_000_000L)); + processResult(approximation, mainPlan, newCountResult(1_000_000_000_000L)); // There's filtering, so start with a count plan with sampling with probability 10^4 / 10^12. subplan = approximation.firstSubPlan(); assertThat(subplan, hasPlan(Filter.class)); @@ -237,14 +239,14 @@ public void testPlans_largeDataAfterFiltering() throws Exception { assertThat(subplan, hasPlan(SampledAggregate.class, withProbability(1e-8), withAggs(CountApproximate.class))); // Sampled-corrected filtered count of 10^9 (so actual count of 10), so increase the sample probability. - approximation.newMainPlan(newCountResult(1_000_000_000)); + processResult(approximation, mainPlan, newCountResult(1_000_000_000)); subplan = approximation.firstSubPlan(); assertThat(subplan, hasPlan(Filter.class)); assertThat(subplan, not(hasPlan(Aggregate.class))); assertThat(subplan, hasPlan(SampledAggregate.class, withProbability(1e-5), withAggs(CountApproximate.class))); // Sampled-corrected filtered count of 10^9 (so actual count of 10_000), so no more subplans. - mainPlan = approximation.newMainPlan(newCountResult(1_000_000_000)); + mainPlan = processResult(approximation, mainPlan, newCountResult(1_000_000_000)); subplan = approximation.firstSubPlan(); assertThat(subplan, nullValue()); @@ -268,7 +270,7 @@ public void testPlans_smallDataAfterFiltering() throws Exception { assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class))); // Source count of 10^18. - approximation.newMainPlan(newCountResult(1_000_000_000_000_000_000L)); + processResult(approximation, mainPlan, newCountResult(1_000_000_000_000_000_000L)); // There's filtering, so start with a count plan with sampling with probability 10^4 / 10^18. subplan = approximation.firstSubPlan(); assertThat(subplan, hasPlan(Filter.class)); @@ -276,28 +278,28 @@ public void testPlans_smallDataAfterFiltering() throws Exception { assertThat(subplan, hasPlan(SampledAggregate.class, withProbability(1e-14), withAggs(CountApproximate.class))); // Filtered count of 0, so increase the sample probability. - approximation.newMainPlan(newCountResult(0)); + processResult(approximation, mainPlan, newCountResult(0)); subplan = approximation.firstSubPlan(); assertThat(subplan, hasPlan(Filter.class)); assertThat(subplan, not(hasPlan(Aggregate.class))); assertThat(subplan, hasPlan(SampledAggregate.class, withProbability(1e-10), withAggs(CountApproximate.class))); // Filtered count of 0, so increase the sample probability. - approximation.newMainPlan(newCountResult(0)); + processResult(approximation, mainPlan, newCountResult(0)); subplan = approximation.firstSubPlan(); assertThat(subplan, hasPlan(Filter.class)); assertThat(subplan, not(hasPlan(Aggregate.class))); assertThat(subplan, hasPlan(SampledAggregate.class, withProbability(1e-6), withAggs(CountApproximate.class))); // Filtered count of 0, so increase the sample probability. - approximation.newMainPlan(newCountResult(0)); + processResult(approximation, mainPlan, newCountResult(0)); subplan = approximation.firstSubPlan(); assertThat(subplan, hasPlan(Filter.class)); assertThat(subplan, not(hasPlan(Aggregate.class))); assertThat(subplan, hasPlan(SampledAggregate.class, withProbability(1e-2), withAggs(CountApproximate.class))); // Filtered count of 0, so no more subplans. - mainPlan = approximation.newMainPlan(newCountResult(0)); + mainPlan = processResult(approximation, mainPlan, newCountResult(0)); subplan = approximation.firstSubPlan(); assertThat(subplan, nullValue()); @@ -321,7 +323,7 @@ public void testPlans_smallDataBeforeFiltering() throws Exception { assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class))); // Source count of 1000, so no more subplans. - mainPlan = approximation.newMainPlan(newCountResult(1_000)); + mainPlan = processResult(approximation, mainPlan, newCountResult(1_000)); subplan = approximation.firstSubPlan(); assertThat(subplan, nullValue()); @@ -345,7 +347,7 @@ public void testPlans_smallDataAfterMvExpanding() throws Exception { assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class))); // Source count of 1000. - approximation.newMainPlan(newCountResult(1_000)); + processResult(approximation, mainPlan, newCountResult(1_000)); // The next subplan should be the mv_expanded count, // without sampling, because source count is small. subplan = approximation.firstSubPlan(); @@ -354,7 +356,7 @@ public void testPlans_smallDataAfterMvExpanding() throws Exception { assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class))); // mv_expanded count of 10_000, so no more subplans. - mainPlan = approximation.newMainPlan(newCountResult(1_000)); + mainPlan = processResult(approximation, mainPlan, newCountResult(1_000)); subplan = approximation.firstSubPlan(); assertThat(subplan, nullValue()); @@ -378,7 +380,7 @@ public void testPlans_largeDataAfterMvExpanding() throws Exception { assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class))); // Source count of 1000. - approximation.newMainPlan(newCountResult(1_000)); + processResult(approximation, mainPlan, newCountResult(1_000)); // The next subplan should be the mv_expanded count, // without no sampling because source count is small. subplan = approximation.firstSubPlan(); @@ -387,7 +389,7 @@ public void testPlans_largeDataAfterMvExpanding() throws Exception { assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class))); // mv_expanded count of 10^9, so no more subplans. - mainPlan = approximation.newMainPlan(newCountResult(1_000_000_000)); + mainPlan = processResult(approximation, mainPlan, newCountResult(1_000_000_000)); subplan = approximation.firstSubPlan(); assertThat(subplan, nullValue()); @@ -411,7 +413,7 @@ public void testPlans_largeDataBeforeMvExpanding() throws Exception { assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class))); // Source count of 10^9. - approximation.newMainPlan(newCountResult(1_000_000_000)); + processResult(approximation, mainPlan, newCountResult(1_000_000_000)); // The next subplan should be the mv_expanded count, // with sampling because source count is large. subplan = approximation.firstSubPlan(); @@ -420,7 +422,7 @@ public void testPlans_largeDataBeforeMvExpanding() throws Exception { assertThat(subplan, hasPlan(SampledAggregate.class, withProbability(1e-5), withAggs(CountApproximate.class))); // Sample-corrected mv_expanded count of 10^12 (so actual of 10^7), so no more subplans. - mainPlan = approximation.newMainPlan(newCountResult(1_000_000_000_000L)); + mainPlan = processResult(approximation, mainPlan, newCountResult(1_000_000_000_000L)); subplan = approximation.firstSubPlan(); assertThat(subplan, nullValue()); @@ -444,7 +446,7 @@ public void testPlans_sampleProbabilityThreshold_noFilter() throws Exception { assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class))); // Source count of 500_000, so no more subplans. - mainPlan = approximation.newMainPlan(newCountResult(500_000)); + mainPlan = processResult(approximation, mainPlan, newCountResult(500_000)); subplan = approximation.firstSubPlan(); assertThat(subplan, nullValue()); @@ -469,7 +471,7 @@ public void testCountPlan_sampleProbabilityThreshold_withFilter() throws Excepti assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class))); // Source count of 10^12. - approximation.newMainPlan(newCountResult(1_000_000_000_000L)); + processResult(approximation, mainPlan, newCountResult(1_000_000_000_000L)); // There's filtering, so start with a count plan with sampling with probability 10^4 / 10^12. subplan = approximation.firstSubPlan(); assertThat(subplan, hasPlan(Filter.class)); @@ -477,7 +479,7 @@ public void testCountPlan_sampleProbabilityThreshold_withFilter() throws Excepti assertThat(subplan, hasPlan(SampledAggregate.class, withProbability(1e-8), withAggs(CountApproximate.class))); // Sampled filtered count of 0, so increase the sample probability. - approximation.newMainPlan(newCountResult(0)); + processResult(approximation, mainPlan, newCountResult(0)); // There's filtering, so start with a count plan with sampling with probability 10^4 / 10^12. subplan = approximation.firstSubPlan(); assertThat(subplan, hasPlan(Filter.class)); @@ -486,7 +488,7 @@ public void testCountPlan_sampleProbabilityThreshold_withFilter() throws Excepti // Sampled filtered count of 20, which would next to a sample probability of 0.2, // which is above the threshold, so no more subplans. - mainPlan = approximation.newMainPlan(newCountResult(20)); + mainPlan = processResult(approximation, mainPlan, newCountResult(20)); // There's filtering, so start with a count plan with sampling with probability 10^4 / 10^12. subplan = approximation.firstSubPlan(); assertThat(subplan, nullValue()); @@ -496,4 +498,9 @@ public void testCountPlan_sampleProbabilityThreshold_withFilter() throws Excepti assertThat(mainPlan, not(hasPlan(SampledAggregate.class))); assertThat(mainPlan, hasPlan(Aggregate.class, withAggs(Sum.class))); } + + private LogicalPlan processResult(Approximation approximation, LogicalPlan mainPlan, Result result) { + Double sampleProbability = approximation.processResult(result); + return sampleProbability == null ? mainPlan : ApproximationPlan.substituteSampleProbability(mainPlan, sampleProbability); + } } From e348103f42e96bef87c6e37b296282971cda0662 Mon Sep 17 00:00:00 2001 From: Jan Kuipers Date: Thu, 9 Apr 2026 15:09:43 +0200 Subject: [PATCH 4/9] update capability --- .../GenerativeApproximationIT.java | 2 +- .../xpack/esql/qa/rest/RestEsqlTestCase.java | 2 +- .../GenerativeApproximationRestTest.java | 4 +- .../generative/GenerativeForkRestTest.java | 4 +- .../command/source/FromGenerator.java | 2 +- .../src/main/resources/approximation.csv-spec | 92 +++++++++---------- .../xpack/esql/action/EsqlActionIT.java | 4 +- .../xpack/esql/action/EsqlCapabilities.java | 2 +- 8 files changed, 56 insertions(+), 56 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/GenerativeApproximationIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/GenerativeApproximationIT.java index bbdbaeb968cc1..03d2bdb3c5b3f 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/GenerativeApproximationIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/GenerativeApproximationIT.java @@ -27,7 +27,7 @@ public class GenerativeApproximationIT extends GenerativeApproximationRestTest { @Before public void checkCapability() { - assumeTrue("query approximation should be enabled", EsqlCapabilities.Cap.APPROXIMATION_V6.isEnabled()); + assumeTrue("query approximation should be enabled", EsqlCapabilities.Cap.APPROXIMATION_V7.isEnabled()); } @ClassRule diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java index 8512fc59c3d19..a47b84a86e2d7 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java @@ -1951,7 +1951,7 @@ public void testRandomTimezoneBuckets() throws IOException { } public void testApproximationColumnMetadata() throws IOException { - assumeTrue("approximation support", EsqlCapabilities.Cap.APPROXIMATION_V6.isEnabled()); + assumeTrue("approximation support", EsqlCapabilities.Cap.APPROXIMATION_V7.isEnabled()); bulkLoadTestData(10); String query = "SET approximation=true; " + fromIndex() + " | STATS count=COUNT()"; diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeApproximationRestTest.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeApproximationRestTest.java index 6daf0c20d42a7..5d0a8b5bca877 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeApproximationRestTest.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeApproximationRestTest.java @@ -16,7 +16,7 @@ import java.util.List; import java.util.Map; -import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.APPROXIMATION_V6; +import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.APPROXIMATION_V7; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FIX_SUM_AGG_LONG_OVERFLOW; /** @@ -73,7 +73,7 @@ protected void assertResults( @Override protected void shouldSkipTest(String testName) throws IOException { super.shouldSkipTest(testName); - assumeFalse("No approximation tests", testCase.requiredCapabilities.contains(APPROXIMATION_V6.capabilityName())); + assumeFalse("No approximation tests", testCase.requiredCapabilities.contains(APPROXIMATION_V7.capabilityName())); assumeFalse( "Approximation casts integer SUM to double, preventing long overflow", testCase.requiredCapabilities.contains(FIX_SUM_AGG_LONG_OVERFLOW.capabilityName()) diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeForkRestTest.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeForkRestTest.java index 18208a1da0727..1d6c61ff3d4cd 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeForkRestTest.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeForkRestTest.java @@ -15,7 +15,7 @@ import java.util.List; import static org.elasticsearch.xpack.esql.CsvTestUtils.loadCsvSpecValues; -import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.APPROXIMATION_V6; +import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.APPROXIMATION_V7; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.ESQL_WITHOUT_GROUPING; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V9; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.METRICS_GROUP_BY_ALL; @@ -92,7 +92,7 @@ protected void shouldSkipTest(String testName) throws IOException { assumeFalse( "Tests using query approximation are skipped since query approximation is not supported with FORK", - testCase.requiredCapabilities.contains(APPROXIMATION_V6.capabilityName()) + testCase.requiredCapabilities.contains(APPROXIMATION_V7.capabilityName()) ); assumeFalse( diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/source/FromGenerator.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/source/FromGenerator.java index b56e2be1ad953..0364ef884f5a0 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/source/FromGenerator.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/source/FromGenerator.java @@ -59,7 +59,7 @@ public CommandDescription generate( if (useUnmappedFields) { result.append(SET_UNMAPPED_FIELDS_PREFIX); } - boolean setQueryApproximation = EsqlCapabilities.Cap.APPROXIMATION_V6.isEnabled() + boolean setQueryApproximation = EsqlCapabilities.Cap.APPROXIMATION_V7.isEnabled() && randomDouble() < QUERY_APPROXIMATION_SETTING_PROBABILITY; if (setQueryApproximation) { result.append(randomQueryApproximationSettings()); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/approximation.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/approximation.csv-spec index 16a136691389e..45c57165a8a04 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/approximation.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/approximation.csv-spec @@ -22,7 +22,7 @@ No approximation -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation=false\; @@ -35,7 +35,7 @@ count:long No confidence intervals -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000, "confidence_level":null}\; @@ -48,7 +48,7 @@ count:long | sum:long Exact total row count -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -61,7 +61,7 @@ count:long | _approximation_confidence_interval(count):long | _approximation_cer Exact total single-valued field count -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -74,7 +74,7 @@ count:long | _approximation_confidence_interval(count):long | _approximation_cer Approximate total multi-valued field count -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -87,7 +87,7 @@ count:long | _approximation_confidence_interval(count):long | _approximati Approximate stats on large single-valued data -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -100,7 +100,7 @@ count:long | avg:double | sum:long | _approximation_confidence_i Approximate stats on large multi-valued data -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation=true\; @@ -115,7 +115,7 @@ count:long | avg:double | sum:long | _approximation_confiden Exact stats on small single-valued data -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -130,7 +130,7 @@ count:long | avg:double | sum:long | _approximation_confidence_interval(count):l Exact stats on small multi-valued data -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -146,7 +146,7 @@ count:long | avg:double | sum:long | _approximation_confidence_interval(count):l Multiple total counts -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -159,7 +159,7 @@ count:long | count2:long | countValue:long | _approximation_confidence_i Exact count with where on single-valued data -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -174,7 +174,7 @@ count:long | _approximation_confidence_interval(count):long | _approximation_cer Approximate stats with where on multi-valued data -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -190,7 +190,7 @@ count:long | avg:double | sum:long | _approximation_confidenc Approximate stats with stats where -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000, "confidence_level":0.85}\; @@ -207,7 +207,7 @@ count:long | avg:double | sum:long | _approximation_confidenc Approximate stats with sample -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000,"confidence_level":0.85}\; @@ -222,7 +222,7 @@ count:long | avg:double | sum:long | _approximation_confidence_in Approximate stats with commands before stats -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -247,7 +247,7 @@ count:long | avg:double | sum:long | _approximation_confiden Approximate stats with commands after stats -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -266,7 +266,7 @@ avg:double | avg2:double | _approximation_confidence_interval(avg):double | _app Approximate stats with dependent variables that have confidence interval -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -286,7 +286,7 @@ y:integer | plus1:double | _approximation_confidence_interval(plus1):double Approximate stats with dependent string variable -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -303,7 +303,7 @@ from_str:double Approximate stats with dependent multi-valued variable -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -319,7 +319,7 @@ sv:double Approximate stats by with zero variance -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":100000}\; @@ -341,7 +341,7 @@ avg:double | median:double | one:double | mv:integer | _approximat Approximate stats by on large single-valued data -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -361,7 +361,7 @@ count:long | sv:integer | _approximation_confidence_interval(count):long | _appr Approximate stats by on large multi-valued data -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":100000}\; @@ -382,7 +382,7 @@ count:long | mv:integer | _approximation_confidence_interval(count):long | _appr Exact stats by on small single-valued data -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -402,7 +402,7 @@ count:long | sv:integer | _approximation_confidence_interval(count):long | _appr Exact stats by on small multi-valued data -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -423,7 +423,7 @@ count:long | mv:integer | _approximation_confidence_interval(count):long | _appr Approximate stats by on mixed data -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -436,7 +436,7 @@ count:long | _approximation_confidence_interval(count):long | _approximation Overwrite approximated column -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation=true\; @@ -451,7 +451,7 @@ sum:integer User-generated approximation column name -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation=true\; @@ -470,7 +470,7 @@ sum:long | _approximation_confidence_interval(sum):long | _approximation_certifi Rename stats group key -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -484,7 +484,7 @@ count:long | key:integer | _approximation_confidence_interval(count):long | _app Filter all rows -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -498,7 +498,7 @@ median:double | _approximation_confidence_interval(median):double | _approximati Row with duplicate aggs -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -513,7 +513,7 @@ a:long | b:long | c:long | x:long | _approximation_confidence_interval(a):long | Duplicate aggs with grouping -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -528,7 +528,7 @@ median:double | p50:double | key:keyword | _approximation_confidence_interval(me Sum of nulls -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation=true\; @@ -544,7 +544,7 @@ null Approximated column is null -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation=true\; @@ -557,7 +557,7 @@ null | null | null Approximated column and null -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation=true\; @@ -570,7 +570,7 @@ sum:long | sum_null:long | _approximation_confidence_interval(sum):lo Mv_expand after duplicate aggs -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -582,7 +582,7 @@ a:long | b:long | _approximation_confidence_interval(a):long | _ Exact dense vector count -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -596,7 +596,7 @@ count_vectors:long | _approximation_confidence_interval(count_vectors):long | _a Count by date bucket -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation=true\; @@ -615,7 +615,7 @@ c:long | b:date | _approximation_confidence_interval(c):long | Lookup join before stats -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -632,7 +632,7 @@ AVG(length):double | _approximation_confidence_interval(AVG(length)):double | _a Lookup join after stats by -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -653,7 +653,7 @@ count:long | language_code:integer | language_name:keyword | _approximation_co Inline stats -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -674,7 +674,7 @@ sv:integer | AVG(sv):double | _approximation_confidence_interval(AVG(sv)):double Inline stats and where -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -696,7 +696,7 @@ sv:integer | SUM(sv):long | _approximation_confidence_interval(SUM(sv)):l Inline stats by -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation={"rows":10000}\; @@ -715,7 +715,7 @@ COUNT():long | AVG(sv):double | sv:integer |_approximation_confidence_in Warn on no stats -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation=true\; @@ -734,7 +734,7 @@ sv:integer Warn on max -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip SET approximation=true\; @@ -752,7 +752,7 @@ max:integer | sv:integer Example boolean for docs -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip // tag::approximationBooleanForDocs[] @@ -772,7 +772,7 @@ sum:long | _approximation_confidence_interval(sum):long | _approximation_certifi Example map for docs -required_capability: approximation_v6 +required_capability: approximation_v7 request_stored: skip // tag::approximationMapForDocs[] diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java index b6250aa749d77..a14a4d255fd21 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java @@ -96,7 +96,7 @@ import static org.elasticsearch.test.MapMatcher.assertMap; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; -import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.APPROXIMATION_V6; +import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.APPROXIMATION_V7; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.EXPLAIN; import static org.elasticsearch.xpack.esql.action.EsqlQueryRequest.syncEsqlQueryRequest; import static org.hamcrest.Matchers.allOf; @@ -2524,7 +2524,7 @@ public void testExplainWithLookupJoin() { */ public void testExplainWithApproximation() { assumeTrue("EXPLAIN requires the capability to be enabled", EXPLAIN.isEnabled()); - assumeTrue("Approximation requires the capability to be enabled", APPROXIMATION_V6.isEnabled()); + assumeTrue("Approximation requires the capability to be enabled", APPROXIMATION_V7.isEnabled()); String indexName = "explain_approximation_test"; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 73dfeb4a2ab87..f2f2846a9d6ca 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -2179,7 +2179,7 @@ public enum Cap { /** * Support query approximation. */ - APPROXIMATION_V6, + APPROXIMATION_V7, /** * Create a ScoreOperator only when shard contexts are available From 96d74a28829248a24f76c3dbc2f3d2e12ce08883 Mon Sep 17 00:00:00 2001 From: Jan Kuipers Date: Fri, 10 Apr 2026 10:48:48 +0200 Subject: [PATCH 5/9] document caveat --- .../physical/local/ReplaceSampledStatsBySampleAndStats.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStats.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStats.java index dd71eadf20389..0304c00232c46 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStats.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStats.java @@ -61,7 +61,10 @@ protected PhysicalPlan rule(SampledAggregateExec plan) { assert sampleProbability < 1.0; // The only non-unary plans that are currently supported are Joins. - // For these, only the first index needs to be sampled. + // At the moment, the left side of the join is the "expensive" side and + // will be sampled, while the right side is just a lookup table. + // This will probably change in the future, in which case ths logic + // must be reconsidered. Holder sampledAdded = new Holder<>(false); PhysicalPlan child = plan.child().transformDown(p -> { if (p instanceof LeafExec && sampledAdded.get() == false) { From 2b2d260234fe3c53e6c1c426a7bfb16bc27ee204 Mon Sep 17 00:00:00 2001 From: Jan Kuipers <148754765+jan-elastic@users.noreply.github.com> Date: Fri, 10 Apr 2026 11:51:21 +0200 Subject: [PATCH 6/9] Update docs/changelog/145980.yaml --- docs/changelog/145980.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/145980.yaml diff --git a/docs/changelog/145980.yaml b/docs/changelog/145980.yaml new file mode 100644 index 0000000000000..f9ce8d4985bb4 --- /dev/null +++ b/docs/changelog/145980.yaml @@ -0,0 +1,5 @@ +area: "Machine Learning" +issues: [] +pr: 145980 +summary: Lookup join and Inline stats support for query approximation +type: feature From 184d6c98227f3fb631ac22cf280dd118f24d3d87 Mon Sep 17 00:00:00 2001 From: Jan Kuipers Date: Tue, 14 Apr 2026 10:18:47 +0200 Subject: [PATCH 7/9] More tests --- .../src/main/resources/approximation.csv-spec | 46 +++++++++++++++++++ .../esql/approximation/Approximation.java | 2 +- .../ReplaceSampledStatsBySampleAndStats.java | 2 +- ...laceSampledStatsBySampleAndStatsTests.java | 31 +++++++++++++ 4 files changed, 79 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/approximation.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/approximation.csv-spec index 45c57165a8a04..7c0fd0961baa9 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/approximation.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/approximation.csv-spec @@ -652,6 +652,31 @@ count:long | language_code:integer | language_name:keyword | _approximation_co ; +Lookup join before and after stats by +required_capability: approximation_v7 +request_stored: skip + +SET approximation={"rows":10000}\; +FROM many_numbers + | EVAL language_code = sv % 4 + 1 + | LOOKUP JOIN languages_lookup ON language_code + | DROP language_code + | STATS avg=AVG(sv) BY language_name + | RENAME language_name AS original_language_name + | EVAL language_code = LENGTH(original_language_name) % 4 + 1 + | LOOKUP JOIN languages_lookup ON language_code + | SORT original_language_name + | DROP language_code +; + +avg:double | original_language_name:keyword | language_name:keyword | _approximation_confidence_interval(avg):double | _approximation_certified(avg):boolean +440..490 | English | German | [440..490,440..490] | {any} +440..490 | French | Spanish | [440..490,440..490] | {any} +440..490 | German | Spanish | [440..490,440..490] | {any} +440..490 | Spanish | German | [440..490,440..490] | {any} +; + + Inline stats required_capability: approximation_v7 request_stored: skip @@ -714,6 +739,27 @@ COUNT():long | AVG(sv):double | sv:integer |_approximation_confidence_in ; +Inline stats by and lookup join +required_capability: approximation_v7 +request_stored: skip + +SET approximation={"rows":10000}\; +FROM many_numbers +| KEEP sv +| WHERE sv < 700 +| INLINE STATS SUM(sv) BY sv +| EVAL language_code = sv % 4 + 1 +| LOOKUP JOIN languages_lookup ON language_code +| SORT sv DESC +| LIMIT 2 +; + +SUM(sv):long | sv:integer | language_code:integer | language_name:keyword | _approximation_confidence_interval(SUM(sv)):long | _approximation_certified(SUM(sv)):boolean +70000..1400000 | 699 | 4 | German | [70000..1400000,70000..1400000] | {any} +70000..1400000 | 699 | 4 | German | [70000..1400000,70000..1400000] | {any} +; + + Warn on no stats required_capability: approximation_v7 request_stored: skip diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximation/Approximation.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximation/Approximation.java index c29d1912354cd..b7329e15ee8c5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximation/Approximation.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximation/Approximation.java @@ -554,7 +554,7 @@ private LogicalPlan countSubPlan(double sampleProbability) { ); } } - } else { + } else if (plan instanceof LeafPlan == false) { // Strip everything after the STATS command. plan = plan.children().getFirst(); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStats.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStats.java index 0304c00232c46..234fcd1b5e3c3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStats.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStats.java @@ -63,7 +63,7 @@ protected PhysicalPlan rule(SampledAggregateExec plan) { // The only non-unary plans that are currently supported are Joins. // At the moment, the left side of the join is the "expensive" side and // will be sampled, while the right side is just a lookup table. - // This will probably change in the future, in which case ths logic + // This will probably change in the future, in which case this logic // must be reconsidered. Holder sampledAdded = new Holder<>(false); PhysicalPlan child = plan.child().transformDown(p -> { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStatsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStatsTests.java index b7494c09cbdef..33eb641b91faf 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStatsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStatsTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; import org.elasticsearch.xpack.esql.plan.physical.EvalExec; +import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.plan.physical.ProjectExec; import org.elasticsearch.xpack.esql.plan.physical.SampleExec; @@ -36,6 +37,7 @@ import java.util.HashMap; import java.util.List; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -123,6 +125,31 @@ public void testReplace_stdDev() { assertThat(sampleExec.probability(), is(sampledAgg.sampleProbability())); } + public void testReplace_join() { + Alias count = countApproximateAlias(Literal.keyword(Source.EMPTY, "*")); + EsQueryExec left = esQueryExec(); + EsQueryExec right = esQueryExec(); + LookupJoinExec lookupJoin = lookupJoinExec(left, right); + SampledAggregateExec sampledAgg = sampledAggregate(lookupJoin, List.of(count), List.of(), AggregatorMode.INITIAL, 0.5); + + PhysicalPlan result = applyRule(sampledAgg); + + assertThat(result, instanceOf(ProjectExec.class)); + ProjectExec project = (ProjectExec) result; + assertThat(project.child(), instanceOf(EvalExec.class)); + EvalExec eval = (EvalExec) project.child(); + // COUNT and its bucket must be sample-corrected. + assertThat(eval.fields(), hasSize(2)); + AggregateExec aggExec = assertAggregate(eval.child(), sampledAgg); + assertThat(aggExec.child(), instanceOf(LookupJoinExec.class)); + LookupJoinExec lookupJoinExec = (LookupJoinExec) aggExec.child(); + assertThat(lookupJoinExec.left(), instanceOf(SampleExec.class)); + SampleExec sampleExec = (SampleExec) lookupJoinExec.left(); + assertThat(sampleExec.probability(), is(sampledAgg.sampleProbability())); + assertThat(sampleExec.child(), equalTo(left)); + assertThat(lookupJoinExec.right(), equalTo(right)); + } + private static PhysicalPlan applyRule(SampledAggregateExec sampledAgg) { return new ReplaceSampledStatsBySampleAndStats().apply(sampledAgg); } @@ -176,6 +203,10 @@ private static EsQueryExec esQueryExec() { return new EsQueryExec(Source.EMPTY, "test", IndexMode.STANDARD, List.of(), null, null, null, List.of()); } + private static LookupJoinExec lookupJoinExec(PhysicalPlan left, PhysicalPlan right) { + return new LookupJoinExec(Source.EMPTY, left, right, List.of(), List.of(), List.of(), null); + } + private static Alias countApproximateAlias(Expression field) { return new Alias(Source.EMPTY, "count", new CountApproximate(Source.EMPTY, field)); } From 4883422ac23834ccefa7fb98b9b9d2481111778d Mon Sep 17 00:00:00 2001 From: Jan Kuipers Date: Tue, 14 Apr 2026 11:51:16 +0200 Subject: [PATCH 8/9] small fixes --- .../esql/approximation/Approximation.java | 23 +++++++++++++++---- .../xpack/esql/session/EsqlSession.java | 2 +- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximation/Approximation.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximation/Approximation.java index b7329e15ee8c5..75a3f8aae9a8f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximation/Approximation.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximation/Approximation.java @@ -60,6 +60,7 @@ import org.elasticsearch.xpack.esql.plan.logical.SampledAggregate; import org.elasticsearch.xpack.esql.plan.logical.TopN; import org.elasticsearch.xpack.esql.plan.logical.TopNBy; +import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; import org.elasticsearch.xpack.esql.plan.logical.UriParts; import org.elasticsearch.xpack.esql.plan.logical.UserAgent; import org.elasticsearch.xpack.esql.plan.logical.inference.Completion; @@ -83,7 +84,7 @@ *
  • it contains exactly one {@code STATS} command *
  • the other processing commands are from the supported set * ({@link Approximation#SUPPORTED_COMMANDS}); this set contains almost all - * unary commands, but most notably not {@code FORK} or {@code JOIN}. + * unary commands, and some non-unary ones; most notably not {@code FORK}. *
  • the aggregate functions are from the supported set * ({@link Approximation#SUPPORTED_SINGLE_VALUED_AGGS} and * {@link Approximation#SUPPORTED_MULTIVALUED_AGGS}) @@ -150,8 +151,8 @@ public record QueryProperties(boolean hasGrouping, boolean canDecreaseRowCount, Grok.class, InlineJoin.class, Insist.class, - LocalRelation.class, Join.class, + LocalRelation.class, MvExpand.class, OrderBy.class, Project.class, @@ -161,7 +162,7 @@ public record QueryProperties(boolean hasGrouping, boolean canDecreaseRowCount, Row.class, Sample.class, SampledAggregate.class, - StubRelation.class, + StubRelation.class, // Temporary node generated by INLINE STATS UriParts.class, UserAgent.class ); @@ -480,7 +481,7 @@ public Double processResult(Result result) { * */ private LogicalPlan sourceCountSubPlan() { - LogicalPlan leaf = logicalPlan.collectLeaves().getFirst(); + LogicalPlan leaf = getLeftmostLeaf(logicalPlan); LogicalPlan sourceCountPlan = new Aggregate( Source.EMPTY, leaf, @@ -491,6 +492,20 @@ private LogicalPlan sourceCountSubPlan() { return sourceCountPlan; } + /** + * Returns the leftmost leaf of a plan, which is the large source index for approximation. + */ + private LogicalPlan getLeftmostLeaf(LogicalPlan plan) { + while (plan instanceof LeafPlan == false) { + plan = switch (plan) { + case UnaryPlan unaryPlan -> unaryPlan.child(); + case Join join -> join.left(); + default -> throw new IllegalStateException("unsupported plan type: " + plan.getClass()); + }; + } + return plan; + } + /** * Determines either the final sample probability or whether more subplans * need to the executed, based on the total number of rows in the source diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 6ce8c6f836f3d..6458012b9fc82 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -371,7 +371,7 @@ public void onResponse(Versioned analyzedPlan) { p, finalConfiguration, foldContext, - new Holder<>(), + new Holder(), minimumVersion, planTimeProfile, l From b93a99ba83352caed6a08569f9734772fd815559 Mon Sep 17 00:00:00 2001 From: Jan Kuipers Date: Wed, 15 Apr 2026 21:02:37 +0200 Subject: [PATCH 9/9] add assertion --- .../physical/local/ReplaceSampledStatsBySampleAndStats.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStats.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStats.java index 234fcd1b5e3c3..8f5f43b431d9e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStats.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStats.java @@ -26,10 +26,12 @@ import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.EvalExec; import org.elasticsearch.xpack.esql.plan.physical.LeafExec; +import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.plan.physical.ProjectExec; import org.elasticsearch.xpack.esql.plan.physical.SampleExec; import org.elasticsearch.xpack.esql.plan.physical.SampledAggregateExec; +import org.elasticsearch.xpack.esql.plan.physical.UnaryExec; import org.elasticsearch.xpack.esql.planner.AggregateMapper; import java.util.ArrayList; @@ -60,11 +62,13 @@ protected PhysicalPlan rule(SampledAggregateExec plan) { double sampleProbability = (double) Foldables.literalValueOf(plan.sampleProbability()); assert sampleProbability < 1.0; - // The only non-unary plans that are currently supported are Joins. + // The only non-unary plans that are currently supported are lookup joins. // At the moment, the left side of the join is the "expensive" side and // will be sampled, while the right side is just a lookup table. // This will probably change in the future, in which case this logic // must be reconsidered. + assert plan.allMatch(p -> p instanceof LeafExec || p instanceof UnaryExec || p instanceof LookupJoinExec); + Holder sampledAdded = new Holder<>(false); PhysicalPlan child = plan.child().transformDown(p -> { if (p instanceof LeafExec && sampledAdded.get() == false) {