diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index 2c9ba3107f7..92b30686294 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -134,6 +134,17 @@ public void testExplainWithReverse() throws IOException { assertTrue(result.contains("dir0=[DESC]")); } + @Test + public void noPushDownForAggOnWindow() throws IOException { + Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled()); + String query = + "source=opensearch-sql_test_index_account | patterns address method=BRAIN | stats count()" + + " by patterns_field"; + var result = explainQueryToString(query); + String expected = loadFromFile("expectedOutput/calcite/explain_agg_on_window.json"); + assertJsonEqualsIgnoreId(expected, result); + } + /** * Executes the PPL query and returns the result as a string with windows-style line breaks * replaced with Unix-style ones. diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_agg_on_window.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_agg_on_window.json new file mode 100644 index 00000000000..e5c93f8aa41 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_agg_on_window.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(count()=[$1], patterns_field=[$0])\n LogicalAggregate(group=[{0}], count()=[COUNT()])\n LogicalProject(patterns_field=[SAFE_CAST(ITEM(PATTERN_PARSER($2, pattern($2, 10, 100000) OVER ()), 'pattern'))])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n", + "physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..1=[{inputs}], count()=[$t1], patterns_field=[$t0])\n EnumerableAggregate(group=[{0}], count()=[COUNT()])\n EnumerableCalc(expr#0..1=[{inputs}], expr#2=[PATTERN_PARSER($t0, $t1)], expr#3=['pattern'], expr#4=[ITEM($t2, $t3)], expr#5=[SAFE_CAST($t4)], patterns_field=[$t5])\n EnumerableWindow(window#0=[window(aggs [pattern($0, $1, $2)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[address]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"address\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" + } +} diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_output.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_output.json index 874d675de35..9474e4d1e31 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_output.json +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_output.json @@ -1,6 +1,6 @@ { "calcite": { "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(age2=[$2])\n LogicalFilter(condition=[<=($3, 1)])\n LogicalProject(avg_age=[$0], state=[$1], age2=[$2], _row_number_=[ROW_NUMBER() OVER (PARTITION BY $2 ORDER BY $2)])\n LogicalFilter(condition=[IS NOT NULL($2)])\n LogicalProject(avg_age=[$0], state=[$1], age2=[+($0, 2)])\n LogicalSort(sort0=[$1], dir0=[ASC-nulls-first])\n LogicalProject(avg_age=[$2], state=[$0], city=[$1])\n LogicalAggregate(group=[{0, 1}], avg_age=[AVG($2)])\n LogicalProject(state=[$7], city=[$5], age=[$8])\n LogicalFilter(condition=[>($8, 30)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n", - "physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=[<=($t2, $t3)], age2=[$t1], $condition=[$t4])\n EnumerableWindow(window#0=[window(partition {1} order by [1] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])\n EnumerableCalc(expr#0..2=[{inputs}], expr#3=[2], expr#4=[+($t2, $t3)], expr#5=[IS NOT NULL($t2)], state=[$t0], age2=[$t4], $condition=[$t5])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[city, state, age], FILTER->>($2, 30), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},avg_age=AVG($2)), SORT->[0 ASC FIRST]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"city\",\"state\",\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":10,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" + "physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=[<=($t2, $t3)], age2=[$t1], $condition=[$t4])\n EnumerableWindow(window#0=[window(partition {1} order by [1] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])\n EnumerableCalc(expr#0..2=[{inputs}], expr#3=[2], expr#4=[+($t2, $t3)], expr#5=[IS NOT NULL($t2)], state=[$t0], age2=[$t4], $condition=[$t5])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[city, state, age], FILTER->>($2, 30), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},avg_age=AVG($2)), SORT->[0 ASC FIRST]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"city\",\"state\",\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" } } diff --git a/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/3996.yml b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/3996.yml new file mode 100644 index 00000000000..0b2ac29bc42 --- /dev/null +++ b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/3996.yml @@ -0,0 +1,52 @@ +setup: + - do: + query.settings: + body: + transient: + plugins.calcite.enabled : true + + - do: + indices.create: + index: test + body: + settings: + number_of_shards: 1 + number_of_replicas: 0 + mappings: + properties: + body: + type: text + +--- +teardown: + - do: + query.settings: + body: + transient: + plugins.calcite.enabled : false + +--- +"Fix incorrectly push down aggregate with filter": + - skip: + features: + - headers + - allowed_warnings + - do: + bulk: + index: test + refresh: true + body: + - '{"index": {}}' + - '{"body": "[2025-07-31T05:44:45.164Z] \"GET /api/data HTTP/1.1\" 200 - via_upstream - \"-\" 0 221 2 2 \"-\" \"python-requests/2.32.4\" \"80a2a234-bbb2-9bf3-bbc6-ba7554aee8b6\" \"frontend-proxy:8080\" \"172.18.0.25:8080\" frontend 172.18.0.27:46596 172.18.0.27:8080 172.18.0.26:53294 - -"}' + + - do: + allowed_warnings: + - 'Loading the fielddata on the _id field is deprecated and will be removed in future versions. If you require sorting or aggregating on this field you should also include the id in the body of your documents, and map this field as a keyword field that has [doc_values] enabled' + headers: + Content-Type: 'application/json' + ppl: + body: + query: source=test | parse body 'HTTP/1.1\" (?\\d+)' | eval status2xx=if(httpstatus>='200' and httpstatus<'300', 1, 0), status3xx=if(httpstatus>='300' and httpstatus<'400', 1, 0), status4xx=if(httpstatus>='400' and httpstatus<'500', 1, 0), status5xx=if(httpstatus>='500' and httpstatus<'600', 1, 0), statusOther=if(httpstatus>='600', 1, 0) | stats count() as `Request Count`, sum(status2xx) as `HTTP 2xx`, sum(status3xx) as `HTTP 3xx`, sum(status4xx) as `HTTP 4xx`, sum(status5xx) as `HTTP 5xx`, sum(statusOther) as `Other` + - match: {"total": 1} + - match: {"schema": [{"name": "Request Count", "type": "bigint"}, {"name": "HTTP 2xx", "type": "bigint"}, {"name": "HTTP 3xx", "type": "bigint"}, {"name": "HTTP 4xx", "type": "bigint"}, {"name": "HTTP 5xx", "type": "bigint"}, {"name": "Other", "type": "bigint"}]} + - match: {"datarows": [[1, 1, 0, 0, 0, 0]]} diff --git a/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4009.yml b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4009.yml new file mode 100644 index 00000000000..ed53edbdb09 --- /dev/null +++ b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4009.yml @@ -0,0 +1,70 @@ +setup: + - do: + query.settings: + body: + transient: + plugins.calcite.enabled : true + + - do: + indices.create: + index: test + body: + settings: + number_of_shards: 1 + number_of_replicas: 0 + mappings: + properties: + body: + type: text + +--- +teardown: + - do: + query.settings: + body: + transient: + plugins.calcite.enabled : false + +--- +"Fix bucket size missing": + - skip: + features: + - headers + - allowed_warnings + - do: + bulk: + index: test + refresh: true + body: + - '{"index": {}}' + - '{ "title" : "document 1"}' + - '{"index": {}}' + - '{ "title" : "document 2"}' + - '{"index": {}}' + - '{ "title" : "document 3"}' + - '{"index": {}}' + - '{ "title" : "document 4"}' + - '{"index": {}}' + - '{ "title" : "document 5"}' + - '{"index": {}}' + - '{ "title" : "document 6"}' + - '{"index": {}}' + - '{ "title" : "document 7"}' + - '{"index": {}}' + - '{ "title" : "document 8"}' + - '{"index": {}}' + - '{ "title" : "document 9"}' + - '{"index": {}}' + - '{ "title" : "document 10"}' + - '{"index": {}}' + - '{ "title" : "document 11"}' + + - do: + allowed_warnings: + - 'Loading the fielddata on the _id field is deprecated and will be removed in future versions. If you require sorting or aggregating on this field you should also include the id in the body of your documents, and map this field as a keyword field that has [doc_values] enabled' + headers: + Content-Type: 'application/json' + ppl: + body: + query: source=test | stats count() by title | sort title + - match: {"total": 11} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchAggregateIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchAggregateIndexScanRule.java index 35c171ab0f7..a93fecd2df1 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchAggregateIndexScanRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchAggregateIndexScanRule.java @@ -7,6 +7,7 @@ import java.util.function.Predicate; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.sql.SqlKind; @@ -64,10 +65,17 @@ public interface Config extends RelRule.Config { .withOperandSupplier( b0 -> b0.operand(LogicalAggregate.class) + .predicate( + agg -> + // Cannot push down aggregation with inner filter + agg.getAggCallList().stream().noneMatch(AggregateCall::hasFilter)) .oneInput( b1 -> b1.operand(LogicalProject.class) - .predicate(OpenSearchIndexScanRule::distinctProjectList) + .predicate( + // Don't push down aggregate on window function + Predicate.not(OpenSearchIndexScanRule::containsRexOver) + .and(OpenSearchIndexScanRule::distinctProjectList)) .oneInput( b2 -> b2.operand(CalciteLogicalIndexScan.class) @@ -92,7 +100,8 @@ public interface Config extends RelRule.Config { .allMatch( call -> call.getAggregation().kind == SqlKind.COUNT - && call.getArgList().isEmpty())) + && call.getArgList().isEmpty() + && !call.hasFilter())) .oneInput( b1 -> b1.operand(CalciteLogicalIndexScan.class) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexScanRule.java index 010f7ee47ca..ebedac0284c 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexScanRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexScanRule.java @@ -12,6 +12,7 @@ import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.logical.LogicalSort; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexOver; import org.opensearch.sql.opensearch.storage.OpenSearchIndex; import org.opensearch.sql.opensearch.storage.scan.AbstractCalciteIndexScan; @@ -42,6 +43,10 @@ static boolean distinctProjectList(LogicalProject project) { return project.getProjects().stream().allMatch(rexSet::add); } + static boolean containsRexOver(LogicalProject project) { + return project.getProjects().stream().anyMatch(RexOver::containsOver); + } + /** * The LogicalSort is a LIMIT that should be pushed down when its fetch field is not null and its * collation is empty. For example: sort name | head 5 should not be pushed down diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java index 46fd7fa7196..61df5b0282e 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java @@ -7,6 +7,7 @@ import static java.util.Objects.requireNonNull; import static org.opensearch.sql.common.setting.Settings.Key.CALCITE_PUSHDOWN_ROWCOUNT_ESTIMATION_FACTOR; +import static org.opensearch.sql.opensearch.request.AggregateAnalyzer.AGGREGATION_BUCKET_SIZE; import java.util.ArrayDeque; import java.util.ArrayList; @@ -293,8 +294,6 @@ public AbstractCalciteIndexScan pushDownSort(List collations) } catch (Exception e) { if (LOG.isDebugEnabled()) { LOG.debug("Cannot pushdown the sort {}", getCollationNames(collations), e); - } else { - LOG.info("Cannot pushdown the sort {}, ", getCollationNames(collations)); } } return null; @@ -389,7 +388,8 @@ public void pushDownSortIntoAggBucket(List collations) { Pair.of( Collections.singletonList( AggregationBuilders.composite("composite_buckets", newBuckets) - .subAggregations(newAggBuilder)), + .subAggregations(newAggBuilder) + .size(AGGREGATION_BUCKET_SIZE)), aggregationBuilder.getRight()); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java index 16c4b36ba97..c5d3aa03072 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java @@ -142,8 +142,6 @@ public AbstractRelNode pushDownFilter(Filter filter) { } catch (Exception e) { if (LOG.isDebugEnabled()) { LOG.debug("Cannot pushdown the filter condition.", e); - } else { - LOG.info("Cannot pushdown the filter condition."); } } return null; @@ -248,8 +246,6 @@ public CalciteLogicalIndexScan pushDownAggregate(Aggregate aggregate, Project pr } catch (Exception e) { if (LOG.isDebugEnabled()) { LOG.debug("Cannot pushdown the aggregate {}", aggregate, e); - } else { - LOG.info("Cannot pushdown the aggregate {}, ", aggregate); } } return null; @@ -267,8 +263,6 @@ public CalciteLogicalIndexScan pushDownLimit(Integer limit, Integer offset) { } catch (Exception e) { if (LOG.isDebugEnabled()) { LOG.debug("Cannot pushdown limit {} with offset {}", limit, offset, e); - } else { - LOG.info("Cannot pushdown limit {} with offset {}", limit, offset); } } return null;