diff --git a/docs/changelog/111690.yaml b/docs/changelog/111690.yaml new file mode 100644 index 0000000000000..36e715744ad88 --- /dev/null +++ b/docs/changelog/111690.yaml @@ -0,0 +1,5 @@ +pr: 111690 +summary: "ESQL: Support INLINESTATS grouped on expressions" +area: ES|QL +type: enhancement +issues: [] diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index d6ab99f0b21ac..3e799730f7269 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -111,6 +111,7 @@ protected void shouldSkipTest(String testName) throws IOException { isEnabled(testName, instructions, Clusters.oldVersion()) ); assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains("inlinestats")); + assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains("inlinestats_v2")); } private TestFeatureService remoteFeaturesService() throws IOException { diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec index e52f1e45cead8..3f2e14f74174b 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec @@ -67,11 +67,70 @@ emp_no:integer | avg_worked_seconds:long | gender:keyword | max_avg_worked_secon 10030 | 394597613 | M | 394597613 ; -// TODO allow inline calculation like BY l = SUBSTRING( maxOfLongByCalculatedKeyword -required_capability: inlinestats +required_capability: inlinestats_v2 // tag::longest-tenured-by-first[] +FROM employees +| KEEP emp_no, avg_worked_seconds, last_name +| INLINESTATS max_avg_worked_seconds = MAX(avg_worked_seconds) BY SUBSTRING(last_name, 0, 1) +| WHERE max_avg_worked_seconds == avg_worked_seconds +| SORT last_name ASC +| LIMIT 5 +// end::longest-tenured-by-first[] +; + +// tag::longest-tenured-by-first-result[] +emp_no:integer | avg_worked_seconds:long | last_name:keyword | SUBSTRING(last_name, 0, 1):keyword | max_avg_worked_seconds:long + 10065 | 372660279 | Awdeh | A | 372660279 + 10074 | 382397583 | Bernatsky | B | 382397583 + 10044 | 387408356 | Casley | C | 387408356 + 10030 | 394597613 | Demeyer | D | 394597613 + 10087 | 305782871 | Eugenio | E | 305782871 +// end::longest-tenured-by-first-result[] +; + +maxOfLongByCalculatedNamedKeyword +required_capability: inlinestats_v2 + +FROM employees +| KEEP emp_no, avg_worked_seconds, last_name +| INLINESTATS max_avg_worked_seconds = MAX(avg_worked_seconds) BY l = SUBSTRING(last_name, 0, 1) +| WHERE max_avg_worked_seconds == avg_worked_seconds +| SORT last_name ASC +| LIMIT 5 +; + +emp_no:integer | avg_worked_seconds:long | last_name:keyword | l:keyword | max_avg_worked_seconds:long + 10065 | 372660279 | Awdeh | A | 372660279 + 10074 | 382397583 | Bernatsky | B | 382397583 + 10044 | 387408356 | Casley | C | 387408356 + 10030 | 394597613 | Demeyer | D | 394597613 + 10087 | 305782871 | Eugenio | E | 305782871 +; + +maxOfLongByCalculatedDroppedKeyword +required_capability: inlinestats_v2 + +FROM employees +| INLINESTATS max_avg_worked_seconds = MAX(avg_worked_seconds) BY l = SUBSTRING(last_name, 0, 1) +| WHERE max_avg_worked_seconds == avg_worked_seconds +| KEEP emp_no, avg_worked_seconds, last_name, max_avg_worked_seconds +| SORT last_name ASC +| LIMIT 5 +; + +emp_no:integer | avg_worked_seconds:long | last_name:keyword | max_avg_worked_seconds:long + 10065 | 372660279 | Awdeh | 372660279 + 10074 | 382397583 | Bernatsky | 382397583 + 10044 | 387408356 | Casley | 387408356 + 10030 | 394597613 | Demeyer | 394597613 + 10087 | 305782871 | Eugenio | 305782871 +; + +maxOfLongByEvaledKeyword +required_capability: inlinestats + FROM employees | EVAL l = SUBSTRING(last_name, 0, 1) | KEEP emp_no, avg_worked_seconds, l @@ -79,17 +138,14 @@ FROM employees | WHERE max_avg_worked_seconds == avg_worked_seconds | SORT l ASC | LIMIT 5 -// end::longest-tenured-by-first[] ; -// tag::longest-tenured-by-first-result[] emp_no:integer | avg_worked_seconds:long | l:keyword | max_avg_worked_seconds:long 10065 | 372660279 | A | 372660279 10074 | 382397583 | B | 382397583 10044 | 387408356 | C | 387408356 10030 | 394597613 | D | 394597613 10087 | 305782871 | E | 305782871 -// end::longest-tenured-by-first-result[] ; maxOfLongByInt @@ -499,3 +555,101 @@ emp_no:integer | salary:integer | ninety_fifth_salary:double 10029 | 74999 | 73584.95 10045 | 74970 | 73584.95 ; + +byTwoCalculated +required_capability: inlinestats_v2 + +FROM airports +| WHERE abbrev IS NOT NULL +| KEEP abbrev, scalerank, location +| INLINESTATS min_sl=MIN(scalerank) + BY lat_10 = ROUND(ST_Y(location), -1) + , lon_10 = ROUND(ST_X(location), -1) +| SORT abbrev DESC +| LIMIT 3 +; + +abbrev:keyword | scalerank:integer | location:geo_point | lat_10:double | lon_10:double | min_sl:integer + ZRH | 3 | POINT(8.56221279534765 47.4523895064915) | 50 | 10 | 2 + ZNZ | 4 | POINT (39.2223319841558 -6.21857034620282) | -10 | 40 | 4 + ZLO | 7 | POINT (-104.560095200097 19.1480860285854) | 20 | -100 | 2 +; + +byTwoCalculatedSecondOverwrites +required_capability: inlinestats_v2 + +FROM airports +| WHERE abbrev IS NOT NULL +| KEEP abbrev, scalerank, location +| INLINESTATS min_sl=MIN(scalerank) + BY x = ROUND(ST_Y(location), -1) + , x = ROUND(ST_X(location), -1) +| SORT abbrev DESC +| LIMIT 3 +; + +abbrev:keyword | scalerank:integer | location:geo_point | x:double | min_sl:integer + ZRH | 3 | POINT(8.56221279534765 47.4523895064915) | 10 | 2 + ZNZ | 4 | POINT (39.2223319841558 -6.21857034620282) | 40 | 2 + ZLO | 7 | POINT (-104.560095200097 19.1480860285854) | -100 | 2 +; + +byTwoCalculatedSecondOverwritesReferencingFirst +required_capability: inlinestats_v2 + +FROM airports +| WHERE abbrev IS NOT NULL +| KEEP abbrev, scalerank, location +| EVAL x = ST_X(location) +| INLINESTATS min_sl=MIN(scalerank) + BY x = ROUND(x, -1) + , x = ROUND(x, -1) +| SORT abbrev DESC +| LIMIT 3 +; + +abbrev:keyword | scalerank:integer | location:geo_point | x:double | min_sl:integer + ZRH | 3 | POINT(8.56221279534765 47.4523895064915) | 10 | 2 + ZNZ | 4 | POINT (39.2223319841558 -6.21857034620282) | 40 | 2 + ZLO | 7 | POINT (-104.560095200097 19.1480860285854) | -100 | 2 +; + + +groupShadowsAgg +required_capability: inlinestats_v2 + +FROM airports +| WHERE abbrev IS NOT NULL +| KEEP abbrev, scalerank, location +| INLINESTATS min_sl=MIN(scalerank) + , lat_10 = ROUND(ST_Y(location), -1) + BY lat_10 = ROUND(ST_Y(location), -1) + , lon_10 = ROUND(ST_X(location), -1) +| SORT abbrev DESC +| LIMIT 3 +; + +abbrev:keyword | scalerank:integer | location:geo_point | lat_10:double | lon_10:double | min_sl:integer + ZRH | 3 | POINT(8.56221279534765 47.4523895064915) | 50 | 10 | 2 + ZNZ | 4 | POINT (39.2223319841558 -6.21857034620282) | -10 | 40 | 4 + ZLO | 7 | POINT (-104.560095200097 19.1480860285854) | 20 | -100 | 2 +; + +groupShadowsField +required_capability: inlinestats_v2 + + FROM employees +| KEEP emp_no, salary, hire_date +| INLINESTATS avg_salary = AVG(salary) + BY hire_date = DATE_TRUNC(1 year, hire_date) +| WHERE salary > avg_salary +| SORT emp_no ASC +| LIMIT 4 +; + +emp_no:integer | salary:integer | hire_date:datetime | avg_salary:double + 10001 | 57305 | 1986-01-01T00:00:00Z | 43869.63636363636 + 10002 | 56371 | 1985-01-01T00:00:00Z | 51831.818181818184 + 10003 | 61805 | 1986-01-01T00:00:00Z | 43869.63636363636 + 10005 | 63528 | 1989-01-01T00:00:00Z | 53487.07692307692 +; diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec index fc607edf4d212..3be846630d5b8 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec @@ -1618,6 +1618,37 @@ m:i | o:i | l:i | s:i 1 | 39729 | 1 | 39729 ; +byTwoCalculatedSecondOverwrites +FROM employees +| STATS m = MAX(salary) by l = salary + 1, l = languages + 1 +| SORT m +| LIMIT 5 +; + + m:i | l:i +66817 | 6 +73578 | 3 +73717 | 2 +74572 | 5 +74970 | 4 +; + +byTwoCalculatedSecondOverwritesReferencingFirst +FROM employees +| EVAL l = languages +| STATS m = MAX(salary) by l = l + 1, l = l + 1 +| SORT m +| LIMIT 5 +; + + m:i | l:i +66817 | 6 +73578 | 3 +73717 | 2 +74572 | 5 +74970 | 4 +; + nestedAggsOverGroupingWithAliasAndProjection#[skip:-8.13.99,reason:supported in 8.14] FROM employees | STATS e = length(f) + 1, c = count(*) by f = first_name diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/union_types.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/union_types.csv-spec index 6d1d4c7892886..6819727be0131 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/union_types.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/union_types.csv-spec @@ -977,7 +977,25 @@ event_duration:long | _index:keyword | ts:date | ts_str:k ; -inlineStatsUnionGroup +inlineStatsUnionGroup-Ignore +required_capability: union_types +required_capability: inlinestats + +FROM sample_data, sample_data_ts_long +| INLINESTATS count = COUNT(*) + BY @timestamp = SUBSTRING(TO_STRING(@timestamp), 0, 7) +| SORT client_ip ASC, @timestamp ASC +| LIMIT 4 +; + +client_ip:ip | event_duration:long | message:keyword | @timestamp:keyword | count:long + 172.21.0.5 | 1232382 | Disconnected | 1698068 | 1 + 172.21.0.5 | 1232382 | Disconnected | 2023-10 | 7 +172.21.2.113 | 2764889 | Connected to 10.1.0.2 | 1698064 | 1 +172.21.2.113 | 2764889 | Connected to 10.1.0.2 | 2023-10 | 7 +; + +inlineStatsUnionGroupWithEval-Ignore required_capability: union_types required_capability: inlinestats @@ -993,16 +1011,15 @@ client_ip:ip | event_duration:long | message:keyword | @timestamp:keyword 172.21.0.5 | 1232382 | Disconnected | 2023-10 | 7 172.21.2.113 | 2764889 | Connected to 10.1.0.2 | 1698064 | 1 172.21.2.113 | 2764889 | Connected to 10.1.0.2 | 2023-10 | 7 - ; -inlineStatsUnionGroupTogether +inlineStatsUnionGroupTogether-Ignore required_capability: union_types required_capability: inlinestats FROM sample_data, sample_data_ts_long -| EVAL @timestamp = TO_STRING(TO_DATETIME(@timestamp)) -| INLINESTATS count = COUNT(*) BY @timestamp +| INLINESTATS count = COUNT(*) + BY @timestamp = TO_STRING(TO_DATETIME(@timestamp)) | SORT client_ip ASC, @timestamp ASC | LIMIT 4 ; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index b60701fe19365..8d478408e8781 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -57,6 +57,11 @@ public enum Cap { */ INLINESTATS(EsqlPlugin.INLINESTATS_FEATURE_FLAG), + /** + * Support for the expressions in grouping in {@code INLINESTATS} syntax. + */ + INLINESTATS_V2(EsqlPlugin.INLINESTATS_FEATURE_FLAG), + /** * Support for aggregation function {@code TOP}. */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index 3ffb4acbe6455..5b59117ad356b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -454,7 +454,7 @@ private LogicalPlan resolveStats(Stats stats, List childrenOutput) { } groupings = newGroupings; if (changed.get()) { - stats = stats.with(newGroupings, stats.aggregates()); + stats = stats.with(stats.child(), newGroupings, stats.aggregates()); changed.set(false); } } @@ -483,7 +483,7 @@ private LogicalPlan resolveStats(Stats stats, List childrenOutput) { newAggregates.add(agg); } - stats = changed.get() ? stats.with(groupings, newAggregates) : stats; + stats = changed.get() ? stats.with(stats.child(), groupings, newAggregates) : stats; } return (LogicalPlan) stats; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java index e55b090bbb35f..282f46e0de7bb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java @@ -155,6 +155,7 @@ public LogicalPlan optimize(LogicalPlan verified) { if (failures.hasFailures()) { throw new VerificationException(failures); } + optimized.setOptimized(); return optimized; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/OptimizerRules.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/OptimizerRules.java index 4d3134db34a0d..733fe2e8762bb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/OptimizerRules.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/OptimizerRules.java @@ -17,6 +17,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; +import org.elasticsearch.xpack.esql.plan.logical.InlineStats; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.MvExpand; import org.elasticsearch.xpack.esql.plan.logical.Row; @@ -99,7 +100,8 @@ protected AttributeSet generates(LogicalPlan logicalPlan) { if (logicalPlan instanceof EsRelation || logicalPlan instanceof LocalRelation || logicalPlan instanceof Row - || logicalPlan instanceof Aggregate) { + || logicalPlan instanceof Aggregate + || logicalPlan instanceof InlineStats) { return logicalPlan.outputSet(); } if (logicalPlan instanceof GeneratingPlan generating) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/RemoveStatsOverride.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/RemoveStatsOverride.java index 5592a04e2f813..0f8e0f450e585 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/RemoveStatsOverride.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/RemoveStatsOverride.java @@ -11,26 +11,30 @@ import org.elasticsearch.xpack.esql.analysis.AnalyzerRules; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Expressions; -import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.Stats; import java.util.ArrayList; import java.util.List; /** - * Rule that removes Aggregate overrides in grouping, aggregates and across them inside. - * The overrides appear when the same alias is used multiple times in aggregations and/or groupings: - * STATS x = COUNT(*), x = MIN(a) BY x = b + 1, x = c + 10 + * Removes {@link Stats} overrides in grouping, aggregates and across them inside. + * The overrides appear when the same alias is used multiple times in aggregations + * and/or groupings: + * {@code STATS x = COUNT(*), x = MIN(a) BY x = b + 1, x = c + 10} * becomes - * STATS BY x = c + 10 - * That is the last declaration for a given alias, overrides all the other declarations, with - * groups having priority vs aggregates. + * {@code STATS BY x = c + 10} + * and + * {@code INLINESTATS x = COUNT(*), x = MIN(a) BY x = b + 1, x = c + 10} + * becomes + * {@code INLINESTATS BY x = c + 10} + * This is "last one wins", with groups having priority over aggregates. * Separately, it replaces expressions used as group keys inside the aggregates with references: - * STATS max(a + b + 1) BY a + b + * {@code STATS max(a + b + 1) BY a + b} * becomes - * STATS max($x + 1) BY $x = a + b + * {@code STATS max($x + 1) BY $x = a + b} */ -public final class RemoveStatsOverride extends AnalyzerRules.AnalyzerRule { +public final class RemoveStatsOverride extends AnalyzerRules.AnalyzerRule { @Override protected boolean skipResolved() { @@ -38,19 +42,18 @@ protected boolean skipResolved() { } @Override - protected LogicalPlan rule(Aggregate agg) { - return agg.resolved() ? removeAggDuplicates(agg) : agg; - } - - private static Aggregate removeAggDuplicates(Aggregate agg) { - var groupings = agg.groupings(); - var aggregates = agg.aggregates(); - - groupings = removeDuplicateNames(groupings); - aggregates = removeDuplicateNames(aggregates); - - // replace EsqlAggregate with Aggregate - return new Aggregate(agg.source(), agg.child(), agg.aggregateType(), groupings, aggregates); + protected LogicalPlan rule(LogicalPlan p) { + if (p.resolved() == false) { + return p; + } + if (p instanceof Stats stats) { + return (LogicalPlan) stats.with( + stats.child(), + removeDuplicateNames(stats.groupings()), + removeDuplicateNames(stats.aggregates()) + ); + } + return p; } private static List removeDuplicateNames(List list) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/ReplaceStatsAggExpressionWithEval.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/ReplaceStatsAggExpressionWithEval.java index 1746931f9a63e..ea0a302f7131d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/ReplaceStatsAggExpressionWithEval.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/ReplaceStatsAggExpressionWithEval.java @@ -34,7 +34,7 @@ * becomes * stats a1 = sum(a), a2 = min(b) by x | eval a = a1 + a2 | keep a, x * The rule also considers expressions applied over groups: - * stats a = x + 1 by x becomes stats by x | eval a = x + 1 | keep a, x + * {@code STATS a = x + 1 BY x} becomes {@code STATS BY x | EVAL a = x + 1 | KEEP a, x} * And to combine the two: * stats a = x + count(*) by x * becomes diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/ReplaceStatsNestedExpressionWithEval.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/ReplaceStatsNestedExpressionWithEval.java index 206bd6d3d1c76..02b39f6babef0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/ReplaceStatsNestedExpressionWithEval.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/ReplaceStatsNestedExpressionWithEval.java @@ -15,9 +15,9 @@ import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction; import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction; import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer; -import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.Stats; import java.util.ArrayList; import java.util.HashMap; @@ -25,15 +25,26 @@ import java.util.Map; /** - * Replace nested expressions inside an aggregate with synthetic eval (which end up being projected away by the aggregate). - * stats sum(a + 1) by x % 2 + * Replace nested expressions inside a {@link Stats} with synthetic eval. + * {@code STATS SUM(a + 1) BY x % 2} * becomes - * eval `a + 1` = a + 1, `x % 2` = x % 2 | stats sum(`a+1`_ref) by `x % 2`_ref + * {@code EVAL `a + 1` = a + 1, `x % 2` = x % 2 | STATS SUM(`a+1`_ref) BY `x % 2`_ref} + * and + * {@code INLINESTATS SUM(a + 1) BY x % 2} + * becomes + * {@code EVAL `a + 1` = a + 1, `x % 2` = x % 2 | INLINESTATS SUM(`a+1`_ref) BY `x % 2`_ref} */ -public final class ReplaceStatsNestedExpressionWithEval extends OptimizerRules.OptimizerRule { +public final class ReplaceStatsNestedExpressionWithEval extends OptimizerRules.OptimizerRule { @Override - protected LogicalPlan rule(Aggregate aggregate) { + protected LogicalPlan rule(LogicalPlan p) { + if (p instanceof Stats stats) { + return rule(stats); + } + return p; + } + + private LogicalPlan rule(Stats aggregate) { List evals = new ArrayList<>(); Map evalNames = new HashMap<>(); Map groupingAttributes = new HashMap<>(); @@ -134,10 +145,10 @@ protected LogicalPlan rule(Aggregate aggregate) { var aggregates = aggsChanged.get() ? newAggs : aggregate.aggregates(); var newEval = new Eval(aggregate.source(), aggregate.child(), evals); - aggregate = new Aggregate(aggregate.source(), newEval, aggregate.aggregateType(), groupings, aggregates); + aggregate = aggregate.with(newEval, groupings, aggregates); } - return aggregate; + return (LogicalPlan) aggregate; } static String syntheticName(Expression expression, AggregateFunction af, int counter) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java index 01132425df11f..5b6fe8c0112c6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java @@ -108,8 +108,8 @@ public Aggregate replaceChild(LogicalPlan newChild) { } @Override - public Aggregate with(List newGroupings, List newAggregates) { - return new Aggregate(source(), child(), aggregateType(), newGroupings, newAggregates); + public Aggregate with(LogicalPlan child, List newGroupings, List newAggregates) { + return new Aggregate(source(), child, aggregateType(), newGroupings, newAggregates); } public AggregateType aggregateType() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java index 187b3542e0607..b37976c00ad06 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java @@ -98,8 +98,8 @@ public InlineStats replaceChild(LogicalPlan newChild) { } @Override - public InlineStats with(List newGroupings, List newAggregates) { - return new InlineStats(source(), child(), newGroupings, newAggregates); + public InlineStats with(LogicalPlan child, List newGroupings, List newAggregates) { + return new InlineStats(source(), child, newGroupings, newAggregates); } @Override @@ -121,11 +121,13 @@ public boolean expressionsResolved() { public List output() { if (this.lazyOutput == null) { List addedFields = new ArrayList<>(); - AttributeSet childOutput = child().outputSet(); + AttributeSet set = child().outputSet(); for (NamedExpression agg : aggregates) { - if (childOutput.contains(agg) == false) { + Attribute att = agg.toAttribute(); + if (set.contains(att) == false) { addedFields.add(agg); + set.add(att); } } @@ -207,7 +209,7 @@ private LogicalPlan groupedNextPhase(List schema, List firstPha if (g instanceof Attribute a) { groupingAttributes.add(a); } else { - throw new UnsupportedOperationException("INLINESTATS doesn't support expressions in grouping position yet"); + throw new IllegalStateException("optimized plans should only have attributes in groups, but got [" + g + "]"); } } List leftFields = new ArrayList<>(groupingAttributes.size()); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Phased.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Phased.java index ba0f97cdfa30b..6923f9e137eab 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Phased.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Phased.java @@ -91,8 +91,8 @@ public interface Phased { * Or {@code null} if there aren't any {@linkplain Phased} operations. */ static LogicalPlan extractFirstPhase(LogicalPlan plan) { - if (false == plan.analyzed()) { - throw new IllegalArgumentException("plan must be analyzed"); + if (false == plan.optimized()) { + throw new IllegalArgumentException("plan must be optimized"); } var firstPhase = new Holder(); plan.forEachUp(t -> { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Stats.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Stats.java index 35d5229d4e52f..c46c735e7482e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Stats.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Stats.java @@ -9,6 +9,7 @@ import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; +import org.elasticsearch.xpack.esql.core.tree.Source; import java.util.List; @@ -16,10 +17,25 @@ * STATS-like operations. Like {@link Aggregate} and {@link InlineStats}. */ public interface Stats { + /** + * The user supplied text in the query for this command. + */ + Source source(); + /** * Rebuild this plan with new groupings and new aggregates. */ - Stats with(List newGroupings, List newAggregates); + Stats with(LogicalPlan child, List newGroupings, List newAggregates); + + /** + * Have all the expressions in this plan been resolved? + */ + boolean expressionsResolved(); + + /** + * The operation directly before this one in the plan. + */ + LogicalPlan child(); /** * List containing both the aggregate expressions and grouping expressions. diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index a6bc7befccc80..25d155ccfde07 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -125,7 +125,9 @@ public void execute( LOGGER.debug("ESQL query:\n{}", request.query()); analyzedPlan( parse(request.query(), request.params()), - listener.delegateFailureAndWrap((next, analyzedPlan) -> executeAnalyzedPlan(request, runPhase, analyzedPlan, next)) + listener.delegateFailureAndWrap( + (next, analyzedPlan) -> executeOptimizedPlan(request, runPhase, optimizedPlan(analyzedPlan), next) + ) ); } @@ -133,17 +135,17 @@ public void execute( * Execute an analyzed plan. Most code should prefer calling {@link #execute} but * this is public for testing. See {@link Phased} for the sequence of operations. */ - public void executeAnalyzedPlan( + public void executeOptimizedPlan( EsqlQueryRequest request, BiConsumer> runPhase, - LogicalPlan analyzedPlan, + LogicalPlan optimizedPlan, ActionListener listener ) { - LogicalPlan firstPhase = Phased.extractFirstPhase(analyzedPlan); + LogicalPlan firstPhase = Phased.extractFirstPhase(optimizedPlan); if (firstPhase == null) { - runPhase.accept(logicalPlanToPhysicalPlan(analyzedPlan, request), listener); + runPhase.accept(logicalPlanToPhysicalPlan(optimizedPlan, request), listener); } else { - executePhased(new ArrayList<>(), analyzedPlan, request, firstPhase, runPhase, listener); + executePhased(new ArrayList<>(), optimizedPlan, request, firstPhase, runPhase, listener); } } @@ -155,11 +157,11 @@ private void executePhased( BiConsumer> runPhase, ActionListener listener ) { - PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(firstPhase, request); + PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan(firstPhase), request); runPhase.accept(physicalPlan, listener.delegateFailureAndWrap((next, result) -> { try { profileAccumulator.addAll(result.profiles()); - LogicalPlan newMainPlan = Phased.applyResultsFromFirstPhase(mainPlan, physicalPlan.output(), result.pages()); + LogicalPlan newMainPlan = optimizedPlan(Phased.applyResultsFromFirstPhase(mainPlan, physicalPlan.output(), result.pages())); LogicalPlan newFirstPhase = Phased.extractFirstPhase(newMainPlan); if (newFirstPhase == null) { PhysicalPlan finalPhysicalPlan = logicalPlanToPhysicalPlan(newMainPlan, request); @@ -235,7 +237,7 @@ private void preAnalyze(LogicalPlan parsed, BiFunction void preAnalyzeIndices(LogicalPlan parsed, ActionListener listener, Set enrichPolicyMatchFields) { + private void preAnalyzeIndices(LogicalPlan parsed, ActionListener listener, Set enrichPolicyMatchFields) { PreAnalyzer.PreAnalysis preAnalysis = new PreAnalyzer().preAnalyze(parsed); // TODO we plan to support joins in the future when possible, but for now we'll just fail early if we see one if (preAnalysis.indices.size() > 1) { @@ -352,8 +354,8 @@ private static Set subfields(Set names) { return names.stream().filter(name -> name.endsWith(WILDCARD) == false).map(name -> name + ".*").collect(Collectors.toSet()); } - private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan logicalPlan, EsqlQueryRequest request) { - PhysicalPlan physicalPlan = optimizedPhysicalPlan(logicalPlan); + private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQueryRequest request) { + PhysicalPlan physicalPlan = optimizedPhysicalPlan(optimizedPlan); physicalPlan = physicalPlan.transformUp(FragmentExec.class, f -> { QueryBuilder filter = request.filter(); if (filter != null) { @@ -371,20 +373,25 @@ private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan logicalPlan, EsqlQuer } public LogicalPlan optimizedPlan(LogicalPlan logicalPlan) { - assert logicalPlan.analyzed(); + if (logicalPlan.analyzed() == false) { + throw new IllegalStateException("Expected analyzed plan"); + } var plan = logicalPlanOptimizer.optimize(logicalPlan); LOGGER.debug("Optimized logicalPlan plan:\n{}", plan); return plan; } - public PhysicalPlan physicalPlan(LogicalPlan logicalPlan) { - var plan = mapper.map(optimizedPlan(logicalPlan)); + public PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) { + if (optimizedPlan.optimized() == false) { + throw new IllegalStateException("Expected optimized plan"); + } + var plan = mapper.map(optimizedPlan); LOGGER.debug("Physical plan:\n{}", plan); return plan; } - public PhysicalPlan optimizedPhysicalPlan(LogicalPlan logicalPlan) { - var plan = physicalPlanOptimizer.optimize(physicalPlan(logicalPlan)); + public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) { + var plan = physicalPlanOptimizer.optimize(physicalPlan(optimizedPlan)); LOGGER.debug("Optimized physical plan:\n{}", plan); return plan; } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index 76e0466af4da0..f30db1bf9bba2 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -415,10 +415,10 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception { PlainActionFuture listener = new PlainActionFuture<>(); - session.executeAnalyzedPlan( + session.executeOptimizedPlan( new EsqlQueryRequest(), runPhase(bigArrays, physicalOperationProviders), - analyzed, + session.optimizedPlan(analyzed), listener.delegateFailureAndWrap( // Wrap so we can capture the warnings in the calling thread (next, result) -> next.onResponse( diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index a294f33ece5c3..74f95e3defbd3 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -127,6 +127,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.Grok; +import org.elasticsearch.xpack.esql.plan.logical.InlineStats; import org.elasticsearch.xpack.esql.plan.logical.Limit; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.MvExpand; @@ -4542,6 +4543,31 @@ public void testReplaceSortByExpressionsWithStats() { as(aggregate.child(), EsRelation.class); } + /** + * Expects + * Limit[1000[INTEGER]] + * \_InlineStats[[emp_no % 2{r}#6],[COUNT(salary{f}#12) AS c, emp_no % 2{r}#6]] + * \_Eval[[emp_no{f}#7 % 2[INTEGER] AS emp_no % 2]] + * \_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..] + */ + public void testInlinestatsNestedExpressionsInGroups() { + var plan = optimizedPlan(""" + FROM test + | INLINESTATS c = COUNT(salary) by emp_no % 2 + """); + + var limit = as(plan, Limit.class); + var agg = as(limit.child(), InlineStats.class); + var groupings = agg.groupings(); + var aggs = agg.aggregates(); + var ref = as(groupings.get(0), ReferenceAttribute.class); + assertThat(aggs.get(1), is(ref)); + var eval = as(agg.child(), Eval.class); + assertThat(eval.fields(), hasSize(1)); + assertThat(eval.fields().get(0).toAttribute(), is(ref)); + assertThat(eval.fields().get(0).name(), is("emp_no % 2")); + } + /** * Expects * diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/PhasedTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/PhasedTests.java index 9a0f1ba3efe1d..5e45de6c77c42 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/PhasedTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/PhasedTests.java @@ -31,14 +31,14 @@ public class PhasedTests extends ESTestCase { public void testZeroLayers() { EsRelation relation = new EsRelation(Source.synthetic("relation"), new EsIndex("foo", Map.of()), IndexMode.STANDARD, false); - relation.setAnalyzed(); + relation.setOptimized(); assertThat(Phased.extractFirstPhase(relation), nullValue()); } public void testOneLayer() { EsRelation relation = new EsRelation(Source.synthetic("relation"), new EsIndex("foo", Map.of()), IndexMode.STANDARD, false); LogicalPlan orig = new Dummy(Source.synthetic("orig"), relation); - orig.setAnalyzed(); + orig.setOptimized(); assertThat(Phased.extractFirstPhase(orig), sameInstance(relation)); LogicalPlan finalPhase = Phased.applyResultsFromFirstPhase( orig, @@ -49,6 +49,7 @@ public void testOneLayer() { finalPhase, equalTo(new Row(orig.source(), List.of(new Alias(orig.source(), "foo", new Literal(orig.source(), "foo", DataType.KEYWORD))))) ); + finalPhase.setOptimized(); assertThat(Phased.extractFirstPhase(finalPhase), nullValue()); } @@ -56,7 +57,7 @@ public void testTwoLayer() { EsRelation relation = new EsRelation(Source.synthetic("relation"), new EsIndex("foo", Map.of()), IndexMode.STANDARD, false); LogicalPlan inner = new Dummy(Source.synthetic("inner"), relation); LogicalPlan orig = new Dummy(Source.synthetic("outer"), inner); - orig.setAnalyzed(); + orig.setOptimized(); assertThat( "extractFirstPhase should call #firstPhase on the earliest child in the plan", Phased.extractFirstPhase(orig), @@ -67,6 +68,7 @@ public void testTwoLayer() { List.of(new ReferenceAttribute(Source.EMPTY, "foo", DataType.KEYWORD)), List.of() ); + secondPhase.setOptimized(); assertThat( "applyResultsFromFirstPhase should call #nextPhase one th earliest child in the plan", secondPhase, @@ -84,6 +86,7 @@ public void testTwoLayer() { List.of(new ReferenceAttribute(Source.EMPTY, "foo", DataType.KEYWORD)), List.of() ); + finalPhase.setOptimized(); assertThat( finalPhase, equalTo(new Row(orig.source(), List.of(new Alias(orig.source(), "foo", new Literal(orig.source(), "foo", DataType.KEYWORD)))))