Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/changelog/141056.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
area: ES|QL
issues:
- 140757
- 139359
pr: 141056
summary: Remove incorrect inline stats pruning
type: bug
Original file line number Diff line number Diff line change
Expand Up @@ -5313,3 +5313,164 @@ c:long | n:null | emp_no:integer
1 | null | 10002
1 | null | 10003
;


inlineStatsDroppingGroupingField
required_capability: inline_stats_drop_groupings_fix

FROM employees
| KEEP languages, salary
| INLINE STATS avg_sal = ROUND(AVG(salary)) BY languages
| DROP languages
| SORT salary
| LIMIT 10
;

salary:i | avg_sal:d
25324 |41681.0
25945 |41681.0
25976 |50577.0
26436 |52419.0
27215 |47733.0
28035 |50577.0
28336 |52520.0
28941 |52419.0
29175 |48179.0
30404 |52419.0
;

inlineStatsDroppingMultipleCommandsGroupingFields
required_capability: inline_stats_drop_groupings_fix

FROM employees
| KEEP languages, salary, gender, emp_no, *rehired
| INLINE STATS avg_sal_by_gender = ROUND(AVG(salary)) BY gender
| INLINE STATS avg_sal_by_languages = ROUND(AVG(avg_sal_by_gender)) BY languages
| DROP languages, gender
| EVAL languages = emp_no
| INLINE STATS avg_sal_by_emp_no = ROUND(AVG(salary)) BY no = emp_no % 10, rehired = MV_MIN(is_rehired)
| RENAME no AS NO
| DROP NO, languages
| SORT salary
| LIMIT 10
;

salary:i | emp_no:i | is_rehired:boolean | avg_sal_by_gender:d | avg_sal_by_languages:d | avg_sal_by_emp_no:d | rehired:boolean
25324 |10015 |[false, false, false, true]|48761.0 |48779.0 |46801.0 |false
25945 |10035 |false |46861.0 |48779.0 |46801.0 |false
25976 |10092 |[false, false, true, true] |50491.0 |48082.0 |51831.0 |false
26436 |10048 |[true, true] |46861.0 |48142.0 |41230.0 |true
27215 |10057 |null |50491.0 |48177.0 |36905.0 |null
28035 |10084 |false |46861.0 |48082.0 |45508.0 |false
28336 |10026 |[false, true] |46861.0 |47950.0 |51094.0 |false
28941 |10068 |true |46861.0 |48142.0 |41230.0 |true
29175 |10060 |[false, false, false, true]|46861.0 |48116.0 |47571.0 |false
30404 |10042 |null |50491.0 |48142.0 |30404.0 |null
;

inlineStatsGroupingByMVField
required_capability: inline_stats_drop_groupings_fix

FROM employees
| KEEP salary_change, salary
| INLINE STATS avg_salary_change = MV_AVG(salary_change), avg = AVG(salary) BY salary_change
| KEEP avg_salary_change
| SORT avg_salary_change
| LIMIT 3
;

avg_salary_change:d
[-9.81, -1.47, 14.44]
[-9.28, 9.42]
[-9.23, 5.19, 5.85, 7.5]
;

chainedInlineStats_BothGroupingByMVField
required_capability: inline_stats_drop_groupings_fix

FROM employees
| KEEP salary_change, salary
| INLINE STATS avg_salary_change = MV_AVG(salary_change), avg = AVG(salary) BY salary_change
| INLINE STATS sum_avg_salary_change = ROUND(SUM(avg_salary_change), 1), avg = AVG(salary) BY salary_change
| KEEP sum_avg_salary_change
| SORT sum_avg_salary_change DESC NULLS LAST
| LIMIT 3
;

sum_avg_salary_change:d
[27.5, 27.5]
[26.8, 26.8, 26.8]
[26.7, 26.7]
;

chainedInlineStats_OneGroupingByMVField
required_capability: inline_stats_drop_groupings_fix

FROM employees
| KEEP salary_change, salary
| INLINE STATS avg_salary_change = MV_AVG(salary_change), avg = AVG(salary) BY salary_change
| INLINE STATS sum_avg_salary_change = SUM(avg_salary_change), avg = AVG(salary)
| KEEP sum_avg_salary_change
| LIMIT 3
;

sum_avg_salary_change:d
347.99
347.99
347.99
;

inlineStatsGroupByNull_PruneDifferentInlineStats
required_capability: inline_stats_drop_groupings_fix
required_capability: fix_inline_stats_group_by_null

