Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.rules.SubstitutionRule;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
Expand All @@ -27,6 +28,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.immutables.value.Value;
import org.opensearch.sql.calcite.utils.CalciteUtils;
import org.opensearch.sql.calcite.utils.PlanUtils;

/**
* Planner rule that merge multiple agg group fields into a single one, on which all other group
Expand All @@ -41,7 +43,8 @@
* these UDFs' output must have equivalent cardinality as `a`.
*/
@Value.Enclosing
public class PPLAggGroupMergeRule extends RelRule<PPLAggGroupMergeRule.Config> {
public class PPLAggGroupMergeRule extends RelRule<PPLAggGroupMergeRule.Config>
implements SubstitutionRule {
Comment on lines +46 to +47
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be wrapped in InterruptibleRelRule?

public class PPLAggGroupMergeRule extends InterruptibleRelRule<PPLAggGroupMergeRule.Config> implements SubstitutionRule

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InterruptibleRelRule is in package opensearch and has dependency on OpenSearchTimeoutException while this rule is package core.

Therefore, we cannot make this extends InterruptibleRelRule unless move that from package opensearch to core and add library opensearch in core gradle.

On the other hand, if there is interrupt triggered in planning process, it should be detected in our push down rules in package opensearch.


/** Creates a OpenSearchAggregateConvertRule. */
protected PPLAggGroupMergeRule(Config config) {
Expand Down Expand Up @@ -101,6 +104,7 @@ public void apply(RelOptRuleCall call, LogicalAggregate aggregate, LogicalProjec
parentProjections.addAll(aggCallRefs);
relBuilder.project(parentProjections);
call.transformTo(relBuilder.build());
PlanUtils.tryPruneRelNodes(call);
}

/** Rule configuration. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.rules.SubstitutionRule;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
Expand All @@ -28,6 +29,7 @@
import org.apache.calcite.tools.RelBuilder;
import org.apache.commons.lang3.tuple.Pair;
import org.immutables.value.Value;
import org.opensearch.sql.calcite.utils.PlanUtils;

/**
* Planner rule that converts specific aggCall to a more efficient expressions, which includes:
Expand All @@ -45,8 +47,8 @@
* <p>- AVG/MAX/MIN(FIELD [+|-|*|+|/] NUMBER) -> AVG/MAX/MIN(FIELD) [+|-|*|+|/] NUMBER
*/
@Value.Enclosing
public class PPLAggregateConvertRule extends RelRule<PPLAggregateConvertRule.Config> {

public class PPLAggregateConvertRule extends RelRule<PPLAggregateConvertRule.Config>
implements SubstitutionRule {
/** Creates a OpenSearchAggregateConvertRule. */
protected PPLAggregateConvertRule(Config config) {
super(config);
Expand Down Expand Up @@ -97,6 +99,8 @@ public void apply(RelOptRuleCall call, LogicalAggregate aggregate, LogicalProjec
return ref;
})
.toList();
// Stop processing if there is no converted agg call args
if (convertedAggCallArgs.isEmpty()) return;
relBuilder.project(newChildProjects);
RelNode newInput = relBuilder.peek();

Expand Down Expand Up @@ -193,6 +197,7 @@ public void apply(RelOptRuleCall call, LogicalAggregate aggregate, LogicalProjec
aliasMaybe(relBuilder, constructor.apply(relBuilder.peek()), name)));
relBuilder.project(parentProjects);
call.transformTo(relBuilder.build());
PlanUtils.tryPruneRelNodes(call);
}

interface OperatorConstructor {
Expand Down
101 changes: 101 additions & 0 deletions core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,28 @@
import static org.apache.calcite.rex.RexWindowBounds.UNBOUNDED_PRECEDING;
import static org.apache.calcite.rex.RexWindowBounds.following;
import static org.apache.calcite.rex.RexWindowBounds.preceding;
import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.isTimeBasedType;

import com.google.common.collect.ImmutableList;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.rel.RelHomogeneousShuttle;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttle;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.Sort;
Expand All @@ -41,6 +48,7 @@
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexOver;
import org.apache.calcite.rex.RexSlot;
import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.rex.RexWindow;
import org.apache.calcite.rex.RexWindowBound;
Expand Down Expand Up @@ -646,4 +654,97 @@ private static boolean isNotNullOnRef(RexNode rex) {
&& rexCall.isA(SqlKind.IS_NOT_NULL)
&& rexCall.getOperands().get(0) instanceof RexInputRef;
}

Predicate<Aggregate> aggIgnoreNullBucket =
agg ->
agg.getHints().stream()
.anyMatch(
hint ->
hint.hintName.equals("stats_args")
&& hint.kvOptions.get(Argument.BUCKET_NULLABLE).equals("false"));

Predicate<Aggregate> maybeTimeSpanAgg =
agg ->
agg.getGroupSet().stream()
.allMatch(
group ->
isTimeBasedType(
agg.getInput().getRowType().getFieldList().get(group).getType()));

static boolean isTimeSpan(RexNode rex) {
return rex instanceof RexCall rexCall
&& rexCall.getKind() == SqlKind.OTHER_FUNCTION
&& rexCall.getOperator().getName().equalsIgnoreCase(BuiltinFunctionName.SPAN.name())
&& rexCall.getOperands().size() == 3
&& rexCall.getOperands().get(2) instanceof RexLiteral unitLiteral
&& unitLiteral.getTypeName() != SqlTypeName.NULL;
}

/**
* Check if the condition is NOT NULL derived from an aggregate.
*
* @param condition the condition to check, composite of single or multiple NOT NULL conditions
* @param aggregate the aggregate where the condition is derived from
* @param project the project between the aggregate and the filter
* @param otherMapping the other mapping generated from ProjectIndexScanRule when applied on the
* above project with non-ref expressions.
* @return true if the condition is single or multiple NOT NULL derived from an aggregate, false
* otherwise
*/
static boolean isNotNullDerivedFromAgg(
RexNode condition,
Aggregate aggregate,
@Nullable Project project,
@Nullable List<Integer> otherMapping) {
boolean ignoreNullBucket = aggIgnoreNullBucket.test(aggregate);
if (!ignoreNullBucket && project == null) return false;
List<Integer> groupRefList = aggregate.getGroupSet().asList();
if (project != null) {
groupRefList =
groupRefList.stream()
.map(project.getProjects()::get)
.filter(rex -> ignoreNullBucket || isTimeSpan(rex))
.flatMap(expr -> PlanUtils.getInputRefs(expr).stream())
.map(RexSlot::getIndex)
.toList();
}
if (otherMapping != null) {
groupRefList = groupRefList.stream().map(otherMapping::get).toList();
}
List<Integer> finalGroupRefList = groupRefList;
Function<RexNode, Boolean> isNotNullFromAgg =
rex ->
rex instanceof RexCall rexCall
&& rexCall.isA(SqlKind.IS_NOT_NULL)
&& rexCall.getOperands().get(0) instanceof RexInputRef ref
&& finalGroupRefList.contains(ref.getIndex());
return isNotNullFromAgg.apply(condition)
|| (condition instanceof RexCall rexCall
&& rexCall.getOperator() == SqlStdOperatorTable.AND
&& rexCall.getOperands().stream().allMatch(isNotNullFromAgg::apply));
}

/**
* Try to prune all RelNodes in the RuleCall from top to down. We can prune a RelNode if:
*
* <p>1. It's the root RelNode of the current RuleCall. Or,
*
* <p>2. It's logical RelNode and it only has one parent which is pruned. TODO: To be more
* precisely, we can prun a RelNode whose parents are all pruned, but `prunedNodes` in
* VolcanoPlanner is not available.
*
* @param call the RuleCall to prune
*/
static void tryPruneRelNodes(RelOptRuleCall call) {
if (call.getPlanner() instanceof VolcanoPlanner volcanoPlanner) {
Arrays.stream(call.rels)
.takeWhile(
rel ->
// Don't prune the physical RelNode as it may prevent sort expr push down
rel.getConvention() == Convention.NONE
&& (rel == call.rels[0]
|| volcanoPlanner.getSubsetNonNull(rel).getParentRels().size() == 1))
.forEach(volcanoPlanner::prune);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ calcite:
LogicalProject(@timestamp=[$0], category=[$1], value=[$2], timestamp=[$3], _id=[$4], _index=[$5], _score=[$6], _maxscore=[$7], _sort=[$8], _routing=[$9], value_range=[CASE(<($2, 7000), 'small':VARCHAR, 'large':VARCHAR)])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]])
physical: |
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[FILTER->IS NOT NULL($0), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 2},avg(value)=AVG($1)), PROJECT->[avg(value), span(@timestamp,1h), value_range], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"exists":{"field":"@timestamp","boost":1.0}},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"span(@timestamp,1h)":{"date_histogram":{"field":"@timestamp","missing_bucket":false,"order":"asc","fixed_interval":"1h"}}}]},"aggregations":{"value_range":{"range":{"field":"value","ranges":[{"key":"small","to":7000.0},{"key":"large","from":7000.0}],"keyed":true},"aggregations":{"avg(value)":{"avg":{"field":"value"}}}}}}}}, requestedTotalSize=10000, pageSize=null, startFrom=0)])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 2},avg(value)=AVG($1)), PROJECT->[avg(value), span(@timestamp,1h), value_range], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"span(@timestamp,1h)":{"date_histogram":{"field":"@timestamp","missing_bucket":false,"order":"asc","fixed_interval":"1h"}}}]},"aggregations":{"value_range":{"range":{"field":"value","ranges":[{"key":"small","to":7000.0},{"key":"large","from":7000.0}],"keyed":true},"aggregations":{"avg(value)":{"avg":{"field":"value"}}}}}}}}, requestedTotalSize=10000, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ calcite:
LogicalFilter(condition=[IS NOT NULL($3)])
CalciteLogicalIndexScan(table=[[OpenSearch, big5]])
physical: |
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[FILTER->IS NOT NULL($3), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={},dc(`agent.name`)=COUNT(DISTINCT $0)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"exists":{"field":"agent.name","boost":1.0}},"aggregations":{"dc(`agent.name`)":{"cardinality":{"field":"agent.name"}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[FILTER->IS NOT NULL($0), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={},dc(`agent.name`)=COUNT(DISTINCT $0)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"exists":{"field":"agent.name","boost":1.0}},"aggregations":{"dc(`agent.name`)":{"cardinality":{"field":"agent.name"}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ calcite:
LogicalFilter(condition=[IS NOT NULL($37)])
CalciteLogicalIndexScan(table=[[OpenSearch, big5]])
physical: |
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[FILTER->IS NOT NULL($37), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={},dc(`event.id`)=COUNT(DISTINCT $0)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"exists":{"field":"event.id","boost":1.0}},"aggregations":{"dc(`event.id`)":{"cardinality":{"field":"event.id"}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[FILTER->IS NOT NULL($0), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={},dc(`event.id`)=COUNT(DISTINCT $0)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"exists":{"field":"event.id","boost":1.0}},"aggregations":{"dc(`event.id`)":{"cardinality":{"field":"event.id"}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ calcite:
LogicalFilter(condition=[IS NOT NULL($14)])
CalciteLogicalIndexScan(table=[[OpenSearch, big5]])
physical: |
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[FILTER->IS NOT NULL($14), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={},dc(`cloud.region`)=COUNT(DISTINCT $0)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"exists":{"field":"cloud.region","boost":1.0}},"aggregations":{"dc(`cloud.region`)":{"cardinality":{"field":"cloud.region"}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[FILTER->IS NOT NULL($0), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={},dc(`cloud.region`)=COUNT(DISTINCT $0)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"exists":{"field":"cloud.region","boost":1.0}},"aggregations":{"dc(`cloud.region`)":{"cardinality":{"field":"cloud.region"}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ calcite:
LogicalFilter(condition=[AND(>=($17, TIMESTAMP('2022-12-30 00:00:00':VARCHAR)), <($17, TIMESTAMP('2023-01-07 12:00:00':VARCHAR)))])
CalciteLogicalIndexScan(table=[[OpenSearch, big5]])
physical: |
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[PROJECT->[@timestamp], FILTER->SEARCH($0, Sarg[['2022-12-30 00:00:00':VARCHAR..'2023-01-07 12:00:00':VARCHAR)]:VARCHAR), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), PROJECT->[count(), span(`@timestamp`,1d)], LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"range":{"@timestamp":{"from":"2022-12-30T00:00:00.000Z","to":"2023-01-07T12:00:00.000Z","include_lower":true,"include_upper":false,"format":"date_time","boost":1.0}}},"_source":{"includes":["@timestamp"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":10,"sources":[{"span(`@timestamp`,1d)":{"date_histogram":{"field":"@timestamp","missing_bucket":false,"order":"asc","fixed_interval":"1d"}}}]}}}}, requestedTotalSize=10, pageSize=null, startFrom=0)])
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[FILTER->SEARCH($0, Sarg[['2022-12-30 00:00:00':VARCHAR..'2023-01-07 12:00:00':VARCHAR)]:VARCHAR), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), PROJECT->[count(), span(`@timestamp`,1d)], LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"range":{"@timestamp":{"from":"2022-12-30T00:00:00.000Z","to":"2023-01-07T12:00:00.000Z","include_lower":true,"include_upper":false,"format":"date_time","boost":1.0}}},"aggregations":{"composite_buckets":{"composite":{"size":10,"sources":[{"span(`@timestamp`,1d)":{"date_histogram":{"field":"@timestamp","missing_bucket":false,"order":"asc","fixed_interval":"1d"}}}]}}}}, requestedTotalSize=10, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ calcite:
LogicalFilter(condition=[AND(>=($17, TIMESTAMP('2023-01-02 00:00:00':VARCHAR)), <($17, TIMESTAMP('2023-01-02 10:00:00':VARCHAR)), IS NOT NULL($7), IS NOT NULL($14))])
CalciteLogicalIndexScan(table=[[OpenSearch, big5]])
physical: |
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[PROJECT->[process.name, cloud.region, @timestamp], FILTER->AND(SEARCH($2, Sarg[['2023-01-02 00:00:00':VARCHAR..'2023-01-02 10:00:00':VARCHAR)]:VARCHAR), IS NOT NULL($0), IS NOT NULL($1)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},count()=COUNT()), PROJECT->[count(), process.name, cloud.region], SORT->[1 DESC LAST, 2 ASC FIRST], LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"range":{"@timestamp":{"from":"2023-01-02T00:00:00.000Z","to":"2023-01-02T10:00:00.000Z","include_lower":true,"include_upper":false,"format":"date_time","boost":1.0}}},{"exists":{"field":"process.name","boost":1.0}},{"exists":{"field":"cloud.region","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["process.name","cloud.region","@timestamp"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":10,"sources":[{"process.name":{"terms":{"field":"process.name","missing_bucket":false,"order":"desc"}}},{"cloud.region":{"terms":{"field":"cloud.region","missing_bucket":false,"order":"asc"}}}]}}}}, requestedTotalSize=10, pageSize=null, startFrom=0)])
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[FILTER->AND(SEARCH($2, Sarg[['2023-01-02 00:00:00':VARCHAR..'2023-01-02 10:00:00':VARCHAR)]:VARCHAR), IS NOT NULL($0), IS NOT NULL($1)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},count()=COUNT()), PROJECT->[count(), process.name, cloud.region], SORT->[1 DESC LAST, 2 ASC FIRST], LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"range":{"@timestamp":{"from":"2023-01-02T00:00:00.000Z","to":"2023-01-02T10:00:00.000Z","include_lower":true,"include_upper":false,"format":"date_time","boost":1.0}}},{"exists":{"field":"process.name","boost":1.0}},{"exists":{"field":"cloud.region","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"aggregations":{"composite_buckets":{"composite":{"size":10,"sources":[{"process.name":{"terms":{"field":"process.name","missing_bucket":false,"order":"desc"}}},{"cloud.region":{"terms":{"field":"cloud.region","missing_bucket":false,"order":"asc"}}}]}}}}, requestedTotalSize=10, pageSize=null, startFrom=0)])
Loading
Loading