diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 99da0ff9658..bbd4dd3c6d6 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -1820,7 +1820,6 @@ private RelNode buildStreamWindowJoinPlan( context.relBuilder.push(leftWithHelpers); context.relBuilder.variable(v::set); - context.relBuilder.push(leftWithHelpers); RexNode rightSeq = context.relBuilder.field(seqCol); RexNode outerSeq = context.relBuilder.field(v.get(), seqCol); @@ -1839,8 +1838,7 @@ private RelNode buildStreamWindowJoinPlan( context.relBuilder.filter(filter); // aggregate all window functions on right side - List aggCalls = buildAggCallsForWindowFunctions(node.getWindowFunctionList(), context); - context.relBuilder.aggregate(context.relBuilder.groupKey(), aggCalls); + aggregateWithTrimming(List.of(), node.getWindowFunctionList(), context, false); RelNode rightAgg = context.relBuilder.build(); // correlate LEFT with RIGHT using seq + group fields @@ -2029,33 +2027,6 @@ private String extractGroupFieldName(UnresolvedExpression groupExpr) { "Unsupported group expression: only field or alias(field) is supported"); } - private List buildAggCallsForWindowFunctions( - List windowExprs, CalcitePlanContext context) { - List aggCalls = new ArrayList<>(); - for (UnresolvedExpression expr : windowExprs) { - if (expr instanceof Alias) { - Alias a = (Alias) expr; - if (a.getDelegated() instanceof WindowFunction) { - WindowFunction wf = (WindowFunction) a.getDelegated(); - Function func = (Function) wf.getFunction(); - List args = func.getFuncArgs(); - // first argument is the input field, others are function params - UnresolvedExpression field = args.isEmpty() ? null : args.get(0); - List rest = - args.size() <= 1 ? List.of() : args.subList(1, args.size()); - AggregateFunction aggFunc = new AggregateFunction(func.getFuncName(), field, rest); - AggCall call = aggVisitor.analyze(new Alias(a.getName(), aggFunc), context); - aggCalls.add(call); - } else { - throw new IllegalArgumentException("Unsupported window function in streamstats"); - } - } else { - throw new IllegalArgumentException("Unsupported window function in streamstats"); - } - } - return aggCalls; - } - private List buildRequiredLeft( CalcitePlanContext context, String seqCol, List groupList) { List requiredLeft = new ArrayList<>(); diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global.yaml index a00d5b40cfa..97703d849a7 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global.yaml @@ -6,10 +6,11 @@ calcite: LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17}]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - LogicalAggregate(group=[{}], avg_age=[AVG($8)]) - LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) - CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalAggregate(group=[{}], avg_age=[AVG($0)]) + LogicalProject(age=[$8]) + LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | EnumerableCalc(expr#0..18=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t18]) EnumerableLimit(fetch=[10000]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global_null_bucket.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global_null_bucket.yaml index 293dd785f96..f615f0633b2 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global_null_bucket.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global_null_bucket.yaml @@ -6,10 +6,11 @@ calcite: LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17}]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - LogicalAggregate(group=[{}], avg_age=[AVG($8)]) - LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), =($4, $cor0.gender))]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) - CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalAggregate(group=[{}], avg_age=[AVG($0)]) + LogicalProject(age=[$8]) + LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), =($4, $cor0.gender))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | EnumerableCalc(expr#0..16=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t16]) EnumerableLimit(fetch=[10000]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset.yaml index fd739ac5cf5..fa6654252f4 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset.yaml @@ -7,11 +7,12 @@ calcite: LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - LogicalAggregate(group=[{}], avg_age=[AVG($8)]) - LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) - CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalAggregate(group=[{}], avg_age=[AVG($0)]) + LogicalProject(age=[$8]) + LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | EnumerableCalc(expr#0..18=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t18]) EnumerableLimit(fetch=[10000]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset_null_bucket.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset_null_bucket.yaml index 0e8ed3a3dde..d2f0db97af9 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset_null_bucket.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset_null_bucket.yaml @@ -7,11 +7,12 @@ calcite: LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - LogicalAggregate(group=[{}], avg_age=[AVG($8)]) - LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), =($4, $cor0.gender))]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) - CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalAggregate(group=[{}], avg_age=[AVG($0)]) + LogicalProject(age=[$8]) + LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), =($4, $cor0.gender))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | EnumerableCalc(expr#0..16=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t16]) EnumerableLimit(fetch=[10000]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global.yaml index 191bd987a16..522e7922e68 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global.yaml @@ -6,10 +6,11 @@ calcite: LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17}]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - LogicalAggregate(group=[{}], avg_age=[AVG($8)]) - LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) - CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalAggregate(group=[{}], avg_age=[AVG($0)]) + LogicalProject(age=[$8]) + LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | EnumerableCalc(expr#0..18=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t18]) EnumerableLimit(fetch=[10000]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global_null_bucket.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global_null_bucket.yaml index 3ac52e02f55..a0634448b5e 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global_null_bucket.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global_null_bucket.yaml @@ -6,10 +6,11 @@ calcite: LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17}]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - LogicalAggregate(group=[{}], avg_age=[AVG($8)]) - LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), =($4, $cor0.gender))]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) - CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalAggregate(group=[{}], avg_age=[AVG($0)]) + LogicalProject(age=[$8]) + LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), =($4, $cor0.gender))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | EnumerableCalc(expr#0..16=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t16]) EnumerableLimit(fetch=[10000]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_reset.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_reset.yaml index 7ca329dac6a..5664cc6aa87 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_reset.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_reset.yaml @@ -7,11 +7,12 @@ calcite: LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - LogicalAggregate(group=[{}], avg_age=[AVG($8)]) - LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) - CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalAggregate(group=[{}], avg_age=[AVG($0)]) + LogicalProject(age=[$8]) + LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | EnumerableCalc(expr#0..18=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t18]) EnumerableLimit(fetch=[10000]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_reset_null_bucket.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_reset_null_bucket.yaml index be28e9b1d8c..40fb4087001 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_reset_null_bucket.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_reset_null_bucket.yaml @@ -7,11 +7,12 @@ calcite: LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - LogicalAggregate(group=[{}], avg_age=[AVG($8)]) - LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), =($4, $cor0.gender))]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) - CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalAggregate(group=[{}], avg_age=[AVG($0)]) + LogicalProject(age=[$8]) + LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), =($4, $cor0.gender))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | EnumerableCalc(expr#0..16=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t16]) EnumerableLimit(fetch=[10000]) diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java index b073453ecbc..48c0e5cfa62 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java @@ -101,18 +101,20 @@ public void testStreamstatsWindow() { + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[ROW_NUMBER() OVER ()])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalAggregate(group=[{}], max(SAL)=[MAX($5)])\n" - + " LogicalFilter(condition=[AND(>=($8, -($cor0.__stream_seq__, 4)), <=($8," + + " LogicalAggregate(group=[{}], max(SAL)=[MAX($0)])\n" + + " LogicalProject(SAL=[$5])\n" + + " LogicalFilter(condition=[AND(>=($8, -($cor0.__stream_seq__, 4)), <=($8," + " $cor0.__stream_seq__), OR(=($7, $cor0.DEPTNO), AND(IS NULL($7), IS" + " NULL($cor0.DEPTNO))))])\n" - + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," - + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[ROW_NUMBER() OVER ()])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n"; + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3]," + + " HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[ROW_NUMBER() OVER" + + " ()])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; verifyLogical(root, expectedLogical); String expectedSparkSql = "SELECT `$cor0`.`EMPNO`, `$cor0`.`ENAME`, `$cor0`.`JOB`, `$cor0`.`MGR`, `$cor0`.`HIREDATE`," - + " `$cor0`.`SAL`, `$cor0`.`COMM`, `$cor0`.`DEPTNO`, `t2`.`max(SAL)`\n" + + " `$cor0`.`SAL`, `$cor0`.`COMM`, `$cor0`.`DEPTNO`, `t3`.`max(SAL)`\n" + "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`," + " ROW_NUMBER() OVER () `__stream_seq__`\n" + "FROM `scott`.`EMP`) `$cor0`,\n" @@ -122,7 +124,7 @@ public void testStreamstatsWindow() { + "FROM `scott`.`EMP`) `t0`\n" + "WHERE `__stream_seq__` >= `$cor0`.`__stream_seq__` - 4 AND `__stream_seq__` <=" + " `$cor0`.`__stream_seq__` AND (`DEPTNO` = `$cor0`.`DEPTNO` OR `DEPTNO` IS NULL AND" - + " `$cor0`.`DEPTNO` IS NULL)) `t2`\n" + + " `$cor0`.`DEPTNO` IS NULL)) `t3`\n" + "ORDER BY `$cor0`.`__stream_seq__` NULLS LAST"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -173,24 +175,26 @@ public void testStreamstatsReset() { + " __reset_before_flag__=[CASE(>($5, 100), 1, 0)], __reset_after_flag__=[CASE(<($5," + " 50), 1, 0)])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalAggregate(group=[{}], avg(SAL)=[AVG($5)])\n" - + " LogicalFilter(condition=[AND(<=($8, $cor0.__stream_seq__), =($11," + + " LogicalAggregate(group=[{}], avg(SAL)=[AVG($0)])\n" + + " LogicalProject(SAL=[$5])\n" + + " LogicalFilter(condition=[AND(<=($8, $cor0.__stream_seq__), =($11," + " $cor0.__seg_id__), OR(=($7, $cor0.DEPTNO), AND(IS NULL($7), IS" + " NULL($cor0.DEPTNO))))])\n" - + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," - + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[$8], __reset_before_flag__=[$9]," - + " __reset_after_flag__=[$10], __seg_id__=[+(SUM($9) OVER (ROWS UNBOUNDED PRECEDING)," - + " COALESCE(SUM($10) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))])\n" + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3]," + + " HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[$8]," + + " __reset_before_flag__=[$9], __reset_after_flag__=[$10], __seg_id__=[+(SUM($9) OVER" + + " (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($10) OVER (ROWS BETWEEN UNBOUNDED" + + " PRECEDING AND 1 PRECEDING), 0))])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3]," + " HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[ROW_NUMBER() OVER" + " ()], __reset_before_flag__=[CASE(>($5, 100), 1, 0)]," + " __reset_after_flag__=[CASE(<($5, 50), 1, 0)])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n"; + + " LogicalTableScan(table=[[scott, EMP]])\n"; verifyLogical(root, expectedLogical); String expectedSparkSql = "SELECT `$cor0`.`EMPNO`, `$cor0`.`ENAME`, `$cor0`.`JOB`, `$cor0`.`MGR`, `$cor0`.`HIREDATE`," - + " `$cor0`.`SAL`, `$cor0`.`COMM`, `$cor0`.`DEPTNO`, `t4`.`avg(SAL)`\n" + + " `$cor0`.`SAL`, `$cor0`.`COMM`, `$cor0`.`DEPTNO`, `t5`.`avg(SAL)`\n" + "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`," + " `__stream_seq__`, `__reset_before_flag__`, `__reset_after_flag__`," + " (SUM(`__reset_before_flag__`) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT" @@ -214,7 +218,7 @@ public void testStreamstatsReset() { + "FROM `scott`.`EMP`) `t1`) `t2`\n" + "WHERE `__stream_seq__` <= `$cor0`.`__stream_seq__` AND `__seg_id__` =" + " `$cor0`.`__seg_id__` AND (`DEPTNO` = `$cor0`.`DEPTNO` OR `DEPTNO` IS NULL AND" - + " `$cor0`.`DEPTNO` IS NULL)) `t4`\n" + + " `$cor0`.`DEPTNO` IS NULL)) `t5`\n" + "ORDER BY `$cor0`.`__stream_seq__` NULLS LAST"; verifyPPLToSparkSQL(root, expectedSparkSql); }