FROM employees
| INLINE STATS x = AVG(salary), a = COUNT_DISTINCT(languages) BY emp_no = null
| INLINE STATS y = AVG(salary), b = COUNT_DISTINCT(languages) BY emp_no = null
| EVAL x = emp_no
| INLINE STATS z = AVG(salary), c = COUNT_DISTINCT(languages), d = AVG(languages) BY last_name
| KEEP x, a, emp_no
| LIMIT 1
;

x:null | a:l | emp_no:null
null |5 |null
;

doubleInlineStatsPrunning_MVFunctionsAndDroppingGroupings
required_capability: inline_stats_drop_groupings_fix

FROM employees
| KEEP languages, salary
| INLINE STATS languages1 = MV_AVG(languages), avg1 = ROUND(AVG(salary), languages) BY languages
| INLINE STATS languages2 = SUM(languages1), avg2 = ROUND(AVG(salary), languages) BY languages
| DROP languages
| SORT salary
| LIMIT 7
;

salary:i | languages1:d | avg1:d | languages2:d | avg2:d
25324 |5.0 |41680.7619 |105.0 |41680.7619
25945 |5.0 |41680.7619 |105.0 |41680.7619
25976 |1.0 |50576.7 |15.0 |50576.7
26436 |3.0 |52418.882 |51.0 |52418.882
27215 |4.0 |47733.0 |72.0 |47733.0
28035 |1.0 |50576.7 |15.0 |50576.7
28336 |null |null |null |null
;

removeGroupingByKeepingWhereFalse
required_capability: inline_stats_drop_groupings_fix

FROM employees
| INLINE STATS x = MAX(salary) WHERE false, c = COUNT(*) BY emp_no
| KEEP x, c
| SORT x, c
| LIMIT 3
;

