Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/changelog/141056.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
area: ES|QL
issues:
- 140757
- 139359
pr: 141056
summary: Remove incorrect inline stats pruning
type: bug
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding here the reproducer from the duplicated PR, as it comes from a different machanism (constant false WHERE in agg, ReplaceStatsFilteredOrNullAggWithEval):

FROM employees
| INLINE STATS x = MAX(salary) WHERE false, c = COUNT(*) BY emp_no
| KEEP x, c
| SORT x, c
| LIMIT 3
;

x:integer | c:long
null      | 1
null      | 1
null      | 1
;

Original file line number Diff line number Diff line change
Expand Up @@ -5313,3 +5313,164 @@ c:long | n:null | emp_no:integer
1 | null | 10002
1 | null | 10003
;


inlineStatsDroppingGroupingField
required_capability: inline_stats_drop_groupings_fix

FROM employees
| KEEP languages, salary
| INLINE STATS avg_sal = ROUND(AVG(salary)) BY languages
| DROP languages
| SORT salary
| LIMIT 10
;

salary:i | avg_sal:d
25324 |41681.0
25945 |41681.0
25976 |50577.0
26436 |52419.0
27215 |47733.0
28035 |50577.0
28336 |52520.0
28941 |52419.0
29175 |48179.0
30404 |52419.0
;

inlineStatsDroppingMultipleCommandsGroupingFields
required_capability: inline_stats_drop_groupings_fix

FROM employees
| KEEP languages, salary, gender, emp_no, *rehired
| INLINE STATS avg_sal_by_gender = ROUND(AVG(salary)) BY gender
| INLINE STATS avg_sal_by_languages = ROUND(AVG(avg_sal_by_gender)) BY languages
| DROP languages, gender
| EVAL languages = emp_no
| INLINE STATS avg_sal_by_emp_no = ROUND(AVG(salary)) BY no = emp_no % 10, rehired = MV_MIN(is_rehired)
| RENAME no AS NO
| DROP NO, languages
| SORT salary
| LIMIT 10
;

salary:i | emp_no:i | is_rehired:boolean | avg_sal_by_gender:d | avg_sal_by_languages:d | avg_sal_by_emp_no:d | rehired:boolean
25324 |10015 |[false, false, false, true]|48761.0 |48779.0 |46801.0 |false
25945 |10035 |false |46861.0 |48779.0 |46801.0 |false
25976 |10092 |[false, false, true, true] |50491.0 |48082.0 |51831.0 |false
26436 |10048 |[true, true] |46861.0 |48142.0 |41230.0 |true
27215 |10057 |null |50491.0 |48177.0 |36905.0 |null
28035 |10084 |false |46861.0 |48082.0 |45508.0 |false
28336 |10026 |[false, true] |46861.0 |47950.0 |51094.0 |false
28941 |10068 |true |46861.0 |48142.0 |41230.0 |true
29175 |10060 |[false, false, false, true]|46861.0 |48116.0 |47571.0 |false
30404 |10042 |null |50491.0 |48142.0 |30404.0 |null
;

inlineStatsGroupingByMVField
required_capability: inline_stats_drop_groupings_fix

FROM employees
| KEEP salary_change, salary
| INLINE STATS avg_salary_change = MV_AVG(salary_change), avg = AVG(salary) BY salary_change
| KEEP avg_salary_change
| SORT avg_salary_change
| LIMIT 3
;

avg_salary_change:d
[-9.81, -1.47, 14.44]
[-9.28, 9.42]
[-9.23, 5.19, 5.85, 7.5]
;

chainedInlineStats_BothGroupingByMVField
required_capability: inline_stats_drop_groupings_fix

FROM employees
| KEEP salary_change, salary
| INLINE STATS avg_salary_change = MV_AVG(salary_change), avg = AVG(salary) BY salary_change
| INLINE STATS sum_avg_salary_change = ROUND(SUM(avg_salary_change), 1), avg = AVG(salary) BY salary_change
| KEEP sum_avg_salary_change
| SORT sum_avg_salary_change DESC NULLS LAST
| LIMIT 3
;

sum_avg_salary_change:d
[27.5, 27.5]
[26.8, 26.8, 26.8]
[26.7, 26.7]
;

