Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,17 @@ public void testExplainWithReverse() throws IOException {
assertTrue(result.contains("dir0=[DESC]"));
}

@Test
public void noPushDownForAggOnWindow() throws IOException {
Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled());
String query =
"source=opensearch-sql_test_index_account | patterns address method=BRAIN | stats count()"
+ " by patterns_field";
var result = explainQueryToString(query);
String expected = loadFromFile("expectedOutput/calcite/explain_agg_on_window.json");
assertJsonEqualsIgnoreId(expected, result);
}

/**
* Executes the PPL query and returns the result as a string with windows-style line breaks
* replaced with Unix-style ones.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(count()=[$1], patterns_field=[$0])\n LogicalAggregate(group=[{0}], count()=[COUNT()])\n LogicalProject(patterns_field=[SAFE_CAST(ITEM(PATTERN_PARSER($2, pattern($2, 10, 100000) OVER ()), 'pattern'))])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..1=[{inputs}], count()=[$t1], patterns_field=[$t0])\n EnumerableAggregate(group=[{0}], count()=[COUNT()])\n EnumerableCalc(expr#0..1=[{inputs}], expr#2=[PATTERN_PARSER($t0, $t1)], expr#3=['pattern'], expr#4=[ITEM($t2, $t3)], expr#5=[SAFE_CAST($t4)], patterns_field=[$t5])\n EnumerableWindow(window#0=[window(aggs [pattern($0, $1, $2)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[address]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"address\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(age2=[$2])\n LogicalFilter(condition=[<=($3, 1)])\n LogicalProject(avg_age=[$0], state=[$1], age2=[$2], _row_number_=[ROW_NUMBER() OVER (PARTITION BY $2 ORDER BY $2)])\n LogicalFilter(condition=[IS NOT NULL($2)])\n LogicalProject(avg_age=[$0], state=[$1], age2=[+($0, 2)])\n LogicalSort(sort0=[$1], dir0=[ASC-nulls-first])\n LogicalProject(avg_age=[$2], state=[$0], city=[$1])\n LogicalAggregate(group=[{0, 1}], avg_age=[AVG($2)])\n LogicalProject(state=[$7], city=[$5], age=[$8])\n LogicalFilter(condition=[>($8, 30)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=[<=($t2, $t3)], age2=[$t1], $condition=[$t4])\n EnumerableWindow(window#0=[window(partition {1} order by [1] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])\n EnumerableCalc(expr#0..2=[{inputs}], expr#3=[2], expr#4=[+($t2, $t3)], expr#5=[IS NOT NULL($t2)], state=[$t0], age2=[$t4], $condition=[$t5])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[city, state, age], FILTER->>($2, 30), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},avg_age=AVG($2)), SORT->[0 ASC FIRST]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"city\",\"state\",\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":10,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=[<=($t2, $t3)], age2=[$t1], $condition=[$t4])\n EnumerableWindow(window#0=[window(partition {1} order by [1] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])\n EnumerableCalc(expr#0..2=[{inputs}], expr#3=[2], expr#4=[+($t2, $t3)], expr#5=[IS NOT NULL($t2)], state=[$t0], age2=[$t4], $condition=[$t5])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[city, state, age], FILTER->>($2, 30), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},avg_age=AVG($2)), SORT->[0 ASC FIRST]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"city\",\"state\",\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
setup:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled : true

- do:
indices.create:
index: test
body:
settings:
number_of_shards: 1
number_of_replicas: 0
mappings:
properties:
body:
type: text

---
teardown:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled : false

---
"Fix incorrectly push down aggregate with filter":
- skip:
features:
- headers
- allowed_warnings
- do:
bulk:
index: test
refresh: true
body:
- '{"index": {}}'
- '{"body": "[2025-07-31T05:44:45.164Z] \"GET /api/data HTTP/1.1\" 200 - via_upstream - \"-\" 0 221 2 2 \"-\" \"python-requests/2.32.4\" \"80a2a234-bbb2-9bf3-bbc6-ba7554aee8b6\" \"frontend-proxy:8080\" \"172.18.0.25:8080\" frontend 172.18.0.27:46596 172.18.0.27:8080 172.18.0.26:53294 - -"}'

- do:
allowed_warnings:
- 'Loading the fielddata on the _id field is deprecated and will be removed in future versions. If you require sorting or aggregating on this field you should also include the id in the body of your documents, and map this field as a keyword field that has [doc_values] enabled'
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=test | parse body 'HTTP/1.1\" (?<httpstatus>\\d+)' | eval status2xx=if(httpstatus>='200' and httpstatus<'300', 1, 0), status3xx=if(httpstatus>='300' and httpstatus<'400', 1, 0), status4xx=if(httpstatus>='400' and httpstatus<'500', 1, 0), status5xx=if(httpstatus>='500' and httpstatus<'600', 1, 0), statusOther=if(httpstatus>='600', 1, 0) | stats count() as `Request Count`, sum(status2xx) as `HTTP 2xx`, sum(status3xx) as `HTTP 3xx`, sum(status4xx) as `HTTP 4xx`, sum(status5xx) as `HTTP 5xx`, sum(statusOther) as `Other`
- match: {"total": 1}
- match: {"schema": [{"name": "Request Count", "type": "bigint"}, {"name": "HTTP 2xx", "type": "bigint"}, {"name": "HTTP 3xx", "type": "bigint"}, {"name": "HTTP 4xx", "type": "bigint"}, {"name": "HTTP 5xx", "type": "bigint"}, {"name": "Other", "type": "bigint"}]}
- match: {"datarows": [[1, 1, 0, 0, 0, 0]]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
setup:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled : true

- do:
indices.create:
index: test
body:
settings:
number_of_shards: 1
number_of_replicas: 0
mappings:
properties:
body:
type: text

---
teardown:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled : false

---
"Fix bucket size missing":
- skip:
features:
- headers
- allowed_warnings
- do:
bulk:
index: test
refresh: true
body:
- '{"index": {}}'
- '{ "title" : "document 1"}'
- '{"index": {}}'
- '{ "title" : "document 2"}'
- '{"index": {}}'
- '{ "title" : "document 3"}'
- '{"index": {}}'
- '{ "title" : "document 4"}'
- '{"index": {}}'
- '{ "title" : "document 5"}'
- '{"index": {}}'
- '{ "title" : "document 6"}'
- '{"index": {}}'
- '{ "title" : "document 7"}'
- '{"index": {}}'
- '{ "title" : "document 8"}'
- '{"index": {}}'
- '{ "title" : "document 9"}'
- '{"index": {}}'
- '{ "title" : "document 10"}'
- '{"index": {}}'
- '{ "title" : "document 11"}'

- do:
allowed_warnings:
- 'Loading the fielddata on the _id field is deprecated and will be removed in future versions. If you require sorting or aggregating on this field you should also include the id in the body of your documents, and map this field as a keyword field that has [doc_values] enabled'
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=test | stats count() by title | sort title
- match: {"total": 11}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.function.Predicate;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.sql.SqlKind;
Expand Down Expand Up @@ -64,10 +65,17 @@ public interface Config extends RelRule.Config {
.withOperandSupplier(
b0 ->
b0.operand(LogicalAggregate.class)
.predicate(
agg ->
// Cannot push down aggregation with inner filter
agg.getAggCallList().stream().noneMatch(AggregateCall::hasFilter))
.oneInput(
b1 ->
b1.operand(LogicalProject.class)
.predicate(OpenSearchIndexScanRule::distinctProjectList)
.predicate(
// Don't push down aggregate on window function
Predicate.not(OpenSearchIndexScanRule::containsRexOver)
.and(OpenSearchIndexScanRule::distinctProjectList))
.oneInput(
b2 ->
b2.operand(CalciteLogicalIndexScan.class)
Expand All @@ -92,7 +100,8 @@ public interface Config extends RelRule.Config {
.allMatch(
call ->
call.getAggregation().kind == SqlKind.COUNT
&& call.getArgList().isEmpty()))
&& call.getArgList().isEmpty()
&& !call.hasFilter()))
.oneInput(
b1 ->
b1.operand(CalciteLogicalIndexScan.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexOver;
import org.opensearch.sql.opensearch.storage.OpenSearchIndex;
import org.opensearch.sql.opensearch.storage.scan.AbstractCalciteIndexScan;

Expand Down Expand Up @@ -42,6 +43,10 @@ static boolean distinctProjectList(LogicalProject project) {
return project.getProjects().stream().allMatch(rexSet::add);
}

static boolean containsRexOver(LogicalProject project) {
return project.getProjects().stream().anyMatch(RexOver::containsOver);
}

/**
* The LogicalSort is a LIMIT that should be pushed down when its fetch field is not null and its
* collation is empty. For example: <code>sort name | head 5</code> should not be pushed down
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static java.util.Objects.requireNonNull;
import static org.opensearch.sql.common.setting.Settings.Key.CALCITE_PUSHDOWN_ROWCOUNT_ESTIMATION_FACTOR;
import static org.opensearch.sql.opensearch.request.AggregateAnalyzer.AGGREGATION_BUCKET_SIZE;

import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down Expand Up @@ -293,8 +294,6 @@ public AbstractCalciteIndexScan pushDownSort(List<RelFieldCollation> collations)
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot pushdown the sort {}", getCollationNames(collations), e);
} else {
LOG.info("Cannot pushdown the sort {}, ", getCollationNames(collations));
}
}
return null;
Expand Down Expand Up @@ -389,7 +388,8 @@ public void pushDownSortIntoAggBucket(List<RelFieldCollation> collations) {
Pair.of(
Collections.singletonList(
AggregationBuilders.composite("composite_buckets", newBuckets)
.subAggregations(newAggBuilder)),
.subAggregations(newAggBuilder)
.size(AGGREGATION_BUCKET_SIZE)),
aggregationBuilder.getRight());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ public AbstractRelNode pushDownFilter(Filter filter) {
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot pushdown the filter condition.", e);
} else {
LOG.info("Cannot pushdown the filter condition.");
}
}
return null;
Expand Down Expand Up @@ -248,8 +246,6 @@ public CalciteLogicalIndexScan pushDownAggregate(Aggregate aggregate, Project pr
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot pushdown the aggregate {}", aggregate, e);
} else {
LOG.info("Cannot pushdown the aggregate {}, ", aggregate);
}
}
return null;
Expand All @@ -267,8 +263,6 @@ public CalciteLogicalIndexScan pushDownLimit(Integer limit, Integer offset) {
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot pushdown limit {} with offset {}", limit, offset, e);
} else {
LOG.info("Cannot pushdown limit {} with offset {}", limit, offset);
}
}
return null;
Expand Down
Loading