diff --git a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java index 3ab6dcb420..664f5212ce 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java +++ b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java @@ -185,14 +185,19 @@ public LogicalPlan visitAggregation(Aggregation node, AnalysisContext context) { aggregatorBuilder .add(new NamedAggregator(aggExpr.getNameOrAlias(), (Aggregator) aggExpr.getDelegated())); } - ImmutableList aggregators = aggregatorBuilder.build(); ImmutableList.Builder groupbyBuilder = new ImmutableList.Builder<>(); + // Span should be first expression if exist. + if (node.getSpan() != null) { + groupbyBuilder.add(namedExpressionAnalyzer.analyze(node.getSpan(), context)); + } + for (UnresolvedExpression expr : node.getGroupExprList()) { groupbyBuilder.add(namedExpressionAnalyzer.analyze(expr, context)); } ImmutableList groupBys = groupbyBuilder.build(); + ImmutableList aggregators = aggregatorBuilder.build(); // new context context.push(); TypeEnvironment newEnv = context.peek(); diff --git a/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java b/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java index 1266eae73f..a9249f82cc 100644 --- a/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java +++ b/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java @@ -92,7 +92,17 @@ public static UnresolvedPlan agg( List sortList, List groupList, List argList) { - return new Aggregation(aggList, sortList, groupList, argList).attach(input); + return new Aggregation(aggList, sortList, groupList, null, argList).attach(input); + } + + public static UnresolvedPlan agg( + UnresolvedPlan input, + List aggList, + List sortList, + List groupList, + UnresolvedExpression span, + List argList) { + return new Aggregation(aggList, sortList, groupList, span, argList).attach(input); } public static UnresolvedPlan rename(UnresolvedPlan input, Map... maps) { diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/Aggregation.java b/core/src/main/java/org/opensearch/sql/ast/tree/Aggregation.java index 6223ebc2ca..e9fa26e981 100644 --- a/core/src/main/java/org/opensearch/sql/ast/tree/Aggregation.java +++ b/core/src/main/java/org/opensearch/sql/ast/tree/Aggregation.java @@ -28,16 +28,17 @@ public class Aggregation extends UnresolvedPlan { private List aggExprList; private List sortExprList; private List groupExprList; + private UnresolvedExpression span; private List argExprList; private UnresolvedPlan child; /** - * Aggregation Constructor without argument. + * Aggregation Constructor without span and argument. */ public Aggregation(List aggExprList, List sortExprList, List groupExprList) { - this(aggExprList, sortExprList, groupExprList, Collections.emptyList()); + this(aggExprList, sortExprList, groupExprList, null, Collections.emptyList()); } /** @@ -46,10 +47,12 @@ public Aggregation(List aggExprList, public Aggregation(List aggExprList, List sortExprList, List groupExprList, + UnresolvedExpression span, List argExprList) { this.aggExprList = aggExprList; this.sortExprList = sortExprList; this.groupExprList = groupExprList; + this.span = span; this.argExprList = argExprList; } diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/AggregationOperator.java b/core/src/main/java/org/opensearch/sql/planner/physical/AggregationOperator.java index d6c5c5d082..5e05286bbc 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/AggregationOperator.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/AggregationOperator.java @@ -18,8 +18,7 @@ import org.opensearch.sql.expression.aggregation.Aggregator; import org.opensearch.sql.expression.aggregation.NamedAggregator; import org.opensearch.sql.expression.span.SpanExpression; -import org.opensearch.sql.planner.physical.bucket.Group; -import org.opensearch.sql.planner.physical.bucket.SpanBucket; +import org.opensearch.sql.planner.physical.collector.Collector; import org.opensearch.sql.storage.bindingtuple.BindingTuple; /** @@ -35,8 +34,13 @@ public class AggregationOperator extends PhysicalPlan { private final List aggregatorList; @Getter private final List groupByExprList; + @Getter + private final NamedExpression span; + /** + * {@link BindingTuple} Collector. + */ @EqualsAndHashCode.Exclude - private final Group group; + private final Collector collector; @EqualsAndHashCode.Exclude private Iterator iterator; @@ -51,9 +55,14 @@ public AggregationOperator(PhysicalPlan input, List aggregatorL List groupByExprList) { this.input = input; this.aggregatorList = aggregatorList; - this.groupByExprList = groupByExprList; - this.group = groupBySpan(groupByExprList) ? new SpanBucket(aggregatorList, groupByExprList) - : new Group(aggregatorList, groupByExprList); + if (hasSpan(groupByExprList)) { + this.span = groupByExprList.get(0); + this.groupByExprList = groupByExprList.subList(1, groupByExprList.size()); + } else { + this.span = null; + this.groupByExprList = groupByExprList; + } + this.collector = Collector.Builder.build(this.span, this.groupByExprList, this.aggregatorList); } @Override @@ -81,14 +90,13 @@ public ExprValue next() { public void open() { super.open(); while (input.hasNext()) { - group.push(input.next()); + collector.collect(input.next().bindingTuples()); } - iterator = group.result().iterator(); + iterator = collector.results().iterator(); } - private boolean groupBySpan(List namedExpressionList) { - return namedExpressionList.size() == 1 + private boolean hasSpan(List namedExpressionList) { + return !namedExpressionList.isEmpty() && namedExpressionList.get(0).getDelegated() instanceof SpanExpression; } - } diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/bucket/Group.java b/core/src/main/java/org/opensearch/sql/planner/physical/bucket/Group.java deleted file mode 100644 index 4e1b47661e..0000000000 --- a/core/src/main/java/org/opensearch/sql/planner/physical/bucket/Group.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.planner.physical.bucket; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; -import java.util.AbstractMap; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import org.opensearch.sql.data.model.ExprTupleValue; -import org.opensearch.sql.data.model.ExprValue; -import org.opensearch.sql.expression.Expression; -import org.opensearch.sql.expression.NamedExpression; -import org.opensearch.sql.expression.aggregation.AggregationState; -import org.opensearch.sql.expression.aggregation.NamedAggregator; -import org.opensearch.sql.storage.bindingtuple.BindingTuple; - -@VisibleForTesting -@RequiredArgsConstructor -public class Group { - @Getter - private final List aggregatorList; - @Getter - private final List groupByExprList; - - protected final Map>> groupListMap = - new HashMap<>(); - - /** - * Push the BindingTuple to Group. Two functions will be applied to each BindingTuple to - * generate the {@link Key} and {@link AggregationState} - * Key = GroupKey(bindingTuple), State = Aggregator(bindingTuple) - */ - public void push(ExprValue inputValue) { - Key groupKey = new Key(inputValue, groupByExprList); - groupListMap.computeIfAbsent(groupKey, k -> - aggregatorList.stream() - .map(aggregator -> new AbstractMap.SimpleEntry<>(aggregator, - aggregator.create())) - .collect(Collectors.toList()) - ); - groupListMap.computeIfPresent(groupKey, (key, aggregatorList) -> { - aggregatorList - .forEach(entry -> entry.getKey().iterate(inputValue.bindingTuples(), entry.getValue())); - return aggregatorList; - }); - } - - /** - * Get the list of {@link BindingTuple} for each group. - */ - public List result() { - ImmutableList.Builder resultBuilder = new ImmutableList.Builder<>(); - for (Map.Entry>> - entry : groupListMap.entrySet()) { - LinkedHashMap map = new LinkedHashMap<>(entry.getKey().groupKeyMap()); - for (Map.Entry stateEntry : entry.getValue()) { - map.put(stateEntry.getKey().getName(), stateEntry.getValue().result()); - } - resultBuilder.add(ExprTupleValue.fromExprValueMap(map)); - } - return resultBuilder.build(); - } - - /** - * Group Key. - */ - @EqualsAndHashCode - @VisibleForTesting - public static class Key { - private final List groupByValueList; - private final List groupByExprList; - - /** - * GroupKey constructor. - */ - public Key(ExprValue value, List groupByExprList) { - this.groupByValueList = new ArrayList<>(); - this.groupByExprList = groupByExprList; - for (Expression groupExpr : groupByExprList) { - this.groupByValueList.add(groupExpr.valueOf(value.bindingTuples())); - } - } - - /** - * Return the Map of group field and group field value. - */ - public LinkedHashMap groupKeyMap() { - LinkedHashMap map = new LinkedHashMap<>(); - for (int i = 0; i < groupByExprList.size(); i++) { - map.put(groupByExprList.get(i).getNameOrAlias(), groupByValueList.get(i)); - } - return map; - } - } -} diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/bucket/SpanBucket.java b/core/src/main/java/org/opensearch/sql/planner/physical/bucket/SpanBucket.java deleted file mode 100644 index 42977830ac..0000000000 --- a/core/src/main/java/org/opensearch/sql/planner/physical/bucket/SpanBucket.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.planner.physical.bucket; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; -import java.util.AbstractMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import org.opensearch.sql.ast.expression.SpanUnit; -import org.opensearch.sql.data.model.ExprTupleValue; -import org.opensearch.sql.data.model.ExprValue; -import org.opensearch.sql.data.model.ExprValueUtils; -import org.opensearch.sql.expression.NamedExpression; -import org.opensearch.sql.expression.aggregation.AggregationState; -import org.opensearch.sql.expression.aggregation.NamedAggregator; -import org.opensearch.sql.expression.span.SpanExpression; - -public class SpanBucket extends Group { - private final List aggregatorList; - private final List groupByExprList; - private final Rounding rounding; - - /** - * SpanBucket Constructor. - */ - public SpanBucket(List aggregatorList, List groupByExprList) { - super(aggregatorList, groupByExprList); - this.aggregatorList = aggregatorList; - this.groupByExprList = groupByExprList; - rounding = Rounding.createRounding(((SpanExpression) groupByExprList.get(0).getDelegated())); - } - - @Override - public void push(ExprValue inputValue) { - Key spanKey = new Key(inputValue, groupByExprList, rounding); - groupListMap.computeIfAbsent(spanKey, k -> - aggregatorList.stream() - .map(aggregator -> new AbstractMap.SimpleEntry<>(aggregator, - aggregator.create())) - .collect(Collectors.toList()) - ); - groupListMap.computeIfPresent(spanKey, (key, aggregatorList) -> { - aggregatorList - .forEach(entry -> entry.getKey().iterate(inputValue.bindingTuples(), entry.getValue())); - return aggregatorList; - }); - } - - @Override - public List result() { - ExprValue[] buckets = rounding.createBuckets(); - LinkedHashMap emptyBucketTuple = new LinkedHashMap<>(); - String spanKey = null; - for (Map.Entry>> - entry : groupListMap.entrySet()) { - LinkedHashMap tupleMap = new LinkedHashMap<>(entry.getKey().groupKeyMap()); - if (spanKey == null) { - spanKey = ((Key) entry.getKey()).namedSpan.getNameOrAlias(); - } - for (Map.Entry stateEntry : entry.getValue()) { - tupleMap.put(stateEntry.getKey().getName(), stateEntry.getValue().result()); - if (emptyBucketTuple.isEmpty()) { - entry.getKey().groupKeyMap().keySet().forEach(key -> emptyBucketTuple.put(key, null)); - emptyBucketTuple.put(stateEntry.getKey().getName(), ExprValueUtils.fromObjectValue(0)); - } - } - int index = rounding.locate(((SpanBucket.Key) entry.getKey()).getRoundedValue()); - buckets[index] = ExprTupleValue.fromExprValueMap(tupleMap); - } - return ImmutableList.copyOf(rounding.fillBuckets(buckets, emptyBucketTuple, spanKey)); - } - - @EqualsAndHashCode(callSuper = false) - @VisibleForTesting - public static class Key extends Group.Key { - @Getter - private final ExprValue roundedValue; - private final NamedExpression namedSpan; - - /** - * SpanBucket.Key Constructor. - */ - public Key(ExprValue value, List groupByExprList, Rounding rounding) { - super(value, groupByExprList); - namedSpan = groupByExprList.get(0); - ExprValue actualValue = ((SpanExpression) namedSpan.getDelegated()).getField() - .valueOf(value.bindingTuples()); - roundedValue = rounding.round(actualValue); - } - - /** - * Return the Map of span key and its actual value. - */ - @Override - public LinkedHashMap groupKeyMap() { - LinkedHashMap map = new LinkedHashMap<>(); - map.put(namedSpan.getNameOrAlias(), roundedValue); - return map; - } - } - -} diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/collector/BucketCollector.java b/core/src/main/java/org/opensearch/sql/planner/physical/collector/BucketCollector.java new file mode 100644 index 0000000000..c3a803ca9e --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/physical/collector/BucketCollector.java @@ -0,0 +1,115 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.physical.collector; + +import com.google.common.collect.ImmutableList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.data.model.ExprCollectionValue; +import org.opensearch.sql.data.model.ExprTupleValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.expression.NamedExpression; +import org.opensearch.sql.storage.bindingtuple.BindingTuple; + +/** + * Collect Bucket from {@link BindingTuple}. + */ +@RequiredArgsConstructor +public class BucketCollector implements Collector { + + /** + * Bucket Expression. + */ + private final NamedExpression bucketExpr; + + /** + * Collector Constructor. + */ + private final Supplier supplier; + + /** + * Map between bucketKey and collector in the bucket. + */ + private final Map collectorMap = new HashMap<>(); + + /** + * Bucket Index. + */ + private int bucketIndex = 0; + + /** + * Collect Bucket from {@link BindingTuple}. + * If bucket not exist, create new bucket and {@link Collector}. + * If bucket exist, let {@link Collector} in the bucket collect from {@link BindingTuple}. + * + * @param input {@link BindingTuple}. + */ + @Override + public void collect(BindingTuple input) { + ExprValue bucketKey = bucketKey(input); + collectorMap.putIfAbsent(bucketKey, supplier.get()); + collectorMap.get(bucketKey).collect(input); + } + + /** + * Bucket Key. + * @param tuple {@link BindingTuple}. + * @return Bucket Key. + */ + protected ExprValue bucketKey(BindingTuple tuple) { + return bucketExpr.valueOf(tuple); + } + + /** + * Get result from all the buckets. + * + * @return list of {@link ExprValue}. + */ + @Override + public List results() { + ExprValue[] buckets = allocateBuckets(); + for (Map.Entry entry : collectorMap.entrySet()) { + ImmutableList.Builder builder = new ImmutableList.Builder<>(); + for (ExprValue tuple : entry.getValue().results()) { + LinkedHashMap tmp = new LinkedHashMap<>(); + tmp.put(bucketExpr.getNameOrAlias(), entry.getKey()); + tmp.putAll(tuple.tupleValue()); + builder.add(ExprTupleValue.fromExprValueMap(tmp)); + } + buckets[locateBucket(entry.getKey())] = new ExprCollectionValue(builder.build()); + } + return Arrays.stream(buckets) + .filter(Objects::nonNull) + .flatMap(v -> v.collectionValue().stream()) + .collect(Collectors.toList()); + } + + /** + * Allocates Buckets for building results. + * + * @return buckets. + */ + protected ExprValue[] allocateBuckets() { + return new ExprValue[collectorMap.size()]; + } + + /** + * Current Bucket index in allocated buckets. + * + * @param value bucket key. + * @return index. + */ + protected int locateBucket(ExprValue value) { + return bucketIndex++; + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/collector/Collector.java b/core/src/main/java/org/opensearch/sql/planner/physical/collector/Collector.java new file mode 100644 index 0000000000..66eba7440b --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/physical/collector/Collector.java @@ -0,0 +1,57 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.physical.collector; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import lombok.experimental.UtilityClass; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.expression.NamedExpression; +import org.opensearch.sql.expression.aggregation.NamedAggregator; +import org.opensearch.sql.storage.bindingtuple.BindingTuple; + +/** + * Interface of {@link BindingTuple} Collector. + */ +public interface Collector { + + /** + * Collect from {@link BindingTuple}. + * + * @param tuple {@link BindingTuple}. + */ + void collect(BindingTuple tuple); + + /** + * Get Result from Collector. + * + * @return list of {@link ExprValue}. + */ + List results(); + + /** + * {@link Collector} tree builder. + */ + @UtilityClass + class Builder { + /** + * build {@link Collector}. + */ + public static Collector build( + NamedExpression span, List buckets, List aggregators) { + if (span == null && buckets.isEmpty()) { + return new MetricCollector(aggregators); + } else if (span != null) { + return new SpanCollector(span, () -> build(null, buckets, aggregators)); + } else { + return new BucketCollector( + buckets.get(0), + () -> + build(null, ImmutableList.copyOf(buckets.subList(1, buckets.size())), aggregators)); + } + } + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/collector/MetricCollector.java b/core/src/main/java/org/opensearch/sql/planner/physical/collector/MetricCollector.java new file mode 100644 index 0000000000..c804c7bc9b --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/physical/collector/MetricCollector.java @@ -0,0 +1,64 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.physical.collector; + +import java.util.AbstractMap; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.opensearch.sql.data.model.ExprTupleValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.expression.aggregation.AggregationState; +import org.opensearch.sql.expression.aggregation.NamedAggregator; +import org.opensearch.sql.storage.bindingtuple.BindingTuple; + +/** + * Each {@link NamedAggregator} defined in aggregators collect metrics from {@link BindingTuple}. + */ +public class MetricCollector implements Collector { + + /** + * List of {@link NamedAggregator}. + */ + private final List> aggregators; + + /** + * Constructor of {@link MetricCollector}. + * + * @param aggregators aggregators. + */ + public MetricCollector(List aggregators) { + this.aggregators = + aggregators.stream() + .map(aggregator -> new AbstractMap.SimpleEntry<>(aggregator, aggregator.create())) + .collect(Collectors.toList()); + } + + /** + * Collect Metrics from BindingTuple. + * + * @param input {@link BindingTuple}. + */ + public void collect(BindingTuple input) { + aggregators.forEach( + agg -> { + agg.getKey().iterate(input, agg.getValue()); + }); + } + + /** + * Get aggregation result from aggregators. + * + * @return List of {@link ExprValue}. + */ + public List results() { + LinkedHashMap map = new LinkedHashMap<>(); + aggregators.forEach(agg -> map.put(agg.getKey().getName(), agg.getValue().result())); + return Collections.singletonList(ExprTupleValue.fromExprValueMap(map)); + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/bucket/Rounding.java b/core/src/main/java/org/opensearch/sql/planner/physical/collector/Rounding.java similarity index 77% rename from core/src/main/java/org/opensearch/sql/planner/physical/bucket/Rounding.java rename to core/src/main/java/org/opensearch/sql/planner/physical/collector/Rounding.java index 366392e910..2934b44e95 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/bucket/Rounding.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/collector/Rounding.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.sql.planner.physical.bucket; +package org.opensearch.sql.planner.physical.collector; import static org.opensearch.sql.data.type.ExprCoreType.DATE; import static org.opensearch.sql.data.type.ExprCoreType.DATETIME; @@ -20,7 +20,6 @@ import java.time.ZonedDateTime; import java.time.temporal.ChronoField; import java.util.Arrays; -import java.util.Map; import java.util.concurrent.TimeUnit; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -29,7 +28,6 @@ import org.opensearch.sql.data.model.ExprDatetimeValue; import org.opensearch.sql.data.model.ExprTimeValue; import org.opensearch.sql.data.model.ExprTimestampValue; -import org.opensearch.sql.data.model.ExprTupleValue; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.model.ExprValueUtils; import org.opensearch.sql.data.type.ExprType; @@ -81,9 +79,6 @@ public static Rounding createRounding(SpanExpression span) { public abstract ExprValue[] createBuckets(); - public abstract ExprValue[] fillBuckets(ExprValue[] buckets, Map map, - String key); - static class TimestampRounding extends Rounding { private final ExprValue interval; @@ -118,28 +113,6 @@ public ExprValue[] createBuckets() { } } - @Override - public ExprValue[] fillBuckets(ExprValue[] buckets, - Map map, - String key) { - for (int id = 0; id < buckets.length; id++) { - ExprValue tuple = buckets[id]; - if (tuple == null) { - long placeHolder; - if (dateTimeUnit.isMillisBased) { - placeHolder = minRounded.toEpochMilli() + dateTimeUnit.ratio * interval - .integerValue() * id; - } else { - placeHolder = minRounded.atZone(ZoneId.of("UTC")).plusMonths(dateTimeUnit - .ratio * interval.integerValue()).toInstant().toEpochMilli(); - } - map.replace(key, new ExprTimestampValue(Instant.ofEpochMilli(placeHolder))); - buckets[id] = ExprTupleValue.fromExprValueMap(map); - } - } - return buckets; - } - @Override public Integer locate(ExprValue value) { if (dateTimeUnit.isMillisBased) { @@ -201,29 +174,6 @@ public ExprValue[] createBuckets() { } } - @Override - public ExprValue[] fillBuckets(ExprValue[] buckets, - Map map, - String key) { - for (int id = 0; id < buckets.length; id++) { - ExprValue tuple = buckets[id]; - if (tuple == null) { - long placeHolder; - if (dateTimeUnit.isMillisBased) { - placeHolder = minRounded.atZone(ZoneId.of("UTC")).toInstant() - .toEpochMilli() + dateTimeUnit.ratio * interval.integerValue() * id; - } else { - placeHolder = minRounded.atZone(ZoneId.of("UTC")).plusMonths(dateTimeUnit - .ratio * interval.integerValue() * id).toInstant().toEpochMilli(); - } - map.replace(key, new ExprDatetimeValue(Instant.ofEpochMilli(placeHolder) - .atZone(ZoneId.of("UTC")).toLocalDateTime())); - buckets[id] = ExprTupleValue.fromExprValueMap(map); - } - } - return buckets; - } - @Override public Integer locate(ExprValue value) { if (dateTimeUnit.isMillisBased) { @@ -286,30 +236,6 @@ public ExprValue[] createBuckets() { } } - @Override - public ExprValue[] fillBuckets(ExprValue[] buckets, - Map map, - String key) { - for (int id = 0; id < buckets.length; id++) { - ExprValue tuple = buckets[id]; - if (tuple == null) { - long placeHolder; - if (dateTimeUnit.isMillisBased) { - placeHolder = minRounded.atStartOfDay().atZone(ZoneId.of("UTC")).toInstant() - .toEpochMilli() + dateTimeUnit.ratio * interval.integerValue() * id; - } else { - placeHolder = minRounded.atStartOfDay().atZone(ZoneId.of("UTC")) - .plusMonths(dateTimeUnit.ratio * interval.integerValue()).toInstant() - .toEpochMilli(); - } - map.replace(key, new ExprDateValue(Instant.ofEpochMilli(placeHolder) - .atZone(ZoneId.of("UTC")).toLocalDate())); - buckets[id] = ExprTupleValue.fromExprValueMap(map); - } - } - return buckets; - } - @Override public Integer locate(ExprValue value) { if (dateTimeUnit.isMillisBased) { @@ -369,24 +295,6 @@ public ExprValue[] createBuckets() { return new ExprValue[size]; } - @Override - public ExprValue[] fillBuckets(ExprValue[] buckets, - Map map, - String key) { - for (int id = 0; id < buckets.length; id++) { - ExprValue tuple = buckets[id]; - if (tuple == null) { - long placeHolder = minRounded.atDate(LocalDate.of(1970, 1, 1)) - .atZone(ZoneId.of("UTC")) - .toInstant().toEpochMilli() + dateTimeUnit.ratio * interval.integerValue() * id; - map.replace(key, new ExprTimeValue(Instant.ofEpochMilli(placeHolder) - .atZone(ZoneId.of("UTC")).toLocalTime())); - buckets[id] = ExprTupleValue.fromExprValueMap(map); - } - } - return buckets; - } - @Override public Integer locate(ExprValue value) { long intervalInEpochMillis = dateTimeUnit.ratio; @@ -434,20 +342,6 @@ public ExprValue[] createBuckets() { return new ExprValue[size]; } - @Override - public ExprValue[] fillBuckets(ExprValue[] buckets, - Map map, - String key) { - for (int id = 0; id < buckets.length; id++) { - ExprValue tuple = buckets[id]; - if (tuple == null) { - map.replace(key, ExprValueUtils.longValue(minRounded + longInterval * id)); - buckets[id] = ExprTupleValue.fromExprValueMap(map); - } - } - return buckets; - } - private void updateRounded(Long value) { if (maxRounded == null || value > maxRounded) { maxRounded = value; @@ -485,19 +379,6 @@ public ExprValue[] createBuckets() { return new ExprValue[size]; } - @Override - public ExprValue[] fillBuckets(ExprValue[] buckets, Map map, - String key) { - for (int id = 0; id < buckets.length; id++) { - ExprValue tuple = buckets[id]; - if (tuple == null) { - map.replace(key, ExprValueUtils.doubleValue(minRounded + doubleInterval * id)); - buckets[id] = ExprTupleValue.fromExprValueMap(map); - } - } - return buckets; - } - private void updateRounded(Double value) { if (maxRounded == null || value > maxRounded) { maxRounded = value; @@ -525,12 +406,6 @@ public Integer locate(ExprValue value) { public ExprValue[] createBuckets() { return new ExprValue[0]; } - - @Override - public ExprValue[] fillBuckets(ExprValue[] buckets, Map map, - String key) { - return new ExprValue[0]; - } } diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/collector/SpanCollector.java b/core/src/main/java/org/opensearch/sql/planner/physical/collector/SpanCollector.java new file mode 100644 index 0000000000..092d79bd81 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/physical/collector/SpanCollector.java @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.physical.collector; + +import java.util.function.Supplier; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.expression.NamedExpression; +import org.opensearch.sql.expression.span.SpanExpression; +import org.opensearch.sql.storage.bindingtuple.BindingTuple; + +/** + * Span Collector. + */ +public class SpanCollector extends BucketCollector { + + /** + * Span Expression. + */ + private final SpanExpression spanExpr; + + /** + * Rounding. + */ + private final Rounding rounding; + + /** + * Constructor. + */ + public SpanCollector(NamedExpression bucketExpr, Supplier supplier) { + super(bucketExpr, supplier); + this.spanExpr = (SpanExpression) bucketExpr.getDelegated(); + this.rounding = Rounding.createRounding(spanExpr); + } + + /** + * Rounding bucket value. + * + * @param tuple {@link BindingTuple}. + * @return {@link ExprValue}. + */ + @Override + protected ExprValue bucketKey(BindingTuple tuple) { + return rounding.round(spanExpr.getField().valueOf(tuple)); + } + + /** + * Allocates Buckets for building results. + * + * @return buckets. + */ + @Override + protected ExprValue[] allocateBuckets() { + return rounding.createBuckets(); + } + + /** + * Current Bucket index in allocated buckets. + * + * @param value bucket key. + * @return index. + */ + @Override + protected int locateBucket(ExprValue value) { + return rounding.locate(value); + } +} diff --git a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java index 0908a8bc8a..8b733a78de 100644 --- a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java +++ b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java @@ -21,6 +21,7 @@ import static org.opensearch.sql.ast.dsl.AstDSL.intLiteral; import static org.opensearch.sql.ast.dsl.AstDSL.qualifiedName; import static org.opensearch.sql.ast.dsl.AstDSL.relation; +import static org.opensearch.sql.ast.dsl.AstDSL.span; import static org.opensearch.sql.ast.tree.Sort.NullOrder; import static org.opensearch.sql.ast.tree.Sort.SortOption; import static org.opensearch.sql.ast.tree.Sort.SortOption.DEFAULT_ASC; @@ -41,6 +42,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.opensearch.sql.ast.dsl.AstDSL; import org.opensearch.sql.ast.expression.Argument; +import org.opensearch.sql.ast.expression.SpanUnit; import org.opensearch.sql.ast.tree.RareTopN.CommandType; import org.opensearch.sql.exception.SemanticCheckException; import org.opensearch.sql.expression.DSL; @@ -644,4 +646,27 @@ public void named_aggregator_with_condition() { ) ); } + + /** + * stats avg(integer_value) by string_value span(long_value, 10). + */ + @Test + public void ppl_stats_by_fieldAndSpan() { + assertAnalyzeEqual( + LogicalPlanDSL.aggregation( + LogicalPlanDSL.relation("schema"), + ImmutableList.of( + DSL.named("AVG(integer_value)", dsl.avg(DSL.ref("integer_value", INTEGER)))), + ImmutableList.of( + DSL.named("span", DSL.span(DSL.ref("long_value", LONG), DSL.literal(10), "")), + DSL.named("string_value", DSL.ref("string_value", STRING)))), + AstDSL.agg( + AstDSL.relation("schema"), + ImmutableList.of( + alias("AVG(integer_value)", aggregate("AVG", qualifiedName("integer_value")))), + emptyList(), + ImmutableList.of(alias("string_value", qualifiedName("string_value"))), + alias("span", span(field("long_value"), intLiteral(10), SpanUnit.NONE)), + emptyList())); + } } diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/AggregationOperatorTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/AggregationOperatorTest.java index 01e8c9ef33..3b45a11c6c 100644 --- a/core/src/test/java/org/opensearch/sql/planner/physical/AggregationOperatorTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/physical/AggregationOperatorTest.java @@ -10,6 +10,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsInRelativeOrder; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.opensearch.sql.data.model.ExprValueUtils.LITERAL_MISSING; import static org.opensearch.sql.data.type.ExprCoreType.DATE; import static org.opensearch.sql.data.type.ExprCoreType.DATETIME; import static org.opensearch.sql.data.type.ExprCoreType.DOUBLE; @@ -34,6 +35,20 @@ import org.opensearch.sql.expression.DSL; class AggregationOperatorTest extends PhysicalPlanTestBase { + + @Test + public void sum_without_groups() { + PhysicalPlan plan = new AggregationOperator(new TestScan(), + Collections + .singletonList(DSL.named("sum(response)", dsl.sum(DSL.ref("response", INTEGER)))), + Collections.emptyList()); + List result = execute(plan); + assertEquals(1, result.size()); + assertThat(result, containsInAnyOrder( + ExprValueUtils.tupleValue(ImmutableMap.of("sum(response)", 1504d)) + )); + } + @Test public void avg_with_one_groups() { PhysicalPlan plan = new AggregationOperator(new TestScan(), @@ -83,18 +98,16 @@ public void sum_with_one_groups() { @Test public void millisecond_span() { - PhysicalPlan plan = new AggregationOperator(new DateTimeTestScan(), + PhysicalPlan plan = new AggregationOperator(testScan(datetimeInputs), Collections.singletonList(DSL .named("count", dsl.count(DSL.ref("second", TIMESTAMP)))), Collections.singletonList(DSL .named("span", DSL.span(DSL.ref("second", TIMESTAMP), DSL.literal(6 * 1000), "ms")))); List result = execute(plan); - assertEquals(3, result.size()); + assertEquals(2, result.size()); assertThat(result, containsInRelativeOrder( ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprTimestampValue("2021-01-01 00:00:00"), "count", 2)), - ExprValueUtils.tupleValue(ImmutableMap - .of("span", new ExprTimestampValue("2021-01-01 00:00:06"), "count", 0)), ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprTimestampValue("2021-01-01 00:00:12"), "count", 3)) )); @@ -102,18 +115,16 @@ public void millisecond_span() { @Test public void second_span() { - PhysicalPlan plan = new AggregationOperator(new DateTimeTestScan(), + PhysicalPlan plan = new AggregationOperator(testScan(datetimeInputs), Collections.singletonList(DSL .named("count", dsl.count(DSL.ref("second", TIMESTAMP)))), Collections.singletonList(DSL .named("span", DSL.span(DSL.ref("second", TIMESTAMP), DSL.literal(6), "s")))); List result = execute(plan); - assertEquals(3, result.size()); + assertEquals(2, result.size()); assertThat(result, containsInRelativeOrder( ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprTimestampValue("2021-01-01 00:00:00"), "count", 2)), - ExprValueUtils.tupleValue(ImmutableMap - .of("span", new ExprTimestampValue("2021-01-01 00:00:06"), "count", 0)), ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprTimestampValue("2021-01-01 00:00:12"), "count", 3)) )); @@ -121,36 +132,32 @@ public void second_span() { @Test public void minute_span() { - PhysicalPlan plan = new AggregationOperator(new DateTimeTestScan(), + PhysicalPlan plan = new AggregationOperator(testScan(datetimeInputs), Collections.singletonList(DSL .named("count", dsl.count(DSL.ref("minute", DATETIME)))), Collections.singletonList(DSL .named("span", DSL.span(DSL.ref("minute", DATETIME), DSL.literal(5), "m")))); List result = execute(plan); - assertEquals(4, result.size()); + assertEquals(3, result.size()); assertThat(result, containsInRelativeOrder( ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprDatetimeValue("2020-12-31 23:50:00"), "count", 1)), - ExprValueUtils.tupleValue(ImmutableMap - .of("span", new ExprDatetimeValue("2020-12-31 23:55:00"), "count", 0)), ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprDatetimeValue("2021-01-01 00:00:00"), "count", 3)), ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprDatetimeValue("2021-01-01 00:05:00"), "count", 1)) )); - plan = new AggregationOperator(new DateTimeTestScan(), + plan = new AggregationOperator(testScan(datetimeInputs), Collections.singletonList(DSL .named("count", dsl.count(DSL.ref("hour", TIME)))), Collections.singletonList(DSL .named("span", DSL.span(DSL.ref("hour", TIME), DSL.literal(30), "m")))); result = execute(plan); - assertEquals(5, result.size()); + assertEquals(4, result.size()); assertThat(result, containsInRelativeOrder( ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprTimeValue("17:00:00"), "count", 2)), - ExprValueUtils.tupleValue(ImmutableMap - .of("span", new ExprTimeValue("17:30:00"), "count", 0)), ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprTimeValue("18:00:00"), "count", 1)), ExprValueUtils.tupleValue(ImmutableMap @@ -162,7 +169,7 @@ public void minute_span() { @Test public void hour_span() { - PhysicalPlan plan = new AggregationOperator(new DateTimeTestScan(), + PhysicalPlan plan = new AggregationOperator(testScan(datetimeInputs), Collections.singletonList(DSL .named("count", dsl.count(DSL.ref("hour", TIME)))), Collections.singletonList(DSL @@ -181,7 +188,7 @@ public void hour_span() { @Test public void day_span() { - PhysicalPlan plan = new AggregationOperator(new DateTestScan(), + PhysicalPlan plan = new AggregationOperator(testScan(dateInputs), Collections.singletonList(DSL .named("count(day)", dsl.count(DSL.ref("day", DATE)))), Collections.singletonList(DSL @@ -199,18 +206,16 @@ public void day_span() { .of("span", new ExprDateValue("2021-01-04"), "count(day)", 1)) )); - plan = new AggregationOperator(new DateTestScan(), + plan = new AggregationOperator(testScan(dateInputs), Collections.singletonList(DSL .named("count", dsl.count(DSL.ref("month", DATE)))), Collections.singletonList(DSL .named("span", DSL.span(DSL.ref("month", DATE), DSL.literal(30), "d")))); result = execute(plan); - assertEquals(4, result.size()); + assertEquals(3, result.size()); assertThat(result, containsInRelativeOrder( ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprDateValue("2020-12-04"), "count", 1)), - ExprValueUtils.tupleValue(ImmutableMap - .of("span", new ExprDateValue("2021-01-03"), "count", 0)), ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprDateValue("2021-02-02"), "count", 3)), ExprValueUtils.tupleValue(ImmutableMap @@ -220,18 +225,16 @@ public void day_span() { @Test public void week_span() { - PhysicalPlan plan = new AggregationOperator(new DateTestScan(), + PhysicalPlan plan = new AggregationOperator(testScan(dateInputs), Collections.singletonList(DSL .named("count", dsl.count(DSL.ref("month", DATE)))), Collections.singletonList(DSL .named("span", DSL.span(DSL.ref("month", DATE), DSL.literal(5), "w")))); List result = execute(plan); - assertEquals(4, result.size()); + assertEquals(3, result.size()); assertThat(result, containsInRelativeOrder( ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprDateValue("2020-11-16"), "count", 1)), - ExprValueUtils.tupleValue(ImmutableMap - .of("span", new ExprDateValue("2020-12-21"), "count", 0)), ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprDateValue("2021-01-25"), "count", 3)), ExprValueUtils.tupleValue(ImmutableMap @@ -241,31 +244,29 @@ public void week_span() { @Test public void month_span() { - PhysicalPlan plan = new AggregationOperator(new DateTestScan(), + PhysicalPlan plan = new AggregationOperator(testScan(dateInputs), Collections.singletonList(DSL .named("count", dsl.count(DSL.ref("month", DATE)))), Collections.singletonList(DSL .named("span", DSL.span(DSL.ref("month", DATE), DSL.literal(1), "M")))); List result = execute(plan); - assertEquals(4, result.size()); + assertEquals(3, result.size()); assertThat(result, containsInRelativeOrder( ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprDateValue("2020-12-01"), "count", 1)), - ExprValueUtils.tupleValue(ImmutableMap - .of("span", new ExprDateValue("2021-01-01"), "count", 0)), ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprDateValue("2021-02-01"), "count", 3)), ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprDateValue("2021-03-01"), "count", 1)) )); - plan = new AggregationOperator(new DateTestScan(), + plan = new AggregationOperator(testScan(dateInputs), Collections.singletonList(DSL .named("count", dsl.count(DSL.ref("quarter", DATETIME)))), Collections.singletonList(DSL .named("span", DSL.span(DSL.ref("quarter", DATETIME), DSL.literal(2), "M")))); result = execute(plan); - assertEquals(5, result.size()); + assertEquals(4, result.size()); assertThat(result, containsInRelativeOrder( ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprDatetimeValue("2020-09-01 00:00:00"), "count", 1)), @@ -273,24 +274,20 @@ public void month_span() { .of("span", new ExprDatetimeValue("2020-11-01 00:00:00"), "count", 1)), ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprDatetimeValue("2021-01-01 00:00:00"), "count", 1)), - ExprValueUtils.tupleValue(ImmutableMap - .of("span", new ExprDatetimeValue("2021-03-01 00:00:00"), "count", 0)), ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprDatetimeValue("2021-05-01 00:00:00"), "count", 2)) )); - plan = new AggregationOperator(new DateTestScan(), + plan = new AggregationOperator(testScan(dateInputs), Collections.singletonList(DSL .named("count", dsl.count(DSL.ref("year", TIMESTAMP)))), Collections.singletonList(DSL .named("span", DSL.span(DSL.ref("year", TIMESTAMP), DSL.literal(10 * 12), "M")))); result = execute(plan); - assertEquals(4, result.size()); + assertEquals(3, result.size()); assertThat(result, containsInRelativeOrder( ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprTimestampValue("1990-01-01 00:00:00"), "count", 1)), - ExprValueUtils.tupleValue(ImmutableMap - .of("span", new ExprTimestampValue("2000-01-01 00:00:00"), "count", 0)), ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprTimestampValue("2010-01-01 00:00:00"), "count", 3)), ExprValueUtils.tupleValue(ImmutableMap @@ -301,7 +298,7 @@ public void month_span() { @Test public void quarter_span() { - PhysicalPlan plan = new AggregationOperator(new DateTestScan(), + PhysicalPlan plan = new AggregationOperator(testScan(dateInputs), Collections.singletonList(DSL .named("count", dsl.count(DSL.ref("quarter", DATETIME)))), Collections.singletonList(DSL @@ -315,18 +312,16 @@ public void quarter_span() { .of("span", new ExprDatetimeValue("2021-01-01 00:00:00"), "count", 3)) )); - plan = new AggregationOperator(new DateTestScan(), + plan = new AggregationOperator(testScan(dateInputs), Collections.singletonList(DSL .named("count", dsl.count(DSL.ref("year", TIMESTAMP)))), Collections.singletonList(DSL .named("span", DSL.span(DSL.ref("year", TIMESTAMP), DSL.literal(10 * 4), "q")))); result = execute(plan); - assertEquals(4, result.size()); + assertEquals(3, result.size()); assertThat(result, containsInRelativeOrder( ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprTimestampValue("1990-01-01 00:00:00"), "count", 1)), - ExprValueUtils.tupleValue(ImmutableMap - .of("span", new ExprTimestampValue("2000-01-01 00:00:00"), "count", 0)), ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprTimestampValue("2010-01-01 00:00:00"), "count", 3)), ExprValueUtils.tupleValue(ImmutableMap @@ -336,18 +331,16 @@ public void quarter_span() { @Test public void year_span() { - PhysicalPlan plan = new AggregationOperator(new DateTestScan(), + PhysicalPlan plan = new AggregationOperator(testScan(dateInputs), Collections.singletonList(DSL .named("count", dsl.count(DSL.ref("year", TIMESTAMP)))), Collections.singletonList(DSL .named("span", DSL.span(DSL.ref("year", TIMESTAMP), DSL.literal(10), "y")))); List result = execute(plan); - assertEquals(4, result.size()); + assertEquals(3, result.size()); assertThat(result, containsInRelativeOrder( ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprTimestampValue("1990-01-01 00:00:00"), "count", 1)), - ExprValueUtils.tupleValue(ImmutableMap - .of("span", new ExprTimestampValue("2000-01-01 00:00:00"), "count", 0)), ExprValueUtils.tupleValue(ImmutableMap .of("span", new ExprTimestampValue("2010-01-01 00:00:00"), "count", 3)), ExprValueUtils.tupleValue(ImmutableMap @@ -357,114 +350,149 @@ public void year_span() { @Test public void integer_field() { - PhysicalPlan plan = new AggregationOperator(new NumericTestScan(), + PhysicalPlan plan = new AggregationOperator(testScan(numericInputs), Collections.singletonList(DSL.named("count", dsl.count(DSL.ref("integer", INTEGER)))), Collections.singletonList(DSL.named("span", DSL.span(DSL.ref("integer", INTEGER), DSL .literal(1), "")))); List result = execute(plan); - assertEquals(5, result.size()); + assertEquals(3, result.size()); assertThat(result, containsInRelativeOrder( ExprValueUtils.tupleValue(ImmutableMap.of("span", 1, "count", 1)), ExprValueUtils.tupleValue(ImmutableMap.of("span", 2, "count", 1)), - ExprValueUtils.tupleValue(ImmutableMap.of("span", 3, "count", 0)), - ExprValueUtils.tupleValue(ImmutableMap.of("span", 4, "count", 0)), ExprValueUtils.tupleValue(ImmutableMap.of("span", 5, "count", 1)))); - plan = new AggregationOperator(new NumericTestScan(), + plan = new AggregationOperator(testScan(numericInputs), Collections.singletonList(DSL.named("count", dsl.count(DSL.ref("integer", INTEGER)))), Collections.singletonList(DSL.named("span", DSL.span(DSL.ref("integer", INTEGER), DSL .literal(1.5), "")))); result = execute(plan); - assertEquals(4, result.size()); + assertEquals(3, result.size()); assertThat(result, containsInRelativeOrder( ExprValueUtils.tupleValue(ImmutableMap.of("span", 0D, "count", 1)), ExprValueUtils.tupleValue(ImmutableMap.of("span", 1.5D, "count", 1)), - ExprValueUtils.tupleValue(ImmutableMap.of("span", 3.0D, "count", 0)), ExprValueUtils.tupleValue(ImmutableMap.of("span", 4.5D, "count", 1)))); } @Test public void long_field() { - PhysicalPlan plan = new AggregationOperator(new NumericTestScan(), + PhysicalPlan plan = new AggregationOperator(testScan(numericInputs), Collections.singletonList(DSL.named("count", dsl.count(DSL.ref("long", LONG)))), Collections.singletonList(DSL.named("span", DSL.span(DSL.ref("long", LONG), DSL .literal(1), "")))); List result = execute(plan); - assertEquals(5, result.size()); + assertEquals(3, result.size()); assertThat(result, containsInRelativeOrder( ExprValueUtils.tupleValue(ImmutableMap.of("span", 1L, "count", 1)), ExprValueUtils.tupleValue(ImmutableMap.of("span", 2L, "count", 1)), - ExprValueUtils.tupleValue(ImmutableMap.of("span", 3L, "count", 0)), - ExprValueUtils.tupleValue(ImmutableMap.of("span", 4L, "count", 0)), ExprValueUtils.tupleValue(ImmutableMap.of("span", 5L, "count", 1)))); - plan = new AggregationOperator(new NumericTestScan(), + plan = new AggregationOperator(testScan(numericInputs), Collections.singletonList(DSL.named("count", dsl.count(DSL.ref("long", LONG)))), Collections.singletonList(DSL.named("span", DSL.span(DSL.ref("long", LONG), DSL .literal(1.5), "")))); result = execute(plan); - assertEquals(4, result.size()); + assertEquals(3, result.size()); assertThat(result, containsInRelativeOrder( ExprValueUtils.tupleValue(ImmutableMap.of("span", 0D, "count", 1)), ExprValueUtils.tupleValue(ImmutableMap.of("span", 1.5D, "count", 1)), - ExprValueUtils.tupleValue(ImmutableMap.of("span", 3.0D, "count", 0)), ExprValueUtils.tupleValue(ImmutableMap.of("span", 4.5D, "count", 1)))); } @Test public void float_field() { - PhysicalPlan plan = new AggregationOperator(new NumericTestScan(), + PhysicalPlan plan = new AggregationOperator(testScan(numericInputs), Collections.singletonList(DSL.named("count", dsl.count(DSL.ref("float", FLOAT)))), Collections.singletonList(DSL.named("span", DSL.span(DSL.ref("float", FLOAT), DSL .literal(1), "")))); List result = execute(plan); - assertEquals(5, result.size()); + assertEquals(3, result.size()); assertThat(result, containsInRelativeOrder( ExprValueUtils.tupleValue(ImmutableMap.of("span", 1F, "count", 1)), ExprValueUtils.tupleValue(ImmutableMap.of("span", 2F, "count", 1)), - ExprValueUtils.tupleValue(ImmutableMap.of("span", 3F, "count", 0)), - ExprValueUtils.tupleValue(ImmutableMap.of("span", 4F, "count", 0)), ExprValueUtils.tupleValue(ImmutableMap.of("span", 5F, "count", 1)))); - plan = new AggregationOperator(new NumericTestScan(), + plan = new AggregationOperator(testScan(numericInputs), Collections.singletonList(DSL.named("count", dsl.count(DSL.ref("float", FLOAT)))), Collections.singletonList(DSL.named("span", DSL.span(DSL.ref("float", FLOAT), DSL .literal(1.5), "")))); result = execute(plan); - assertEquals(4, result.size()); + assertEquals(3, result.size()); assertThat(result, containsInRelativeOrder( ExprValueUtils.tupleValue(ImmutableMap.of("span", 0D, "count", 1)), ExprValueUtils.tupleValue(ImmutableMap.of("span", 1.5D, "count", 1)), - ExprValueUtils.tupleValue(ImmutableMap.of("span", 3.0D, "count", 0)), ExprValueUtils.tupleValue(ImmutableMap.of("span", 4.5D, "count", 1)))); } @Test public void double_field() { - PhysicalPlan plan = new AggregationOperator(new NumericTestScan(), + PhysicalPlan plan = new AggregationOperator(testScan(numericInputs), Collections.singletonList(DSL.named("count", dsl.count(DSL.ref("double", DOUBLE)))), Collections.singletonList(DSL.named("span", DSL.span(DSL.ref("double", DOUBLE), DSL .literal(1), "")))); List result = execute(plan); - assertEquals(5, result.size()); + assertEquals(3, result.size()); assertThat(result, containsInRelativeOrder( ExprValueUtils.tupleValue(ImmutableMap.of("span", 1D, "count", 1)), ExprValueUtils.tupleValue(ImmutableMap.of("span", 2D, "count", 1)), - ExprValueUtils.tupleValue(ImmutableMap.of("span", 3D, "count", 0)), - ExprValueUtils.tupleValue(ImmutableMap.of("span", 4D, "count", 0)), ExprValueUtils.tupleValue(ImmutableMap.of("span", 5D, "count", 1)))); - plan = new AggregationOperator(new NumericTestScan(), + plan = new AggregationOperator(testScan(numericInputs), Collections.singletonList(DSL.named("count", dsl.count(DSL.ref("double", DOUBLE)))), Collections.singletonList(DSL.named("span", DSL.span(DSL.ref("double", DOUBLE), DSL .literal(1.5), "")))); result = execute(plan); - assertEquals(4, result.size()); + assertEquals(3, result.size()); assertThat(result, containsInRelativeOrder( ExprValueUtils.tupleValue(ImmutableMap.of("span", 0D, "count", 1)), ExprValueUtils.tupleValue(ImmutableMap.of("span", 1.5D, "count", 1)), - ExprValueUtils.tupleValue(ImmutableMap.of("span", 3.0D, "count", 0)), ExprValueUtils.tupleValue(ImmutableMap.of("span", 4.5D, "count", 1)))); } + @Test + public void twoBucketsSpanAndLong() { + PhysicalPlan plan = new AggregationOperator(testScan(compoundInputs), + Collections.singletonList(DSL.named("max", dsl.max(DSL.ref("errors", INTEGER)))), + Arrays.asList( + DSL.named("span", DSL.span(DSL.ref("day", DATE), DSL.literal(1), "d")), + DSL.named("region", DSL.ref("region", STRING))) + ); + List result = execute(plan); + assertEquals(4, result.size()); + assertThat(result, containsInRelativeOrder( + ExprValueUtils.tupleValue(ImmutableMap.of( + "span", new ExprDateValue("2021-01-03"), "region","iad", "max", 3)), + ExprValueUtils.tupleValue(ImmutableMap.of( + "span", new ExprDateValue("2021-01-04"), "region","iad", "max", 10)), + ExprValueUtils.tupleValue(ImmutableMap.of( + "span", new ExprDateValue("2021-01-06"), "region","iad", "max", 1)), + ExprValueUtils.tupleValue(ImmutableMap.of( + "span", new ExprDateValue("2021-01-07"), "region","iad", "max", 8)) + )); + + plan = new AggregationOperator(testScan(compoundInputs), + Collections.singletonList(DSL.named("max", dsl.max(DSL.ref("errors", INTEGER)))), + Arrays.asList( + DSL.named("span", DSL.span(DSL.ref("day", DATE), DSL.literal(1), "d")), + DSL.named("region", DSL.ref("region", STRING)), + DSL.named("host", DSL.ref("host", STRING))) + ); + result = execute(plan); + assertEquals(7, result.size()); + assertThat(result, containsInRelativeOrder( + ExprValueUtils.tupleValue(ImmutableMap.of( + "span", new ExprDateValue("2021-01-03"), "region","iad", "host", "h1", "max", 2)), + ExprValueUtils.tupleValue(ImmutableMap.of( + "span", new ExprDateValue("2021-01-03"), "region","iad", "host", "h2", "max", 3)), + ExprValueUtils.tupleValue(ImmutableMap.of( + "span", new ExprDateValue("2021-01-04"), "region","iad", "host", "h1", "max", 1)), + ExprValueUtils.tupleValue(ImmutableMap.of( + "span", new ExprDateValue("2021-01-04"), "region","iad", "host", "h2", "max", 10)), + ExprValueUtils.tupleValue(ImmutableMap.of( + "span", new ExprDateValue("2021-01-06"), "region","iad", "host", "h1", "max", 1)), + ExprValueUtils.tupleValue(ImmutableMap.of( + "span", new ExprDateValue("2021-01-07"), "region","iad", "host", "h1", "max", 6)), + ExprValueUtils.tupleValue(ImmutableMap.of( + "span", new ExprDateValue("2021-01-07"), "region","iad", "host", "h2", "max", 8)) + )); + } } diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/PhysicalPlanTestBase.java b/core/src/test/java/org/opensearch/sql/planner/physical/PhysicalPlanTestBase.java index 4f70e2b923..73af929fcd 100644 --- a/core/src/test/java/org/opensearch/sql/planner/physical/PhysicalPlanTestBase.java +++ b/core/src/test/java/org/opensearch/sql/planner/physical/PhysicalPlanTestBase.java @@ -157,6 +157,44 @@ public class PhysicalPlanTestBase { "double", 5D))) .build(); + protected static final List compoundInputs = new ImmutableList.Builder() + .add(ExprValueUtils.tupleValue(ImmutableMap.of( + "day", new ExprDateValue("2021-01-03"), + "region", "iad", + "host", "h1", + "errors", 2))) + .add(ExprValueUtils.tupleValue(ImmutableMap.of( + "day", new ExprDateValue("2021-01-03"), + "region", "iad", + "host", "h2", + "errors", 3))) + .add(ExprValueUtils.tupleValue(ImmutableMap.of( + "day", new ExprDateValue("2021-01-04"), + "region", "iad", + "host", "h1", + "errors", 1))) + .add(ExprValueUtils.tupleValue(ImmutableMap.of( + "day", new ExprDateValue("2021-01-04"), + "region", "iad", + "host", "h2", + "errors", 10))) + .add(ExprValueUtils.tupleValue(ImmutableMap.of( + "day", new ExprDateValue("2021-01-06"), + "region", "iad", + "host", "h1", + "errors", 1))) + .add(ExprValueUtils.tupleValue(ImmutableMap.of( + "day", new ExprDateValue("2021-01-07"), + "region", "iad", + "host", "h1", + "errors", 6))) + .add(ExprValueUtils.tupleValue(ImmutableMap.of( + "day", new ExprDateValue("2021-01-07"), + "region", "iad", + "host", "h2", + "errors", 8))) + .build(); + @Bean protected Environment typeEnv() { return var -> { @@ -180,6 +218,10 @@ protected List execute(PhysicalPlan plan) { return builder.build(); } + protected static PhysicalPlan testScan(List inputs) { + return new TestScan(inputs); + } + protected static class TestScan extends PhysicalPlan { private final Iterator iterator; @@ -187,116 +229,8 @@ public TestScan() { iterator = inputs.iterator(); } - @Override - public R accept(PhysicalPlanNodeVisitor visitor, C context) { - return null; - } - - @Override - public List getChild() { - return ImmutableList.of(); - } - - @Override - public boolean hasNext() { - return iterator.hasNext(); - } - - @Override - public ExprValue next() { - return iterator.next(); - } - } - - protected static class CountTestScan extends PhysicalPlan { - private final Iterator iterator; - - public CountTestScan() { - iterator = countTestInputs.iterator(); - } - - @Override - public R accept(PhysicalPlanNodeVisitor visitor, C context) { - return null; - } - - @Override - public List getChild() { - return ImmutableList.of(); - } - - @Override - public boolean hasNext() { - return iterator.hasNext(); - } - - @Override - public ExprValue next() { - return iterator.next(); - } - } - - protected static class DateTimeTestScan extends PhysicalPlan { - private final Iterator iterator; - - public DateTimeTestScan() { - iterator = datetimeInputs.iterator(); - } - - @Override - public R accept(PhysicalPlanNodeVisitor visitor, C context) { - return null; - } - - @Override - public List getChild() { - return ImmutableList.of(); - } - - @Override - public boolean hasNext() { - return iterator.hasNext(); - } - - @Override - public ExprValue next() { - return iterator.next(); - } - } - - protected static class DateTestScan extends PhysicalPlan { - private final Iterator iterator; - - public DateTestScan() { - iterator = dateInputs.iterator(); - } - - @Override - public R accept(PhysicalPlanNodeVisitor visitor, C context) { - return null; - } - - @Override - public List getChild() { - return ImmutableList.of(); - } - - @Override - public boolean hasNext() { - return iterator.hasNext(); - } - - @Override - public ExprValue next() { - return iterator.next(); - } - } - - protected static class NumericTestScan extends PhysicalPlan { - private final Iterator iterator; - - public NumericTestScan() { - iterator = numericInputs.iterator(); + public TestScan(List inputs) { + iterator = inputs.iterator(); } @Override diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/bucket/RoundingTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/collector/RoundingTest.java similarity index 92% rename from core/src/test/java/org/opensearch/sql/planner/physical/bucket/RoundingTest.java rename to core/src/test/java/org/opensearch/sql/planner/physical/collector/RoundingTest.java index 5a66f358b3..41b3ea5d6b 100644 --- a/core/src/test/java/org/opensearch/sql/planner/physical/bucket/RoundingTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/physical/collector/RoundingTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.sql.planner.physical.bucket; +package org.opensearch.sql.planner.physical.collector; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -36,7 +36,6 @@ void round_unknown_type() { assertNull(rounding.round(ExprValueUtils.integerValue(1))); assertNull(rounding.locate(ExprValueUtils.integerValue(1))); assertEquals(0, rounding.createBuckets().length); - assertEquals(0, rounding.fillBuckets(new ExprValue[0], ImmutableMap.of(), "span").length); } @Test diff --git a/docs/user/ppl/cmd/stats.rst b/docs/user/ppl/cmd/stats.rst index 4f88bb75d6..5312cf29fc 100644 --- a/docs/user/ppl/cmd/stats.rst +++ b/docs/user/ppl/cmd/stats.rst @@ -32,12 +32,44 @@ The following table catalogs the aggregation functions and also indicates how th Syntax ============ -stats ... [by-clause]... +stats ... [by-clause] * aggregation: mandatory. A aggregation function. The argument of aggregation must be field. -* by-clause: optional. The one or more fields to group the results by. **Default**: If no is specified, the stats command returns only one row, which is the aggregation over the entire result set. +* by-clause: optional. + + * Syntax: by [field]... [span-expression]. + * Description: The by clause could be the fields and expressions like scalar functions and aggregation functions. Besides, the span clause can be used to split specific field into buckets in the same interval, the stats then does the aggregation by these span buckets. + * Default: If no is specified, the stats command returns only one row, which is the aggregation over the entire result set. + +* span-expression: optional. + + * Syntax: span(field_expr, interval_expr) + * Description: The unit of the interval expression is the natural unit by default. If the field is a date and time type field, and the interval is in date/time units, you will need to specify the unit in the interval expression. For example, to split the field ``age`` into buckets by 10 years, it looks like ``span(age, 10)``. And here is another example of time span, the span to split a ``timestamp`` field into hourly intervals, it looks like ``span(timestamp, 1h)``. + +* Available time unit: ++----------------------------+ +| Span Interval Units | ++============================+ +| millisecond (ms) | ++----------------------------+ +| second (s) | ++----------------------------+ +| minute (m, case sensitive) | ++----------------------------+ +| hour (h) | ++----------------------------+ +| day (d) | ++----------------------------+ +| week (w) | ++----------------------------+ +| month (M, case sensitive) | ++----------------------------+ +| quarter (q) | ++----------------------------+ +| year (y) | ++----------------------------+ Aggregation Functions ===================== @@ -206,39 +238,6 @@ Example:: | 2.8613807855648994 | +--------------------+ - -By Clause -========= - -The by clause could be the fields and expressions like scalar functions and aggregation functions. Besides, the span clause can also be used in the by clause to split specific field into buckets in the same interval, the stats then does the aggregation by these span buckets. - -The span syntax is ``span(field_expr, interval_expr)``, the unit of the interval expression is the natural unit by default. If the field is a date and time type field, and the interval is in date/time units, you will need to specify the unit in the interval expression. For example, to split the field ``age`` into buckets by 10 years, it looks like ``span(age, 10)``. And here is another example of time span, the span to split a ``timestamp`` field into hourly intervals, it looks like ``span(timestamp, 1h)``. - -Available time unit: - -+----------------------------+ -| Span Interval Units | -+============================+ -| millisecond (ms) | -+----------------------------+ -| second (s) | -+----------------------------+ -| minute (m, case sensitive) | -+----------------------------+ -| hour (h) | -+----------------------------+ -| day (d) | -+----------------------------+ -| week (w) | -+----------------------------+ -| month (M, case sensitive) | -+----------------------------+ -| quarter (q) | -+----------------------------+ -| year (y) | -+----------------------------+ - - Example 1: Calculate the count of events ======================================== @@ -366,3 +365,19 @@ PPL query:: | 3 | 30 | +--------------+------------+ +Example 9: Calculate the count by a gender and span +=================================================== + +The example gets the count of age by the interval of 10 years and group by gender. + +PPL query:: + + os> source=accounts | stats count() as cnt by gender span(age, 5) as age_span + fetched rows / total rows = 3/3 + +-------+------------+----------+ + | cnt | age_span | gender | + |-------+------------+----------| + | 1 | 25 | F | + | 2 | 30 | M | + | 1 | 35 | M | + +-------+------------+----------+ diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index ce76cf9348..cb839eb9ec 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -46,7 +46,7 @@ statsCommand (ALLNUM EQUAL allnum=booleanLiteral)? (DELIM EQUAL delim=stringLiteral)? statsAggTerm (COMMA statsAggTerm)* - (byClause | bySpanClause)? + (statsByClause)? (DEDUP_SPLITVALUES EQUAL dedupsplit=booleanLiteral)? ; @@ -98,8 +98,14 @@ byClause : BY fieldList ; +statsByClause + : BY fieldList + | BY bySpanClause + | BY fieldList bySpanClause + ; + bySpanClause - : BY spanClause (AS alias=qualifiedName)? + : spanClause (AS alias=qualifiedName)? ; spanClause diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 04dc82829e..918c8836a6 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import org.antlr.v4.runtime.ParserRuleContext; @@ -146,21 +147,27 @@ public UnresolvedPlan visitStatsCommand(StatsCommandContext ctx) { aggListBuilder.add(alias); } - List groupList = ctx.byClause() != null - ? ctx.byClause() - .fieldList() - .fieldExpression() - .stream() - .map(groupCtx -> new Alias(getTextInQuery(groupCtx), visitExpression(groupCtx))).collect( - Collectors.toList()) - : ctx.bySpanClause() != null - ? Collections.singletonList(visitExpression(ctx.bySpanClause())) - : Collections.emptyList(); + List groupList = + Optional.ofNullable(ctx.statsByClause()) + .map(OpenSearchPPLParser.StatsByClauseContext::fieldList) + .map(expr -> expr.fieldExpression().stream() + .map(groupCtx -> + (UnresolvedExpression) new Alias(getTextInQuery(groupCtx), + visitExpression(groupCtx))) + .collect(Collectors.toList())) + .orElse(Collections.emptyList()); + + UnresolvedExpression span = + Optional.ofNullable(ctx.statsByClause()) + .map(OpenSearchPPLParser.StatsByClauseContext::bySpanClause) + .map(this::visitExpression) + .orElse(null); Aggregation aggregation = new Aggregation( aggListBuilder.build(), Collections.emptyList(), groupList, + span, ArgumentFactory.getArgumentList(ctx) ); return aggregation; diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java index b874862c65..d5738bbed4 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java @@ -277,9 +277,8 @@ public void testStatsCommandWithSpan() { alias("avg(price)", aggregate("avg", field("price"))) ), emptyList(), - exprList( - alias("span(timestamp,1h)", span(field("timestamp"), intLiteral(1), SpanUnit.H)) - ), + emptyList(), + alias("span(timestamp,1h)", span(field("timestamp"), intLiteral(1), SpanUnit.H)), defaultStatsArgs() )); @@ -290,11 +289,44 @@ public void testStatsCommandWithSpan() { alias("count(a)", aggregate("count", field("a"))) ), emptyList(), + emptyList(), + alias("span(age,10)", span(field("age"), intLiteral(10), SpanUnit.NONE)), + defaultStatsArgs() + )); + + assertEqual("source=t | stats avg(price) by b span(timestamp, 1h)", + agg( + relation("t"), exprList( - alias("span(age,10)", span(field("age"), intLiteral(10), SpanUnit.NONE)) + alias("avg(price)", aggregate("avg", field("price"))) ), + emptyList(), + exprList(alias("b", field("b"))), + alias("span(timestamp,1h)", span(field("timestamp"), intLiteral(1), SpanUnit.H)), defaultStatsArgs() )); + + assertEqual("source=t | stats avg(price) by f1, f2 span(timestamp, 1h)", + agg( + relation("t"), + exprList( + alias("avg(price)", aggregate("avg", field("price"))) + ), + emptyList(), + exprList(alias("f1", field("f1")), alias("f2", field("f2"))), + alias("span(timestamp,1h)", span(field("timestamp"), intLiteral(1), SpanUnit.H)), + defaultStatsArgs() + )); + } + + @Test(expected = org.opensearch.sql.common.antlr.SyntaxCheckException.class) + public void throwExceptionIfSpanInGroupByList() { + plan("source=t | stats avg(price) by f1, f2, span(timestamp, 1h)"); + } + + @Test(expected = org.opensearch.sql.common.antlr.SyntaxCheckException.class) + public void throwExceptionWithEmptyGroupByList() { + plan("source=t | stats avg(price) by)"); } @Test @@ -306,10 +338,9 @@ public void testStatsSpanWithAlias() { alias("avg(price)", aggregate("avg", field("price"))) ), emptyList(), - exprList( - alias("span(timestamp,1h)", span( - field("timestamp"), intLiteral(1), SpanUnit.H), "time_span") - ), + emptyList(), + alias("span(timestamp,1h)", span( + field("timestamp"), intLiteral(1), SpanUnit.H), "time_span"), defaultStatsArgs() )); @@ -320,9 +351,9 @@ public void testStatsSpanWithAlias() { alias("count(a)", aggregate("count", field("a"))) ), emptyList(), - exprList(alias("span(age,10)", span( - field("age"), intLiteral(10), SpanUnit.NONE), "numeric_span") - ), + emptyList(), + alias("span(age,10)", span( + field("age"), intLiteral(10), SpanUnit.NONE), "numeric_span"), defaultStatsArgs() )); }