Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public enum Key {
PATTERN_MAX_SAMPLE_COUNT("plugins.ppl.pattern.max.sample.count"),
PATTERN_BUFFER_LIMIT("plugins.ppl.pattern.buffer.limit"),
PPL_REX_MAX_MATCH_LIMIT("plugins.ppl.rex.max_match.limit"),
PPL_SYNTAX_LEGACY_PREFERRED("plugins.ppl.syntax.legacy.preferred"),

/** Enable Calcite as execution engine */
CALCITE_ENGINE_ENABLED("plugins.calcite.enabled"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,11 @@ private LogicalAggregation analyzeAggregation(
groupBys.forEach(
group ->
newEnv.define(new Symbol(Namespace.FIELD_NAME, group.getNameOrAlias()), group.type()));
return new LogicalAggregation(child, aggregators, groupBys);

Argument.ArgumentMap statsArgs = Argument.ArgumentMap.of(node.getArgExprList());
boolean bucketNullable =
(Boolean) statsArgs.getOrDefault(Argument.BUCKET_NULLABLE, Literal.TRUE).getValue();
return new LogicalAggregation(child, aggregators, groupBys, bucketNullable);
}

private Aggregation analyzePatternsAgg(Patterns node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ public static List<Argument> defaultStatsArgs() {
argument("partitions", intLiteral(1)),
argument("allnum", booleanLiteral(false)),
argument("delim", stringLiteral(" ")),
argument(Argument.BUCKET_NULLABLE, booleanLiteral(true)),
argument("dedupsplit", booleanLiteral(false)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
@RequiredArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class Argument extends UnresolvedExpression {
public static final String BUCKET_NULLABLE = "bucket_nullable";

private final String argName;
private final Literal value;

Expand Down Expand Up @@ -66,5 +68,9 @@ public static ArgumentMap empty() {
public Literal get(String name) {
return map.get(name);
}

public Literal getOrDefault(String name, Literal literal) {
return map.getOrDefault(name, literal);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,6 @@ public String toString() {

public static Literal TRUE = new Literal(true, DataType.BOOLEAN);
public static Literal FALSE = new Literal(false, DataType.BOOLEAN);
public static Literal ZERO = new Literal(Integer.valueOf("0"), DataType.INTEGER);
public static Literal ZERO = new Literal(0, DataType.INTEGER);
public static Literal ONE = new Literal(1, DataType.INTEGER);
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.hint.HintStrategyTable;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexCall;
Expand Down Expand Up @@ -847,6 +850,41 @@ public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
groupExprList.addAll(node.getGroupExprList());
Pair<List<RexNode>, List<AggCall>> aggregationAttributes =
aggregateWithTrimming(groupExprList, aggExprList, context);
// Add group by columns
List<RexNode> aliasedGroupByList =
aggregationAttributes.getLeft().stream()
.map(this::extractAliasLiteral)
.flatMap(Optional::stream)
.map(ref -> ((RexLiteral) ref).getValueAs(String.class))
.map(context.relBuilder::field)
.map(f -> (RexNode) f)
.collect(Collectors.toList());

// add stats hint to LogicalAggregation
Argument.ArgumentMap statsArgs = Argument.ArgumentMap.of(node.getArgExprList());
Boolean bucketNullable =
(Boolean) statsArgs.getOrDefault(Argument.BUCKET_NULLABLE, Literal.TRUE).getValue();
if (!bucketNullable && !aliasedGroupByList.isEmpty()) {
final RelHint statHits =
RelHint.builder("stats_args").hintOption(Argument.BUCKET_NULLABLE, "false").build();
assert context.relBuilder.peek() instanceof LogicalAggregate
: "Stats hits should be added to LogicalAggregate";
context.relBuilder.hints(statHits);
context
.relBuilder
.getCluster()
.setHintStrategies(
HintStrategyTable.builder()
.hintStrategy(
"stats_args",
(hint, rel) -> {
return rel instanceof LogicalAggregate;
})
.build());
context.relBuilder.filter(
aliasedGroupByList.stream().map(context.relBuilder::isNotNull)
.collect(Collectors.toList()));
}

// schema reordering
// As an example, in command `stats count() by colA, colB`,
Expand All @@ -859,15 +897,6 @@ public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
List<RexNode> aggRexList =
outputFields.subList(numOfOutputFields - numOfAggList, numOfOutputFields);
reordered.addAll(aggRexList);
// Add group by columns
List<RexNode> aliasedGroupByList =
aggregationAttributes.getLeft().stream()
.map(this::extractAliasLiteral)
.flatMap(Optional::stream)
.map(ref -> ((RexLiteral) ref).getValueAs(String.class))
.map(context.relBuilder::field)
.map(f -> (RexNode) f)
.collect(Collectors.toList());
reordered.addAll(aliasedGroupByList);
context.relBuilder.project(reordered);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,27 @@
import org.opensearch.sql.expression.aggregation.NamedAggregator;

/** Logical Aggregation. */
@Getter
@ToString
@EqualsAndHashCode(callSuper = true)
public class LogicalAggregation extends LogicalPlan {

@Getter private final List<NamedAggregator> aggregatorList;
private final List<NamedAggregator> aggregatorList;

@Getter private final List<NamedExpression> groupByList;
private final List<NamedExpression> groupByList;

private final boolean bucketNullable;

/** Constructor of LogicalAggregation. */
public LogicalAggregation(
LogicalPlan child, List<NamedAggregator> aggregatorList, List<NamedExpression> groupByList) {
LogicalPlan child,
List<NamedAggregator> aggregatorList,
List<NamedExpression> groupByList,
boolean bucketNullable) {
super(Collections.singletonList(child));
this.aggregatorList = aggregatorList;
this.groupByList = groupByList;
this.bucketNullable = bucketNullable;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,19 @@ public static LogicalPlan write(LogicalPlan input, Table table, List<String> col
return new LogicalWrite(input, table, columns);
}

/** Build a logical aggregation with nullable bucket always true. */
public static LogicalPlan aggregation(
LogicalPlan input, List<NamedAggregator> aggregatorList, List<NamedExpression> groupByList) {
return new LogicalAggregation(input, aggregatorList, groupByList);
return new LogicalAggregation(input, aggregatorList, groupByList, true);
}

/** Build a logical aggregation with nullable bucket parameter */
public static LogicalPlan aggregation(
LogicalPlan input,
List<NamedAggregator> aggregatorList,
List<NamedExpression> groupByList,
boolean bucketNullable) {
return new LogicalAggregation(input, aggregatorList, groupByList, bucketNullable);
}

public static LogicalPlan filter(LogicalPlan input, Expression expression) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.opensearch.sql.ast.dsl.AstDSL.booleanLiteral;
import static org.opensearch.sql.ast.dsl.AstDSL.compare;
import static org.opensearch.sql.ast.dsl.AstDSL.computation;
import static org.opensearch.sql.ast.dsl.AstDSL.exprList;
import static org.opensearch.sql.ast.dsl.AstDSL.field;
import static org.opensearch.sql.ast.dsl.AstDSL.filter;
import static org.opensearch.sql.ast.dsl.AstDSL.filteredAggregate;
Expand Down Expand Up @@ -434,7 +435,7 @@ public void stats_source() {
ImmutableList.of(DSL.named("string_value", DSL.ref("string_value", STRING)))),
AstDSL.agg(
AstDSL.relation("schema"),
AstDSL.exprList(
exprList(
AstDSL.alias(
"avg(integer_value)", AstDSL.aggregate("avg", field("integer_value")))),
null,
Expand Down Expand Up @@ -486,7 +487,7 @@ public void rename_to_invalid_expression() {
AstDSL.rename(
AstDSL.agg(
AstDSL.relation("schema"),
AstDSL.exprList(
exprList(
AstDSL.alias(
"avg(integer_value)",
AstDSL.aggregate("avg", field("integer_value")))),
Expand Down Expand Up @@ -1956,4 +1957,28 @@ public void rex_command_throws_unsupported_operation_exception_in_legacy_engine(
.attach(relation("schema"))));
assertEquals("Rex is supported only when plugins.calcite.enabled=true", exception.getMessage());
}

@Test
public void stats_non_bucket_nullable_test() {
assertAnalyzeEqual(
LogicalPlanDSL.aggregation(
LogicalPlanDSL.relation("schema", table),
ImmutableList.of(
DSL.named("avg(integer_value)", DSL.avg(DSL.ref("integer_value", INTEGER)))),
ImmutableList.of(DSL.named("string_value", DSL.ref("string_value", STRING))),
false),
AstDSL.agg(
AstDSL.relation("schema"),
exprList(
AstDSL.alias(
"avg(integer_value)", AstDSL.aggregate("avg", field("integer_value")))),
null,
ImmutableList.of(AstDSL.alias("string_value", field("string_value"))),
exprList(
argument("partitions", intLiteral(1)),
argument("allnum", booleanLiteral(false)),
argument("delim", stringLiteral(" ")),
argument(Argument.BUCKET_NULLABLE, booleanLiteral(false)),
argument("dedupsplit", booleanLiteral(false)))));
}
}
37 changes: 37 additions & 0 deletions docs/user/ppl/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,40 @@ PPL query::
}
}
}

plugins.ppl.syntax.legacy.preferred
===================================

Description
-----------

This configuration is introduced since 3.3.0 which is used to switch some behaviours in PPL syntax. The current default value is ``true``.
The behaviours it controlled includes:

- The default value of argument ``bucket_nullable`` in ``stats`` command. Check `stats command <../cmd/stats.rst>`_ for details.

Example
-------

You can update the setting with a new value like this.

PPL query::

sh$ curl -sS -H 'Content-Type: application/json' \
... -X PUT localhost:9200/_plugins/_query/settings \
... -d '{"transient" : {"plugins.ppl.syntax.legacy.preferred" : "false"}}'
{
"acknowledged": true,
"persistent": {},
"transient": {
"plugins": {
"ppl": {
"syntax": {
"legacy": {
"preferred": "false"
}
}
}
}
}
}
28 changes: 26 additions & 2 deletions docs/user/ppl/cmd/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,16 @@ The following table dataSources the aggregation functions and also indicates how

Syntax
============
stats <aggregation>... [by-clause]
stats [bucket_nullable=bool] <aggregation>... [by-clause]


* aggregation: mandatory. A aggregation function. The argument of aggregation must be field.

* bucket_nullable: optional (since 3.3.0). Controls whether the stats command includes null buckets in group-by aggregations. When set to ``false``, the aggregation ignores records where the group-by field is null, resulting in faster performance by excluding null bucket. The default value of ``bucket_nullable`` is determined by ``plugins.ppl.syntax.legacy.preferred``:

* When ``plugins.ppl.syntax.legacy.preferred=true``, ``bucket_nullable`` defaults to ``true``
* When ``plugins.ppl.syntax.legacy.preferred=false``, ``bucket_nullable`` defaults to ``false``

* by-clause: optional.

* Syntax: by [span-expression,] [field,]...
Expand Down Expand Up @@ -793,7 +798,7 @@ PPL query::
+-----+----------+--------+

Example 14: Collect all values in a field using LIST
=====================================================
====================================================

The example shows how to collect all firstname values, preserving duplicates and order.

Expand All @@ -806,3 +811,22 @@ PPL query::
|-------------------------------------|
| ["Amber","Hattie","Nanette","Dale"] |
+-------------------------------------+


Example 15: Ignore null bucket
==============================

Note: This argument requires version 3.3.0 or above.

PPL query::

PPL> source=accounts | stats bucket_nullable=false count() as cnt by email;
fetched rows / total rows = 3/3
+-----+-----------------------+
| cnt | email |
|-----+-----------------------|
| 1 | [email protected] |
| 1 | [email protected] |
| 1 | [email protected] |
+-----+-----------------------+

2 changes: 1 addition & 1 deletion docs/user/ppl/interfaces/endpoint.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ The following PPL query demonstrated that where and stats command were pushed do
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":10,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":10,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[],\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}, searchDone=false)"
},
"children": []
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
@RunWith(Suite.class)
@Suite.SuiteClasses({
CalciteExplainIT.class,
CalciteArrayFunctionIT.class,
CalciteBinCommandIT.class,
CalciteConvertTZFunctionIT.class,
Expand All @@ -31,7 +32,6 @@
CalciteDedupCommandIT.class,
CalciteDescribeCommandIT.class,
CalciteExpandCommandIT.class,
CalciteExplainIT.class,
CalciteFieldsCommandIT.class,
CalciteFillNullCommandIT.class,
CalciteFlattenCommandIT.class,
Expand Down
Loading
Loading