Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5192730
Speed up aggregation pushdown for single group-by expression
LantaoJin Apr 15, 2025
8216f4e
Merge remote-tracking branch 'upstream/main' into issues/3528
LantaoJin Sep 3, 2025
3e6c23f
Merge remote-tracking branch 'upstream/main' into issues/3528
LantaoJin Sep 3, 2025
c3acc57
Add configs nullable_bucket
LantaoJin Sep 4, 2025
6bc425e
Merge remote-tracking branch 'upstream/main' into issues/3528
LantaoJin Sep 4, 2025
2858733
Fix IT
LantaoJin Sep 5, 2025
4e13427
revert typo
LantaoJin Sep 5, 2025
e0c5e95
Merge remote-tracking branch 'upstream/main' into issues/3528
LantaoJin Sep 5, 2025
e7e4bb5
Fix conflicts error
LantaoJin Sep 5, 2025
90449b6
fix unit tests
LantaoJin Sep 5, 2025
97e89d9
Fix order
LantaoJin Sep 5, 2025
1f0f82a
Fix UT
LantaoJin Sep 5, 2025
3896081
Merge remote-tracking branch 'upstream/main' into issues/3528
LantaoJin Sep 5, 2025
655ad3b
Fix UT in windows
LantaoJin Sep 5, 2025
656798c
Merge remote-tracking branch 'upstream/main' into issues/3528
LantaoJin Sep 10, 2025
c712498
fix compile error of conflicts
LantaoJin Sep 10, 2025
9be3579
Merge remote-tracking branch 'upstream/main' into issues/3528
LantaoJin Sep 10, 2025
9e73ebd
Add more ITs after merging push down limit to agg buckets
LantaoJin Sep 11, 2025
3af543b
fix IT
LantaoJin Sep 11, 2025
3938840
Merge remote-tracking branch 'upstream/main' into issues/3528
LantaoJin Sep 11, 2025
70934f6
address comments
LantaoJin Sep 11, 2025
ac72fc3
Clear sorts in source builder for aggregation pushdown
LantaoJin Sep 11, 2025
862cb20
Delete the TODO of v2, it's resolved now
LantaoJin Sep 11, 2025
121427c
fix doctest
LantaoJin Sep 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public enum Key {
PATTERN_MAX_SAMPLE_COUNT("plugins.ppl.pattern.max.sample.count"),
PATTERN_BUFFER_LIMIT("plugins.ppl.pattern.buffer.limit"),
PPL_REX_MAX_MATCH_LIMIT("plugins.ppl.rex.max_match.limit"),
PPL_SYNTAX_LEGACY_PREFERRED("plugins.ppl.syntax.legacy.preferred"),

/** Enable Calcite as execution engine */
CALCITE_ENGINE_ENABLED("plugins.calcite.enabled"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,11 @@ private LogicalAggregation analyzeAggregation(
groupBys.forEach(
group ->
newEnv.define(new Symbol(Namespace.FIELD_NAME, group.getNameOrAlias()), group.type()));
return new LogicalAggregation(child, aggregators, groupBys);

Argument.ArgumentMap statsArgs = Argument.ArgumentMap.of(node.getArgExprList());
boolean bucketNullable =
(Boolean) statsArgs.getOrDefault(Argument.BUCKET_NULLABLE, Literal.TRUE).getValue();
return new LogicalAggregation(child, aggregators, groupBys, bucketNullable);
}

private Aggregation analyzePatternsAgg(Patterns node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ public static List<Argument> defaultStatsArgs() {
argument("partitions", intLiteral(1)),
argument("allnum", booleanLiteral(false)),
argument("delim", stringLiteral(" ")),
argument(Argument.BUCKET_NULLABLE, booleanLiteral(true)),
argument("dedupsplit", booleanLiteral(false)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
@RequiredArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class Argument extends UnresolvedExpression {
public static final String BUCKET_NULLABLE = "bucket_nullable";

private final String argName;
private final Literal value;

Expand Down Expand Up @@ -66,5 +68,9 @@ public static ArgumentMap empty() {
public Literal get(String name) {
return map.get(name);
}

public Literal getOrDefault(String name, Literal literal) {
return map.getOrDefault(name, literal);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,6 @@ public String toString() {

public static Literal TRUE = new Literal(true, DataType.BOOLEAN);
public static Literal FALSE = new Literal(false, DataType.BOOLEAN);
public static Literal ZERO = new Literal(Integer.valueOf("0"), DataType.INTEGER);
public static Literal ZERO = new Literal(0, DataType.INTEGER);
public static Literal ONE = new Literal(1, DataType.INTEGER);
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.hint.HintStrategyTable;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexCall;
Expand Down Expand Up @@ -880,6 +883,40 @@ public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
groupExprList.addAll(node.getGroupExprList());
Pair<List<RexNode>, List<AggCall>> aggregationAttributes =
aggregateWithTrimming(groupExprList, aggExprList, context);
// Add group by columns
List<RexNode> aliasedGroupByList =
aggregationAttributes.getLeft().stream()
.map(this::extractAliasLiteral)
.flatMap(Optional::stream)
.map(ref -> ((RexLiteral) ref).getValueAs(String.class))
.map(context.relBuilder::field)
.map(f -> (RexNode) f)
.toList();

// add stats hint to LogicalAggregation
Argument.ArgumentMap statsArgs = Argument.ArgumentMap.of(node.getArgExprList());
Boolean bucketNullable =
(Boolean) statsArgs.getOrDefault(Argument.BUCKET_NULLABLE, Literal.TRUE).getValue();
if (!bucketNullable && !aliasedGroupByList.isEmpty()) {
final RelHint statHits =
RelHint.builder("stats_args").hintOption(Argument.BUCKET_NULLABLE, "false").build();
assert context.relBuilder.peek() instanceof LogicalAggregate
: "Stats hits should be added to LogicalAggregate";
context.relBuilder.hints(statHits);
context
.relBuilder
.getCluster()
.setHintStrategies(
HintStrategyTable.builder()
.hintStrategy(
"stats_args",
(hint, rel) -> {
return rel instanceof LogicalAggregate;
})
.build());
context.relBuilder.filter(
aliasedGroupByList.stream().map(context.relBuilder::isNotNull).toList());
}

// schema reordering
// As an example, in command `stats count() by colA, colB`,
Expand All @@ -892,15 +929,6 @@ public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
List<RexNode> aggRexList =
outputFields.subList(numOfOutputFields - numOfAggList, numOfOutputFields);
reordered.addAll(aggRexList);
// Add group by columns
List<RexNode> aliasedGroupByList =
aggregationAttributes.getLeft().stream()
.map(this::extractAliasLiteral)
.flatMap(Optional::stream)
.map(ref -> ((RexLiteral) ref).getValueAs(String.class))
.map(context.relBuilder::field)
.map(f -> (RexNode) f)
.toList();
reordered.addAll(aliasedGroupByList);
context.relBuilder.project(reordered);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,27 @@
import org.opensearch.sql.expression.aggregation.NamedAggregator;

/** Logical Aggregation. */
@Getter
@ToString
@EqualsAndHashCode(callSuper = true)
public class LogicalAggregation extends LogicalPlan {

@Getter private final List<NamedAggregator> aggregatorList;
private final List<NamedAggregator> aggregatorList;

@Getter private final List<NamedExpression> groupByList;
private final List<NamedExpression> groupByList;

private final boolean bucketNullable;

/** Constructor of LogicalAggregation. */
public LogicalAggregation(
LogicalPlan child, List<NamedAggregator> aggregatorList, List<NamedExpression> groupByList) {
LogicalPlan child,
List<NamedAggregator> aggregatorList,
List<NamedExpression> groupByList,
boolean bucketNullable) {
super(Collections.singletonList(child));
this.aggregatorList = aggregatorList;
this.groupByList = groupByList;
this.bucketNullable = bucketNullable;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,19 @@ public static LogicalPlan write(LogicalPlan input, Table table, List<String> col
return new LogicalWrite(input, table, columns);
}

/** Build a logical aggregation with nullable bucket always true. */
public static LogicalPlan aggregation(
LogicalPlan input, List<NamedAggregator> aggregatorList, List<NamedExpression> groupByList) {
return new LogicalAggregation(input, aggregatorList, groupByList);
return new LogicalAggregation(input, aggregatorList, groupByList, true);
}

/** Build a logical aggregation with nullable bucket parameter */
public static LogicalPlan aggregation(
LogicalPlan input,
List<NamedAggregator> aggregatorList,
List<NamedExpression> groupByList,
boolean bucketNullable) {
return new LogicalAggregation(input, aggregatorList, groupByList, bucketNullable);
}

public static LogicalPlan filter(LogicalPlan input, Expression expression) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.opensearch.sql.ast.dsl.AstDSL.booleanLiteral;
import static org.opensearch.sql.ast.dsl.AstDSL.compare;
import static org.opensearch.sql.ast.dsl.AstDSL.computation;
import static org.opensearch.sql.ast.dsl.AstDSL.exprList;
import static org.opensearch.sql.ast.dsl.AstDSL.field;
import static org.opensearch.sql.ast.dsl.AstDSL.filter;
import static org.opensearch.sql.ast.dsl.AstDSL.filteredAggregate;
Expand Down Expand Up @@ -434,7 +435,7 @@ public void stats_source() {
ImmutableList.of(DSL.named("string_value", DSL.ref("string_value", STRING)))),
AstDSL.agg(
AstDSL.relation("schema"),
AstDSL.exprList(
exprList(
AstDSL.alias(
"avg(integer_value)", AstDSL.aggregate("avg", field("integer_value")))),
null,
Expand Down Expand Up @@ -486,7 +487,7 @@ public void rename_to_invalid_expression() {
AstDSL.rename(
AstDSL.agg(
AstDSL.relation("schema"),
AstDSL.exprList(
exprList(
AstDSL.alias(
"avg(integer_value)",
AstDSL.aggregate("avg", field("integer_value")))),
Expand Down Expand Up @@ -1956,4 +1957,28 @@ public void rex_command_throws_unsupported_operation_exception_in_legacy_engine(
.attach(relation("schema"))));
assertEquals("Rex is supported only when plugins.calcite.enabled=true", exception.getMessage());
}

@Test
public void stats_non_bucket_nullable_test() {
assertAnalyzeEqual(
LogicalPlanDSL.aggregation(
LogicalPlanDSL.relation("schema", table),
ImmutableList.of(
DSL.named("avg(integer_value)", DSL.avg(DSL.ref("integer_value", INTEGER)))),
ImmutableList.of(DSL.named("string_value", DSL.ref("string_value", STRING))),
false),
AstDSL.agg(
AstDSL.relation("schema"),
exprList(
AstDSL.alias(
"avg(integer_value)", AstDSL.aggregate("avg", field("integer_value")))),
null,
ImmutableList.of(AstDSL.alias("string_value", field("string_value"))),
exprList(
argument("partitions", intLiteral(1)),
argument("allnum", booleanLiteral(false)),
argument("delim", stringLiteral(" ")),
argument(Argument.BUCKET_NULLABLE, booleanLiteral(false)),
argument("dedupsplit", booleanLiteral(false)))));
}
}
37 changes: 37 additions & 0 deletions docs/user/ppl/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,40 @@ PPL query::
}
}
}

plugins.ppl.syntax.legacy.preferred
===================================

Description
-----------

This configuration is introduced since 3.3.0 which is used to switch some behaviours in PPL syntax. The current default value is ``true``.
The behaviours it controlled includes:

- The default value of argument ``bucket_nullable`` in ``stats`` command. Check `stats command <../cmd/stats.rst>`_ for details.

Example
-------

You can update the setting with a new value like this.

PPL query::

sh$ curl -sS -H 'Content-Type: application/json' \
... -X PUT localhost:9200/_plugins/_query/settings \
... -d '{"transient" : {"plugins.ppl.syntax.legacy.preferred" : "false"}}'
{
"acknowledged": true,
"persistent": {},
"transient": {
"plugins": {
"ppl": {
"syntax": {
"legacy": {
"preferred": "false"
}
}
}
}
}
}
28 changes: 26 additions & 2 deletions docs/user/ppl/cmd/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,16 @@ The following table dataSources the aggregation functions and also indicates how

Syntax
============
stats <aggregation>... [by-clause]
stats [bucket_nullable=bool] <aggregation>... [by-clause]


* aggregation: mandatory. A aggregation function. The argument of aggregation must be field.

* bucket_nullable: optional (since 3.3.0). Controls whether the stats command includes null buckets in group-by aggregations. When set to ``false``, the aggregation ignores records where the group-by field is null, resulting in faster performance by excluding null bucket. The default value of ``bucket_nullable`` is determined by ``plugins.ppl.syntax.legacy.preferred``:

* When ``plugins.ppl.syntax.legacy.preferred=true``, ``bucket_nullable`` defaults to ``true``
* When ``plugins.ppl.syntax.legacy.preferred=false``, ``bucket_nullable`` defaults to ``false``

* by-clause: optional.

* Syntax: by [span-expression,] [field,]...
Expand Down Expand Up @@ -796,7 +801,7 @@ PPL query::
+-----+----------+--------+

Example 14: Collect all values in a field using LIST
=====================================================
====================================================

The example shows how to collect all firstname values, preserving duplicates and order.

Expand All @@ -809,3 +814,22 @@ PPL query::
|-------------------------------------|
| ["Amber","Hattie","Nanette","Dale"] |
+-------------------------------------+


Example 15: Ignore null bucket
==============================

Note: This argument requires version 3.3.0 or above.

PPL query::

PPL> source=accounts | stats bucket_nullable=false count() as cnt by email;
fetched rows / total rows = 3/3
+-----+-----------------------+
| cnt | email |
|-----+-----------------------|
| 1 | [email protected] |
| 1 | [email protected] |
| 1 | [email protected] |
+-----+-----------------------+

2 changes: 1 addition & 1 deletion docs/user/ppl/interfaces/endpoint.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ The following PPL query demonstrated that where and stats command were pushed do
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":10,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":10,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[],\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}, searchDone=false)"
},
"children": []
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
@RunWith(Suite.class)
@Suite.SuiteClasses({
CalciteExplainIT.class,
CalciteArrayFunctionIT.class,
CalciteBinCommandIT.class,
CalciteConvertTZFunctionIT.class,
Expand All @@ -31,7 +32,6 @@
CalciteDedupCommandIT.class,
CalciteDescribeCommandIT.class,
CalciteExpandCommandIT.class,
CalciteExplainIT.class,
CalciteFieldsCommandIT.class,
CalciteFillNullCommandIT.class,
CalciteFlattenCommandIT.class,
Expand Down
Loading
Loading