chainedInlineStats_OneGroupingByMVField
required_capability: inline_stats_drop_groupings_fix

FROM employees
| KEEP salary_change, salary
| INLINE STATS avg_salary_change = MV_AVG(salary_change), avg = AVG(salary) BY salary_change
| INLINE STATS sum_avg_salary_change = SUM(avg_salary_change), avg = AVG(salary)
| KEEP sum_avg_salary_change
| LIMIT 3
;

sum_avg_salary_change:d
347.99
347.99
347.99
;

inlineStatsGroupByNull_PruneDifferentInlineStats
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test covers both the bug fix here but also it's extended following #140027 merge

required_capability: inline_stats_drop_groupings_fix
required_capability: fix_inline_stats_group_by_null

FROM employees
| INLINE STATS x = AVG(salary), a = COUNT_DISTINCT(languages) BY emp_no = null
| INLINE STATS y = AVG(salary), b = COUNT_DISTINCT(languages) BY emp_no = null
| EVAL x = emp_no
| INLINE STATS z = AVG(salary), c = COUNT_DISTINCT(languages), d = AVG(languages) BY last_name
| KEEP x, a, emp_no
| LIMIT 1
;

x:null | a:l | emp_no:null
null |5 |null
;

doubleInlineStatsPrunning_MVFunctionsAndDroppingGroupings
required_capability: inline_stats_drop_groupings_fix

FROM employees
| KEEP languages, salary
| INLINE STATS languages1 = MV_AVG(languages), avg1 = ROUND(AVG(salary), languages) BY languages
| INLINE STATS languages2 = SUM(languages1), avg2 = ROUND(AVG(salary), languages) BY languages
| DROP languages
| SORT salary
| LIMIT 7
;

salary:i | languages1:d | avg1:d | languages2:d | avg2:d
25324 |5.0 |41680.7619 |105.0 |41680.7619
25945 |5.0 |41680.7619 |105.0 |41680.7619
25976 |1.0 |50576.7 |15.0 |50576.7
26436 |3.0 |52418.882 |51.0 |52418.882
27215 |4.0 |47733.0 |72.0 |47733.0
28035 |1.0 |50576.7 |15.0 |50576.7
28336 |null |null |null |null
;

removeGroupingByKeepingWhereFalse
required_capability: inline_stats_drop_groupings_fix

FROM employees
| INLINE STATS x = MAX(salary) WHERE false, c = COUNT(*) BY emp_no
| KEEP x, c
| SORT x, c
| LIMIT 3
;

