Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
setup:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled : true
- do:
bulk:
index: test
refresh: true
body:
- '{"index": {}}'
- '{"category":"A","has_flag":true,"value":10}'
- '{"index": {}}'
- '{"category":"B","has_flag":true,"value":20}'
- '{"index": {}}'
- '{"category":"C","has_flag":true,"value":30}'
- '{"index": {}}'
- '{"category":"D","has_flag":false,"value":40}'
- '{"index": {}}'
- '{"category":"E","has_flag":false,"value":50}'
- '{"index": {}}'
- '{"category":"F","has_flag":false,"value":60}'

---
teardown:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled : false

---
"Join with fields":
- skip:
features:
- headers
- allowed_warnings
- do:
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=test | stats COUNT() as cnt by category, has_flag | fields category, has_flag, cnt | join left=L right=R ON L.has_flag = R.has_flag [source=test | stats COUNT() as overall_cnt by has_flag]

- match: { total: 6 }
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ protected OpenSearchSortIndexScanRule(Config config) {
public void onMatch(RelOptRuleCall call) {
final Sort sort = call.rel(0);
final AbstractCalciteIndexScan scan = call.rel(1);
if (sort.getConvention() != scan.getConvention()) {
return;
}

var collations = sort.collation.getFieldCollations();
AbstractCalciteIndexScan newScan = scan.pushDownSort(collations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,8 @@ private static Pair<AggregationBuilder, MetricParser> createRegularAggregation(
case AVG -> Pair.of(
helper.build(args.getFirst(), AggregationBuilders.avg(aggFieldName)),
new SingleValueParser(aggFieldName));
// 1. Only case SUM, skip SUM0 / COUNT since calling avg() in DSL should be faster.
// 2. To align with databases, SUM0 is not preferred now.
case SUM -> Pair.of(
helper.build(args.getFirst(), AggregationBuilders.sum(aggFieldName)),
new SingleValueParser(aggFieldName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,20 +258,8 @@ public AbstractCalciteIndexScan pushDownSort(List<RelFieldCollation> collations)
// aggregators.
return null;
}

// Propagate the sort to the new scan
RelTraitSet traitsWithCollations = getTraitSet().plus(RelCollations.of(collations));
AbstractCalciteIndexScan newScan =
buildScan(
getCluster(),
traitsWithCollations,
hints,
table,
osIndex,
getRowType(),
// Existing collations are overridden (discarded) by the new collations,
pushDownContext.cloneWithoutSort());

PushDownContext pushDownContextWithoutSort = this.pushDownContext.cloneWithoutSort();
AbstractAction<?> action;
Object digest;
if (pushDownContext.isAggregatePushed()) {
Expand All @@ -281,7 +269,27 @@ public AbstractCalciteIndexScan pushDownSort(List<RelFieldCollation> collations)
aggAction ->
aggAction.pushDownSortIntoAggBucket(collations, getRowType().getFieldNames());
digest = collations;
pushDownContextWithoutSort.add(PushDownType.SORT, digest, action);
return buildScan(
getCluster(),
traitsWithCollations,
hints,
table,
osIndex,
getRowType(),
pushDownContextWithoutSort.clone());
} else {
// Propagate the sort to the new scan
AbstractCalciteIndexScan newScan =
buildScan(
getCluster(),
traitsWithCollations,
hints,
table,
osIndex,
getRowType(),
// Existing collations are overridden (discarded) by the new collations,
pushDownContextWithoutSort);
List<SortBuilder<?>> builders = new ArrayList<>();
for (RelFieldCollation collation : collations) {
int index = collation.getFieldIndex();
Expand Down Expand Up @@ -310,9 +318,9 @@ public AbstractCalciteIndexScan pushDownSort(List<RelFieldCollation> collations)
}
action = (OSRequestBuilderAction) requestBuilder -> requestBuilder.pushDownSort(builders);
digest = builders.toString();
newScan.pushDownContext.add(PushDownType.SORT, digest, action);
return newScan;
}
newScan.pushDownContext.add(PushDownType.SORT, digest, action);
return newScan;
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot pushdown the sort {}", getCollationNames(collations), e);
Expand Down
Loading