Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,50 +9,27 @@
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.UnresolvedExpression;

@Getter
@ToString
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
public class StreamWindow extends UnresolvedPlan {

private final List<UnresolvedExpression> windowFunctionList;
private final List<UnresolvedExpression> groupList;
private final boolean current;
private final int window;
private final boolean global;
private final boolean bucketNullable;
private final UnresolvedExpression resetBefore;
private final UnresolvedExpression resetAfter;
@ToString.Exclude private UnresolvedPlan child;

/** StreamWindow Constructor. */
public StreamWindow(
List<UnresolvedExpression> windowFunctionList,
List<UnresolvedExpression> groupList,
boolean current,
int window,
boolean global,
UnresolvedExpression resetBefore,
UnresolvedExpression resetAfter) {
this.windowFunctionList = windowFunctionList;
this.groupList = groupList;
this.current = current;
this.window = window;
this.global = global;
this.resetBefore = resetBefore;
this.resetAfter = resetAfter;
}

public boolean isCurrent() {
return current;
}

public boolean isGlobal() {
return global;
}

@Override
public StreamWindow attach(UnresolvedPlan child) {
this.child = child;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1130,8 +1130,7 @@ private Pair<List<RexNode>, List<AggCall>> resolveAttributesForAggregation(
@Override
public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
Argument.ArgumentMap statsArgs = Argument.ArgumentMap.of(node.getArgExprList());
Boolean bucketNullable =
(Boolean) statsArgs.getOrDefault(Argument.BUCKET_NULLABLE, Literal.TRUE).getValue();
Boolean bucketNullable = (Boolean) statsArgs.get(Argument.BUCKET_NULLABLE).getValue();
int nGroup = node.getGroupExprList().size() + (Objects.nonNull(node.getSpan()) ? 1 : 0);
BitSet nonNullGroupMask = new BitSet(nGroup);
if (!bucketNullable) {
Expand Down Expand Up @@ -1748,20 +1747,25 @@ public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context)
.as(ROW_NUMBER_COLUMN_FOR_STREAMSTATS);
context.relBuilder.projectPlus(streamSeq);

// construct groupNotNull predicate
List<RexNode> groupByList =
groupList.stream().map(expr -> rexVisitor.analyze(expr, context)).collect(Collectors.toList());
List<RexNode> notNullList =
PlanUtils.getSelectColumns(groupByList).stream()
.map(context.relBuilder::field)
.map(context.relBuilder::isNotNull)
.collect(Collectors.toList());
RexNode groupNotNull = context.relBuilder.and(notNullList);
if (!node.isBucketNullable()) {
// construct groupNotNull predicate
List<RexNode> groupByList =
groupList.stream().map(expr -> rexVisitor.analyze(expr, context)).collect(Collectors.toList());
List<RexNode> notNullList =
PlanUtils.getSelectColumns(groupByList).stream()
.map(context.relBuilder::field)
.map(context.relBuilder::isNotNull)
.collect(Collectors.toList());
RexNode groupNotNull = context.relBuilder.and(notNullList);

// wrap each expr: CASE WHEN groupNotNull THEN rawExpr ELSE CAST(NULL AS rawType) END
List<RexNode> wrappedOverExprs =
wrapWindowFunctionsWithGroupNotNull(overExpressions, groupNotNull, context);
context.relBuilder.projectPlus(wrappedOverExprs);
} else {
context.relBuilder.projectPlus(overExpressions);
}

// wrap each expr: CASE WHEN groupNotNull THEN rawExpr ELSE CAST(NULL AS rawType) END
List<RexNode> wrappedOverExprs =
wrapWindowFunctionsWithGroupNotNull(overExpressions, groupNotNull, context);
context.relBuilder.projectPlus(wrappedOverExprs);
// resort when there is by condition
context.relBuilder.sort(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_STREAMSTATS));
context.relBuilder.projectExcept(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_STREAMSTATS));
Expand Down Expand Up @@ -1821,11 +1825,11 @@ private RelNode buildStreamWindowJoinPlan(
RexNode segRight = context.relBuilder.field(segmentCol);
RexNode segOuter = context.relBuilder.field(v.get(), segmentCol);
RexNode frame = buildResetFrameFilter(context, node, outerSeq, rightSeq, segOuter, segRight);
RexNode group = buildGroupFilter(context, groupList, v.get());
RexNode group = buildGroupFilter(context, node, groupList, v.get());
filter = (group == null) ? frame : context.relBuilder.and(frame, group);
} else { // global + window + by condition
RexNode frame = buildFrameFilter(context, node, outerSeq, rightSeq);
RexNode group = buildGroupFilter(context, groupList, v.get());
RexNode group = buildGroupFilter(context, node, groupList, v.get());
filter = context.relBuilder.and(frame, group);
}
context.relBuilder.filter(filter);
Expand Down Expand Up @@ -1975,7 +1979,10 @@ private RexNode buildResetFrameFilter(
}

private RexNode buildGroupFilter(
CalcitePlanContext context, List<UnresolvedExpression> groupList, RexCorrelVariable correl) {
CalcitePlanContext context,
StreamWindow node,
List<UnresolvedExpression> groupList,
RexCorrelVariable correl) {
// build conjunctive equality filters: right.g_i = outer.g_i
if (groupList.isEmpty()) {
return null;
Expand All @@ -1987,7 +1994,17 @@ private RexNode buildGroupFilter(
String groupName = extractGroupFieldName(expr);
RexNode rightGroup = context.relBuilder.field(groupName);
RexNode outerGroup = context.relBuilder.field(correl, groupName);
return context.relBuilder.equals(rightGroup, outerGroup);
RexNode equalCondition = context.relBuilder.equals(rightGroup, outerGroup);
// handle bucket_nullable case
if (!node.isBucketNullable()) {
return equalCondition;
} else {
RexNode bothNull =
context.relBuilder.and(
context.relBuilder.isNull(rightGroup),
context.relBuilder.isNull(outerGroup));
return context.relBuilder.or(equalCondition, bothNull);
}
})
.collect(Collectors.toList());
return context.relBuilder.and(equalsList);
Expand Down
39 changes: 37 additions & 2 deletions docs/user/ppl/cmd/streamstats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,14 @@ All of these commands can be used to generate aggregations such as average, sum,

Syntax
======
streamstats [current=<bool>] [window=<int>] [global=<bool>] [reset_before="("<eval-expression>")"] [reset_after="("<eval-expression>")"] <function>... [by-clause]
streamstats [bucket_nullable=bool] [current=<bool>] [window=<int>] [global=<bool>] [reset_before="("<eval-expression>")"] [reset_after="("<eval-expression>")"] <function>... [by-clause]

* function: mandatory. A aggregation function or window function.
* bucket_nullable: optional. Controls whether the streamstats command consider null buckets as a valid group in group-by aggregations. When set to ``false``, it will not treat null group-by values as a distinct group during aggregation. **Default:** Determined by ``plugins.ppl.syntax.legacy.preferred``.

* When ``plugins.ppl.syntax.legacy.preferred=true``, ``bucket_nullable`` defaults to ``true``
* When ``plugins.ppl.syntax.legacy.preferred=false``, ``bucket_nullable`` defaults to ``false``

* current: optional. If true, the search includes the given, or current, event in the summary calculations. If false, the search uses the field value from the previous event. Syntax: current=<boolean>. **Default:** true.
* window: optional. Specifies the number of events to use when computing the statistics. Syntax: window=<integer>. **Default:** 0, which means that all previous and current events are used.
* global: optional. Used only when the window argument is set. Defines whether to use a single window, global=true, or to use separate windows based on the by clause. If global=false and window is set to a non-zero value, a separate window is used for each group of values of the field specified in the by clause. Syntax: global=<boolean>. **Default:** true.
Expand Down Expand Up @@ -226,4 +231,34 @@ PPL query::
| Peter | Canada | B.C | 4 | 2023 | 57 | null |
| Rick | Canada | B.C | 4 | 2023 | 70 | null |
| David | USA | Washington | 4 | 2023 | 40 | null |
+-------+---------+------------+-------+------+-----+---------+
+-------+---------+------------+-------+------+-----+---------+


Example 5: Null buckets handling
================================

PPL query::

os> source=accounts | streamstats bucket_nullable=false count() as cnt by employer | fields account_number, firstname, employer, cnt;
fetched rows / total rows = 4/4
+----------------+-----------+----------+------+
| account_number | firstname | employer | cnt |
|----------------+-----------+----------+------|
| 1 | Amber | Pyrami | 1 |
| 6 | Hattie | Netagy | 1 |
| 13 | Nanette | Quility | 1 |
| 18 | Dale | null | null |
+----------------+-----------+----------+------+

PPL query::

os> source=accounts | streamstats bucket_nullable=true count() as cnt by employer | fields account_number, firstname, employer, cnt;
fetched rows / total rows = 4/4
+----------------+-----------+----------+-----+
| account_number | firstname | employer | cnt |
|----------------+-----------+----------+-----|
| 1 | Amber | Pyrami | 1 |
| 6 | Hattie | Netagy | 1 |
| 13 | Nanette | Quility | 1 |
| 18 | Dale | null | 1 |
+----------------+-----------+----------+-----+
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,36 @@ public void testStreamstatsResetExplain() throws IOException {
assertYamlEqualsIgnoreId(expected, result);
}

@Test
public void testStreamstatsNullBucketExplain() throws IOException {
String query =
"source=opensearch-sql_test_index_account | streamstats bucket_nullable=false avg(age) as"
+ " avg_age by gender";
var result = explainQueryYaml(query);
String expected = loadExpectedPlan("explain_streamstats_null_bucket.yaml");
assertYamlEqualsIgnoreId(expected, result);
}

@Test
public void testStreamstatsGlobalNullBucketExplain() throws IOException {
String query =
"source=opensearch-sql_test_index_account | streamstats bucket_nullable=false window=2"
+ " global=true avg(age) as avg_age by gender";
var result = explainQueryYaml(query);
String expected = loadExpectedPlan("explain_streamstats_global_null_bucket.yaml");
assertYamlEqualsIgnoreId(expected, result);
}

@Test
public void testStreamstatsResetNullBucketExplain() throws IOException {
String query =
"source=opensearch-sql_test_index_account | streamstats bucket_nullable=false current=false"
+ " reset_before=age>34 reset_after=age<25 avg(age) as avg_age by gender";
var result = explainQueryYaml(query);
String expected = loadExpectedPlan("explain_streamstats_reset_null_bucket.yaml");
assertYamlEqualsIgnoreId(expected, result);
}

@Test
public void testKeywordILikeFunctionExplain() throws IOException {
// ilike is only supported in v3
Expand Down
Loading
Loading