x:integer | c:long
null | 1
null | 1
null | 1
;
Original file line number Diff line number Diff line change
Expand Up @@ -2099,6 +2099,10 @@ public enum Cap {
*/
TS_IMPLICIT_TIMESTAMP_SORT,

/**
* Fixes https://github.com/elastic/elasticsearch/issues/139359
*/
INLINE_STATS_DROP_GROUPINGS_FIX(INLINE_STATS.enabled),
// Last capability should still have a comma for fewer merge conflicts when adding new ones :)
// This comment prevents the semicolon from being on the previous capability when Spotless formats the file.
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.UnionAll;
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
Expand All @@ -39,7 +38,6 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputExpressions;
import static org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneEmptyPlans.skipPlan;

/**
Expand Down Expand Up @@ -80,9 +78,9 @@ private static LogicalPlan pruneColumns(LogicalPlan plan, AttributeSet.Builder u
recheck.set(false);
p = switch (p) {
case Aggregate agg -> pruneColumnsInAggregate(agg, used, inlineJoin);
case InlineJoin inj -> pruneColumnsInInlineJoinRight(inj, used, recheck);
case InlineJoin inj -> pruneColumnsInInlineJoin(inj, used, recheck);
case Eval eval -> pruneColumnsInEval(eval, used, recheck);
case Project project -> inlineJoin ? pruneColumnsInProject(project, used, recheck) : p;
case Project project -> pruneColumnsInProject(project, used, recheck);
case EsRelation esr -> pruneColumnsInEsRelation(esr, used);
case Fork fork -> {
forkPresent.set(true);
Expand Down Expand Up @@ -139,32 +137,16 @@ private static LogicalPlan pruneColumnsInAggregate(Aggregate aggregate, Attribut
return p;
}

private static LogicalPlan pruneColumnsInInlineJoinRight(InlineJoin ij, AttributeSet.Builder used, Holder<Boolean> recheck) {
private static LogicalPlan pruneColumnsInInlineJoin(InlineJoin ij, AttributeSet.Builder used, Holder<Boolean> recheck) {
LogicalPlan p = ij;

used.addAll(ij.references());
var right = pruneColumns(ij.right(), used, true);
if (right.output().isEmpty() || isLocalEmptyRelation(right)) {

if (right.outputSet().subtract(ij.references()).isEmpty() || isLocalEmptyRelation(right)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd add a comment: "ij.references() are the join keys - the inline join doesn't add new columns. Since it preserves rows, it doesn't do anything and can be pruned."

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this only happen when we are shadowing variables in the inline stats outside?

Example:

FROM employees
| KEEP emp_no, salary, birth_date
| EVAL orig_emp_no = emp_no
| INLINE STATS emp_no = MAX(birth_date) BY salary
| EVAL emp_no = birth_date

Or are there more cases?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shadowing and DROP'ping. Maybe @astefan is aware of more cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is about all use cases where what inline stats creates (meaning the aggs themselves) are "missing" after the inline stats command. This can be drop, keep [other columns and not the ones created by inline stats], eval, another stats (maybe others).

The other scenario (|| isLocalEmptyRelation(right)) comes from scenarios where the output of the right hand side is completely empty, meaning we don't need anything from what inline stats is using. In your specific query, for example, if I would do drop emp_no, salary then the inline stats itself can be pruned entirely. And I think that PruneEmptyPlans does that by creating a LocalRelation with an EMPTY supplier and the inline stats command itself gets to be pruned here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only subtlety is that even a pruned inline stats affects the column order if there is a BY clause and the grouping does not get dropped:

FROM employees
| KEEP emp_no, languages, salary
| INLINE STATS s = max(salary) BY languages
| DROP s

-> output:
emp_no | salary | languages

The INLINE STATS moves the groupings to the right hand side, compared to just

FROM employees
| KEEP emp_no, languages, salary

// ij.references() are the join keys and if the output of the inline join doesn't contain anything else except the join keys,
// then the inline join doesn't add any new columns. Since it preserves rows, it doesn't do anything and can be pruned.
p = pruneRightSideAndProject(ij);
recheck.set(true);
} else if (right != ij.right()) {
if (right.anyMatch(plan -> plan instanceof Aggregate) == false) {// there is no aggregation on the right side anymore
if (right instanceof StubRelation) {// right is just a StubRelation, meaning nothing is needed from the right side
p = pruneRightSideAndProject(ij);
} else {
// if the right has no aggregation anymore, but it still has some other plans (evals, projects),
// we keep those and integrate them into the main plan. The InlineJoin is also replaced entirely.
p = InlineJoin.replaceStub(ij.left(), right);
p = new Project(ij.source(), p, mergeOutputExpressions(p.output(), ij.left().output()));
}
} else {
// if the right side has been updated, replace it
p = ij.replaceRight(right);
}
recheck.set(true);
}

if (recheck.get() == false) {
used.addAll(p.references());
}

return p;
Expand Down Expand Up @@ -198,13 +180,12 @@ private static LogicalPlan pruneColumnsInEval(Eval eval, AttributeSet.Builder us
return p;
}

// Note: only run when the Project is a descendent of an InlineJoin.
private static LogicalPlan pruneColumnsInProject(Project project, AttributeSet.Builder used, Holder<Boolean> recheck) {
LogicalPlan p = project;

var remaining = pruneUnusedAndAddReferences(project.projections(), used);
if (remaining != null) {
p = remaining.isEmpty() ? emptyLocalRelation(project) : new Project(project.source(), project.child(), remaining);
p = new Project(project.source(), project.child(), remaining);
recheck.set(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ protected LogicalPlan localPlan(String query) {
return localPlan(plan(query), TEST_SEARCH_STATS);
}

protected LogicalPlan localPlan(String query, Analyzer analyzer) {
public LogicalPlan localPlan(String query, Analyzer analyzer) {
return localPlan(plan(query, analyzer), TEST_SEARCH_STATS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,17 +221,33 @@ public static void init() {
);

// Create a union index with conflicting types (keyword vs integer) for field 'id'
LinkedHashMap<String, Set<String>> typesToIndices = new LinkedHashMap<>();
typesToIndices.put("keyword", Set.of("test1"));
typesToIndices.put("integer", Set.of("test2"));
EsField idField = new InvalidMappedField("id", typesToIndices);
EsField fooField = new EsField("foo", KEYWORD, emptyMap(), true, EsField.TimeSeriesFieldType.NONE);
var typesToIndices_languages = new LinkedHashMap<String, Set<String>>();
typesToIndices_languages.put("byte", Set.of("union_types_index"));
typesToIndices_languages.put("integer", Set.of("union_types_index_incompatible"));
EsField languages = new InvalidMappedField("languages", typesToIndices_languages);

var typesToIndices_lastName = new LinkedHashMap<String, Set<String>>();
typesToIndices_lastName.put("text", Set.of("union_types_index"));
typesToIndices_lastName.put("keyword", Set.of("union_types_index_incompatible"));
EsField lastName = new InvalidMappedField("last_name", typesToIndices_lastName);

var typesToIndices_salaryChange = new LinkedHashMap<String, Set<String>>();
typesToIndices_salaryChange.put("float", Set.of("union_types_index"));
typesToIndices_salaryChange.put("double", Set.of("union_types_index_incompatible"));
EsField salaryChange = new InvalidMappedField("salary_change", typesToIndices_salaryChange);

var typesToIndices_firstName = new LinkedHashMap<String, Set<String>>();
typesToIndices_firstName.put("text", Set.of("union_types_index"));
typesToIndices_firstName.put("keyword", Set.of("union_types_index_incompatible"));
EsField firstName = new InvalidMappedField("first_name", typesToIndices_firstName);

EsField idField = new EsField("id", KEYWORD, emptyMap(), true, EsField.TimeSeriesFieldType.NONE);
var unionIndex = new EsIndex(
"union_index*",
Map.of("id", idField, "foo", fooField),
Map.of("test1", IndexMode.STANDARD, "test2", IndexMode.STANDARD),
Map.of(),
Map.of(),
"union_types_index*",
Map.of("languages", languages, "last_name", lastName, "salary_change", salaryChange, "first_name", firstName, "id", idField),
Map.of("union_types_index", IndexMode.STANDARD, "union_types_index_incompatible", IndexMode.STANDARD),
Map.of("", List.of("union_types_index*")),
Map.of("", List.of("union_types_index_incompatible", "union_types_index")),
Set.of()
);
unionIndexAnalyzer = new Analyzer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,9 +382,8 @@ public void testMissingFieldInNewCommand() {

// Expects
// MockFieldAttributeCommand[last_name{f}#7]
// \_Project[[_meta_field{f}#9, emp_no{f}#3, first_name{f}#4, gender{f}#5, hire_date{f}#10, job{f}#11, job.raw{f}#12, langu
// ages{f}#6, last_name{r}#7, long_noidx{f}#13, salary{f}#8]]
// \_Eval[[null[KEYWORD] AS last_name]]
// \_Project[[last_name{r}#7]]
// \_Eval[[null[KEYWORD] AS last_name#7]]
// \_Limit[1000[INTEGER],false]
// \_EsRelation[test][_meta_field{f}#9, emp_no{f}#3, first_name{f}#4, gen..]
LogicalPlan localPlan = localPlan(new MockFieldAttributeCommand(EMPTY, plan, lastName), testStats);
Expand All @@ -400,7 +399,7 @@ public void testMissingFieldInNewCommand() {
assertEquals(literal.child(), new Literal(EMPTY, null, KEYWORD));
assertThat(Expressions.names(relation.output()), not(contains("last_name")));

assertEquals(Expressions.names(initialRelation.output()), Expressions.names(project.output()));
assertThat(Expressions.names(project.output()), contains("last_name"));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -932,22 +932,21 @@ public void testMissingFieldsPurgesTheJoinLocallyThroughCommands() {

/*
* LimitExec[1000[INTEGER],12]
* \_AggregateExec[[language_code{r}#12],[COUNT(emp_no{r}#31,true[BOOLEAN]) AS c#17, language_code{r}#12],FINAL,[language_code{r}#12
* , $$c$count{r}#32, $$c$seen{r}#33],12]
* \_ExchangeExec[[language_code{r}#12, $$c$count{r}#32, $$c$seen{r}#33],true]
* \_AggregateExec[[language_code{r}#12],[COUNT(emp_no{r}#31,true[BOOLEAN]) AS c#17, language_code{r}#12],INITIAL,[language_code{r}#
* 12, $$c$count{r}#34, $$c$seen{r}#35],12]
* \_LookupJoinExec[[language_code{r}#12],[language_code{f}#29],[]]
* |_GrokExec[first_name{f}#19,Parser[pattern=%{NUMBER:language_code:int}, grok=org.elasticsearch.grok.Grok@177d8fd5],[languag
* e_code{r}#12]]
* | \_MvExpandExec[emp_no{f}#18,emp_no{r}#31]
* | \_ProjectExec[[emp_no{f}#18, languages{r}#21 AS language_code#7, first_name{f}#19]]
* | \_FieldExtractExec[emp_no{f}#18, first_name{f}#19]<[],[]>
* | \_EvalExec[[null[INTEGER] AS languages#21]]
* | \_EsQueryExec[test], indexMode[standard], [_doc{f}#36], limit[], sort[] estimatedRowSize[66]
* queryBuilderAndTags [[QueryBuilderAndTags{queryBuilder=[null], tags=[]}]]
* \_AggregateExec[[language_code{r}#13],[COUNT(emp_no{r}#32,true[BOOLEAN],PT0S[TIME_DURATION]) AS c#18, language_code{r}#13],FINAL,
* [language_code{r}#13, $$c$count{r}#33, $$c$seen{r}#34],12]
* \_ExchangeExec[[language_code{r}#13, $$c$count{r}#33, $$c$seen{r}#34],true]
* \_AggregateExec[[language_code{r}#13],[COUNT(emp_no{r}#32,true[BOOLEAN],PT0S[TIME_DURATION]) AS c#18, language_code{r}#13],INITI
* AL,[language_code{r}#13, $$c$count{r}#35, $$c$seen{r}#36],12]
* \_LookupJoinExec[[language_code{r}#13],[language_code{f}#30],[],null]
* |_GrokExec[first_name{f}#20,Parser[pattern=%{NUMBER:language_code:int}, grok=org.elasticsearch.grok.Grok@33b1c803],[languag
* e_code{r}#13]]
* | \_MvExpandExec[emp_no{f}#19,emp_no{r}#32]
* | \_ProjectExec[[emp_no{f}#19, first_name{f}#20]]
* | \_FieldExtractExec[emp_no{f}#19, first_name{f}#20]<[],[]>
* | \_EsQueryExec[test], indexMode[standard], [_doc{f}#37], limit[], sort[] estimatedRowSize[62] queryBuilderAndTags
* [[QueryBuilderAndTags[query=null, tags=[]]]]
* \_FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[<>
* EsRelation[languages_lookup][LOOKUP][language_code{f}#29]<>]]
* EsRelation[languages_lookup][LOOKUP][language_code{f}#30]<>]]
*/
public void testMissingFieldsNotPurgingTheJoinLocally() {
var stats = EsqlTestUtils.statsForMissingField("languages");
Expand All @@ -973,8 +972,7 @@ public void testMissingFieldsNotPurgingTheJoinLocally() {
var mvexpand = as(grok.child(), MvExpandExec.class);
var project = as(mvexpand.child(), ProjectExec.class);
var extract = as(project.child(), FieldExtractExec.class);
var eval = as(extract.child(), EvalExec.class);
var source = as(eval.child(), EsQueryExec.class);
var source = as(extract.child(), EsQueryExec.class);
Comment on lines -976 to +975
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why has this changed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically I don't understand why the INLINE STATS prunning or the Project prunning affect this test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because _EvalExec[[null[INTEGER] AS languages#21]] is not needed anymore, since it is not used afterwards. languages#21 is projected with \_ProjectExec[[emp_no{f}#18, languages{r}#21 AS language_code#7, first_name{f}#19]] but afterwards, languages#21 and/or language_code#7 is not re-used anywhere else. It is overiden by language_code{r}#12 that grok creates.

var right = as(join.right(), FragmentExec.class);
var relation = as(right.fragment(), EsRelation.class);
}
Expand Down
Loading