From cf37634c92bc6652c3bfecd2a4444ccb06e76396 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 7 Aug 2024 15:38:13 -0400 Subject: [PATCH 1/9] ESQL: Support INLINESTATS grouped on expressions This adds support for grouping `INLINESTATS` on an expression: ``` | INLINESTATS MAX(avg_worked_seconds) BY SUBSTRING(last_name, 0, 1) ``` This functions *exactly* as thought you ran did: ``` | EVAL `SUBSTRING(last_name, 0, 1)` = SUBSTRING(last_name, 0, 1) | INLINESTATS MAX(avg_worked_seconds) BY `SUBSTRING(last_name, 0, 1)` ``` The calculated field is retained in the results. --- .../src/main/resources/inlinestats.csv-spec | 64 +++++++++++++++++-- .../xpack/esql/analysis/Analyzer.java | 4 +- .../esql/optimizer/LogicalPlanOptimizer.java | 1 + .../ReplaceStatsAggExpressionWithEval.java | 2 +- .../ReplaceStatsNestedExpressionWithEval.java | 21 ++++-- .../xpack/esql/plan/logical/Aggregate.java | 4 +- .../xpack/esql/plan/logical/InlineStats.java | 4 +- .../xpack/esql/plan/logical/Phased.java | 4 +- .../xpack/esql/plan/logical/Stats.java | 13 +++- .../xpack/esql/session/EsqlSession.java | 39 ++++++----- .../elasticsearch/xpack/esql/CsvTests.java | 4 +- .../xpack/esql/plan/logical/PhasedTests.java | 9 ++- 12 files changed, 127 insertions(+), 42 deletions(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec index e52f1e45cead8..9ffee0605899b 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec @@ -67,11 +67,70 @@ emp_no:integer | avg_worked_seconds:long | gender:keyword | max_avg_worked_secon 10030 | 394597613 | M | 394597613 ; -// TODO allow inline calculation like BY l = SUBSTRING( maxOfLongByCalculatedKeyword required_capability: inlinestats // tag::longest-tenured-by-first[] +FROM employees +| KEEP emp_no, avg_worked_seconds, last_name +| INLINESTATS max_avg_worked_seconds = MAX(avg_worked_seconds) BY SUBSTRING(last_name, 0, 1) +| WHERE max_avg_worked_seconds == avg_worked_seconds +| SORT last_name ASC +| LIMIT 5 +// end::longest-tenured-by-first[] +; + +// tag::longest-tenured-by-first-result[] +emp_no:integer | avg_worked_seconds:long | last_name:keyword | SUBSTRING(last_name, 0, 1):keyword | max_avg_worked_seconds:long + 10065 | 372660279 | Awdeh | A | 372660279 + 10074 | 382397583 | Bernatsky | B | 382397583 + 10044 | 387408356 | Casley | C | 387408356 + 10030 | 394597613 | Demeyer | D | 394597613 + 10087 | 305782871 | Eugenio | E | 305782871 +// end::longest-tenured-by-first-result[] +; + +maxOfLongByCalculatedNamedKeyword +required_capability: inlinestats + +FROM employees +| KEEP emp_no, avg_worked_seconds, last_name +| INLINESTATS max_avg_worked_seconds = MAX(avg_worked_seconds) BY l = SUBSTRING(last_name, 0, 1) +| WHERE max_avg_worked_seconds == avg_worked_seconds +| SORT last_name ASC +| LIMIT 5 +; + +emp_no:integer | avg_worked_seconds:long | last_name:keyword | l:keyword | max_avg_worked_seconds:long + 10065 | 372660279 | Awdeh | A | 372660279 + 10074 | 382397583 | Bernatsky | B | 382397583 + 10044 | 387408356 | Casley | C | 387408356 + 10030 | 394597613 | Demeyer | D | 394597613 + 10087 | 305782871 | Eugenio | E | 305782871 +; + +maxOfLongByCalculatedDroppedKeyword +required_capability: inlinestats + +FROM employees +| INLINESTATS max_avg_worked_seconds = MAX(avg_worked_seconds) BY l = SUBSTRING(last_name, 0, 1) +| WHERE max_avg_worked_seconds == avg_worked_seconds +| KEEP emp_no, avg_worked_seconds, last_name, max_avg_worked_seconds +| SORT last_name ASC +| LIMIT 5 +; + +emp_no:integer | avg_worked_seconds:long | last_name:keyword | max_avg_worked_seconds:long + 10065 | 372660279 | Awdeh | 372660279 + 10074 | 382397583 | Bernatsky | 382397583 + 10044 | 387408356 | Casley | 387408356 + 10030 | 394597613 | Demeyer | 394597613 + 10087 | 305782871 | Eugenio | 305782871 +; + +maxOfLongByEvaledKeyword +required_capability: inlinestats + FROM employees | EVAL l = SUBSTRING(last_name, 0, 1) | KEEP emp_no, avg_worked_seconds, l @@ -79,17 +138,14 @@ FROM employees | WHERE max_avg_worked_seconds == avg_worked_seconds | SORT l ASC | LIMIT 5 -// end::longest-tenured-by-first[] ; -// tag::longest-tenured-by-first-result[] emp_no:integer | avg_worked_seconds:long | l:keyword | max_avg_worked_seconds:long 10065 | 372660279 | A | 372660279 10074 | 382397583 | B | 382397583 10044 | 387408356 | C | 387408356 10030 | 394597613 | D | 394597613 10087 | 305782871 | E | 305782871 -// end::longest-tenured-by-first-result[] ; maxOfLongByInt diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index abc9bf258caae..5c281c496ea35 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -452,7 +452,7 @@ private LogicalPlan resolveStats(Stats stats, List childrenOutput) { } groupings = newGroupings; if (changed.get()) { - stats = stats.with(newGroupings, stats.aggregates()); + stats = stats.with(stats.child(), newGroupings, stats.aggregates()); changed.set(false); } } @@ -481,7 +481,7 @@ private LogicalPlan resolveStats(Stats stats, List childrenOutput) { newAggregates.add(agg); } - stats = changed.get() ? stats.with(groupings, newAggregates) : stats; + stats = changed.get() ? stats.with(stats.child(), groupings, newAggregates) : stats; } return (LogicalPlan) stats; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java index bad8080c3def4..e1da77556d4f2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java @@ -155,6 +155,7 @@ public LogicalPlan optimize(LogicalPlan verified) { if (failures.hasFailures()) { throw new VerificationException(failures); } + optimized.setOptimized(); return optimized; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/ReplaceStatsAggExpressionWithEval.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/ReplaceStatsAggExpressionWithEval.java index 6cd61d03f1ffa..77a66d2959df5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/ReplaceStatsAggExpressionWithEval.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/ReplaceStatsAggExpressionWithEval.java @@ -34,7 +34,7 @@ * becomes * stats a1 = sum(a), a2 = min(b) by x | eval a = a1 + a2 | keep a, x * The rule also considers expressions applied over groups: - * stats a = x + 1 by x becomes stats by x | eval a = x + 1 | keep a, x + * {@code STATS a = x + 1 BY x} becomes {@code STATS BY x | EVAL a = x + 1 | KEEP a, x} * And to combine the two: * stats a = x + count(*) by x * becomes diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/ReplaceStatsNestedExpressionWithEval.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/ReplaceStatsNestedExpressionWithEval.java index 5bae99e1204f3..e57d960c9e718 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/ReplaceStatsNestedExpressionWithEval.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/ReplaceStatsNestedExpressionWithEval.java @@ -15,9 +15,9 @@ import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction; import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction; import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer; -import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.Stats; import java.util.ArrayList; import java.util.HashMap; @@ -26,14 +26,21 @@ /** * Replace nested expressions inside an aggregate with synthetic eval (which end up being projected away by the aggregate). - * stats sum(a + 1) by x % 2 + * {@code STAT SUM(a + 1) BY x % 2} * becomes - * eval `a + 1` = a + 1, `x % 2` = x % 2 | stats sum(`a+1`_ref) by `x % 2`_ref + * {@code EVAL `a + 1` = a + 1, `x % 2` = x % 2 | STATS SUM(`a+1`_ref) BY `x % 2`_ref} */ -public final class ReplaceStatsNestedExpressionWithEval extends OptimizerRules.OptimizerRule { +public final class ReplaceStatsNestedExpressionWithEval extends OptimizerRules.OptimizerRule { @Override - protected LogicalPlan rule(Aggregate aggregate) { + protected LogicalPlan rule(LogicalPlan aggregate) { + if (aggregate instanceof Stats stats) { + return rule(stats); + } + return aggregate; + } + + private LogicalPlan rule(Stats aggregate) { List evals = new ArrayList<>(); Map evalNames = new HashMap<>(); Map groupingAttributes = new HashMap<>(); @@ -134,10 +141,10 @@ protected LogicalPlan rule(Aggregate aggregate) { var aggregates = aggsChanged.get() ? newAggs : aggregate.aggregates(); var newEval = new Eval(aggregate.source(), aggregate.child(), evals); - aggregate = new Aggregate(aggregate.source(), newEval, aggregate.aggregateType(), groupings, aggregates); + aggregate = aggregate.with(newEval, groupings, aggregates); } - return aggregate; + return (LogicalPlan) aggregate; } static String syntheticName(Expression expression, AggregateFunction af, int counter) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java index 01132425df11f..5b6fe8c0112c6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java @@ -108,8 +108,8 @@ public Aggregate replaceChild(LogicalPlan newChild) { } @Override - public Aggregate with(List newGroupings, List newAggregates) { - return new Aggregate(source(), child(), aggregateType(), newGroupings, newAggregates); + public Aggregate with(LogicalPlan child, List newGroupings, List newAggregates) { + return new Aggregate(source(), child, aggregateType(), newGroupings, newAggregates); } public AggregateType aggregateType() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java index 0ad43f7029de0..0d4c8f6440717 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java @@ -98,8 +98,8 @@ public InlineStats replaceChild(LogicalPlan newChild) { } @Override - public InlineStats with(List newGroupings, List newAggregates) { - return new InlineStats(source(), child(), newGroupings, newAggregates); + public InlineStats with(LogicalPlan child, List newGroupings, List newAggregates) { + return new InlineStats(source(), child, newGroupings, newAggregates); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Phased.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Phased.java index ba0f97cdfa30b..6923f9e137eab 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Phased.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Phased.java @@ -91,8 +91,8 @@ public interface Phased { * Or {@code null} if there aren't any {@linkplain Phased} operations. */ static LogicalPlan extractFirstPhase(LogicalPlan plan) { - if (false == plan.analyzed()) { - throw new IllegalArgumentException("plan must be analyzed"); + if (false == plan.optimized()) { + throw new IllegalArgumentException("plan must be optimized"); } var firstPhase = new Holder(); plan.forEachUp(t -> { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Stats.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Stats.java index 1dde8e9e95990..c46c735e7482e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Stats.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Stats.java @@ -9,6 +9,7 @@ import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; +import org.elasticsearch.xpack.esql.core.tree.Source; import java.util.List; @@ -16,16 +17,26 @@ * STATS-like operations. Like {@link Aggregate} and {@link InlineStats}. */ public interface Stats { + /** + * The user supplied text in the query for this command. + */ + Source source(); + /** * Rebuild this plan with new groupings and new aggregates. */ - Stats with(List newGroupings, List newAggregates); + Stats with(LogicalPlan child, List newGroupings, List newAggregates); /** * Have all the expressions in this plan been resolved? */ boolean expressionsResolved(); + /** + * The operation directly before this one in the plan. + */ + LogicalPlan child(); + /** * List containing both the aggregate expressions and grouping expressions. */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index fc84a833923ba..ab772ec2a1a5b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -125,7 +125,9 @@ public void execute( LOGGER.debug("ESQL query:\n{}", request.query()); analyzedPlan( parse(request.query(), request.params()), - listener.delegateFailureAndWrap((next, analyzedPlan) -> executeAnalyzedPlan(request, runPhase, analyzedPlan, next)) + listener.delegateFailureAndWrap( + (next, analyzedPlan) -> executeOptimizedPlan(request, runPhase, optimizedPlan(analyzedPlan), next) + ) ); } @@ -133,17 +135,17 @@ public void execute( * Execute an analyzed plan. Most code should prefer calling {@link #execute} but * this is public for testing. See {@link Phased} for the sequence of operations. */ - public void executeAnalyzedPlan( + public void executeOptimizedPlan( EsqlQueryRequest request, BiConsumer> runPhase, - LogicalPlan analyzedPlan, + LogicalPlan optimizedPlan, ActionListener listener ) { - LogicalPlan firstPhase = Phased.extractFirstPhase(analyzedPlan); + LogicalPlan firstPhase = Phased.extractFirstPhase(optimizedPlan); if (firstPhase == null) { - runPhase.accept(logicalPlanToPhysicalPlan(analyzedPlan, request), listener); + runPhase.accept(logicalPlanToPhysicalPlan(optimizedPlan, request), listener); } else { - executePhased(new ArrayList<>(), analyzedPlan, request, firstPhase, runPhase, listener); + executePhased(new ArrayList<>(), optimizedPlan, request, firstPhase, runPhase, listener); } } @@ -155,11 +157,11 @@ private void executePhased( BiConsumer> runPhase, ActionListener listener ) { - PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(firstPhase, request); + PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan(firstPhase), request); runPhase.accept(physicalPlan, listener.delegateFailureAndWrap((next, result) -> { try { profileAccumulator.addAll(result.profiles()); - LogicalPlan newMainPlan = Phased.applyResultsFromFirstPhase(mainPlan, physicalPlan.output(), result.pages()); + LogicalPlan newMainPlan = optimizedPlan(Phased.applyResultsFromFirstPhase(mainPlan, physicalPlan.output(), result.pages())); LogicalPlan newFirstPhase = Phased.extractFirstPhase(newMainPlan); if (newFirstPhase == null) { PhysicalPlan finalPhysicalPlan = logicalPlanToPhysicalPlan(newMainPlan, request); @@ -235,7 +237,7 @@ private void preAnalyze(LogicalPlan parsed, BiFunction void preAnalyzeIndices(LogicalPlan parsed, ActionListener listener, Set enrichPolicyMatchFields) { + private void preAnalyzeIndices(LogicalPlan parsed, ActionListener listener, Set enrichPolicyMatchFields) { PreAnalyzer.PreAnalysis preAnalysis = new PreAnalyzer().preAnalyze(parsed); // TODO we plan to support joins in the future when possible, but for now we'll just fail early if we see one if (preAnalysis.indices.size() > 1) { @@ -352,8 +354,8 @@ private static Set subfields(Set names) { return names.stream().filter(name -> name.endsWith(WILDCARD) == false).map(name -> name + ".*").collect(Collectors.toSet()); } - private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan logicalPlan, EsqlQueryRequest request) { - PhysicalPlan physicalPlan = optimizedPhysicalPlan(logicalPlan); + private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQueryRequest request) { + PhysicalPlan physicalPlan = optimizedPhysicalPlan(optimizedPlan); physicalPlan = physicalPlan.transformUp(FragmentExec.class, f -> { QueryBuilder filter = request.filter(); if (filter != null) { @@ -371,20 +373,25 @@ private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan logicalPlan, EsqlQuer } public LogicalPlan optimizedPlan(LogicalPlan logicalPlan) { - assert logicalPlan.analyzed(); + if (logicalPlan.analyzed() == false) { + throw new IllegalStateException("Expected analyzed plan"); + } var plan = logicalPlanOptimizer.optimize(logicalPlan); LOGGER.debug("Optimized logicalPlan plan:\n{}", plan); return plan; } - public PhysicalPlan physicalPlan(LogicalPlan logicalPlan) { - var plan = mapper.map(optimizedPlan(logicalPlan)); + public PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) { + if (optimizedPlan.optimized() == false) { + throw new IllegalStateException("Expected optimized plan"); + } + var plan = mapper.map(optimizedPlan); LOGGER.debug("Physical plan:\n{}", plan); return plan; } - public PhysicalPlan optimizedPhysicalPlan(LogicalPlan logicalPlan) { - var plan = physicalPlanOptimizer.optimize(physicalPlan(logicalPlan)); + public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) { + var plan = physicalPlanOptimizer.optimize(physicalPlan(optimizedPlan)); LOGGER.debug("Optimized physical plan:\n{}", plan); return plan; } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index c363af3d6cc17..9b4293ac5aeb0 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -417,10 +417,10 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception { PlainActionFuture listener = new PlainActionFuture<>(); - session.executeAnalyzedPlan( + session.executeOptimizedPlan( new EsqlQueryRequest(), runPhase(bigArrays, physicalOperationProviders), - analyzed, + session.optimizedPlan(analyzed), listener.delegateFailureAndWrap( // Wrap so we can capture the warnings in the calling thread (next, result) -> next.onResponse( diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/PhasedTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/PhasedTests.java index 9a0f1ba3efe1d..5e45de6c77c42 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/PhasedTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/PhasedTests.java @@ -31,14 +31,14 @@ public class PhasedTests extends ESTestCase { public void testZeroLayers() { EsRelation relation = new EsRelation(Source.synthetic("relation"), new EsIndex("foo", Map.of()), IndexMode.STANDARD, false); - relation.setAnalyzed(); + relation.setOptimized(); assertThat(Phased.extractFirstPhase(relation), nullValue()); } public void testOneLayer() { EsRelation relation = new EsRelation(Source.synthetic("relation"), new EsIndex("foo", Map.of()), IndexMode.STANDARD, false); LogicalPlan orig = new Dummy(Source.synthetic("orig"), relation); - orig.setAnalyzed(); + orig.setOptimized(); assertThat(Phased.extractFirstPhase(orig), sameInstance(relation)); LogicalPlan finalPhase = Phased.applyResultsFromFirstPhase( orig, @@ -49,6 +49,7 @@ public void testOneLayer() { finalPhase, equalTo(new Row(orig.source(), List.of(new Alias(orig.source(), "foo", new Literal(orig.source(), "foo", DataType.KEYWORD))))) ); + finalPhase.setOptimized(); assertThat(Phased.extractFirstPhase(finalPhase), nullValue()); } @@ -56,7 +57,7 @@ public void testTwoLayer() { EsRelation relation = new EsRelation(Source.synthetic("relation"), new EsIndex("foo", Map.of()), IndexMode.STANDARD, false); LogicalPlan inner = new Dummy(Source.synthetic("inner"), relation); LogicalPlan orig = new Dummy(Source.synthetic("outer"), inner); - orig.setAnalyzed(); + orig.setOptimized(); assertThat( "extractFirstPhase should call #firstPhase on the earliest child in the plan", Phased.extractFirstPhase(orig), @@ -67,6 +68,7 @@ public void testTwoLayer() { List.of(new ReferenceAttribute(Source.EMPTY, "foo", DataType.KEYWORD)), List.of() ); + secondPhase.setOptimized(); assertThat( "applyResultsFromFirstPhase should call #nextPhase one th earliest child in the plan", secondPhase, @@ -84,6 +86,7 @@ public void testTwoLayer() { List.of(new ReferenceAttribute(Source.EMPTY, "foo", DataType.KEYWORD)), List.of() ); + finalPhase.setOptimized(); assertThat( finalPhase, equalTo(new Row(orig.source(), List.of(new Alias(orig.source(), "foo", new Literal(orig.source(), "foo", DataType.KEYWORD))))) From 2132d9b1cfed07ed1a7fee60742156cd9557e197 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 7 Aug 2024 16:06:47 -0400 Subject: [PATCH 2/9] Test --- .../optimizer/LogicalPlanOptimizerTests.java | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index be5d72e622512..f41097ed9db87 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -127,6 +127,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.Grok; +import org.elasticsearch.xpack.esql.plan.logical.InlineStats; import org.elasticsearch.xpack.esql.plan.logical.Limit; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.MvExpand; @@ -4499,6 +4500,31 @@ public void testReplaceSortByExpressionsWithStats() { as(aggregate.child(), EsRelation.class); } + /** + * Expects + * Limit[1000[INTEGER]] + * \_InlineStats[[emp_no % 2{r}#6],[COUNT(salary{f}#12) AS c, emp_no % 2{r}#6]] + * \_Eval[[emp_no{f}#7 % 2[INTEGER] AS emp_no % 2]] + * \_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..] + */ + public void testInlinestatsNestedExpressionsInGroups() { + var plan = optimizedPlan(""" + FROM test + | INLINESTATS c = COUNT(salary) by emp_no % 2 + """); + + var limit = as(plan, Limit.class); + var agg = as(limit.child(), InlineStats.class); + var groupings = agg.groupings(); + var aggs = agg.aggregates(); + var ref = as(groupings.get(0), ReferenceAttribute.class); + assertThat(aggs.get(1), is(ref)); + var eval = as(agg.child(), Eval.class); + assertThat(eval.fields(), hasSize(1)); + assertThat(eval.fields().get(0).toAttribute(), is(ref)); + assertThat(eval.fields().get(0).name(), is("emp_no % 2")); + } + /** * Expects * From f011f849b255952e646f6f3a6b7eb66ba93562fb Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 7 Aug 2024 16:08:08 -0400 Subject: [PATCH 3/9] Update docs/changelog/111690.yaml --- docs/changelog/111690.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/111690.yaml diff --git a/docs/changelog/111690.yaml b/docs/changelog/111690.yaml new file mode 100644 index 0000000000000..36e715744ad88 --- /dev/null +++ b/docs/changelog/111690.yaml @@ -0,0 +1,5 @@ +pr: 111690 +summary: "ESQL: Support INLINESTATS grouped on expressions" +area: ES|QL +type: enhancement +issues: [] From c49cacb5d07613a83c96862eabec9c030db7b009 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 7 Aug 2024 16:15:42 -0400 Subject: [PATCH 4/9] explain --- .../org/elasticsearch/xpack/esql/plan/logical/InlineStats.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java index 0d4c8f6440717..3a62b358729f5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java @@ -207,7 +207,7 @@ private LogicalPlan groupedNextPhase(List schema, List firstPha if (g instanceof Attribute a) { groupingAttributes.add(a); } else { - throw new UnsupportedOperationException("INLINESTATS doesn't support expressions in grouping position yet"); + throw new IllegalStateException("optimized plans should only have attributes in groups, but got [" + g + "]"); } } List leftFields = new ArrayList<>(groupingAttributes.size()); From d2f946230876fd0d5607804f2fbfcac5611e64ba Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 9 Aug 2024 07:58:50 -0400 Subject: [PATCH 5/9] More tests --- .../src/main/resources/inlinestats.csv-spec | 104 +++++++++++++++++- .../src/main/resources/stats.csv-spec | 31 ++++++ .../src/main/resources/union_types.csv-spec | 23 +++- .../xpack/esql/action/EsqlCapabilities.java | 5 + .../xpack/esql/analysis/Analyzer.java | 17 ++- .../xpack/esql/optimizer/OptimizerRules.java | 4 +- .../optimizer/rules/RemoveStatsOverride.java | 49 +++++---- .../ReplaceStatsNestedExpressionWithEval.java | 14 ++- .../xpack/esql/plan/logical/InlineStats.java | 8 +- 9 files changed, 217 insertions(+), 38 deletions(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec index 9ffee0605899b..3f2e14f74174b 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec @@ -68,7 +68,7 @@ emp_no:integer | avg_worked_seconds:long | gender:keyword | max_avg_worked_secon ; maxOfLongByCalculatedKeyword -required_capability: inlinestats +required_capability: inlinestats_v2 // tag::longest-tenured-by-first[] FROM employees @@ -91,7 +91,7 @@ emp_no:integer | avg_worked_seconds:long | last_name:keyword | SUBSTRING(last_na ; maxOfLongByCalculatedNamedKeyword -required_capability: inlinestats +required_capability: inlinestats_v2 FROM employees | KEEP emp_no, avg_worked_seconds, last_name @@ -110,7 +110,7 @@ emp_no:integer | avg_worked_seconds:long | last_name:keyword | l:keyword | max_a ; maxOfLongByCalculatedDroppedKeyword -required_capability: inlinestats +required_capability: inlinestats_v2 FROM employees | INLINESTATS max_avg_worked_seconds = MAX(avg_worked_seconds) BY l = SUBSTRING(last_name, 0, 1) @@ -555,3 +555,101 @@ emp_no:integer | salary:integer | ninety_fifth_salary:double 10029 | 74999 | 73584.95 10045 | 74970 | 73584.95 ; + +byTwoCalculated +required_capability: inlinestats_v2 + +FROM airports +| WHERE abbrev IS NOT NULL +| KEEP abbrev, scalerank, location +| INLINESTATS min_sl=MIN(scalerank) + BY lat_10 = ROUND(ST_Y(location), -1) + , lon_10 = ROUND(ST_X(location), -1) +| SORT abbrev DESC +| LIMIT 3 +; + +abbrev:keyword | scalerank:integer | location:geo_point | lat_10:double | lon_10:double | min_sl:integer + ZRH | 3 | POINT(8.56221279534765 47.4523895064915) | 50 | 10 | 2 + ZNZ | 4 | POINT (39.2223319841558 -6.21857034620282) | -10 | 40 | 4 + ZLO | 7 | POINT (-104.560095200097 19.1480860285854) | 20 | -100 | 2 +; + +byTwoCalculatedSecondOverwrites +required_capability: inlinestats_v2 + +FROM airports +| WHERE abbrev IS NOT NULL +| KEEP abbrev, scalerank, location +| INLINESTATS min_sl=MIN(scalerank) + BY x = ROUND(ST_Y(location), -1) + , x = ROUND(ST_X(location), -1) +| SORT abbrev DESC +| LIMIT 3 +; + +abbrev:keyword | scalerank:integer | location:geo_point | x:double | min_sl:integer + ZRH | 3 | POINT(8.56221279534765 47.4523895064915) | 10 | 2 + ZNZ | 4 | POINT (39.2223319841558 -6.21857034620282) | 40 | 2 + ZLO | 7 | POINT (-104.560095200097 19.1480860285854) | -100 | 2 +; + +byTwoCalculatedSecondOverwritesReferencingFirst +required_capability: inlinestats_v2 + +FROM airports +| WHERE abbrev IS NOT NULL +| KEEP abbrev, scalerank, location +| EVAL x = ST_X(location) +| INLINESTATS min_sl=MIN(scalerank) + BY x = ROUND(x, -1) + , x = ROUND(x, -1) +| SORT abbrev DESC +| LIMIT 3 +; + +abbrev:keyword | scalerank:integer | location:geo_point | x:double | min_sl:integer + ZRH | 3 | POINT(8.56221279534765 47.4523895064915) | 10 | 2 + ZNZ | 4 | POINT (39.2223319841558 -6.21857034620282) | 40 | 2 + ZLO | 7 | POINT (-104.560095200097 19.1480860285854) | -100 | 2 +; + + +groupShadowsAgg +required_capability: inlinestats_v2 + +FROM airports +| WHERE abbrev IS NOT NULL +| KEEP abbrev, scalerank, location +| INLINESTATS min_sl=MIN(scalerank) + , lat_10 = ROUND(ST_Y(location), -1) + BY lat_10 = ROUND(ST_Y(location), -1) + , lon_10 = ROUND(ST_X(location), -1) +| SORT abbrev DESC +| LIMIT 3 +; + +abbrev:keyword | scalerank:integer | location:geo_point | lat_10:double | lon_10:double | min_sl:integer + ZRH | 3 | POINT(8.56221279534765 47.4523895064915) | 50 | 10 | 2 + ZNZ | 4 | POINT (39.2223319841558 -6.21857034620282) | -10 | 40 | 4 + ZLO | 7 | POINT (-104.560095200097 19.1480860285854) | 20 | -100 | 2 +; + +groupShadowsField +required_capability: inlinestats_v2 + + FROM employees +| KEEP emp_no, salary, hire_date +| INLINESTATS avg_salary = AVG(salary) + BY hire_date = DATE_TRUNC(1 year, hire_date) +| WHERE salary > avg_salary +| SORT emp_no ASC +| LIMIT 4 +; + +emp_no:integer | salary:integer | hire_date:datetime | avg_salary:double + 10001 | 57305 | 1986-01-01T00:00:00Z | 43869.63636363636 + 10002 | 56371 | 1985-01-01T00:00:00Z | 51831.818181818184 + 10003 | 61805 | 1986-01-01T00:00:00Z | 43869.63636363636 + 10005 | 63528 | 1989-01-01T00:00:00Z | 53487.07692307692 +; diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec index 7b0dda9eb3669..b978a07ce616e 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec @@ -1458,6 +1458,37 @@ m:i | o:i | l:i | s:i 1 | 39729 | 1 | 39729 ; +byTwoCalculatedSecondOverwrites +FROM employees +| STATS m = MAX(salary) by l = salary + 1, l = languages + 1 +| SORT m +| LIMIT 5 +; + + m:i | l:i +66817 | 6 +73578 | 3 +73717 | 2 +74572 | 5 +74970 | 4 +; + +byTwoCalculatedSecondOverwritesReferencingFirst +FROM employees +| EVAL l = languages +| STATS m = MAX(salary) by l = l + 1, l = l + 1 +| SORT m +| LIMIT 5 +; + + m:i | l:i +66817 | 6 +73578 | 3 +73717 | 2 +74572 | 5 +74970 | 4 +; + nestedAggsOverGroupingWithAliasAndProjection#[skip:-8.13.99,reason:supported in 8.14] FROM employees | STATS e = length(f) + 1, c = count(*) by f = first_name diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/union_types.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/union_types.csv-spec index e1aa411702420..1b92d0e1036b2 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/union_types.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/union_types.csv-spec @@ -981,6 +981,24 @@ inlineStatsUnionGroup required_capability: union_types required_capability: inlinestats +FROM sample_data, sample_data_ts_long +| INLINESTATS count = COUNT(*) + BY @timestamp = SUBSTRING(TO_STRING(@timestamp), 0, 7) +| SORT client_ip ASC, @timestamp ASC +| LIMIT 4 +; + +client_ip:ip | event_duration:long | message:keyword | @timestamp:keyword | count:long + 172.21.0.5 | 1232382 | Disconnected | 1698068 | 1 + 172.21.0.5 | 1232382 | Disconnected | 2023-10 | 7 +172.21.2.113 | 2764889 | Connected to 10.1.0.2 | 1698064 | 1 +172.21.2.113 | 2764889 | Connected to 10.1.0.2 | 2023-10 | 7 +; + +inlineStatsUnionGroupWithEval +required_capability: union_types +required_capability: inlinestats + FROM sample_data, sample_data_ts_long | EVAL @timestamp = SUBSTRING(TO_STRING(@timestamp), 0, 7) | INLINESTATS count = COUNT(*) BY @timestamp @@ -993,7 +1011,6 @@ client_ip:ip | event_duration:long | message:keyword | @timestamp:keyword 172.21.0.5 | 1232382 | Disconnected | 2023-10 | 7 172.21.2.113 | 2764889 | Connected to 10.1.0.2 | 1698064 | 1 172.21.2.113 | 2764889 | Connected to 10.1.0.2 | 2023-10 | 7 - ; inlineStatsUnionGroupTogether @@ -1001,8 +1018,8 @@ required_capability: union_types required_capability: inlinestats FROM sample_data, sample_data_ts_long -| EVAL @timestamp = TO_STRING(TO_DATETIME(@timestamp)) -| INLINESTATS count = COUNT(*) BY @timestamp +| INLINESTATS count = COUNT(*) + BY @timestamp = TO_STRING(TO_DATETIME(@timestamp)) | SORT client_ip ASC, @timestamp ASC | LIMIT 4 ; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 18d9ef8c81783..8d5538e55263d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -51,6 +51,11 @@ public enum Cap { */ INLINESTATS(EsqlPlugin.INLINESTATS_FEATURE_FLAG), + /** + * Support for the expressions in grouping in {@code INLINESTATS} syntax. + */ + INLINESTATS_V2(EsqlPlugin.INLINESTATS_FEATURE_FLAG), + /** * Support for aggregation function {@code TOP}. */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index 5c281c496ea35..934a1aff0f5e9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -67,6 +67,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.logical.Eval; +import org.elasticsearch.xpack.esql.plan.logical.InlineStats; import org.elasticsearch.xpack.esql.plan.logical.Keep; import org.elasticsearch.xpack.esql.plan.logical.Limit; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; @@ -1122,7 +1123,6 @@ private LogicalPlan doRule(LogicalPlan plan) { // In ResolveRefs the aggregates are resolved from the groupings, which might have an unresolved MultiTypeEsField. // Now that we have resolved those, we need to re-resolve the aggregates. if (plan instanceof Aggregate agg) { - // TODO once inlinestats supports expressions in groups we'll likely need the same sort of extraction here // If the union-types resolution occurred in a child of the aggregate, we need to check the groupings plan = agg.transformExpressionsOnly(FieldAttribute.class, UnionTypesCleanup::checkUnresolved); @@ -1136,6 +1136,21 @@ private LogicalPlan doRule(LogicalPlan plan) { } plan = plan.transformExpressionsOnly(UnresolvedAttribute.class, ua -> resolveAttribute(ua, resolved)); } + if (plan instanceof InlineStats stats) { + System.err.println("ADSFADFSAF " + stats); + // If the union-types resolution occurred in a child of the aggregate, we need to check the groupings + plan = stats.transformExpressionsOnly(FieldAttribute.class, UnionTypesCleanup::checkUnresolved); + + // Aggregates where the grouping key comes from a union-type field need to be resolved against the grouping key + Map resolved = new HashMap<>(); + for (Expression e : stats.groupings()) { + Attribute attr = Expressions.attribute(e); + if (attr != null && attr.resolved()) { + resolved.put(attr, e); + } + } + plan = plan.transformExpressionsOnly(UnresolvedAttribute.class, ua -> resolveAttribute(ua, resolved)); + } // And add generated fields to EsRelation, so these new attributes will appear in the OutputExec of the Fragment // and thereby get used in FieldExtractExec diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/OptimizerRules.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/OptimizerRules.java index 4d3134db34a0d..733fe2e8762bb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/OptimizerRules.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/OptimizerRules.java @@ -17,6 +17,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; +import org.elasticsearch.xpack.esql.plan.logical.InlineStats; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.MvExpand; import org.elasticsearch.xpack.esql.plan.logical.Row; @@ -99,7 +100,8 @@ protected AttributeSet generates(LogicalPlan logicalPlan) { if (logicalPlan instanceof EsRelation || logicalPlan instanceof LocalRelation || logicalPlan instanceof Row - || logicalPlan instanceof Aggregate) { + || logicalPlan instanceof Aggregate + || logicalPlan instanceof InlineStats) { return logicalPlan.outputSet(); } if (logicalPlan instanceof GeneratingPlan generating) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/RemoveStatsOverride.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/RemoveStatsOverride.java index 5592a04e2f813..0f8e0f450e585 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/RemoveStatsOverride.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/RemoveStatsOverride.java @@ -11,26 +11,30 @@ import org.elasticsearch.xpack.esql.analysis.AnalyzerRules; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Expressions; -import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.Stats; import java.util.ArrayList; import java.util.List; /** - * Rule that removes Aggregate overrides in grouping, aggregates and across them inside. - * The overrides appear when the same alias is used multiple times in aggregations and/or groupings: - * STATS x = COUNT(*), x = MIN(a) BY x = b + 1, x = c + 10 + * Removes {@link Stats} overrides in grouping, aggregates and across them inside. + * The overrides appear when the same alias is used multiple times in aggregations + * and/or groupings: + * {@code STATS x = COUNT(*), x = MIN(a) BY x = b + 1, x = c + 10} * becomes - * STATS BY x = c + 10 - * That is the last declaration for a given alias, overrides all the other declarations, with - * groups having priority vs aggregates. + * {@code STATS BY x = c + 10} + * and + * {@code INLINESTATS x = COUNT(*), x = MIN(a) BY x = b + 1, x = c + 10} + * becomes + * {@code INLINESTATS BY x = c + 10} + * This is "last one wins", with groups having priority over aggregates. * Separately, it replaces expressions used as group keys inside the aggregates with references: - * STATS max(a + b + 1) BY a + b + * {@code STATS max(a + b + 1) BY a + b} * becomes - * STATS max($x + 1) BY $x = a + b + * {@code STATS max($x + 1) BY $x = a + b} */ -public final class RemoveStatsOverride extends AnalyzerRules.AnalyzerRule { +public final class RemoveStatsOverride extends AnalyzerRules.AnalyzerRule { @Override protected boolean skipResolved() { @@ -38,19 +42,18 @@ protected boolean skipResolved() { } @Override - protected LogicalPlan rule(Aggregate agg) { - return agg.resolved() ? removeAggDuplicates(agg) : agg; - } - - private static Aggregate removeAggDuplicates(Aggregate agg) { - var groupings = agg.groupings(); - var aggregates = agg.aggregates(); - - groupings = removeDuplicateNames(groupings); - aggregates = removeDuplicateNames(aggregates); - - // replace EsqlAggregate with Aggregate - return new Aggregate(agg.source(), agg.child(), agg.aggregateType(), groupings, aggregates); + protected LogicalPlan rule(LogicalPlan p) { + if (p.resolved() == false) { + return p; + } + if (p instanceof Stats stats) { + return (LogicalPlan) stats.with( + stats.child(), + removeDuplicateNames(stats.groupings()), + removeDuplicateNames(stats.aggregates()) + ); + } + return p; } private static List removeDuplicateNames(List list) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/ReplaceStatsNestedExpressionWithEval.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/ReplaceStatsNestedExpressionWithEval.java index e57d960c9e718..0549501862d75 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/ReplaceStatsNestedExpressionWithEval.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/ReplaceStatsNestedExpressionWithEval.java @@ -25,19 +25,23 @@ import java.util.Map; /** - * Replace nested expressions inside an aggregate with synthetic eval (which end up being projected away by the aggregate). - * {@code STAT SUM(a + 1) BY x % 2} + * Replace nested expressions inside a {@link Stats} with synthetic eval. + * {@code STATS SUM(a + 1) BY x % 2} * becomes * {@code EVAL `a + 1` = a + 1, `x % 2` = x % 2 | STATS SUM(`a+1`_ref) BY `x % 2`_ref} + * and + * {@code INLINESTATS SUM(a + 1) BY x % 2} + * becomes + * {@code EVAL `a + 1` = a + 1, `x % 2` = x % 2 | INLINESTATS SUM(`a+1`_ref) BY `x % 2`_ref} */ public final class ReplaceStatsNestedExpressionWithEval extends OptimizerRules.OptimizerRule { @Override - protected LogicalPlan rule(LogicalPlan aggregate) { - if (aggregate instanceof Stats stats) { + protected LogicalPlan rule(LogicalPlan p) { + if (p instanceof Stats stats) { return rule(stats); } - return aggregate; + return p; } private LogicalPlan rule(Stats aggregate) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java index 3a62b358729f5..c22432bd867df 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java @@ -16,6 +16,7 @@ import org.elasticsearch.compute.data.BlockUtils; import org.elasticsearch.compute.data.Page; import org.elasticsearch.core.Releasables; +import org.elasticsearch.logging.LogManager; import org.elasticsearch.xpack.esql.core.capabilities.Resolvables; import org.elasticsearch.xpack.esql.core.expression.Alias; import org.elasticsearch.xpack.esql.core.expression.Attribute; @@ -121,15 +122,18 @@ public boolean expressionsResolved() { public List output() { if (this.lazyOutput == null) { List addedFields = new ArrayList<>(); - AttributeSet childOutput = child().outputSet(); + AttributeSet set = child().outputSet(); for (NamedExpression agg : aggregates) { - if (childOutput.contains(agg) == false) { + Attribute att = agg.toAttribute(); + if (set.contains(att) == false) { addedFields.add(agg); + set.add(att); } } this.lazyOutput = mergeOutputAttributes(addedFields, child().output()); + LogManager.getLogger(InlineStats.class).error("ADF DADF {}", lazyOutput, new Exception()); } return lazyOutput; } From f0d58e6114cba2209acb686c8c4839cfbea55ee7 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 21 Aug 2024 13:16:54 -0400 Subject: [PATCH 6/9] spotless --- .../java/org/elasticsearch/xpack/esql/analysis/Analyzer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index 521b443510e4a..5b59117ad356b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -65,7 +65,6 @@ import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.logical.Eval; -import org.elasticsearch.xpack.esql.plan.logical.InlineStats; import org.elasticsearch.xpack.esql.plan.logical.Keep; import org.elasticsearch.xpack.esql.plan.logical.Limit; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; From 3ba7b016df3d56349f598c5357d2042b90e43f7c Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 21 Aug 2024 13:56:30 -0400 Subject: [PATCH 7/9] Skip tests --- .../qa/testFixtures/src/main/resources/union_types.csv-spec | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/union_types.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/union_types.csv-spec index a305dd9cfdb8e..6819727be0131 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/union_types.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/union_types.csv-spec @@ -977,7 +977,7 @@ event_duration:long | _index:keyword | ts:date | ts_str:k ; -inlineStatsUnionGroup +inlineStatsUnionGroup-Ignore required_capability: union_types required_capability: inlinestats @@ -995,7 +995,7 @@ client_ip:ip | event_duration:long | message:keyword | @timestamp:keyword 172.21.2.113 | 2764889 | Connected to 10.1.0.2 | 2023-10 | 7 ; -inlineStatsUnionGroupWithEval +inlineStatsUnionGroupWithEval-Ignore required_capability: union_types required_capability: inlinestats @@ -1013,7 +1013,7 @@ client_ip:ip | event_duration:long | message:keyword | @timestamp:keyword 172.21.2.113 | 2764889 | Connected to 10.1.0.2 | 2023-10 | 7 ; -inlineStatsUnionGroupTogether +inlineStatsUnionGroupTogether-Ignore required_capability: union_types required_capability: inlinestats From 9973bb0f8b6d4d1f3950d96cb3762e13bc8fa666 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 21 Aug 2024 15:00:54 -0400 Subject: [PATCH 8/9] Come on --- .../org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java | 1 + .../org/elasticsearch/xpack/esql/plan/logical/InlineStats.java | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index d6ab99f0b21ac..3e799730f7269 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -111,6 +111,7 @@ protected void shouldSkipTest(String testName) throws IOException { isEnabled(testName, instructions, Clusters.oldVersion()) ); assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains("inlinestats")); + assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains("inlinestats_v2")); } private TestFeatureService remoteFeaturesService() throws IOException { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java index a3b2fa0d91ea9..38de550d95d05 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java @@ -133,7 +133,6 @@ public List output() { } this.lazyOutput = mergeOutputAttributes(addedFields, child().output()); - LogManager.getLogger(InlineStats.class).error("ADF DADF {}", lazyOutput, new Exception()); } return lazyOutput; } From 91872429b1601d6e04538e468fa3f33c580659c9 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 21 Aug 2024 16:09:50 -0400 Subject: [PATCH 9/9] Spotless --- .../org/elasticsearch/xpack/esql/plan/logical/InlineStats.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java index 38de550d95d05..b37976c00ad06 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java @@ -16,7 +16,6 @@ import org.elasticsearch.compute.data.BlockUtils; import org.elasticsearch.compute.data.Page; import org.elasticsearch.core.Releasables; -import org.elasticsearch.logging.LogManager; import org.elasticsearch.xpack.esql.core.capabilities.Resolvables; import org.elasticsearch.xpack.esql.core.expression.Alias; import org.elasticsearch.xpack.esql.core.expression.Attribute;