Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,19 @@ public LogicalPlan visitAggregation(Aggregation node, AnalysisContext context) {
aggregatorBuilder
.add(new NamedAggregator(aggExpr.getNameOrAlias(), (Aggregator) aggExpr.getDelegated()));
}
ImmutableList<NamedAggregator> aggregators = aggregatorBuilder.build();

ImmutableList.Builder<NamedExpression> groupbyBuilder = new ImmutableList.Builder<>();
// Span should be first expression if exist.
if (node.getSpan() != null) {
groupbyBuilder.add(namedExpressionAnalyzer.analyze(node.getSpan(), context));
}

for (UnresolvedExpression expr : node.getGroupExprList()) {
groupbyBuilder.add(namedExpressionAnalyzer.analyze(expr, context));
}
ImmutableList<NamedExpression> groupBys = groupbyBuilder.build();

ImmutableList<NamedAggregator> aggregators = aggregatorBuilder.build();
// new context
context.push();
TypeEnvironment newEnv = context.peek();
Expand Down
12 changes: 11 additions & 1 deletion core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,17 @@ public static UnresolvedPlan agg(
List<UnresolvedExpression> sortList,
List<UnresolvedExpression> groupList,
List<Argument> argList) {
return new Aggregation(aggList, sortList, groupList, argList).attach(input);
return new Aggregation(aggList, sortList, groupList, null, argList).attach(input);
}

public static UnresolvedPlan agg(
UnresolvedPlan input,
List<UnresolvedExpression> aggList,
List<UnresolvedExpression> sortList,
List<UnresolvedExpression> groupList,
UnresolvedExpression span,
List<Argument> argList) {
return new Aggregation(aggList, sortList, groupList, span, argList).attach(input);
}

public static UnresolvedPlan rename(UnresolvedPlan input, Map... maps) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,17 @@ public class Aggregation extends UnresolvedPlan {
private List<UnresolvedExpression> aggExprList;
private List<UnresolvedExpression> sortExprList;
private List<UnresolvedExpression> groupExprList;
private UnresolvedExpression span;
private List<Argument> argExprList;
private UnresolvedPlan child;

/**
* Aggregation Constructor without argument.
* Aggregation Constructor without span and argument.
*/
public Aggregation(List<UnresolvedExpression> aggExprList,
List<UnresolvedExpression> sortExprList,
List<UnresolvedExpression> groupExprList) {
this(aggExprList, sortExprList, groupExprList, Collections.emptyList());
this(aggExprList, sortExprList, groupExprList, null, Collections.emptyList());
}

/**
Expand All @@ -46,10 +47,12 @@ public Aggregation(List<UnresolvedExpression> aggExprList,
public Aggregation(List<UnresolvedExpression> aggExprList,
List<UnresolvedExpression> sortExprList,
List<UnresolvedExpression> groupExprList,
UnresolvedExpression span,
List<Argument> argExprList) {
this.aggExprList = aggExprList;
this.sortExprList = sortExprList;
this.groupExprList = groupExprList;
this.span = span;
this.argExprList = argExprList;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
import org.opensearch.sql.expression.aggregation.Aggregator;
import org.opensearch.sql.expression.aggregation.NamedAggregator;
import org.opensearch.sql.expression.span.SpanExpression;
import org.opensearch.sql.planner.physical.bucket.Group;
import org.opensearch.sql.planner.physical.bucket.SpanBucket;
import org.opensearch.sql.planner.physical.collector.Collector;
import org.opensearch.sql.storage.bindingtuple.BindingTuple;

/**
Expand All @@ -35,8 +34,13 @@ public class AggregationOperator extends PhysicalPlan {
private final List<NamedAggregator> aggregatorList;
@Getter
private final List<NamedExpression> groupByExprList;
@Getter
private final NamedExpression span;
/**
* {@link BindingTuple} Collector.
*/
@EqualsAndHashCode.Exclude
private final Group group;
private final Collector collector;
@EqualsAndHashCode.Exclude
private Iterator<ExprValue> iterator;

Expand All @@ -51,9 +55,14 @@ public AggregationOperator(PhysicalPlan input, List<NamedAggregator> aggregatorL
List<NamedExpression> groupByExprList) {
this.input = input;
this.aggregatorList = aggregatorList;
this.groupByExprList = groupByExprList;
this.group = groupBySpan(groupByExprList) ? new SpanBucket(aggregatorList, groupByExprList)
: new Group(aggregatorList, groupByExprList);
if (hasSpan(groupByExprList)) {
this.span = groupByExprList.get(0);
this.groupByExprList = groupByExprList.subList(1, groupByExprList.size());
} else {
this.span = null;
this.groupByExprList = groupByExprList;
}
this.collector = Collector.Builder.build(this.span, this.groupByExprList, this.aggregatorList);
}

@Override
Expand Down Expand Up @@ -81,14 +90,13 @@ public ExprValue next() {
public void open() {
super.open();
while (input.hasNext()) {
group.push(input.next());
collector.collect(input.next().bindingTuples());
}
iterator = group.result().iterator();
iterator = collector.results().iterator();
}

private boolean groupBySpan(List<NamedExpression> namedExpressionList) {
return namedExpressionList.size() == 1
private boolean hasSpan(List<NamedExpression> namedExpressionList) {
return !namedExpressionList.isEmpty()
&& namedExpressionList.get(0).getDelegated() instanceof SpanExpression;
}

}

This file was deleted.

This file was deleted.

Loading