diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteBinCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteBinCommandIT.java index 2aa4455fb72..3f0adb08432 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteBinCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteBinCommandIT.java @@ -941,4 +941,54 @@ public void testStatsWithBinsOnTimeField_Avg() throws IOException { rows(41.8, "2024-07-01 00:04:00"), rows(50.0, "2024-07-01 00:05:00")); } + + @Test + public void testStatsWithBinsOnTimeAndTermField_Count() throws IOException { + // TODO: Remove this after addressing https://github.com/opensearch-project/sql/issues/4317 + enabledOnlyWhenPushdownIsEnabled(); + + JSONObject result = + executeQuery( + "source=events_null | bin @timestamp bins=3 | stats bucket_nullable=false count() by" + + " region, @timestamp"); + // TODO: @timestamp should keep date as its type, to be addressed by this issue: + // https://github.com/opensearch-project/sql/issues/4317 + verifySchema( + result, + schema("count()", null, "bigint"), + schema("region", null, "string"), + schema("@timestamp", null, "string")); + // auto_date_histogram will choose span=5m for bins=3 + verifyDataRows( + result, + rows(1, "eu-west", "2024-07-01 00:03:00"), + rows(2, "us-east", "2024-07-01 00:00:00"), + rows(1, "us-east", "2024-07-01 00:05:00"), + rows(2, "us-west", "2024-07-01 00:01:00")); + } + + @Test + public void testStatsWithBinsOnTimeAndTermField_Avg() throws IOException { + // TODO: Remove this after addressing https://github.com/opensearch-project/sql/issues/4317 + enabledOnlyWhenPushdownIsEnabled(); + + JSONObject result = + executeQuery( + "source=events_null | bin @timestamp bins=3 | stats bucket_nullable=false " + + " avg(cpu_usage) by region, @timestamp"); + // TODO: @timestamp should keep date as its type, to be addressed by this issue: + // https://github.com/opensearch-project/sql/issues/4317 + verifySchema( + result, + schema("avg(cpu_usage)", null, "double"), + schema("region", null, "string"), + schema("@timestamp", null, "string")); + // auto_date_histogram will choose span=5m for bins=3 + verifyDataRows( + result, + rows(42.1, "eu-west", "2024-07-01 00:03:00"), + rows(50.25, "us-east", "2024-07-01 00:00:00"), + rows(50, "us-east", "2024-07-01 00:05:00"), + rows(40.25, "us-west", "2024-07-01 00:01:00")); + } } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index 0e22507852e..d755c7acc8f 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -358,6 +358,40 @@ public void testExplainStatsWithBinsOnTimeField() throws IOException { "source=events | bin @timestamp bins=3 | stats avg(cpu_usage) by @timestamp")); } + @Test + public void testExplainStatsWithSubAggregation() throws IOException { + enabledOnlyWhenPushdownIsEnabled(); + String expected = loadExpectedPlan("explain_stats_bins_on_time_and_term.yaml"); + assertYamlEqualsJsonIgnoreId( + expected, + explainQueryToString( + "source=events | bin @timestamp bins=3 | stats bucket_nullable=false count() by" + + " @timestamp, region")); + + expected = loadExpectedPlan("explain_stats_bins_on_time_and_term2.yaml"); + assertYamlEqualsJsonIgnoreId( + expected, + explainQueryToString( + "source=events | bin @timestamp bins=3 | stats bucket_nullable=false avg(cpu_usage) by" + + " @timestamp, region")); + } + + @Test + public void bucketNullableNotSupportSubAggregation() throws IOException { + // TODO: Don't throw exception after addressing + // https://github.com/opensearch-project/sql/issues/4317 + // When bucketNullable is true, sub aggregation is not supported. Hence we cannot pushdown the + // aggregation in this query. Caused by issue + // https://github.com/opensearch-project/sql/issues/4317, + // bin aggregation on timestamp field won't work if not been push down. + enabledOnlyWhenPushdownIsEnabled(); + assertThrows( + Exception.class, + () -> + explainQueryToString( + "source=events | bin @timestamp bins=3 | stats count() by @timestamp, region")); + } + @Test public void testExplainBinWithSpan() throws IOException { String expected = loadExpectedPlan("explain_bin_span.yaml"); @@ -673,15 +707,15 @@ public void testPushdownLimitIntoAggregation() throws IOException { "source=opensearch-sql_test_index_account | stats count() by state | sort state | head" + " 100 | head 10 from 10 ")); - expected = loadExpectedPlan("explain_limit_agg_pushdown_bucket_nullable1.json"); - assertJsonEqualsIgnoreId( + expected = loadExpectedPlan("explain_limit_agg_pushdown_bucket_nullable1.yaml"); + assertYamlEqualsJsonIgnoreId( expected, explainQueryToString( "source=opensearch-sql_test_index_account | stats bucket_nullable=false count() by" + " state | head 100 | head 10 from 10 ")); - expected = loadExpectedPlan("explain_limit_agg_pushdown_bucket_nullable2.json"); - assertJsonEqualsIgnoreId( + expected = loadExpectedPlan("explain_limit_agg_pushdown_bucket_nullable2.yaml"); + assertYamlEqualsJsonIgnoreId( expected, explainQueryToString( "source=opensearch-sql_test_index_account | stats bucket_nullable=false count() by" @@ -853,15 +887,15 @@ public void testExplainCountsByAgg() throws IOException { public void testExplainSortOnMetricsNoBucketNullable() throws IOException { // TODO enhancement later: https://github.com/opensearch-project/sql/issues/4282 enabledOnlyWhenPushdownIsEnabled(); - String expected = loadExpectedPlan("explain_agg_sort_on_metrics1.json"); - assertJsonEqualsIgnoreId( + String expected = loadExpectedPlan("explain_agg_sort_on_metrics1.yaml"); + assertYamlEqualsJsonIgnoreId( expected, explainQueryToString( "source=opensearch-sql_test_index_account | stats bucket_nullable=false count() by" + " state | sort `count()`")); - expected = loadExpectedPlan("explain_agg_sort_on_metrics2.json"); - assertJsonEqualsIgnoreId( + expected = loadExpectedPlan("explain_agg_sort_on_metrics2.yaml"); + assertYamlEqualsJsonIgnoreId( expected, explainQueryToString( "source=opensearch-sql_test_index_account | stats bucket_nullable=false count() by" diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_metrics1.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_metrics1.yaml new file mode 100644 index 00000000000..81082ac86e7 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_metrics1.yaml @@ -0,0 +1,13 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$0], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(sort0=[$0], dir0=[ASC-nulls-first]) + LogicalProject(count()=[$1], state=[$0]) + LogicalAggregate(group=[{0}], count()=[COUNT()]) + LogicalProject(state=[$7]) + LogicalFilter(condition=[IS NOT NULL($7)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableSort(sort0=[$0], dir0=[ASC-nulls-first]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), PROJECT->[count(), state]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"state":{"terms":{"field":"state.keyword","missing_bucket":false,"order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_metrics2.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_metrics2.json deleted file mode 100644 index a40a5e51c16..00000000000 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_metrics2.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "calcite": { - "logical": "LogicalSystemLimit(sort0=[$0], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalSort(sort0=[$0], dir0=[ASC-nulls-first])\n LogicalProject(count()=[$2], gender=[$0], state=[$1])\n LogicalAggregate(group=[{0, 1}], count()=[COUNT()])\n LogicalProject(gender=[$4], state=[$7])\n LogicalFilter(condition=[AND(IS NOT NULL($4), IS NOT NULL($7))])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n", - "physical": "EnumerableLimit(fetch=[10000])\n EnumerableSort(sort0=[$0], dir0=[ASC-nulls-first])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[FILTER->AND(IS NOT NULL($4), IS NOT NULL($7)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},count()=COUNT()), PROJECT->[count(), gender, state]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"query\":{\"bool\":{\"must\":[{\"exists\":{\"field\":\"gender\",\"boost\":1.0}},{\"exists\":{\"field\":\"state\",\"boost\":1.0}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":false,\"order\":\"asc\"}}},{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":false,\"order\":\"asc\"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" - } -} diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_metrics2.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_metrics2.yaml new file mode 100644 index 00000000000..8a45ecc2f92 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_metrics2.yaml @@ -0,0 +1,13 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$0], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(sort0=[$0], dir0=[ASC-nulls-first]) + LogicalProject(count()=[$2], gender=[$0], state=[$1]) + LogicalAggregate(group=[{0, 1}], count()=[COUNT()]) + LogicalProject(gender=[$4], state=[$7]) + LogicalFilter(condition=[AND(IS NOT NULL($4), IS NOT NULL($7))]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableSort(sort0=[$0], dir0=[ASC-nulls-first]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},count()=COUNT()), PROJECT->[count(), gender, state]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"gender":{"terms":{"field":"gender.keyword","missing_bucket":false,"order":"asc"}}},{"state":{"terms":{"field":"state.keyword","missing_bucket":false,"order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_limit_agg_pushdown_bucket_nullable1.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_limit_agg_pushdown_bucket_nullable1.json deleted file mode 100644 index c4346b4134b..00000000000 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_limit_agg_pushdown_bucket_nullable1.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "calcite": { - "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalSort(offset=[10], fetch=[10])\n LogicalSort(fetch=[100])\n LogicalProject(count()=[$1], state=[$0])\n LogicalAggregate(group=[{0}], count()=[COUNT()])\n LogicalProject(state=[$7])\n LogicalFilter(condition=[IS NOT NULL($7)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n", - "physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..1=[{inputs}], count()=[$t1], state=[$t0])\n EnumerableLimit(offset=[10], fetch=[10])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[FILTER->IS NOT NULL($7), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), LIMIT->100, LIMIT->[10 from 10]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"query\":{\"exists\":{\"field\":\"state\",\"boost\":1.0}},\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":20,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":false,\"order\":\"asc\"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" - } -} diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_limit_agg_pushdown_bucket_nullable1.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_limit_agg_pushdown_bucket_nullable1.yaml new file mode 100644 index 00000000000..b4117a5a84c --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_limit_agg_pushdown_bucket_nullable1.yaml @@ -0,0 +1,15 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(offset=[10], fetch=[10]) + LogicalSort(fetch=[100]) + LogicalProject(count()=[$1], state=[$0]) + LogicalAggregate(group=[{0}], count()=[COUNT()]) + LogicalProject(state=[$7]) + LogicalFilter(condition=[IS NOT NULL($7)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..1=[{inputs}], count()=[$t1], state=[$t0]) + EnumerableLimit(offset=[10], fetch=[10]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), LIMIT->100, LIMIT->[10 from 10]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":20,"sources":[{"state":{"terms":{"field":"state.keyword","missing_bucket":false,"order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_limit_agg_pushdown_bucket_nullable2.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_limit_agg_pushdown_bucket_nullable2.json deleted file mode 100644 index e391db7e53e..00000000000 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_limit_agg_pushdown_bucket_nullable2.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "calcite": { - "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalSort(offset=[10], fetch=[10])\n LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[100])\n LogicalProject(count()=[$1], state=[$0])\n LogicalAggregate(group=[{0}], count()=[COUNT()])\n LogicalProject(state=[$7])\n LogicalFilter(condition=[IS NOT NULL($7)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n", - "physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..1=[{inputs}], count()=[$t1], state=[$t0])\n EnumerableLimit(offset=[10], fetch=[10])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[FILTER->IS NOT NULL($7), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), SORT->[0 ASC FIRST], LIMIT->100, LIMIT->[10 from 10]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"query\":{\"exists\":{\"field\":\"state\",\"boost\":1.0}},\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":20,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":false,\"order\":\"asc\"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" - } -} diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_limit_agg_pushdown_bucket_nullable2.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_limit_agg_pushdown_bucket_nullable2.yaml new file mode 100644 index 00000000000..78278fe1618 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_limit_agg_pushdown_bucket_nullable2.yaml @@ -0,0 +1,15 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(offset=[10], fetch=[10]) + LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[100]) + LogicalProject(count()=[$1], state=[$0]) + LogicalAggregate(group=[{0}], count()=[COUNT()]) + LogicalProject(state=[$7]) + LogicalFilter(condition=[IS NOT NULL($7)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..1=[{inputs}], count()=[$t1], state=[$t0]) + EnumerableLimit(offset=[10], fetch=[10]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), SORT->[0 ASC FIRST], LIMIT->100, LIMIT->[10 from 10]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":20,"sources":[{"state":{"terms":{"field":"state.keyword","missing_bucket":false,"order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_stats_bins_on_time.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_stats_bins_on_time.yaml index cb3428897ac..b3f3f5aed9b 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_stats_bins_on_time.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_stats_bins_on_time.yaml @@ -8,4 +8,4 @@ calcite: physical: | EnumerableLimit(fetch=[10000]) EnumerableCalc(expr#0..1=[{inputs}], expr#2=[0], expr#3=[>($t1, $t2)], count()=[$t1], @timestamp=[$t0], $condition=[$t3]) - CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT())], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"@timestamp":{"auto_date_histogram":{"field":"@timestamp","buckets":3,"minimum_interval":null},"aggregations":{"count()":{"value_count":{"field":"_index"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT())], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"@timestamp":{"auto_date_histogram":{"field":"@timestamp","buckets":3,"minimum_interval":null}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_stats_bins_on_time_and_term.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_stats_bins_on_time_and_term.yaml new file mode 100644 index 00000000000..a285a731ab9 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_stats_bins_on_time_and_term.yaml @@ -0,0 +1,11 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(count()=[$2], @timestamp=[$0], region=[$1]) + LogicalAggregate(group=[{0, 1}], count()=[COUNT()]) + LogicalProject(@timestamp=[$15], region=[$7]) + LogicalFilter(condition=[IS NOT NULL($7)]) + LogicalProject(environment=[$0], status_code=[$2], service=[$3], host=[$4], memory_usage=[$5], response_time=[$6], cpu_usage=[$7], region=[$8], bytes_sent=[$9], _id=[$10], _index=[$11], _score=[$12], _maxscore=[$13], _sort=[$14], _routing=[$15], @timestamp=[WIDTH_BUCKET($1, 3, -(MAX($1) OVER (), MIN($1) OVER ()), MAX($1) OVER ())]) + CalciteLogicalIndexScan(table=[[OpenSearch, events]]) + physical: | + CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},count()=COUNT()), PROJECT->[count(), @timestamp, region], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"region":{"terms":{"field":"region","size":1000,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":{"_key":"asc"}},"aggregations":{"@timestamp":{"auto_date_histogram":{"field":"@timestamp","buckets":3,"minimum_interval":null}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_stats_bins_on_time_and_term2.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_stats_bins_on_time_and_term2.yaml new file mode 100644 index 00000000000..147902bdf0d --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_stats_bins_on_time_and_term2.yaml @@ -0,0 +1,11 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(avg(cpu_usage)=[$2], @timestamp=[$0], region=[$1]) + LogicalAggregate(group=[{0, 1}], avg(cpu_usage)=[AVG($2)]) + LogicalProject(@timestamp=[$15], region=[$7], cpu_usage=[$6]) + LogicalFilter(condition=[IS NOT NULL($7)]) + LogicalProject(environment=[$0], status_code=[$2], service=[$3], host=[$4], memory_usage=[$5], response_time=[$6], cpu_usage=[$7], region=[$8], bytes_sent=[$9], _id=[$10], _index=[$11], _score=[$12], _maxscore=[$13], _sort=[$14], _routing=[$15], @timestamp=[WIDTH_BUCKET($1, 3, -(MAX($1) OVER (), MIN($1) OVER ()), MAX($1) OVER ())]) + CalciteLogicalIndexScan(table=[[OpenSearch, events]]) + physical: | + CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={1, 2},avg(cpu_usage)=AVG($0)), PROJECT->[avg(cpu_usage), @timestamp, region], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"region":{"terms":{"field":"region","size":1000,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":{"_key":"asc"}},"aggregations":{"@timestamp":{"auto_date_histogram":{"field":"@timestamp","buckets":3,"minimum_interval":null},"aggregations":{"avg(cpu_usage)":{"avg":{"field":"cpu_usage"}}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchAggregateIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchAggregateIndexScanRule.java index f5f9969b8fc..0e9f68dfc3d 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchAggregateIndexScanRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchAggregateIndexScanRule.java @@ -8,16 +8,22 @@ import static org.opensearch.sql.expression.function.PPLBuiltinOperators.WIDTH_BUCKET; import java.util.List; +import java.util.function.Function; import java.util.function.Predicate; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelRule; import org.apache.calcite.rel.AbstractRelNode; import org.apache.calcite.rel.logical.LogicalAggregate; +import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.immutables.value.Value; +import org.opensearch.sql.ast.expression.Argument; import org.opensearch.sql.calcite.type.ExprSqlType; import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.ExprUDT; import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan; @@ -34,17 +40,31 @@ protected OpenSearchAggregateIndexScanRule(Config config) { @Override public void onMatch(RelOptRuleCall call) { - if (call.rels.length == 3) { + if (call.rels.length == 4) { + final LogicalAggregate aggregate = call.rel(0); + final LogicalFilter filter = call.rel(1); + final LogicalProject project = call.rel(2); + final CalciteLogicalIndexScan scan = call.rel(3); + List groupSet = aggregate.getGroupSet().asList(); + RexNode condition = filter.getCondition(); + Function isNotNullFromAgg = + rex -> + rex instanceof RexCall rexCall + && rexCall.getOperator() == SqlStdOperatorTable.IS_NOT_NULL + && rexCall.getOperands().get(0) instanceof RexInputRef ref + && groupSet.contains(ref.getIndex()); + if (isNotNullFromAgg.apply(condition) + || (condition instanceof RexCall rexCall + && rexCall.getOperator() == SqlStdOperatorTable.AND + && rexCall.getOperands().stream().allMatch(isNotNullFromAgg::apply))) { + // Try to do the aggregate push down and ignore the filter if the filter sources from the + // aggregate's hint. See{@link CalciteRelNodeVisitor::visitAggregation} + apply(call, aggregate, project, scan); + } + } else if (call.rels.length == 3) { final LogicalAggregate aggregate = call.rel(0); final LogicalProject project = call.rel(1); final CalciteLogicalIndexScan scan = call.rel(2); - - // For multiple group-by, we currently have to use CompositeAggregationBuilder while it - // doesn't support auto_date_histogram referring to bin command with parameter bins - if (aggregate.getGroupSet().length() > 1 && Config.containsWidthBucketFuncOnDate(project)) { - return; - } - apply(call, aggregate, project, scan); } else if (call.rels.length == 2) { // case of count() without group-by @@ -123,12 +143,77 @@ public interface Config extends RelRule.Config { Predicate.not(OpenSearchIndexScanRule::isLimitPushed) .and(OpenSearchIndexScanRule::noAggregatePushed)) .noInputs())); + // TODO: No need this rule once https://github.com/opensearch-project/sql/issues/4403 is + // addressed + Config BUCKET_NON_NULL_AGG = + ImmutableOpenSearchAggregateIndexScanRule.Config.builder() + .build() + .withDescription("Agg-Filter-Project-TableScan") + .withOperandSupplier( + b0 -> + b0.operand(LogicalAggregate.class) + .predicate( + agg -> + agg.getHints().stream() + .anyMatch( + hint -> + hint.hintName.equals("stats_args") + && hint.kvOptions + .get(Argument.BUCKET_NULLABLE) + .equals("false"))) + .oneInput( + b1 -> + b1.operand(LogicalFilter.class) + .predicate(Config::mayBeFilterFromBucketNonNull) + .oneInput( + b2 -> + b2.operand(LogicalProject.class) + .predicate( + // Support push down aggregate with project + // that: + // 1. No RexOver and no duplicate projection + // 2. Contains width_bucket function on date + // field referring + // to bin command with parameter bins + Predicate.not( + OpenSearchIndexScanRule + ::containsRexOver) + .and( + OpenSearchIndexScanRule + ::distinctProjectList) + .or(Config::containsWidthBucketFuncOnDate)) + .oneInput( + b3 -> + b3.operand(CalciteLogicalIndexScan.class) + .predicate( + Predicate.not( + OpenSearchIndexScanRule + ::isLimitPushed) + .and( + OpenSearchIndexScanRule + ::noAggregatePushed)) + .noInputs())))); @Override default OpenSearchAggregateIndexScanRule toRule() { return new OpenSearchAggregateIndexScanRule(this); } + static boolean mayBeFilterFromBucketNonNull(LogicalFilter filter) { + RexNode condition = filter.getCondition(); + return isNotNullOnRef(condition) + || (condition instanceof RexCall rexCall + && rexCall.getOperator().equals(SqlStdOperatorTable.AND) + && rexCall.getOperands().stream() + .allMatch(OpenSearchAggregateIndexScanRule.Config::isNotNullOnRef)); + } + + private static boolean isNotNullOnRef(RexNode rex) { + return rex instanceof RexCall rexCall + && rexCall.getOperator().equals(SqlStdOperatorTable.IS_NOT_NULL) + && rexCall.getOperands().get(0) instanceof RexInputRef; + } + static boolean containsWidthBucketFuncOnDate(LogicalProject project) { return project.getProjects().stream() .anyMatch( diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexRules.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexRules.java index 6d349aa452d..0e947126314 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexRules.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexRules.java @@ -18,6 +18,9 @@ public class OpenSearchIndexRules { OpenSearchAggregateIndexScanRule.Config.DEFAULT.toRule(); private static final OpenSearchAggregateIndexScanRule COUNT_STAR_INDEX_SCAN = OpenSearchAggregateIndexScanRule.Config.COUNT_STAR.toRule(); + // TODO: No need this rule once https://github.com/opensearch-project/sql/issues/4403 is addressed + private static final OpenSearchAggregateIndexScanRule BUCKET_NON_NULL_AGG_INDEX_SCAN = + OpenSearchAggregateIndexScanRule.Config.BUCKET_NON_NULL_AGG.toRule(); private static final OpenSearchLimitIndexScanRule LIMIT_INDEX_SCAN = OpenSearchLimitIndexScanRule.Config.DEFAULT.toRule(); private static final OpenSearchSortIndexScanRule SORT_INDEX_SCAN = @@ -39,6 +42,7 @@ public class OpenSearchIndexRules { FILTER_INDEX_SCAN, AGGREGATE_INDEX_SCAN, COUNT_STAR_INDEX_SCAN, + BUCKET_NON_NULL_AGG_INDEX_SCAN, LIMIT_INDEX_SCAN, SORT_INDEX_SCAN, // TODO enable if https://github.com/opensearch-project/OpenSearch/issues/3725 resolved diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java index 065375dd230..350b9e37926 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java @@ -127,6 +127,12 @@ public static class ExpressionNotAnalyzableException extends Exception { } } + public static class CompositeAggUnSupportedException extends RuntimeException { + CompositeAggUnSupportedException(String message) { + super(message); + } + } + private AggregateAnalyzer() {} @RequiredArgsConstructor @@ -206,74 +212,53 @@ public static Pair, OpenSearchAggregationResponseParser // both count() and count(FIELD) can apply doc_count optimization in non-bucket aggregation, // but only count() can apply doc_count optimization in bucket aggregation. boolean countAllOnly = !aggregate.getGroupSet().isEmpty(); - Pair, Builder> pair = + Pair, Builder> countAggNameAndBuilderPair = removeCountAggregationBuilders(metricBuilder, countAllOnly); - List removedCountAggBuilders = pair.getLeft(); - Builder newMetricBuilder = pair.getRight(); - - boolean removedCountAggBuildersHaveSomeField = - removedCountAggBuilders.stream() - .map(ValuesSourceAggregationBuilder::fieldName) - .distinct() - .count() - == 1; - boolean allCountAggRemoved = - removedCountAggBuilders.size() == metricBuilder.getAggregatorFactories().size(); + Builder newMetricBuilder = countAggNameAndBuilderPair.getRight(); + List countAggNames = countAggNameAndBuilderPair.getLeft(); + if (aggregate.getGroupSet().isEmpty()) { - if (allCountAggRemoved && removedCountAggBuildersHaveSomeField) { + if (newMetricBuilder == null) { // The optimization must require all count aggregations are removed, // and they have only one field name - List countAggNameList = - removedCountAggBuilders.stream() - .map(ValuesSourceAggregationBuilder::getName) - .toList(); - return Pair.of( - ImmutableList.copyOf(newMetricBuilder.getAggregatorFactories()), - new CountAsTotalHitsParser(countAggNameList)); + return Pair.of(List.of(), new CountAsTotalHitsParser(countAggNames)); } else { return Pair.of( - ImmutableList.copyOf(metricBuilder.getAggregatorFactories()), + ImmutableList.copyOf(newMetricBuilder.getAggregatorFactories()), new NoBucketAggregationParser(metricParserList)); } } else if (aggregate.getGroupSet().length() == 1 && isAutoDateSpan(project.getProjects().get(groupList.getFirst()))) { - RexCall rexCall = (RexCall) project.getProjects().get(groupList.getFirst()); - String bucketName = project.getRowType().getFieldList().get(groupList.getFirst()).getName(); - RexInputRef rexInputRef = (RexInputRef) rexCall.getOperands().getFirst(); - RexLiteral valueLiteral = (RexLiteral) rexCall.getOperands().get(1); - ValuesSourceAggregationBuilder bucketBuilder = - new AutoDateHistogramAggregationBuilder(bucketName) - .field(helper.inferNamedField(rexInputRef).getRootName()) - .setNumBuckets(requireNonNull(valueLiteral.getValueAs(Integer.class))); + ValuesSourceAggregationBuilder bucketBuilder = createBucket(0, project, helper); + if (newMetricBuilder != null) { + bucketBuilder.subAggregations(newMetricBuilder); + } return Pair.of( - Collections.singletonList(bucketBuilder.subAggregations(metricBuilder)), - new BucketAggregationParser(metricParserList)); + Collections.singletonList(bucketBuilder), + new BucketAggregationParser(metricParserList, countAggNames)); } else { - List> buckets = - createCompositeBuckets(groupList, project, helper); - AggregationBuilder aggregationBuilder = - AggregationBuilders.composite("composite_buckets", buckets) - .size(AGGREGATION_BUCKET_SIZE); - - // For bucket aggregation, no count() aggregator or not all aggregators are count(), - // fallback to original ValueCountAggregation. - if (removedCountAggBuilders.isEmpty() - || removedCountAggBuilders.size() != metricBuilder.getAggregatorFactories().size()) { - aggregationBuilder.subAggregations(metricBuilder); + AggregationBuilder aggregationBuilder; + try { + List> buckets = + createCompositeBuckets(groupList, project, helper); + aggregationBuilder = + AggregationBuilders.composite("composite_buckets", buckets) + .size(AGGREGATION_BUCKET_SIZE); + if (newMetricBuilder != null) { + aggregationBuilder.subAggregations(metricBuilder); + } return Pair.of( Collections.singletonList(aggregationBuilder), - new CompositeAggregationParser(metricParserList)); - } - // No need to register sub-factories if no aggregator factories left after removing all - // ValueCountAggregationBuilder. - if (!newMetricBuilder.getAggregatorFactories().isEmpty()) { - aggregationBuilder.subAggregations(newMetricBuilder); + new CompositeAggregationParser(metricParserList, countAggNames)); + } catch (CompositeAggUnSupportedException e) { + if (bucketNullable) { + throw new UnsupportedOperationException(e.getMessage()); + } + aggregationBuilder = createNestedBuckets(groupList, project, newMetricBuilder, helper); + return Pair.of( + Collections.singletonList(aggregationBuilder), + new BucketAggregationParser(metricParserList, countAggNames)); } - List countAggNameList = - removedCountAggBuilders.stream().map(ValuesSourceAggregationBuilder::getName).toList(); - return Pair.of( - Collections.singletonList(aggregationBuilder), - new CompositeAggregationParser(metricParserList, countAggNameList)); } } catch (Throwable e) { Throwables.throwIfInstanceOf(e, UnsupportedOperationException.class); @@ -282,14 +267,16 @@ && isAutoDateSpan(project.getProjects().get(groupList.getFirst()))) { } /** - * Remove all ValueCountAggregationBuilder from metric builder, and return the removed - * ValueCountAggregationBuilder list. + * Remove all ValueCountAggregationBuilder from metric builder, and return the name list for the + * removed count aggs with the updated metric builder. * * @param metricBuilder metrics builder * @param countAllOnly remove count() only, or count(FIELD) will be removed. - * @return a pair of removed ValueCountAggregationBuilder and updated metric builder + * @return a pair of name list for the removed count aggs and updated metric builder. If the count + * aggregations cannot satisfy the requirement to remove, it will return an empty name list + * with the original metric builder. */ - private static Pair, Builder> removeCountAggregationBuilders( + private static Pair, Builder> removeCountAggregationBuilders( Builder metricBuilder, boolean countAllOnly) { List countAggregatorFactories = metricBuilder.getAggregatorFactories().stream() @@ -302,7 +289,26 @@ private static Pair, Builder> removeCountAggr copy.removeAll(countAggregatorFactories); Builder newMetricBuilder = new AggregatorFactories.Builder(); copy.forEach(newMetricBuilder::addAggregator); - return Pair.of(countAggregatorFactories, newMetricBuilder); + + if (countAllOnly || supportCountFiled(countAggregatorFactories, metricBuilder)) { + List countAggNameList = + countAggregatorFactories.stream().map(ValuesSourceAggregationBuilder::getName).toList(); + if (newMetricBuilder.getAggregatorFactories().isEmpty()) { + newMetricBuilder = null; + } + return Pair.of(countAggNameList, newMetricBuilder); + } + return Pair.of(List.of(), metricBuilder); + } + + private static boolean supportCountFiled( + List countAggBuilderList, Builder metricBuilder) { + return countAggBuilderList.size() == metricBuilder.getAggregatorFactories().size() + && countAggBuilderList.stream() + .map(ValuesSourceAggregationBuilder::fieldName) + .distinct() + .count() + == 1; } private static Pair> processAggregateCalls( @@ -339,7 +345,7 @@ private static Pair createAggregationBuilderAn AggregateCall aggCall, List args, String aggFieldName, - AggregateBuilderHelper helper) { + AggregateAnalyzer.AggregateBuilderHelper helper) { if (aggCall.isDistinct()) { return createDistinctAggregation(aggCall, args, aggFieldName, helper); } else { @@ -512,11 +518,6 @@ private static boolean supportsMaxMinAggregation(ExprType fieldType) { || coreType == ExprCoreType.TIMESTAMP; } - private static ValuesSourceAggregationBuilder createBucketAggregation( - Integer group, Project project, AggregateAnalyzer.AggregateBuilderHelper helper) { - return createBucket(group, project, helper); - } - private static List> createCompositeBuckets( List groupList, Project project, AggregateAnalyzer.AggregateBuilderHelper helper) { ImmutableList.Builder> resultBuilder = ImmutableList.builder(); @@ -525,6 +526,24 @@ private static List> createCompositeBuckets( return resultBuilder.build(); } + private static ValuesSourceAggregationBuilder createNestedBuckets( + List groupList, + Project project, + Builder metricBuilder, + AggregateAnalyzer.AggregateBuilderHelper helper) { + ValuesSourceAggregationBuilder rootAgg = createBucket(groupList.get(0), project, helper); + ValuesSourceAggregationBuilder currentAgg = rootAgg; + for (int i = 1; i < groupList.size(); i++) { + ValuesSourceAggregationBuilder nextAgg = createBucket(groupList.get(i), project, helper); + currentAgg.subAggregations(new AggregatorFactories.Builder().addAggregator(nextAgg)); + currentAgg = nextAgg; + } + if (metricBuilder != null) { + currentAgg.subAggregations(metricBuilder); + } + return rootAgg; + } + private static boolean isAutoDateSpan(RexNode rex) { return rex instanceof RexCall rexCall && rexCall.getKind() == SqlKind.OTHER_FUNCTION @@ -547,13 +566,21 @@ private static ValuesSourceAggregationBuilder createBucket( helper.inferNamedField(rexInputRef).getRootName(), valueLiteral.getValueAs(Double.class), SpanUnit.of(unitLiteral.getValueAs(String.class))); + } else if (isAutoDateSpan(rex)) { + RexCall rexCall = (RexCall) rex; + RexInputRef rexInputRef = (RexInputRef) rexCall.getOperands().getFirst(); + RexLiteral valueLiteral = (RexLiteral) rexCall.getOperands().get(1); + return new AutoDateHistogramAggregationBuilder(bucketName) + .field(helper.inferNamedField(rexInputRef).getRootName()) + .setNumBuckets(requireNonNull(valueLiteral.getValueAs(Integer.class))); } else { return createTermsAggregationBuilder(bucketName, rex, helper); } } private static CompositeValuesSourceBuilder createCompositeBucket( - Integer groupIndex, Project project, AggregateAnalyzer.AggregateBuilderHelper helper) { + Integer groupIndex, Project project, AggregateAnalyzer.AggregateBuilderHelper helper) + throws CompositeAggUnSupportedException { RexNode rex = project.getProjects().get(groupIndex); String bucketName = project.getRowType().getFieldList().get(groupIndex).getName(); if (rex instanceof RexCall rexCall @@ -571,8 +598,7 @@ private static CompositeValuesSourceBuilder createCompositeBucket( MissingOrder.FIRST, helper.bucketNullable); } else if (isAutoDateSpan(rex)) { - // Defense check. We've already prevented this case in OpenSearchAggregateIndexScanRule. - throw new UnsupportedOperationException( + throw new CompositeAggUnSupportedException( "auto_date_histogram is not supported in composite agg."); } else { return createTermsSourceBuilder(bucketName, rex, helper); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/BucketAggregationParser.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/BucketAggregationParser.java index f43743d2e28..f9395976625 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/BucketAggregationParser.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/BucketAggregationParser.java @@ -6,10 +6,8 @@ package org.opensearch.sql.opensearch.response.agg; import java.util.Arrays; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import lombok.EqualsAndHashCode; import org.opensearch.search.SearchHits; import org.opensearch.search.aggregations.Aggregation; @@ -23,6 +21,8 @@ @EqualsAndHashCode public class BucketAggregationParser implements OpenSearchAggregationResponseParser { private final MetricParserHelper metricsParser; + // countAggNameList dedicated the list of count aggregations which are filled by doc_count + private List countAggNameList = List.of(); public BucketAggregationParser(MetricParser... metricParserList) { metricsParser = new MetricParserHelper(Arrays.asList(metricParserList)); @@ -32,18 +32,44 @@ public BucketAggregationParser(List metricParserList) { metricsParser = new MetricParserHelper(metricParserList); } + public BucketAggregationParser( + List metricParserList, List countAggNameList) { + metricsParser = new MetricParserHelper(metricParserList, countAggNameList); + this.countAggNameList = countAggNameList; + } + @Override public List> parse(Aggregations aggregations) { Aggregation agg = aggregations.asList().getFirst(); return ((MultiBucketsAggregation) agg) - .getBuckets().stream().map(b -> parse(b, agg.getName())).collect(Collectors.toList()); + .getBuckets().stream() + .map(b -> parseBucket(b, agg.getName())) + .flatMap(List::stream) + .toList(); + } + + private List> parseBucket( + MultiBucketsAggregation.Bucket bucket, String name) { + Aggregations aggregations = bucket.getAggregations(); + List> results = + isLeafAgg(aggregations) + ? parseLeafAgg(aggregations, bucket.getDocCount()) + : parse(aggregations); + for (Map r : results) { + r.put(name, bucket.getKey()); + } + return results; + } + + private boolean isLeafAgg(Aggregations aggregations) { + return !(aggregations.asList().size() == 1 + && aggregations.asList().get(0) instanceof MultiBucketsAggregation); } - private Map parse(MultiBucketsAggregation.Bucket bucket, String keyName) { - Map resultMap = new LinkedHashMap<>(); - resultMap.put(keyName, bucket.getKey()); - resultMap.putAll(metricsParser.parse(bucket.getAggregations())); - return resultMap; + private List> parseLeafAgg(Aggregations aggregations, long docCount) { + Map resultMap = metricsParser.parse(aggregations); + countAggNameList.forEach(countAggName -> resultMap.put(countAggName, docCount)); + return List.of(resultMap); } @Override diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java index ee9f6be144c..412a75a794d 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java @@ -9,6 +9,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; import lombok.Getter; import org.apache.calcite.plan.Convention; @@ -309,10 +310,10 @@ public AbstractRelNode pushDownAggregate(Aggregate aggregate, Project project) { instanceof AutoDateHistogramAggregationBuilder autoDateHistogram) { // If it's auto_date_histogram, filter the empty bucket by using the first aggregate metrics RexBuilder rexBuilder = getCluster().getRexBuilder(); - AggregationBuilder aggregationBuilders = - autoDateHistogram.getSubAggregations().stream().toList().getFirst(); + Optional aggBuilderOpt = + autoDateHistogram.getSubAggregations().stream().toList().stream().findFirst(); RexNode condition = - aggregationBuilders instanceof ValueCountAggregationBuilder + aggBuilderOpt.isEmpty() || aggBuilderOpt.get() instanceof ValueCountAggregationBuilder ? rexBuilder.makeCall( SqlStdOperatorTable.GREATER_THAN, rexBuilder.makeInputRef(newScan, 1),