diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/StreamWindow.java b/core/src/main/java/org/opensearch/sql/ast/tree/StreamWindow.java index ed7bcf10289..c404ed26a38 100644 --- a/core/src/main/java/org/opensearch/sql/ast/tree/StreamWindow.java +++ b/core/src/main/java/org/opensearch/sql/ast/tree/StreamWindow.java @@ -9,6 +9,7 @@ import java.util.List; import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.ToString; import org.opensearch.sql.ast.AbstractNodeVisitor; import org.opensearch.sql.ast.expression.UnresolvedExpression; @@ -16,6 +17,7 @@ @Getter @ToString @EqualsAndHashCode(callSuper = false) +@RequiredArgsConstructor public class StreamWindow extends UnresolvedPlan { private final List windowFunctionList; @@ -23,36 +25,11 @@ public class StreamWindow extends UnresolvedPlan { private final boolean current; private final int window; private final boolean global; + private final boolean bucketNullable; private final UnresolvedExpression resetBefore; private final UnresolvedExpression resetAfter; @ToString.Exclude private UnresolvedPlan child; - /** StreamWindow Constructor. */ - public StreamWindow( - List windowFunctionList, - List groupList, - boolean current, - int window, - boolean global, - UnresolvedExpression resetBefore, - UnresolvedExpression resetAfter) { - this.windowFunctionList = windowFunctionList; - this.groupList = groupList; - this.current = current; - this.window = window; - this.global = global; - this.resetBefore = resetBefore; - this.resetAfter = resetAfter; - } - - public boolean isCurrent() { - return current; - } - - public boolean isGlobal() { - return global; - } - @Override public StreamWindow attach(UnresolvedPlan child) { this.child = child; diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index d5359ee938d..56b5cf53ae4 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -1130,8 +1130,7 @@ private Pair, List> resolveAttributesForAggregation( @Override public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) { Argument.ArgumentMap statsArgs = Argument.ArgumentMap.of(node.getArgExprList()); - Boolean bucketNullable = - (Boolean) statsArgs.getOrDefault(Argument.BUCKET_NULLABLE, Literal.TRUE).getValue(); + Boolean bucketNullable = (Boolean) statsArgs.get(Argument.BUCKET_NULLABLE).getValue(); int nGroup = node.getGroupExprList().size() + (Objects.nonNull(node.getSpan()) ? 1 : 0); BitSet nonNullGroupMask = new BitSet(nGroup); if (!bucketNullable) { @@ -1748,20 +1747,25 @@ public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context) .as(ROW_NUMBER_COLUMN_FOR_STREAMSTATS); context.relBuilder.projectPlus(streamSeq); - // construct groupNotNull predicate - List groupByList = - groupList.stream().map(expr -> rexVisitor.analyze(expr, context)).collect(Collectors.toList()); - List notNullList = - PlanUtils.getSelectColumns(groupByList).stream() - .map(context.relBuilder::field) - .map(context.relBuilder::isNotNull) - .collect(Collectors.toList()); - RexNode groupNotNull = context.relBuilder.and(notNullList); + if (!node.isBucketNullable()) { + // construct groupNotNull predicate + List groupByList = + groupList.stream().map(expr -> rexVisitor.analyze(expr, context)).collect(Collectors.toList()); + List notNullList = + PlanUtils.getSelectColumns(groupByList).stream() + .map(context.relBuilder::field) + .map(context.relBuilder::isNotNull) + .collect(Collectors.toList()); + RexNode groupNotNull = context.relBuilder.and(notNullList); + + // wrap each expr: CASE WHEN groupNotNull THEN rawExpr ELSE CAST(NULL AS rawType) END + List wrappedOverExprs = + wrapWindowFunctionsWithGroupNotNull(overExpressions, groupNotNull, context); + context.relBuilder.projectPlus(wrappedOverExprs); + } else { + context.relBuilder.projectPlus(overExpressions); + } - // wrap each expr: CASE WHEN groupNotNull THEN rawExpr ELSE CAST(NULL AS rawType) END - List wrappedOverExprs = - wrapWindowFunctionsWithGroupNotNull(overExpressions, groupNotNull, context); - context.relBuilder.projectPlus(wrappedOverExprs); // resort when there is by condition context.relBuilder.sort(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_STREAMSTATS)); context.relBuilder.projectExcept(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_STREAMSTATS)); @@ -1821,11 +1825,11 @@ private RelNode buildStreamWindowJoinPlan( RexNode segRight = context.relBuilder.field(segmentCol); RexNode segOuter = context.relBuilder.field(v.get(), segmentCol); RexNode frame = buildResetFrameFilter(context, node, outerSeq, rightSeq, segOuter, segRight); - RexNode group = buildGroupFilter(context, groupList, v.get()); + RexNode group = buildGroupFilter(context, node, groupList, v.get()); filter = (group == null) ? frame : context.relBuilder.and(frame, group); } else { // global + window + by condition RexNode frame = buildFrameFilter(context, node, outerSeq, rightSeq); - RexNode group = buildGroupFilter(context, groupList, v.get()); + RexNode group = buildGroupFilter(context, node, groupList, v.get()); filter = context.relBuilder.and(frame, group); } context.relBuilder.filter(filter); @@ -1975,7 +1979,10 @@ private RexNode buildResetFrameFilter( } private RexNode buildGroupFilter( - CalcitePlanContext context, List groupList, RexCorrelVariable correl) { + CalcitePlanContext context, + StreamWindow node, + List groupList, + RexCorrelVariable correl) { // build conjunctive equality filters: right.g_i = outer.g_i if (groupList.isEmpty()) { return null; @@ -1987,7 +1994,17 @@ private RexNode buildGroupFilter( String groupName = extractGroupFieldName(expr); RexNode rightGroup = context.relBuilder.field(groupName); RexNode outerGroup = context.relBuilder.field(correl, groupName); - return context.relBuilder.equals(rightGroup, outerGroup); + RexNode equalCondition = context.relBuilder.equals(rightGroup, outerGroup); + // handle bucket_nullable case + if (!node.isBucketNullable()) { + return equalCondition; + } else { + RexNode bothNull = + context.relBuilder.and( + context.relBuilder.isNull(rightGroup), + context.relBuilder.isNull(outerGroup)); + return context.relBuilder.or(equalCondition, bothNull); + } }) .collect(Collectors.toList()); return context.relBuilder.and(equalsList); diff --git a/docs/user/ppl/cmd/streamstats.rst b/docs/user/ppl/cmd/streamstats.rst index e82053f748f..a18e2869a5e 100644 --- a/docs/user/ppl/cmd/streamstats.rst +++ b/docs/user/ppl/cmd/streamstats.rst @@ -43,9 +43,14 @@ All of these commands can be used to generate aggregations such as average, sum, Syntax ====== -streamstats [current=] [window=] [global=] [reset_before="("")"] [reset_after="("")"] ... [by-clause] +streamstats [bucket_nullable=bool] [current=] [window=] [global=] [reset_before="("")"] [reset_after="("")"] ... [by-clause] * function: mandatory. A aggregation function or window function. +* bucket_nullable: optional. Controls whether the streamstats command consider null buckets as a valid group in group-by aggregations. When set to ``false``, it will not treat null group-by values as a distinct group during aggregation. **Default:** Determined by ``plugins.ppl.syntax.legacy.preferred``. + + * When ``plugins.ppl.syntax.legacy.preferred=true``, ``bucket_nullable`` defaults to ``true`` + * When ``plugins.ppl.syntax.legacy.preferred=false``, ``bucket_nullable`` defaults to ``false`` + * current: optional. If true, the search includes the given, or current, event in the summary calculations. If false, the search uses the field value from the previous event. Syntax: current=. **Default:** true. * window: optional. Specifies the number of events to use when computing the statistics. Syntax: window=. **Default:** 0, which means that all previous and current events are used. * global: optional. Used only when the window argument is set. Defines whether to use a single window, global=true, or to use separate windows based on the by clause. If global=false and window is set to a non-zero value, a separate window is used for each group of values of the field specified in the by clause. Syntax: global=. **Default:** true. @@ -226,4 +231,34 @@ PPL query:: | Peter | Canada | B.C | 4 | 2023 | 57 | null | | Rick | Canada | B.C | 4 | 2023 | 70 | null | | David | USA | Washington | 4 | 2023 | 40 | null | - +-------+---------+------------+-------+------+-----+---------+ \ No newline at end of file + +-------+---------+------------+-------+------+-----+---------+ + + +Example 5: Null buckets handling +================================ + +PPL query:: + + os> source=accounts | streamstats bucket_nullable=false count() as cnt by employer | fields account_number, firstname, employer, cnt; + fetched rows / total rows = 4/4 + +----------------+-----------+----------+------+ + | account_number | firstname | employer | cnt | + |----------------+-----------+----------+------| + | 1 | Amber | Pyrami | 1 | + | 6 | Hattie | Netagy | 1 | + | 13 | Nanette | Quility | 1 | + | 18 | Dale | null | null | + +----------------+-----------+----------+------+ + +PPL query:: + + os> source=accounts | streamstats bucket_nullable=true count() as cnt by employer | fields account_number, firstname, employer, cnt; + fetched rows / total rows = 4/4 + +----------------+-----------+----------+-----+ + | account_number | firstname | employer | cnt | + |----------------+-----------+----------+-----| + | 1 | Amber | Pyrami | 1 | + | 6 | Hattie | Netagy | 1 | + | 13 | Nanette | Quility | 1 | + | 18 | Dale | null | 1 | + +----------------+-----------+----------+-----+ \ No newline at end of file diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index 36ba4da6282..e6b5bfdb1e1 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -668,6 +668,36 @@ public void testStreamstatsResetExplain() throws IOException { assertYamlEqualsIgnoreId(expected, result); } + @Test + public void testStreamstatsNullBucketExplain() throws IOException { + String query = + "source=opensearch-sql_test_index_account | streamstats bucket_nullable=false avg(age) as" + + " avg_age by gender"; + var result = explainQueryYaml(query); + String expected = loadExpectedPlan("explain_streamstats_null_bucket.yaml"); + assertYamlEqualsIgnoreId(expected, result); + } + + @Test + public void testStreamstatsGlobalNullBucketExplain() throws IOException { + String query = + "source=opensearch-sql_test_index_account | streamstats bucket_nullable=false window=2" + + " global=true avg(age) as avg_age by gender"; + var result = explainQueryYaml(query); + String expected = loadExpectedPlan("explain_streamstats_global_null_bucket.yaml"); + assertYamlEqualsIgnoreId(expected, result); + } + + @Test + public void testStreamstatsResetNullBucketExplain() throws IOException { + String query = + "source=opensearch-sql_test_index_account | streamstats bucket_nullable=false current=false" + + " reset_before=age>34 reset_after=age<25 avg(age) as avg_age by gender"; + var result = explainQueryYaml(query); + String expected = loadExpectedPlan("explain_streamstats_reset_null_bucket.yaml"); + assertYamlEqualsIgnoreId(expected, result); + } + @Test public void testKeywordILikeFunctionExplain() throws IOException { // ilike is only supported in v3 diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteStreamstatsCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteStreamstatsCommandIT.java index d64e8c32324..364835b7af8 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteStreamstatsCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteStreamstatsCommandIT.java @@ -147,7 +147,7 @@ public void testStreamstatsByWithNull() throws IOException { rows("John", "Canada", "Ontario", 4, 2023, 25, 1, 25, 25, 25), rows("Jane", "Canada", "Quebec", 4, 2023, 20, 2, 22.5, 20, 25), rows(null, "Canada", null, 4, 2023, 10, 3, 18.333333333333332, 10, 25), - rows("Kevin", null, null, 4, 2023, null, null, null, null, null)); + rows("Kevin", null, null, 4, 2023, null, 1, null, null, null)); actual = executeQuery( @@ -155,6 +155,53 @@ public void testStreamstatsByWithNull() throws IOException { "source=%s | streamstats count() as cnt, avg(age) as avg, min(age) as min, max(age)" + " as max by state | fields name, country, state, month, year, age, cnt, avg, min, max", TEST_INDEX_STATE_COUNTRY_WITH_NULL)); + verifyDataRows( + actual, + rows("Jake", "USA", "California", 4, 2023, 70, 1, 70, 70, 70), + rows("Hello", "USA", "New York", 4, 2023, 30, 1, 30, 30, 30), + rows("John", "Canada", "Ontario", 4, 2023, 25, 1, 25, 25, 25), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 1, 20, 20, 20), + rows(null, "Canada", null, 4, 2023, 10, 1, 10, 10, 10), + rows("Kevin", null, null, 4, 2023, null, 2, 10, 10, 10)); + } + + @Test + public void testStreamstatsByWithNullBucket() throws IOException { + JSONObject actual = + executeQuery( + String.format( + "source=%s | streamstats bucket_nullable=false count() as cnt, avg(age) as avg," + + " min(age) as min, max(age) as max by country | fields name, country, state, month, year, age, cnt, avg, min, max", + TEST_INDEX_STATE_COUNTRY_WITH_NULL)); + + verifySchemaInOrder( + actual, + schema("name", "string"), + schema("country", "string"), + schema("state", "string"), + schema("month", "int"), + schema("year", "int"), + schema("age", "int"), + schema("cnt", "bigint"), + schema("avg", "double"), + schema("min", "int"), + schema("max", "int")); + + verifyDataRows( + actual, + rows("Jake", "USA", "California", 4, 2023, 70, 1, 70, 70, 70), + rows("Hello", "USA", "New York", 4, 2023, 30, 2, 50, 30, 70), + rows("John", "Canada", "Ontario", 4, 2023, 25, 1, 25, 25, 25), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 2, 22.5, 20, 25), + rows(null, "Canada", null, 4, 2023, 10, 3, 18.333333333333332, 10, 25), + rows("Kevin", null, null, 4, 2023, null, null, null, null, null)); + + actual = + executeQuery( + String.format( + "source=%s | streamstats bucket_nullable=false count() as cnt, avg(age) as avg," + + " min(age) as min, max(age) as max by state | fields name, country, state, month, year, age, cnt, avg, min, max", + TEST_INDEX_STATE_COUNTRY_WITH_NULL)); verifyDataRows( actual, rows("Jake", "USA", "California", 4, 2023, 70, 1, 70, 70, 70), @@ -198,7 +245,7 @@ public void testStreamstatsBySpanWithNull() throws IOException { rows("John", "Canada", "Ontario", 4, 2023, 25, 1, 25, 25, 25), rows("Jane", "Canada", "Quebec", 4, 2023, 20, 2, 22.5, 20, 25), rows(null, "Canada", null, 4, 2023, 10, 1, 10, 10, 10), - rows("Kevin", null, null, 4, 2023, null, null, null, null, null)); + rows("Kevin", null, null, 4, 2023, null, 1, null, null, null)); } @Test @@ -240,8 +287,9 @@ public void testStreamstatsByMultiplePartitionsWithNull1() throws IOException { JSONObject actual = executeQuery( String.format( - "source=%s | streamstats count() as cnt, avg(age) as avg, min(age) as min, max(age)" - + " as max by span(age, 10) as age_span, country | fields name, country, state, month, year, age, cnt, avg, min, max", + "source=%s | streamstats bucket_nullable=false count() as cnt, avg(age) as avg," + + " min(age) as min, max(age) as max by span(age, 10) as age_span, country |" + + " fields name, country, state, month, year, age, cnt, avg, min, max", TEST_INDEX_STATE_COUNTRY_WITH_NULL)); verifyDataRows( @@ -252,6 +300,23 @@ public void testStreamstatsByMultiplePartitionsWithNull1() throws IOException { rows("Jane", "Canada", "Quebec", 4, 2023, 20, 2, 22.5, 20, 25), rows(null, "Canada", null, 4, 2023, 10, 1, 10, 10, 10), rows("Kevin", null, null, 4, 2023, null, null, null, null, null)); + + JSONObject actual2 = + executeQuery( + String.format( + "source=%s | streamstats bucket_nullable=true count() as cnt, avg(age) as avg," + + " min(age) as min, max(age) as max by span(age, 10) as age_span, country |" + + " fields name, country, state, month, year, age, cnt, avg, min, max", + TEST_INDEX_STATE_COUNTRY_WITH_NULL)); + + verifyDataRows( + actual2, + rows("Jake", "USA", "California", 4, 2023, 70, 1, 70, 70, 70), + rows("Hello", "USA", "New York", 4, 2023, 30, 1, 30, 30, 30), + rows("John", "Canada", "Ontario", 4, 2023, 25, 1, 25, 25, 25), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 2, 22.5, 20, 25), + rows(null, "Canada", null, 4, 2023, 10, 1, 10, 10, 10), + rows("Kevin", null, null, 4, 2023, null, 1, null, null, null)); } @Test @@ -259,8 +324,9 @@ public void testStreamstatsByMultiplePartitionsWithNull2() throws IOException { JSONObject actual = executeQuery( String.format( - "source=%s | streamstats count() as cnt, avg(age) as avg, min(age) as min, max(age)" - + " as max by span(age, 10) as age_span, state | fields name, country, state, month, year, age, cnt, avg, min, max", + "source=%s | streamstats bucket_nullable=false count() as cnt, avg(age) as avg," + + " min(age) as min, max(age) as max by span(age, 10) as age_span, state |" + + " fields name, country, state, month, year, age, cnt, avg, min, max", TEST_INDEX_STATE_COUNTRY_WITH_NULL)); verifyDataRows( @@ -271,6 +337,23 @@ public void testStreamstatsByMultiplePartitionsWithNull2() throws IOException { rows("Jane", "Canada", "Quebec", 4, 2023, 20, 1, 20, 20, 20), rows(null, "Canada", null, 4, 2023, 10, null, null, null, null), rows("Kevin", null, null, 4, 2023, null, null, null, null, null)); + + JSONObject actual2 = + executeQuery( + String.format( + "source=%s | streamstats bucket_nullable=true count() as cnt, avg(age) as avg," + + " min(age) as min, max(age) as max by span(age, 10) as age_span, state |" + + " fields name, country, state, month, year, age, cnt, avg, min, max", + TEST_INDEX_STATE_COUNTRY_WITH_NULL)); + + verifyDataRows( + actual2, + rows("Jake", "USA", "California", 4, 2023, 70, 1, 70, 70, 70), + rows("Hello", "USA", "New York", 4, 2023, 30, 1, 30, 30, 30), + rows("John", "Canada", "Ontario", 4, 2023, 25, 1, 25, 25, 25), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 1, 20, 20, 20), + rows(null, "Canada", null, 4, 2023, 10, 1, 10, 10, 10), + rows("Kevin", null, null, 4, 2023, null, 1, null, null, null)); } @Test @@ -502,6 +585,61 @@ public void testStreamstatsGlobalWithNull() throws IOException { } } + @Test + public void testStreamstatsGlobalWithNullBucket() throws IOException { + final int docId = 7; + Request insertRequest = + new Request( + "PUT", + String.format("/%s/_doc/%d?refresh=true", TEST_INDEX_STATE_COUNTRY_WITH_NULL, docId)); + insertRequest.setJsonEntity( + "{\"name\": \"Jay\",\"age\": 40,\"state\":" + + " \"Quebec\",\"country\": \"USA\",\"year\": 2023,\"month\":" + + " 4}\n"); + client().performRequest(insertRequest); + try { + JSONObject actual = + executeQuery( + String.format( + "source=%s | streamstats bucket_nullable=false window=2 global=true avg(age) as" + + " avg by state | fields name, country, state, month, year, age, avg", + TEST_INDEX_STATE_COUNTRY_WITH_NULL)); + + verifyDataRows( + actual, + rows("Jake", "USA", "California", 4, 2023, 70, 70), + rows("Hello", "USA", "New York", 4, 2023, 30, 30), + rows("John", "Canada", "Ontario", 4, 2023, 25, 25), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 20), + rows(null, "Canada", null, 4, 2023, 10, null), + rows("Kevin", null, null, 4, 2023, null, null), + rows("Jay", "USA", "Quebec", 4, 2023, 40, 40)); + + JSONObject actual2 = + executeQuery( + String.format( + "source=%s | streamstats bucket_nullable=true window=2 global=true avg(age) as" + + " avg by state | fields name, country, state, month, year, age, avg", + TEST_INDEX_STATE_COUNTRY_WITH_NULL)); + + verifyDataRows( + actual2, + rows("Jake", "USA", "California", 4, 2023, 70, 70), + rows("Hello", "USA", "New York", 4, 2023, 30, 30), + rows("John", "Canada", "Ontario", 4, 2023, 25, 25), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 20), + rows(null, "Canada", null, 4, 2023, 10, 10), + rows("Kevin", null, null, 4, 2023, null, 10), + rows("Jay", "USA", "Quebec", 4, 2023, 40, 40)); + } finally { + Request deleteRequest = + new Request( + "DELETE", + String.format("/%s/_doc/%d?refresh=true", TEST_INDEX_STATE_COUNTRY_WITH_NULL, docId)); + client().performRequest(deleteRequest); + } + } + @Test public void testStreamstatsReset() throws IOException { final int docId = 5; @@ -602,6 +740,61 @@ public void testStreamstatsResetWithNull() throws IOException { } } + @Test + public void testStreamstatsResetWithNullBucket() throws IOException { + final int docId = 7; + Request insertRequest = + new Request( + "PUT", + String.format("/%s/_doc/%d?refresh=true", TEST_INDEX_STATE_COUNTRY_WITH_NULL, docId)); + insertRequest.setJsonEntity( + "{\"name\": \"Jay\",\"age\": 28,\"state\":" + + " \"Quebec\",\"country\": \"USA\",\"year\": 2023,\"month\":" + + " 4}\n"); + client().performRequest(insertRequest); + try { + JSONObject actual = + executeQuery( + String.format( + "source=%s | streamstats bucket_nullable=true window=2 reset_before=age>29" + + " avg(age) as avg by state | fields name, country, state, month, year, age, avg", + TEST_INDEX_STATE_COUNTRY_WITH_NULL)); + + verifyDataRows( + actual, + rows("Jake", "USA", "California", 4, 2023, 70, 70), + rows("Hello", "USA", "New York", 4, 2023, 30, 30), + rows("John", "Canada", "Ontario", 4, 2023, 25, 25), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 20), + rows(null, "Canada", null, 4, 2023, 10, 10), + rows("Kevin", null, null, 4, 2023, null, 10), + rows("Jay", "USA", "Quebec", 4, 2023, 28, 28)); + + JSONObject actual2 = + executeQuery( + String.format( + "source=%s | streamstats bucket_nullable=false window=2 reset_after=age>22" + + " avg(age) as avg by state | fields name, country, state, month, year, age, avg", + TEST_INDEX_STATE_COUNTRY_WITH_NULL)); + + verifyDataRows( + actual2, + rows("Jake", "USA", "California", 4, 2023, 70, 70), + rows("Hello", "USA", "New York", 4, 2023, 30, 30), + rows("John", "Canada", "Ontario", 4, 2023, 25, 25), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 20), + rows(null, "Canada", null, 4, 2023, 10, null), + rows("Kevin", null, null, 4, 2023, null, null), + rows("Jay", "USA", "Quebec", 4, 2023, 28, 28)); + } finally { + Request deleteRequest = + new Request( + "DELETE", + String.format("/%s/_doc/%d?refresh=true", TEST_INDEX_STATE_COUNTRY_WITH_NULL, docId)); + client().performRequest(deleteRequest); + } + } + @Test public void testUnsupportedWindowFunctions() { List unsupported = List.of("PERCENTILE_APPROX", "PERCENTILE"); @@ -649,6 +842,23 @@ public void testMultipleStreamstatsWithNull1() throws IOException { rows("Hello", "USA", "New York", 4, 2023, 30, 30, 50), rows("John", "Canada", "Ontario", 4, 2023, 25, 25, 25), rows("Jane", "Canada", "Quebec", 4, 2023, 20, 20, 22.5), + rows(null, "Canada", null, 4, 2023, 10, 10, 18.333333333333332), + rows("Kevin", null, null, 4, 2023, null, null, null)); + + JSONObject actual2 = + executeQuery( + String.format( + "source=%s | streamstats bucket_nullable=false avg(age) as avg_age by state," + + " country | streamstats bucket_nullable=false avg(avg_age) as avg_state_age" + + " by country | fields name, country, state, month, year, age, avg_age, avg_state_age", + TEST_INDEX_STATE_COUNTRY_WITH_NULL)); + + verifyDataRows( + actual2, + rows("Jake", "USA", "California", 4, 2023, 70, 70, 70), + rows("Hello", "USA", "New York", 4, 2023, 30, 30, 50), + rows("John", "Canada", "Ontario", 4, 2023, 25, 25, 25), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 20, 22.5), rows(null, "Canada", null, 4, 2023, 10, null, 22.5), rows("Kevin", null, null, 4, 2023, null, null, null)); } @@ -669,7 +879,7 @@ public void testMultipleStreamstatsWithNull2() throws IOException { executeQuery( String.format( "source=%s | streamstats avg(age) as avg_age by state, country | streamstats" - + " avg(avg_age) as avg_state_age by country | fields name, country, state, month, year, age, avg_age, avg_state_age", + + " avg(avg_age) as avg_state_age by country | fields name, country, state, month, year, age, avg_age, avg_state_age", TEST_INDEX_STATE_COUNTRY)); verifyDataRows( @@ -678,6 +888,22 @@ public void testMultipleStreamstatsWithNull2() throws IOException { rows("Hello", "USA", "New York", 4, 2023, 30, 30, 50), rows("John", "Canada", "Ontario", 4, 2023, 25, 25, 25), rows("Jane", "Canada", "Quebec", 4, 2023, 20, 20, 22.5), + rows("Jay", "USA", null, 4, 2023, 28, 28, 42.666666666666664)); + + JSONObject actual2 = + executeQuery( + String.format( + "source=%s | streamstats bucket_nullable=false avg(age) as avg_age by state," + + " country | streamstats bucket_nullable=false avg(avg_age) as avg_state_age" + + " by country | fields name, country, state, month, year, age, avg_age, avg_state_age", + TEST_INDEX_STATE_COUNTRY)); + + verifyDataRows( + actual2, + rows("Jake", "USA", "California", 4, 2023, 70, 70, 70), + rows("Hello", "USA", "New York", 4, 2023, 30, 30, 50), + rows("John", "Canada", "Ontario", 4, 2023, 25, 25, 25), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 20, 22.5), rows("Jay", "USA", null, 4, 2023, 28, null, 50)); } finally { Request deleteRequest = @@ -726,7 +952,8 @@ public void testLeftJoinWithStreamstats() throws IOException { executeQuery( String.format( "source=%s as l | left join left=l right=r on l.country = r.country [ source=%s |" - + " streamstats window=2 avg(age) as avg_age] | fields l.name, l.country, l.state, l.month, l.year, l.age, r.name, r.country, r.state, r.month, r.year, r.age, avg_age", + + " streamstats window=2 avg(age) as avg_age] | fields l.name, l.country, l.state," + + " l.month, l.year, l.age, r.name, r.country, r.state, r.month, r.year, r.age, avg_age", TEST_INDEX_STATE_COUNTRY, TEST_INDEX_STATE_COUNTRY_WITH_NULL)); verifyDataRows( @@ -792,7 +1019,8 @@ public void testMultipleStreamstatsWithEval2() throws IOException { executeQuery( String.format( "source=%s | eval new_state=lower(state), new_country=lower(country) | streamstats" - + " avg(age) as avg_age by new_state, new_country | fields name, country, state, month, year, age, new_state, new_country, avg_age", + + " bucket_nullable=false avg(age) as avg_age by new_state, new_country |" + + " fields name, country, state, month, year, age, new_state, new_country, avg_age", TEST_INDEX_STATE_COUNTRY_WITH_NULL)); verifySchemaInOrder( @@ -815,6 +1043,23 @@ public void testMultipleStreamstatsWithEval2() throws IOException { rows("Jane", "Canada", "Quebec", 4, 2023, 20, "quebec", "canada", 20), rows(null, "Canada", null, 4, 2023, 10, null, "canada", null), rows("Kevin", null, null, 4, 2023, null, null, null, null)); + + JSONObject actual2 = + executeQuery( + String.format( + "source=%s | eval new_state=lower(state), new_country=lower(country) | streamstats" + + " bucket_nullable=true avg(age) as avg_age by new_state, new_country |" + + " fields name, country, state, month, year, age, new_state, new_country, avg_age", + TEST_INDEX_STATE_COUNTRY_WITH_NULL)); + + verifyDataRows( + actual2, + rows("Jake", "USA", "California", 4, 2023, 70, "california", "usa", 70), + rows("Hello", "USA", "New York", 4, 2023, 30, "new york", "usa", 30), + rows("John", "Canada", "Ontario", 4, 2023, 25, "ontario", "canada", 25), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, "quebec", "canada", 20), + rows(null, "Canada", null, 4, 2023, 10, null, "canada", 10), + rows("Kevin", null, null, 4, 2023, null, null, null, null)); } @Test diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_distinct_count.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_distinct_count.yaml index c9ef1ca9ebd..32538ab17df 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_distinct_count.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_distinct_count.yaml @@ -3,14 +3,13 @@ calcite: LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], distinct_states=[$18]) LogicalSort(sort0=[$17], dir0=[ASC]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], distinct_states=[CASE(IS NOT NULL($4), DISTINCT_COUNT_APPROX($7) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING), null:BIGINT)]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], distinct_states=[DISTINCT_COUNT_APPROX($7) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING)]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | - EnumerableCalc(expr#0..13=[{inputs}], expr#14=[null:BIGINT], expr#15=[CASE($t12, $t13, $t14)], proj#0..10=[{exprs}], distinct_states=[$t15]) + EnumerableCalc(expr#0..12=[{inputs}], proj#0..10=[{exprs}], distinct_states=[$t12]) EnumerableLimit(fetch=[10000]) EnumerableSort(sort0=[$11], dir0=[ASC]) EnumerableWindow(window#0=[window(partition {4} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [DISTINCT_COUNT_APPROX($7)])]) - EnumerableCalc(expr#0..11=[{inputs}], expr#12=[IS NOT NULL($t4)], proj#0..12=[{exprs}]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number","firstname","address","balance","gender","city","employer","state","age","email","lastname"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number","firstname","address","balance","gender","city","employer","state","age","email","lastname"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_earliest_latest.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_earliest_latest.yaml index aac0fab3748..cac21b929ee 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_earliest_latest.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_earliest_latest.yaml @@ -3,14 +3,13 @@ calcite: LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], earliest_message=[$12], latest_message=[$13]) LogicalSort(sort0=[$11], dir0=[ASC]) - LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __stream_seq__=[$11], earliest_message=[CASE(IS NOT NULL($1), ARG_MIN($3, $2) OVER (PARTITION BY $1 ROWS UNBOUNDED PRECEDING), null:VARCHAR)], latest_message=[CASE(IS NOT NULL($1), ARG_MAX($3, $2) OVER (PARTITION BY $1 ROWS UNBOUNDED PRECEDING), null:VARCHAR)]) + LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __stream_seq__=[$11], earliest_message=[ARG_MIN($3, $2) OVER (PARTITION BY $1 ROWS UNBOUNDED PRECEDING)], latest_message=[ARG_MAX($3, $2) OVER (PARTITION BY $1 ROWS UNBOUNDED PRECEDING)]) LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __stream_seq__=[ROW_NUMBER() OVER ()]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]]) physical: | - EnumerableCalc(expr#0..8=[{inputs}], expr#9=[null:VARCHAR], expr#10=[CASE($t6, $t7, $t9)], expr#11=[CASE($t6, $t8, $t9)], proj#0..4=[{exprs}], earliest_message=[$t10], latest_message=[$t11]) + EnumerableCalc(expr#0..7=[{inputs}], proj#0..4=[{exprs}], earliest_message=[$t6], latest_message=[$t7]) EnumerableLimit(fetch=[10000]) EnumerableSort(sort0=[$5], dir0=[ASC]) EnumerableWindow(window#0=[window(partition {1} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ARG_MIN($3, $2), ARG_MAX($3, $2)])]) - EnumerableCalc(expr#0..5=[{inputs}], expr#6=[IS NOT NULL($t1)], proj#0..6=[{exprs}]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]], PushDownContext=[[PROJECT->[created_at, server, @timestamp, message, level]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["created_at","server","@timestamp","message","level"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]], PushDownContext=[[PROJECT->[created_at, server, @timestamp, message, level]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["created_at","server","@timestamp","message","level"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_earliest_latest_custom_time.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_earliest_latest_custom_time.yaml index e86cfb8236c..f19625d85e5 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_earliest_latest_custom_time.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_earliest_latest_custom_time.yaml @@ -3,14 +3,13 @@ calcite: LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], earliest_message=[$12], latest_message=[$13]) LogicalSort(sort0=[$11], dir0=[ASC]) - LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __stream_seq__=[$11], earliest_message=[CASE(IS NOT NULL($4), ARG_MIN($3, $0) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING), null:VARCHAR)], latest_message=[CASE(IS NOT NULL($4), ARG_MAX($3, $0) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING), null:VARCHAR)]) + LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __stream_seq__=[$11], earliest_message=[ARG_MIN($3, $0) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING)], latest_message=[ARG_MAX($3, $0) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING)]) LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __stream_seq__=[ROW_NUMBER() OVER ()]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]]) physical: | - EnumerableCalc(expr#0..8=[{inputs}], expr#9=[null:VARCHAR], expr#10=[CASE($t6, $t7, $t9)], expr#11=[CASE($t6, $t8, $t9)], proj#0..4=[{exprs}], earliest_message=[$t10], latest_message=[$t11]) + EnumerableCalc(expr#0..7=[{inputs}], proj#0..4=[{exprs}], earliest_message=[$t6], latest_message=[$t7]) EnumerableLimit(fetch=[10000]) EnumerableSort(sort0=[$5], dir0=[ASC]) EnumerableWindow(window#0=[window(partition {4} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ARG_MIN($3, $0), ARG_MAX($3, $0)])]) - EnumerableCalc(expr#0..5=[{inputs}], expr#6=[IS NOT NULL($t4)], proj#0..6=[{exprs}]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]], PushDownContext=[[PROJECT->[created_at, server, @timestamp, message, level]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["created_at","server","@timestamp","message","level"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]], PushDownContext=[[PROJECT->[created_at, server, @timestamp, message, level]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["created_at","server","@timestamp","message","level"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global.yaml index 293dd785f96..a00d5b40cfa 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global.yaml @@ -7,23 +7,24 @@ calcite: LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) LogicalAggregate(group=[{}], avg_age=[AVG($8)]) - LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), =($4, $cor0.gender))]) + LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | - EnumerableCalc(expr#0..16=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t16]) + EnumerableCalc(expr#0..18=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t18]) EnumerableLimit(fetch=[10000]) - EnumerableHashJoin(condition=[AND(=($4, $13), =($11, $14), =($12, $15))], joinType=[left]) - EnumerableSort(sort0=[$11], dir0=[ASC]) - EnumerableCalc(expr#0..11=[{inputs}], expr#12=[1], expr#13=[-($t11, $t12)], proj#0..11=[{exprs}], $f12=[$t13]) + EnumerableMergeJoin(condition=[AND(=($11, $15), =($12, $16), =($13, $17), IS NOT DISTINCT FROM($4, $14))], joinType=[left]) + EnumerableSort(sort0=[$11], sort1=[$12], sort2=[$13], dir0=[ASC], dir1=[ASC], dir2=[ASC]) + EnumerableCalc(expr#0..11=[{inputs}], expr#12=[1], expr#13=[-($t11, $t12)], expr#14=[IS NULL($t4)], proj#0..11=[{exprs}], $f12=[$t13], $f13=[$t14]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number","firstname","address","balance","gender","city","employer","state","age","email","lastname"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) - EnumerableCalc(expr#0..4=[{inputs}], expr#5=[0], expr#6=[=($t4, $t5)], expr#7=[null:BIGINT], expr#8=[CASE($t6, $t7, $t3)], expr#9=[CAST($t8):DOUBLE], expr#10=[/($t9, $t4)], proj#0..2=[{exprs}], avg_age=[$t10]) - EnumerableAggregate(group=[{0, 1, 2}], agg#0=[$SUM0($4)], agg#1=[COUNT($4)]) - EnumerableHashJoin(condition=[AND(=($0, $3), >=($5, $2), <=($5, $1))], joinType=[inner]) - EnumerableAggregate(group=[{0, 1, 2}]) - EnumerableCalc(expr#0..1=[{inputs}], expr#2=[1], expr#3=[-($t1, $t2)], proj#0..1=[{exprs}], $f2=[$t3]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender, age]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender","age"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file + EnumerableSort(sort0=[$1], sort1=[$2], sort2=[$3], dir0=[ASC], dir1=[ASC], dir2=[ASC]) + EnumerableCalc(expr#0..5=[{inputs}], expr#6=[0], expr#7=[=($t5, $t6)], expr#8=[null:BIGINT], expr#9=[CASE($t7, $t8, $t4)], expr#10=[CAST($t9):DOUBLE], expr#11=[/($t10, $t5)], proj#0..3=[{exprs}], avg_age=[$t11]) + EnumerableAggregate(group=[{0, 1, 2, 3}], agg#0=[$SUM0($5)], agg#1=[COUNT($5)]) + EnumerableNestedLoopJoin(condition=[AND(>=($6, $2), <=($6, $1), OR(=($4, $0), AND(IS NULL($4), $3)))], joinType=[inner]) + EnumerableAggregate(group=[{0, 1, 2, 3}]) + EnumerableCalc(expr#0..1=[{inputs}], expr#2=[1], expr#3=[-($t1, $t2)], expr#4=[IS NULL($t0)], proj#0..1=[{exprs}], $f2=[$t3], $f3=[$t4]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender, age]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender","age"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global_null_bucket.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global_null_bucket.yaml new file mode 100644 index 00000000000..293dd785f96 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global_null_bucket.yaml @@ -0,0 +1,29 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], avg_age=[$18]) + LogicalSort(sort0=[$17], dir0=[ASC]) + LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17}]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalAggregate(group=[{}], avg_age=[AVG($8)]) + LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), =($4, $cor0.gender))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableCalc(expr#0..16=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t16]) + EnumerableLimit(fetch=[10000]) + EnumerableHashJoin(condition=[AND(=($4, $13), =($11, $14), =($12, $15))], joinType=[left]) + EnumerableSort(sort0=[$11], dir0=[ASC]) + EnumerableCalc(expr#0..11=[{inputs}], expr#12=[1], expr#13=[-($t11, $t12)], proj#0..11=[{exprs}], $f12=[$t13]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number","firstname","address","balance","gender","city","employer","state","age","email","lastname"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + EnumerableCalc(expr#0..4=[{inputs}], expr#5=[0], expr#6=[=($t4, $t5)], expr#7=[null:BIGINT], expr#8=[CASE($t6, $t7, $t3)], expr#9=[CAST($t8):DOUBLE], expr#10=[/($t9, $t4)], proj#0..2=[{exprs}], avg_age=[$t10]) + EnumerableAggregate(group=[{0, 1, 2}], agg#0=[$SUM0($4)], agg#1=[COUNT($4)]) + EnumerableHashJoin(condition=[AND(=($0, $3), >=($5, $2), <=($5, $1))], joinType=[inner]) + EnumerableAggregate(group=[{0, 1, 2}]) + EnumerableCalc(expr#0..1=[{inputs}], expr#2=[1], expr#3=[-($t1, $t2)], proj#0..1=[{exprs}], $f2=[$t3]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender, age]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender","age"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_null_bucket.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_null_bucket.yaml new file mode 100644 index 00000000000..fe79eb90faa --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_null_bucket.yaml @@ -0,0 +1,16 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], avg_age=[$18]) + LogicalSort(sort0=[$17], dir0=[ASC]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], avg_age=[CASE(IS NOT NULL($4), /(SUM($8) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING), CAST(COUNT($8) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING)):DOUBLE NOT NULL), null:DOUBLE)]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableCalc(expr#0..14=[{inputs}], expr#15=[CAST($t14):DOUBLE NOT NULL], expr#16=[/($t13, $t15)], expr#17=[null:DOUBLE], expr#18=[CASE($t12, $t16, $t17)], proj#0..10=[{exprs}], avg_age=[$t18]) + EnumerableLimit(fetch=[10000]) + EnumerableSort(sort0=[$11], dir0=[ASC]) + EnumerableWindow(window#0=[window(partition {4} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($8), COUNT($8)])]) + EnumerableCalc(expr#0..11=[{inputs}], expr#12=[IS NOT NULL($t4)], proj#0..12=[{exprs}]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number","firstname","address","balance","gender","city","employer","state","age","email","lastname"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset.yaml index 0e8ed3a3dde..fd739ac5cf5 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset.yaml @@ -8,31 +8,32 @@ calcite: LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) LogicalAggregate(group=[{}], avg_age=[AVG($8)]) - LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), =($4, $cor0.gender))]) + LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | - EnumerableCalc(expr#0..16=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t16]) + EnumerableCalc(expr#0..18=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t18]) EnumerableLimit(fetch=[10000]) - EnumerableHashJoin(condition=[AND(=($4, $13), =($11, $14), =($12, $15))], joinType=[left]) - EnumerableSort(sort0=[$11], dir0=[ASC]) - EnumerableCalc(expr#0..15=[{inputs}], expr#16=[0], expr#17=[COALESCE($t15, $t16)], expr#18=[+($t14, $t17)], proj#0..11=[{exprs}], __seg_id__=[$t18]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($12)])], window#1=[window(rows between UNBOUNDED PRECEDING and $14 PRECEDING aggs [$SUM0($13)])], constants=[[1]]) - EnumerableCalc(expr#0..11=[{inputs}], expr#12=[34], expr#13=[>($t8, $t12)], expr#14=[1], expr#15=[0], expr#16=[CASE($t13, $t14, $t15)], expr#17=[25], expr#18=[<($t8, $t17)], expr#19=[CASE($t18, $t14, $t15)], proj#0..11=[{exprs}], __reset_before_flag__=[$t16], __reset_after_flag__=[$t19]) + EnumerableMergeJoin(condition=[AND(=($11, $15), =($12, $16), =($13, $17), IS NOT DISTINCT FROM($4, $14))], joinType=[left]) + EnumerableSort(sort0=[$11], sort1=[$12], sort2=[$13], dir0=[ASC], dir1=[ASC], dir2=[ASC]) + EnumerableCalc(expr#0..16=[{inputs}], expr#17=[0], expr#18=[COALESCE($t16, $t17)], expr#19=[+($t15, $t18)], proj#0..11=[{exprs}], __seg_id__=[$t19], $f16=[$t14]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($12)])], window#1=[window(rows between UNBOUNDED PRECEDING and $15 PRECEDING aggs [$SUM0($13)])], constants=[[1]]) + EnumerableCalc(expr#0..11=[{inputs}], expr#12=[34], expr#13=[>($t8, $t12)], expr#14=[1], expr#15=[0], expr#16=[CASE($t13, $t14, $t15)], expr#17=[25], expr#18=[<($t8, $t17)], expr#19=[CASE($t18, $t14, $t15)], expr#20=[IS NULL($t4)], proj#0..11=[{exprs}], __reset_before_flag__=[$t16], __reset_after_flag__=[$t19], $14=[$t20]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number","firstname","address","balance","gender","city","employer","state","age","email","lastname"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) - EnumerableCalc(expr#0..4=[{inputs}], expr#5=[0], expr#6=[=($t4, $t5)], expr#7=[null:BIGINT], expr#8=[CASE($t6, $t7, $t3)], expr#9=[CAST($t8):DOUBLE], expr#10=[/($t9, $t4)], proj#0..2=[{exprs}], avg_age=[$t10]) - EnumerableAggregate(group=[{0, 1, 2}], agg#0=[$SUM0($4)], agg#1=[COUNT($4)]) - EnumerableHashJoin(condition=[AND(=($2, $6), =($0, $3), <($5, $1))], joinType=[inner]) - EnumerableAggregate(group=[{0, 1, 2}]) - EnumerableCalc(expr#0..5=[{inputs}], expr#6=[0], expr#7=[COALESCE($t5, $t6)], expr#8=[+($t4, $t7)], proj#0..1=[{exprs}], __seg_id__=[$t8]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($2)])], window#1=[window(rows between UNBOUNDED PRECEDING and $4 PRECEDING aggs [$SUM0($3)])], constants=[[1]]) - EnumerableCalc(expr#0..2=[{inputs}], expr#3=[34], expr#4=[>($t1, $t3)], expr#5=[1], expr#6=[0], expr#7=[CASE($t4, $t5, $t6)], expr#8=[25], expr#9=[<($t1, $t8)], expr#10=[CASE($t9, $t5, $t6)], gender=[$t0], __stream_seq__=[$t2], __reset_before_flag__=[$t7], __reset_after_flag__=[$t10]) + EnumerableSort(sort0=[$1], sort1=[$2], sort2=[$3], dir0=[ASC], dir1=[ASC], dir2=[ASC]) + EnumerableCalc(expr#0..5=[{inputs}], expr#6=[0], expr#7=[=($t5, $t6)], expr#8=[null:BIGINT], expr#9=[CASE($t7, $t8, $t4)], expr#10=[CAST($t9):DOUBLE], expr#11=[/($t10, $t5)], proj#0..3=[{exprs}], avg_age=[$t11]) + EnumerableAggregate(group=[{0, 1, 2, 3}], agg#0=[$SUM0($5)], agg#1=[COUNT($5)]) + EnumerableHashJoin(condition=[AND(=($2, $7), <($6, $1), OR(=($4, $0), AND(IS NULL($4), $3)))], joinType=[inner]) + EnumerableAggregate(group=[{0, 1, 2, 3}]) + EnumerableCalc(expr#0..6=[{inputs}], expr#7=[0], expr#8=[COALESCE($t6, $t7)], expr#9=[+($t5, $t8)], proj#0..1=[{exprs}], __seg_id__=[$t9], $f16=[$t4]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($2)])], window#1=[window(rows between UNBOUNDED PRECEDING and $5 PRECEDING aggs [$SUM0($3)])], constants=[[1]]) + EnumerableCalc(expr#0..2=[{inputs}], expr#3=[34], expr#4=[>($t1, $t3)], expr#5=[1], expr#6=[0], expr#7=[CASE($t4, $t5, $t6)], expr#8=[25], expr#9=[<($t1, $t8)], expr#10=[CASE($t9, $t5, $t6)], expr#11=[IS NULL($t0)], gender=[$t0], __stream_seq__=[$t2], __reset_before_flag__=[$t7], __reset_after_flag__=[$t10], $4=[$t11]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender, age]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender","age"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + EnumerableCalc(expr#0..6=[{inputs}], expr#7=[0], expr#8=[COALESCE($t6, $t7)], expr#9=[+($t5, $t8)], proj#0..2=[{exprs}], __seg_id__=[$t9]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($3)])], window#1=[window(rows between UNBOUNDED PRECEDING and $5 PRECEDING aggs [$SUM0($4)])], constants=[[1]]) + EnumerableCalc(expr#0..2=[{inputs}], expr#3=[34], expr#4=[>($t1, $t3)], expr#5=[1], expr#6=[0], expr#7=[CASE($t4, $t5, $t6)], expr#8=[25], expr#9=[<($t1, $t8)], expr#10=[CASE($t9, $t5, $t6)], proj#0..2=[{exprs}], __reset_before_flag__=[$t7], __reset_after_flag__=[$t10]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender, age]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender","age"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) - EnumerableCalc(expr#0..6=[{inputs}], expr#7=[0], expr#8=[COALESCE($t6, $t7)], expr#9=[+($t5, $t8)], proj#0..2=[{exprs}], __seg_id__=[$t9]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($3)])], window#1=[window(rows between UNBOUNDED PRECEDING and $5 PRECEDING aggs [$SUM0($4)])], constants=[[1]]) - EnumerableCalc(expr#0..2=[{inputs}], expr#3=[34], expr#4=[>($t1, $t3)], expr#5=[1], expr#6=[0], expr#7=[CASE($t4, $t5, $t6)], expr#8=[25], expr#9=[<($t1, $t8)], expr#10=[CASE($t9, $t5, $t6)], proj#0..2=[{exprs}], __reset_before_flag__=[$t7], __reset_after_flag__=[$t10]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender, age]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender","age"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender, age]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender","age"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset_null_bucket.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset_null_bucket.yaml new file mode 100644 index 00000000000..0e8ed3a3dde --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset_null_bucket.yaml @@ -0,0 +1,38 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], avg_age=[$21]) + LogicalSort(sort0=[$17], dir0=[ASC]) + LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17, 20}]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalAggregate(group=[{}], avg_age=[AVG($8)]) + LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), =($4, $cor0.gender))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableCalc(expr#0..16=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t16]) + EnumerableLimit(fetch=[10000]) + EnumerableHashJoin(condition=[AND(=($4, $13), =($11, $14), =($12, $15))], joinType=[left]) + EnumerableSort(sort0=[$11], dir0=[ASC]) + EnumerableCalc(expr#0..15=[{inputs}], expr#16=[0], expr#17=[COALESCE($t15, $t16)], expr#18=[+($t14, $t17)], proj#0..11=[{exprs}], __seg_id__=[$t18]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($12)])], window#1=[window(rows between UNBOUNDED PRECEDING and $14 PRECEDING aggs [$SUM0($13)])], constants=[[1]]) + EnumerableCalc(expr#0..11=[{inputs}], expr#12=[34], expr#13=[>($t8, $t12)], expr#14=[1], expr#15=[0], expr#16=[CASE($t13, $t14, $t15)], expr#17=[25], expr#18=[<($t8, $t17)], expr#19=[CASE($t18, $t14, $t15)], proj#0..11=[{exprs}], __reset_before_flag__=[$t16], __reset_after_flag__=[$t19]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number","firstname","address","balance","gender","city","employer","state","age","email","lastname"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + EnumerableCalc(expr#0..4=[{inputs}], expr#5=[0], expr#6=[=($t4, $t5)], expr#7=[null:BIGINT], expr#8=[CASE($t6, $t7, $t3)], expr#9=[CAST($t8):DOUBLE], expr#10=[/($t9, $t4)], proj#0..2=[{exprs}], avg_age=[$t10]) + EnumerableAggregate(group=[{0, 1, 2}], agg#0=[$SUM0($4)], agg#1=[COUNT($4)]) + EnumerableHashJoin(condition=[AND(=($2, $6), =($0, $3), <($5, $1))], joinType=[inner]) + EnumerableAggregate(group=[{0, 1, 2}]) + EnumerableCalc(expr#0..5=[{inputs}], expr#6=[0], expr#7=[COALESCE($t5, $t6)], expr#8=[+($t4, $t7)], proj#0..1=[{exprs}], __seg_id__=[$t8]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($2)])], window#1=[window(rows between UNBOUNDED PRECEDING and $4 PRECEDING aggs [$SUM0($3)])], constants=[[1]]) + EnumerableCalc(expr#0..2=[{inputs}], expr#3=[34], expr#4=[>($t1, $t3)], expr#5=[1], expr#6=[0], expr#7=[CASE($t4, $t5, $t6)], expr#8=[25], expr#9=[<($t1, $t8)], expr#10=[CASE($t9, $t5, $t6)], gender=[$t0], __stream_seq__=[$t2], __reset_before_flag__=[$t7], __reset_after_flag__=[$t10]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender, age]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender","age"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + EnumerableCalc(expr#0..6=[{inputs}], expr#7=[0], expr#8=[COALESCE($t6, $t7)], expr#9=[+($t5, $t8)], proj#0..2=[{exprs}], __seg_id__=[$t9]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($3)])], window#1=[window(rows between UNBOUNDED PRECEDING and $5 PRECEDING aggs [$SUM0($4)])], constants=[[1]]) + EnumerableCalc(expr#0..2=[{inputs}], expr#3=[34], expr#4=[>($t1, $t3)], expr#5=[1], expr#6=[0], expr#7=[CASE($t4, $t5, $t6)], expr#8=[25], expr#9=[<($t1, $t8)], expr#10=[CASE($t9, $t5, $t6)], proj#0..2=[{exprs}], __reset_before_flag__=[$t7], __reset_after_flag__=[$t10]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender, age]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender","age"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_distinct_count.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_distinct_count.yaml index f04a18c5f16..550cf0ea9cb 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_distinct_count.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_distinct_count.yaml @@ -3,14 +3,13 @@ calcite: LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], distinct_states=[$18]) LogicalSort(sort0=[$17], dir0=[ASC]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], distinct_states=[CASE(IS NOT NULL($4), DISTINCT_COUNT_APPROX($7) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING), null:BIGINT)]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], distinct_states=[DISTINCT_COUNT_APPROX($7) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING)]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | - EnumerableCalc(expr#0..13=[{inputs}], expr#14=[null:BIGINT], expr#15=[CASE($t12, $t13, $t14)], proj#0..10=[{exprs}], distinct_states=[$t15]) + EnumerableCalc(expr#0..18=[{inputs}], proj#0..10=[{exprs}], distinct_states=[$t18]) EnumerableLimit(fetch=[10000]) - EnumerableSort(sort0=[$11], dir0=[ASC]) + EnumerableSort(sort0=[$17], dir0=[ASC]) EnumerableWindow(window#0=[window(partition {4} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [DISTINCT_COUNT_APPROX($7)])]) - EnumerableCalc(expr#0..17=[{inputs}], expr#18=[IS NOT NULL($t4)], proj#0..10=[{exprs}], __stream_seq__=[$t17], $12=[$t18]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) \ No newline at end of file + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_earliest_latest.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_earliest_latest.yaml index af7d996dfb8..c37fae48771 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_earliest_latest.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_earliest_latest.yaml @@ -3,14 +3,13 @@ calcite: LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], earliest_message=[$12], latest_message=[$13]) LogicalSort(sort0=[$11], dir0=[ASC]) - LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __stream_seq__=[$11], earliest_message=[CASE(IS NOT NULL($1), ARG_MIN($3, $2) OVER (PARTITION BY $1 ROWS UNBOUNDED PRECEDING), null:VARCHAR)], latest_message=[CASE(IS NOT NULL($1), ARG_MAX($3, $2) OVER (PARTITION BY $1 ROWS UNBOUNDED PRECEDING), null:VARCHAR)]) + LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __stream_seq__=[$11], earliest_message=[ARG_MIN($3, $2) OVER (PARTITION BY $1 ROWS UNBOUNDED PRECEDING)], latest_message=[ARG_MAX($3, $2) OVER (PARTITION BY $1 ROWS UNBOUNDED PRECEDING)]) LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __stream_seq__=[ROW_NUMBER() OVER ()]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]]) physical: | - EnumerableCalc(expr#0..8=[{inputs}], expr#9=[null:VARCHAR], expr#10=[CASE($t6, $t7, $t9)], expr#11=[CASE($t6, $t8, $t9)], proj#0..4=[{exprs}], earliest_message=[$t10], latest_message=[$t11]) + EnumerableCalc(expr#0..13=[{inputs}], proj#0..4=[{exprs}], earliest_message=[$t12], latest_message=[$t13]) EnumerableLimit(fetch=[10000]) - EnumerableSort(sort0=[$5], dir0=[ASC]) + EnumerableSort(sort0=[$11], dir0=[ASC]) EnumerableWindow(window#0=[window(partition {1} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ARG_MIN($3, $2), ARG_MAX($3, $2)])]) - EnumerableCalc(expr#0..11=[{inputs}], expr#12=[IS NOT NULL($t1)], proj#0..4=[{exprs}], __stream_seq__=[$t11], $6=[$t12]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]]) \ No newline at end of file + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_earliest_latest_custom_time.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_earliest_latest_custom_time.yaml index c5c4eec782f..b85e4b6b7bb 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_earliest_latest_custom_time.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_earliest_latest_custom_time.yaml @@ -3,14 +3,13 @@ calcite: LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], earliest_message=[$12], latest_message=[$13]) LogicalSort(sort0=[$11], dir0=[ASC]) - LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __stream_seq__=[$11], earliest_message=[CASE(IS NOT NULL($4), ARG_MIN($3, $0) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING), null:VARCHAR)], latest_message=[CASE(IS NOT NULL($4), ARG_MAX($3, $0) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING), null:VARCHAR)]) + LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __stream_seq__=[$11], earliest_message=[ARG_MIN($3, $0) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING)], latest_message=[ARG_MAX($3, $0) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING)]) LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __stream_seq__=[ROW_NUMBER() OVER ()]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]]) physical: | - EnumerableCalc(expr#0..8=[{inputs}], expr#9=[null:VARCHAR], expr#10=[CASE($t6, $t7, $t9)], expr#11=[CASE($t6, $t8, $t9)], proj#0..4=[{exprs}], earliest_message=[$t10], latest_message=[$t11]) + EnumerableCalc(expr#0..13=[{inputs}], proj#0..4=[{exprs}], earliest_message=[$t12], latest_message=[$t13]) EnumerableLimit(fetch=[10000]) - EnumerableSort(sort0=[$5], dir0=[ASC]) + EnumerableSort(sort0=[$11], dir0=[ASC]) EnumerableWindow(window#0=[window(partition {4} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ARG_MIN($3, $0), ARG_MAX($3, $0)])]) - EnumerableCalc(expr#0..11=[{inputs}], expr#12=[IS NOT NULL($t4)], proj#0..4=[{exprs}], __stream_seq__=[$t11], $6=[$t12]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]]) \ No newline at end of file + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global.yaml index 3ac52e02f55..191bd987a16 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global.yaml @@ -7,24 +7,25 @@ calcite: LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) LogicalAggregate(group=[{}], avg_age=[AVG($8)]) - LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), =($4, $cor0.gender))]) + LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | - EnumerableCalc(expr#0..16=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t16]) + EnumerableCalc(expr#0..18=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t18]) EnumerableLimit(fetch=[10000]) - EnumerableHashJoin(condition=[AND(=($4, $13), =($11, $14), =($12, $15))], joinType=[left]) - EnumerableSort(sort0=[$11], dir0=[ASC]) - EnumerableCalc(expr#0..17=[{inputs}], expr#18=[1], expr#19=[-($t17, $t18)], proj#0..10=[{exprs}], __stream_seq__=[$t17], $f12=[$t19]) + EnumerableMergeJoin(condition=[AND(=($11, $15), =($12, $16), =($13, $17), IS NOT DISTINCT FROM($4, $14))], joinType=[left]) + EnumerableSort(sort0=[$11], sort1=[$12], sort2=[$13], dir0=[ASC], dir1=[ASC], dir2=[ASC]) + EnumerableCalc(expr#0..17=[{inputs}], expr#18=[1], expr#19=[-($t17, $t18)], expr#20=[IS NULL($t4)], proj#0..10=[{exprs}], __stream_seq__=[$t17], $f12=[$t19], $f15=[$t20]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - EnumerableCalc(expr#0..4=[{inputs}], expr#5=[0], expr#6=[=($t4, $t5)], expr#7=[null:BIGINT], expr#8=[CASE($t6, $t7, $t3)], expr#9=[CAST($t8):DOUBLE], expr#10=[/($t9, $t4)], proj#0..2=[{exprs}], avg_age=[$t10]) - EnumerableAggregate(group=[{0, 1, 2}], agg#0=[$SUM0($4)], agg#1=[COUNT($4)]) - EnumerableHashJoin(condition=[AND(=($0, $3), >=($5, $2), <=($5, $1))], joinType=[inner]) - EnumerableAggregate(group=[{0, 1, 2}]) - EnumerableCalc(expr#0..17=[{inputs}], expr#18=[1], expr#19=[-($t17, $t18)], gender=[$t4], __stream_seq__=[$t17], $f12=[$t19]) + EnumerableSort(sort0=[$1], sort1=[$2], sort2=[$3], dir0=[ASC], dir1=[ASC], dir2=[ASC]) + EnumerableCalc(expr#0..5=[{inputs}], expr#6=[0], expr#7=[=($t5, $t6)], expr#8=[null:BIGINT], expr#9=[CASE($t7, $t8, $t4)], expr#10=[CAST($t9):DOUBLE], expr#11=[/($t10, $t5)], proj#0..3=[{exprs}], avg_age=[$t11]) + EnumerableAggregate(group=[{0, 1, 2, 3}], agg#0=[$SUM0($5)], agg#1=[COUNT($5)]) + EnumerableNestedLoopJoin(condition=[AND(>=($6, $2), <=($6, $1), OR(=($4, $0), AND(IS NULL($4), $3)))], joinType=[inner]) + EnumerableAggregate(group=[{0, 1, 2, 3}]) + EnumerableCalc(expr#0..17=[{inputs}], expr#18=[1], expr#19=[-($t17, $t18)], expr#20=[IS NULL($t4)], gender=[$t4], __stream_seq__=[$t17], $f12=[$t19], $f15=[$t20]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + EnumerableCalc(expr#0..17=[{inputs}], gender=[$t4], age=[$t8], $2=[$t17]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - EnumerableCalc(expr#0..17=[{inputs}], gender=[$t4], age=[$t8], $2=[$t17]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) \ No newline at end of file + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global_null_bucket.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global_null_bucket.yaml new file mode 100644 index 00000000000..3ac52e02f55 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global_null_bucket.yaml @@ -0,0 +1,30 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], avg_age=[$18]) + LogicalSort(sort0=[$17], dir0=[ASC]) + LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17}]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalAggregate(group=[{}], avg_age=[AVG($8)]) + LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), =($4, $cor0.gender))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableCalc(expr#0..16=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t16]) + EnumerableLimit(fetch=[10000]) + EnumerableHashJoin(condition=[AND(=($4, $13), =($11, $14), =($12, $15))], joinType=[left]) + EnumerableSort(sort0=[$11], dir0=[ASC]) + EnumerableCalc(expr#0..17=[{inputs}], expr#18=[1], expr#19=[-($t17, $t18)], proj#0..10=[{exprs}], __stream_seq__=[$t17], $f12=[$t19]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + EnumerableCalc(expr#0..4=[{inputs}], expr#5=[0], expr#6=[=($t4, $t5)], expr#7=[null:BIGINT], expr#8=[CASE($t6, $t7, $t3)], expr#9=[CAST($t8):DOUBLE], expr#10=[/($t9, $t4)], proj#0..2=[{exprs}], avg_age=[$t10]) + EnumerableAggregate(group=[{0, 1, 2}], agg#0=[$SUM0($4)], agg#1=[COUNT($4)]) + EnumerableHashJoin(condition=[AND(=($0, $3), >=($5, $2), <=($5, $1))], joinType=[inner]) + EnumerableAggregate(group=[{0, 1, 2}]) + EnumerableCalc(expr#0..17=[{inputs}], expr#18=[1], expr#19=[-($t17, $t18)], gender=[$t4], __stream_seq__=[$t17], $f12=[$t19]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + EnumerableCalc(expr#0..17=[{inputs}], gender=[$t4], age=[$t8], $2=[$t17]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_null_bucket.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_null_bucket.yaml new file mode 100644 index 00000000000..08876045225 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_null_bucket.yaml @@ -0,0 +1,16 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], avg_age=[$18]) + LogicalSort(sort0=[$17], dir0=[ASC]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], avg_age=[CASE(IS NOT NULL($4), /(SUM($8) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING), CAST(COUNT($8) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING)):DOUBLE NOT NULL), null:DOUBLE)]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableCalc(expr#0..14=[{inputs}], expr#15=[CAST($t14):DOUBLE NOT NULL], expr#16=[/($t13, $t15)], expr#17=[null:DOUBLE], expr#18=[CASE($t12, $t16, $t17)], proj#0..10=[{exprs}], avg_age=[$t18]) + EnumerableLimit(fetch=[10000]) + EnumerableSort(sort0=[$11], dir0=[ASC]) + EnumerableWindow(window#0=[window(partition {4} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($8), COUNT($8)])]) + EnumerableCalc(expr#0..17=[{inputs}], expr#18=[IS NOT NULL($t4)], proj#0..10=[{exprs}], __stream_seq__=[$t17], $12=[$t18]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_reset.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_reset.yaml index be28e9b1d8c..7ca329dac6a 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_reset.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_reset.yaml @@ -8,31 +8,32 @@ calcite: LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) LogicalAggregate(group=[{}], avg_age=[AVG($8)]) - LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), =($4, $cor0.gender))]) + LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | - EnumerableCalc(expr#0..16=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t16]) + EnumerableCalc(expr#0..18=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t18]) EnumerableLimit(fetch=[10000]) - EnumerableHashJoin(condition=[AND(=($4, $13), =($11, $14), =($12, $15))], joinType=[left]) - EnumerableSort(sort0=[$11], dir0=[ASC]) - EnumerableCalc(expr#0..15=[{inputs}], expr#16=[0], expr#17=[COALESCE($t15, $t16)], expr#18=[+($t14, $t17)], proj#0..11=[{exprs}], __seg_id__=[$t18]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($12)])], window#1=[window(rows between UNBOUNDED PRECEDING and $14 PRECEDING aggs [$SUM0($13)])], constants=[[1]]) - EnumerableCalc(expr#0..17=[{inputs}], expr#18=[34], expr#19=[>($t8, $t18)], expr#20=[1], expr#21=[0], expr#22=[CASE($t19, $t20, $t21)], expr#23=[25], expr#24=[<($t8, $t23)], expr#25=[CASE($t24, $t20, $t21)], proj#0..10=[{exprs}], __stream_seq__=[$t17], __reset_before_flag__=[$t22], __reset_after_flag__=[$t25]) + EnumerableMergeJoin(condition=[AND(=($11, $15), =($12, $16), =($13, $17), IS NOT DISTINCT FROM($4, $14))], joinType=[left]) + EnumerableSort(sort0=[$11], sort1=[$12], sort2=[$13], dir0=[ASC], dir1=[ASC], dir2=[ASC]) + EnumerableCalc(expr#0..16=[{inputs}], expr#17=[0], expr#18=[COALESCE($t16, $t17)], expr#19=[+($t15, $t18)], proj#0..11=[{exprs}], __seg_id__=[$t19], $f16=[$t14]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($12)])], window#1=[window(rows between UNBOUNDED PRECEDING and $15 PRECEDING aggs [$SUM0($13)])], constants=[[1]]) + EnumerableCalc(expr#0..17=[{inputs}], expr#18=[34], expr#19=[>($t8, $t18)], expr#20=[1], expr#21=[0], expr#22=[CASE($t19, $t20, $t21)], expr#23=[25], expr#24=[<($t8, $t23)], expr#25=[CASE($t24, $t20, $t21)], expr#26=[IS NULL($t4)], proj#0..10=[{exprs}], __stream_seq__=[$t17], __reset_before_flag__=[$t22], __reset_after_flag__=[$t25], $14=[$t26]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - EnumerableCalc(expr#0..4=[{inputs}], expr#5=[0], expr#6=[=($t4, $t5)], expr#7=[null:BIGINT], expr#8=[CASE($t6, $t7, $t3)], expr#9=[CAST($t8):DOUBLE], expr#10=[/($t9, $t4)], proj#0..2=[{exprs}], avg_age=[$t10]) - EnumerableAggregate(group=[{0, 1, 2}], agg#0=[$SUM0($4)], agg#1=[COUNT($4)]) - EnumerableHashJoin(condition=[AND(=($2, $6), =($0, $3), <($5, $1))], joinType=[inner]) - EnumerableAggregate(group=[{0, 1, 2}]) - EnumerableCalc(expr#0..5=[{inputs}], expr#6=[0], expr#7=[COALESCE($t5, $t6)], expr#8=[+($t4, $t7)], proj#0..1=[{exprs}], __seg_id__=[$t8]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($2)])], window#1=[window(rows between UNBOUNDED PRECEDING and $4 PRECEDING aggs [$SUM0($3)])], constants=[[1]]) - EnumerableCalc(expr#0..17=[{inputs}], expr#18=[34], expr#19=[>($t8, $t18)], expr#20=[1], expr#21=[0], expr#22=[CASE($t19, $t20, $t21)], expr#23=[25], expr#24=[<($t8, $t23)], expr#25=[CASE($t24, $t20, $t21)], gender=[$t4], __stream_seq__=[$t17], __reset_before_flag__=[$t22], __reset_after_flag__=[$t25]) + EnumerableSort(sort0=[$1], sort1=[$2], sort2=[$3], dir0=[ASC], dir1=[ASC], dir2=[ASC]) + EnumerableCalc(expr#0..5=[{inputs}], expr#6=[0], expr#7=[=($t5, $t6)], expr#8=[null:BIGINT], expr#9=[CASE($t7, $t8, $t4)], expr#10=[CAST($t9):DOUBLE], expr#11=[/($t10, $t5)], proj#0..3=[{exprs}], avg_age=[$t11]) + EnumerableAggregate(group=[{0, 1, 2, 3}], agg#0=[$SUM0($5)], agg#1=[COUNT($5)]) + EnumerableHashJoin(condition=[AND(=($2, $7), <($6, $1), OR(=($4, $0), AND(IS NULL($4), $3)))], joinType=[inner]) + EnumerableAggregate(group=[{0, 1, 2, 3}]) + EnumerableCalc(expr#0..6=[{inputs}], expr#7=[0], expr#8=[COALESCE($t6, $t7)], expr#9=[+($t5, $t8)], proj#0..1=[{exprs}], __seg_id__=[$t9], $f16=[$t4]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($2)])], window#1=[window(rows between UNBOUNDED PRECEDING and $5 PRECEDING aggs [$SUM0($3)])], constants=[[1]]) + EnumerableCalc(expr#0..17=[{inputs}], expr#18=[34], expr#19=[>($t8, $t18)], expr#20=[1], expr#21=[0], expr#22=[CASE($t19, $t20, $t21)], expr#23=[25], expr#24=[<($t8, $t23)], expr#25=[CASE($t24, $t20, $t21)], expr#26=[IS NULL($t4)], gender=[$t4], __stream_seq__=[$t17], __reset_before_flag__=[$t22], __reset_after_flag__=[$t25], $4=[$t26]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + EnumerableCalc(expr#0..6=[{inputs}], expr#7=[0], expr#8=[COALESCE($t6, $t7)], expr#9=[+($t5, $t8)], proj#0..2=[{exprs}], __seg_id__=[$t9]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($3)])], window#1=[window(rows between UNBOUNDED PRECEDING and $5 PRECEDING aggs [$SUM0($4)])], constants=[[1]]) + EnumerableCalc(expr#0..17=[{inputs}], expr#18=[34], expr#19=[>($t8, $t18)], expr#20=[1], expr#21=[0], expr#22=[CASE($t19, $t20, $t21)], expr#23=[25], expr#24=[<($t8, $t23)], expr#25=[CASE($t24, $t20, $t21)], gender=[$t4], age=[$t8], __stream_seq__=[$t17], __reset_before_flag__=[$t22], __reset_after_flag__=[$t25]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - EnumerableCalc(expr#0..6=[{inputs}], expr#7=[0], expr#8=[COALESCE($t6, $t7)], expr#9=[+($t5, $t8)], proj#0..2=[{exprs}], __seg_id__=[$t9]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($3)])], window#1=[window(rows between UNBOUNDED PRECEDING and $5 PRECEDING aggs [$SUM0($4)])], constants=[[1]]) - EnumerableCalc(expr#0..17=[{inputs}], expr#18=[34], expr#19=[>($t8, $t18)], expr#20=[1], expr#21=[0], expr#22=[CASE($t19, $t20, $t21)], expr#23=[25], expr#24=[<($t8, $t23)], expr#25=[CASE($t24, $t20, $t21)], gender=[$t4], age=[$t8], __stream_seq__=[$t17], __reset_before_flag__=[$t22], __reset_after_flag__=[$t25]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) \ No newline at end of file + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_reset_null_bucket.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_reset_null_bucket.yaml new file mode 100644 index 00000000000..be28e9b1d8c --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_reset_null_bucket.yaml @@ -0,0 +1,38 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], avg_age=[$21]) + LogicalSort(sort0=[$17], dir0=[ASC]) + LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17, 20}]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalAggregate(group=[{}], avg_age=[AVG($8)]) + LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), =($4, $cor0.gender))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableCalc(expr#0..16=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t16]) + EnumerableLimit(fetch=[10000]) + EnumerableHashJoin(condition=[AND(=($4, $13), =($11, $14), =($12, $15))], joinType=[left]) + EnumerableSort(sort0=[$11], dir0=[ASC]) + EnumerableCalc(expr#0..15=[{inputs}], expr#16=[0], expr#17=[COALESCE($t15, $t16)], expr#18=[+($t14, $t17)], proj#0..11=[{exprs}], __seg_id__=[$t18]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($12)])], window#1=[window(rows between UNBOUNDED PRECEDING and $14 PRECEDING aggs [$SUM0($13)])], constants=[[1]]) + EnumerableCalc(expr#0..17=[{inputs}], expr#18=[34], expr#19=[>($t8, $t18)], expr#20=[1], expr#21=[0], expr#22=[CASE($t19, $t20, $t21)], expr#23=[25], expr#24=[<($t8, $t23)], expr#25=[CASE($t24, $t20, $t21)], proj#0..10=[{exprs}], __stream_seq__=[$t17], __reset_before_flag__=[$t22], __reset_after_flag__=[$t25]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + EnumerableCalc(expr#0..4=[{inputs}], expr#5=[0], expr#6=[=($t4, $t5)], expr#7=[null:BIGINT], expr#8=[CASE($t6, $t7, $t3)], expr#9=[CAST($t8):DOUBLE], expr#10=[/($t9, $t4)], proj#0..2=[{exprs}], avg_age=[$t10]) + EnumerableAggregate(group=[{0, 1, 2}], agg#0=[$SUM0($4)], agg#1=[COUNT($4)]) + EnumerableHashJoin(condition=[AND(=($2, $6), =($0, $3), <($5, $1))], joinType=[inner]) + EnumerableAggregate(group=[{0, 1, 2}]) + EnumerableCalc(expr#0..5=[{inputs}], expr#6=[0], expr#7=[COALESCE($t5, $t6)], expr#8=[+($t4, $t7)], proj#0..1=[{exprs}], __seg_id__=[$t8]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($2)])], window#1=[window(rows between UNBOUNDED PRECEDING and $4 PRECEDING aggs [$SUM0($3)])], constants=[[1]]) + EnumerableCalc(expr#0..17=[{inputs}], expr#18=[34], expr#19=[>($t8, $t18)], expr#20=[1], expr#21=[0], expr#22=[CASE($t19, $t20, $t21)], expr#23=[25], expr#24=[<($t8, $t23)], expr#25=[CASE($t24, $t20, $t21)], gender=[$t4], __stream_seq__=[$t17], __reset_before_flag__=[$t22], __reset_after_flag__=[$t25]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + EnumerableCalc(expr#0..6=[{inputs}], expr#7=[0], expr#8=[COALESCE($t6, $t7)], expr#9=[+($t5, $t8)], proj#0..2=[{exprs}], __seg_id__=[$t9]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($3)])], window#1=[window(rows between UNBOUNDED PRECEDING and $5 PRECEDING aggs [$SUM0($4)])], constants=[[1]]) + EnumerableCalc(expr#0..17=[{inputs}], expr#18=[34], expr#19=[>($t8, $t18)], expr#20=[1], expr#21=[0], expr#22=[CASE($t19, $t20, $t21)], expr#23=[25], expr#24=[<($t8, $t23)], expr#25=[CASE($t24, $t20, $t21)], gender=[$t4], age=[$t8], __stream_seq__=[$t17], __reset_before_flag__=[$t22], __reset_after_flag__=[$t25]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) \ No newline at end of file diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index d9482ee4cb4..6524adf87a6 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -267,7 +267,7 @@ streamstatsCommand ; streamstatsArgs - : (currentArg | windowArg | globalArg | resetBeforeArg | resetAfterArg)* + : (currentArg | windowArg | globalArg | resetBeforeArg | resetAfterArg | bucketNullableArg)* ; currentArg diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 7767ba852a3..bcdb30a9fc3 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -485,8 +485,7 @@ public UnresolvedPlan visitEventstatsCommand(OpenSearchPPLParser.EventstatsComma ArgumentMap arguments = ArgumentMap.of(argExprList); // bucket_nullable - boolean bucketNullable = - (Boolean) arguments.getOrDefault(Argument.BUCKET_NULLABLE, Literal.TRUE).getValue(); + boolean bucketNullable = (Boolean) arguments.get(Argument.BUCKET_NULLABLE).getValue(); // 2. Build groupList List groupList = getPartitionExprList(ctx.statsByClause()); @@ -513,13 +512,14 @@ public UnresolvedPlan visitEventstatsCommand(OpenSearchPPLParser.EventstatsComma /** Streamstats command. */ public UnresolvedPlan visitStreamstatsCommand(OpenSearchPPLParser.StreamstatsCommandContext ctx) { // 1. Parse arguments from the streamstats command - List argExprList = ArgumentFactory.getArgumentList(ctx); + List argExprList = ArgumentFactory.getArgumentList(ctx, settings); ArgumentMap arguments = ArgumentMap.of(argExprList); - // current, window and global from ArgumentFactory + // current, window, global and bucket_nullable from ArgumentFactory boolean current = (Boolean) arguments.get("current").getValue(); int window = (Integer) arguments.get("window").getValue(); boolean global = (Boolean) arguments.get("global").getValue(); + boolean bucketNullable = (Boolean) arguments.get(Argument.BUCKET_NULLABLE).getValue(); if (window < 0) { throw new IllegalArgumentException("Window size must be >= 0, but got: " + window); @@ -571,6 +571,7 @@ public UnresolvedPlan visitStreamstatsCommand(OpenSearchPPLParser.StreamstatsCom current, window, global, + bucketNullable, resetBeforeExpr, resetAfterExpr); } diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/ArgumentFactory.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/ArgumentFactory.java index a102565ec0f..ed76b29b77a 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/ArgumentFactory.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/ArgumentFactory.java @@ -96,7 +96,7 @@ public static List getArgumentList( * @param ctx StreamstatsCommandContext instance * @return the list of arguments fetched from the streamstats command */ - public static List getArgumentList(StreamstatsCommandContext ctx) { + public static List getArgumentList(StreamstatsCommandContext ctx, Settings settings) { return Arrays.asList( ctx.streamstatsArgs().currentArg() != null && !ctx.streamstatsArgs().currentArg().isEmpty() ? new Argument("current", getArgumentValue(ctx.streamstatsArgs().currentArg(0).current)) @@ -106,7 +106,15 @@ public static List getArgumentList(StreamstatsCommandContext ctx) { : new Argument("window", new Literal(0, DataType.INTEGER)), ctx.streamstatsArgs().globalArg() != null && !ctx.streamstatsArgs().globalArg().isEmpty() ? new Argument("global", getArgumentValue(ctx.streamstatsArgs().globalArg(0).global)) - : new Argument("global", new Literal(true, DataType.BOOLEAN))); + : new Argument("global", new Literal(true, DataType.BOOLEAN)), + ctx.streamstatsArgs().bucketNullableArg() != null + && !ctx.streamstatsArgs().bucketNullableArg().isEmpty() + ? new Argument( + Argument.BUCKET_NULLABLE, + getArgumentValue(ctx.streamstatsArgs().bucketNullableArg(0).bucket_nullable)) + : new Argument( + Argument.BUCKET_NULLABLE, + UnresolvedPlanHelper.legacyPreferred(settings) ? Literal.TRUE : Literal.FALSE)); } /** diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java index a5853ecba5d..b073453ecbc 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java @@ -19,6 +19,33 @@ public CalcitePPLStreamstatsTest() { public void testStreamstatsBy() { String ppl = "source=EMP | streamstats max(SAL) by DEPTNO"; RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5]," + + " COMM=[$6], DEPTNO=[$7], max(SAL)=[$9])\n" + + " LogicalSort(sort0=[$8], dir0=[ASC])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[$8], max(SAL)=[MAX($5) OVER" + + " (PARTITION BY $7 ROWS UNBOUNDED PRECEDING)])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[ROW_NUMBER() OVER ()])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`, MAX(`SAL`)" + + " OVER (PARTITION BY `DEPTNO` ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)" + + " `max(SAL)`\n" + + "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`," + + " ROW_NUMBER() OVER () `__stream_seq__`\n" + + "FROM `scott`.`EMP`) `t`\n" + + "ORDER BY `__stream_seq__` NULLS LAST"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testStreamstatsByNullBucket() { + String ppl = "source=EMP | streamstats bucket_nullable=false max(SAL) by DEPTNO"; + RelNode root = getRelNode(ppl); String expectedLogical = "LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5]," + " COMM=[$6], DEPTNO=[$7], max(SAL)=[$9])\n" @@ -76,7 +103,8 @@ public void testStreamstatsWindow() { + " LogicalTableScan(table=[[scott, EMP]])\n" + " LogicalAggregate(group=[{}], max(SAL)=[MAX($5)])\n" + " LogicalFilter(condition=[AND(>=($8, -($cor0.__stream_seq__, 4)), <=($8," - + " $cor0.__stream_seq__), =($7, $cor0.DEPTNO))])\n" + + " $cor0.__stream_seq__), OR(=($7, $cor0.DEPTNO), AND(IS NULL($7), IS" + + " NULL($cor0.DEPTNO))))])\n" + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[ROW_NUMBER() OVER ()])\n" + " LogicalTableScan(table=[[scott, EMP]])\n"; @@ -93,7 +121,8 @@ public void testStreamstatsWindow() { + " ROW_NUMBER() OVER () `__stream_seq__`\n" + "FROM `scott`.`EMP`) `t0`\n" + "WHERE `__stream_seq__` >= `$cor0`.`__stream_seq__` - 4 AND `__stream_seq__` <=" - + " `$cor0`.`__stream_seq__` AND `DEPTNO` = `$cor0`.`DEPTNO`) `t2`\n" + + " `$cor0`.`__stream_seq__` AND (`DEPTNO` = `$cor0`.`DEPTNO` OR `DEPTNO` IS NULL AND" + + " `$cor0`.`DEPTNO` IS NULL)) `t2`\n" + "ORDER BY `$cor0`.`__stream_seq__` NULLS LAST"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -107,17 +136,16 @@ public void testStreamstatsGlobal() { + " COMM=[$6], DEPTNO=[$7], max(SAL)=[$9])\n" + " LogicalSort(sort0=[$8], dir0=[ASC])\n" + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," - + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[$8], max(SAL)=[CASE(IS NOT" - + " NULL($7), MAX($5) OVER (PARTITION BY $7 ROWS 4 PRECEDING), null:DECIMAL(7, 2))])\n" + + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[$8], max(SAL)=[MAX($5) OVER" + + " (PARTITION BY $7 ROWS 4 PRECEDING)])\n" + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[ROW_NUMBER() OVER ()])\n" + " LogicalTableScan(table=[[scott, EMP]])\n"; verifyLogical(root, expectedLogical); String expectedSparkSql = - "SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`, CASE WHEN" - + " `DEPTNO` IS NOT NULL THEN MAX(`SAL`) OVER (PARTITION BY `DEPTNO` ROWS BETWEEN 4" - + " PRECEDING AND CURRENT ROW) ELSE NULL END `max(SAL)`\n" + "SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`, MAX(`SAL`)" + + " OVER (PARTITION BY `DEPTNO` ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) `max(SAL)`\n" + "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`," + " ROW_NUMBER() OVER () `__stream_seq__`\n" + "FROM `scott`.`EMP`) `t`\n" @@ -147,7 +175,8 @@ public void testStreamstatsReset() { + " LogicalTableScan(table=[[scott, EMP]])\n" + " LogicalAggregate(group=[{}], avg(SAL)=[AVG($5)])\n" + " LogicalFilter(condition=[AND(<=($8, $cor0.__stream_seq__), =($11," - + " $cor0.__seg_id__), =($7, $cor0.DEPTNO))])\n" + + " $cor0.__seg_id__), OR(=($7, $cor0.DEPTNO), AND(IS NULL($7), IS" + + " NULL($cor0.DEPTNO))))])\n" + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[$8], __reset_before_flag__=[$9]," + " __reset_after_flag__=[$10], __seg_id__=[+(SUM($9) OVER (ROWS UNBOUNDED PRECEDING)," @@ -184,7 +213,8 @@ public void testStreamstatsReset() { + " `__reset_after_flag__`\n" + "FROM `scott`.`EMP`) `t1`) `t2`\n" + "WHERE `__stream_seq__` <= `$cor0`.`__stream_seq__` AND `__seg_id__` =" - + " `$cor0`.`__seg_id__` AND `DEPTNO` = `$cor0`.`DEPTNO`) `t4`\n" + + " `$cor0`.`__seg_id__` AND (`DEPTNO` = `$cor0`.`DEPTNO` OR `DEPTNO` IS NULL AND" + + " `$cor0`.`DEPTNO` IS NULL)) `t4`\n" + "ORDER BY `$cor0`.`__stream_seq__` NULLS LAST"; verifyPPLToSparkSQL(root, expectedSparkSql); }