x:integer | c:long
null | 1
null | 1
null | 1
;
Original file line number Diff line number Diff line change
Expand Up @@ -1881,6 +1881,10 @@ public enum Cap {
*/
TS_COMMAND_GROUP_ON_ALIASES,

/**
* Fixes https://github.com/elastic/elasticsearch/issues/139359
*/
INLINE_STATS_DROP_GROUPINGS_FIX(INLINE_STATS.enabled),
// Last capability should still have a comma for fewer merge conflicts when adding new ones :)
// This comment prevents the semicolon from being on the previous capability when Spotless formats the file.
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.UnionAll;
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
Expand All @@ -39,7 +38,6 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputExpressions;
import static org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneEmptyPlans.skipPlan;

/**
Expand Down Expand Up @@ -80,9 +78,9 @@ private static LogicalPlan pruneColumns(LogicalPlan plan, AttributeSet.Builder u
recheck.set(false);
p = switch (p) {
case Aggregate agg -> pruneColumnsInAggregate(agg, used, inlineJoin);
case InlineJoin inj -> pruneColumnsInInlineJoinRight(inj, used, recheck);
case InlineJoin inj -> pruneColumnsInInlineJoin(inj, used, recheck);
case Eval eval -> pruneColumnsInEval(eval, used, recheck);
case Project project -> inlineJoin ? pruneColumnsInProject(project, used, recheck) : p;
case Project project -> pruneColumnsInProject(project, used, recheck);
case EsRelation esr -> pruneColumnsInEsRelation(esr, used);
case Fork fork -> {
forkPresent.set(true);
Expand Down Expand Up @@ -139,32 +137,16 @@ private static LogicalPlan pruneColumnsInAggregate(Aggregate aggregate, Attribut
return p;
}

private static LogicalPlan pruneColumnsInInlineJoinRight(InlineJoin ij, AttributeSet.Builder used, Holder<Boolean> recheck) {
private static LogicalPlan pruneColumnsInInlineJoin(InlineJoin ij, AttributeSet.Builder used, Holder<Boolean> recheck) {
LogicalPlan p = ij;

used.addAll(ij.references());
var right = pruneColumns(ij.right(), used, true);
if (right.output().isEmpty() || isLocalEmptyRelation(right)) {

if (right.outputSet().subtract(ij.references()).isEmpty() || isLocalEmptyRelation(right)) {
// ij.references() are the join keys and if the output of the inline join doesn't contain anything else except the join keys,
// then the inline join doesn't add any new columns. Since it preserves rows, it doesn't do anything and can be pruned.
p = pruneRightSideAndProject(ij);
recheck.set(true);
} else if (right != ij.right()) {
if (right.anyMatch(plan -> plan instanceof Aggregate) == false) {// there is no aggregation on the right side anymore
if (right instanceof StubRelation) {// right is just a StubRelation, meaning nothing is needed from the right side
p = pruneRightSideAndProject(ij);
} else {
// if the right has no aggregation anymore, but it still has some other plans (evals, projects),
// we keep those and integrate them into the main plan. The InlineJoin is also replaced entirely.
p = InlineJoin.replaceStub(ij.left(), right);
p = new Project(ij.source(), p, mergeOutputExpressions(p.output(), ij.left().output()));
}
} else {
// if the right side has been updated, replace it
p = ij.replaceRight(right);
}
recheck.set(true);
}

if (recheck.get() == false) {
used.addAll(p.references());
}

return p;
Expand Down Expand Up @@ -198,13 +180,12 @@ private static LogicalPlan pruneColumnsInEval(Eval eval, AttributeSet.Builder us
return p;
}

// Note: only run when the Project is a descendent of an InlineJoin.
private static LogicalPlan pruneColumnsInProject(Project project, AttributeSet.Builder used, Holder<Boolean> recheck) {
LogicalPlan p = project;

var remaining = pruneUnusedAndAddReferences(project.projections(), used);
if (remaining != null) {
p = remaining.isEmpty() ? emptyLocalRelation(project) : new Project(project.source(), project.child(), remaining);
p = new Project(project.source(), project.child(), remaining);
recheck.set(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ protected LogicalPlan localPlan(String query) {
return localPlan(plan(query), TEST_SEARCH_STATS);
}

protected LogicalPlan localPlan(String query, Analyzer analyzer) {
public LogicalPlan localPlan(String query, Analyzer analyzer) {
return localPlan(plan(query, analyzer), TEST_SEARCH_STATS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
import org.elasticsearch.xpack.esql.analysis.MutableAnalyzerContext;
import org.elasticsearch.xpack.esql.core.type.EsField;
import org.elasticsearch.xpack.esql.core.type.InvalidMappedField;
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
import org.elasticsearch.xpack.esql.index.EsIndex;
import org.elasticsearch.xpack.esql.index.EsIndexGenerator;
Expand All @@ -26,6 +27,7 @@
import org.junit.BeforeClass;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -64,6 +66,7 @@ public abstract class AbstractLogicalPlanOptimizerTests extends ESTestCase {
protected static Map<String, EsField> metricMapping;
protected static Analyzer metricsAnalyzer;
protected static Analyzer multiIndexAnalyzer;
protected static Analyzer unionIndexAnalyzer;
protected static Analyzer sampleDataIndexAnalyzer;
protected static Analyzer subqueryAnalyzer;
protected static Map<String, EsField> mappingBaseConversion;
Expand Down Expand Up @@ -217,6 +220,48 @@ public static void init() {
TEST_VERIFIER
);

// Create a union index with conflicting types (keyword vs integer) for field 'id'
var typesToIndices_languages = new LinkedHashMap<String, Set<String>>();
typesToIndices_languages.put("byte", Set.of("union_types_index"));
typesToIndices_languages.put("integer", Set.of("union_types_index_incompatible"));
EsField languages = new InvalidMappedField("languages", typesToIndices_languages);

var typesToIndices_lastName = new LinkedHashMap<String, Set<String>>();
typesToIndices_lastName.put("text", Set.of("union_types_index"));
typesToIndices_lastName.put("keyword", Set.of("union_types_index_incompatible"));
EsField lastName = new InvalidMappedField("last_name", typesToIndices_lastName);

var typesToIndices_salaryChange = new LinkedHashMap<String, Set<String>>();
typesToIndices_salaryChange.put("float", Set.of("union_types_index"));
typesToIndices_salaryChange.put("double", Set.of("union_types_index_incompatible"));
EsField salaryChange = new InvalidMappedField("salary_change", typesToIndices_salaryChange);

var typesToIndices_firstName = new LinkedHashMap<String, Set<String>>();
typesToIndices_firstName.put("text", Set.of("union_types_index"));
typesToIndices_firstName.put("keyword", Set.of("union_types_index_incompatible"));
EsField firstName = new InvalidMappedField("first_name", typesToIndices_firstName);

EsField idField = new EsField("id", KEYWORD, emptyMap(), true, EsField.TimeSeriesFieldType.NONE);
var unionIndex = new EsIndex(
"union_types_index*",
Map.of("languages", languages, "last_name", lastName, "salary_change", salaryChange, "first_name", firstName, "id", idField),
Map.of("union_types_index", IndexMode.STANDARD, "union_types_index_incompatible", IndexMode.STANDARD),
Map.of("", List.of("union_types_index*")),
Map.of("", List.of("union_types_index_incompatible", "union_types_index")),
Set.of()
);
unionIndexAnalyzer = new Analyzer(
testAnalyzerContext(
EsqlTestUtils.TEST_CFG,
new EsqlFunctionRegistry(),
indexResolutions(unionIndex),
defaultLookupResolution(),
enrichResolution,
emptyInferenceResolution()
),
TEST_VERIFIER
);

var sampleDataMapping = loadMapping("mapping-sample_data.json");
var sampleDataIndex = new EsIndex(
"sample_data",
Expand Down Expand Up @@ -313,6 +358,10 @@ protected LogicalPlan planMultiIndex(String query) {
return logicalOptimizer.optimize(multiIndexAnalyzer.analyze(parser.parseQuery(query)));
}

protected LogicalPlan planUnionIndex(String query) {
return logicalOptimizer.optimize(unionIndexAnalyzer.analyze(parser.parseQuery(query)));
}

protected LogicalPlan planSample(String query) {
var analyzed = sampleDataIndexAnalyzer.analyze(parser.parseQuery(query));
return logicalOptimizer.optimize(analyzed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ public void testInlineStatsAfterSortDropped() {
* Project[[salary{r}#7, emp_no{f}#9]]
* \_TopN[[Order[$$salary$temp_name$20{r}#21,ASC,LAST]],1000[INTEGER],false]
* \_InlineJoin[LEFT,[emp_no{f}#9],[emp_no{r}#9]]
* |_EsqlProject[[salary{f}#14, emp_no{f}#9, $$salary$temp_name$20{r}#21]]
* |_Project[[salary{f}#14, emp_no{f}#9, $$salary$temp_name$20{r}#21]]
* | \_Eval[[salary{f}#14 AS $$salary$temp_name$20#21]]
* | \_EsRelation[employees][_meta_field{f}#15, emp_no{f}#9, first_name{f}#10, g..]
* \_Aggregate[[emp_no{f}#9],[COUNT(*[KEYWORD],true[BOOLEAN],PT0S[TIME_DURATION]) AS salary#7, emp_no{f}#9]]
Expand Down Expand Up @@ -570,7 +570,7 @@ public void testShadowingInlineStatsAfterSort() {
assertThat(Expressions.names(inlineJoin.config().rightFields()), is(List.of("emp_no")));

// Left side of the join
var leftProject = as(inlineJoin.left(), EsqlProject.class);
var leftProject = as(inlineJoin.left(), Project.class);
var leftEval = as(leftProject.child(), Eval.class);
assertThat(Expressions.names(leftEval.fields()), contains(startsWith("$$salary$temp_name$")));
var relation = as(leftEval.child(), EsRelation.class);
Expand All @@ -590,7 +590,7 @@ public void testShadowingInlineStatsAfterSort() {
* Project[[salary{r}#8, emp_no{f}#10]]
* \_TopN[[Order[$$salary$temp_name$21{r}#22,ASC,LAST], Order[emp_no{f}#10,ASC,LAST]],1000[INTEGER],false]
* \_InlineJoin[LEFT,[emp_no{f}#10],[emp_no{r}#10]]
* |_EsqlProject[[salary{f}#15, emp_no{f}#10, $$salary$temp_name$21{r}#22]]
* |_Project[[salary{f}#15, emp_no{f}#10, $$salary$temp_name$21{r}#22]]
* | \_Eval[[salary{f}#15 AS $$salary$temp_name$21#22]]
* | \_EsRelation[employees][_meta_field{f}#16, emp_no{f}#10, first_name{f}#11, ..]
* \_Aggregate[[emp_no{f}#10],[COUNT(*[KEYWORD],true[BOOLEAN],PT0S[TIME_DURATION]) AS salary#8, emp_no{f}#10]]
Expand Down Expand Up @@ -631,7 +631,7 @@ public void testMixedShadowingInlineStatsAfterSort() {
assertThat(Expressions.names(inlineJoin.config().rightFields()), is(List.of("emp_no")));

// Left side of the join
var leftProject = as(inlineJoin.left(), EsqlProject.class);
var leftProject = as(inlineJoin.left(), Project.class);
var leftEval = as(leftProject.child(), Eval.class);
assertThat(Expressions.names(leftEval.fields()), contains(startsWith("$$salary$temp_name$")));
var relation = as(leftEval.child(), EsRelation.class);
Expand Down
Loading