Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private Settings defaultSettings() {
.put(Key.FIELD_TYPE_TOLERANCE, true)
.put(Key.CALCITE_ENGINE_ENABLED, true)
.put(Key.CALCITE_FALLBACK_ALLOWED, false)
.put(Key.CALCITE_PUSHDOWN_ENABLED, false)
.put(Key.CALCITE_PUSHDOWN_ENABLED, true)
.build();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,17 @@ public Void visitInputRef(RexInputRef inputRef) {
}
};
visitor.visitEach(project.getProjects());
// Only do push down when an actual projection happens
if (!selectedColumns.isEmpty() && selectedColumns.size() != scan.getRowType().getFieldCount()) {
Mapping mapping = Mappings.target(selectedColumns, scan.getRowType().getFieldCount());
CalciteOpenSearchIndexScan newScan = scan.pushDownProject(selectedColumns);
final List<RexNode> newProjectRexNodes = RexUtil.apply(mapping, project.getProjects());

Mapping mapping = Mappings.target(selectedColumns, scan.getRowType().getFieldCount());
CalciteOpenSearchIndexScan newScan = scan.pushDownProject(selectedColumns);
final List<RexNode> newProjectRexNodes = RexUtil.apply(mapping, project.getProjects());

if (RexUtil.isIdentity(newProjectRexNodes, newScan.getRowType())) {
call.transformTo(newScan);
} else {
call.transformTo(call.builder().push(newScan).project(newProjectRexNodes).build());
if (RexUtil.isIdentity(newProjectRexNodes, newScan.getRowType())) {
call.transformTo(newScan);
} else {
call.transformTo(call.builder().push(newScan).project(newProjectRexNodes).build());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public class OpenSearchSettings extends Settings {
public static final Setting<?> CALCITE_PUSHDOWN_ENABLED_SETTING =
Setting.boolSetting(
Key.CALCITE_PUSHDOWN_ENABLED.getKeyValue(),
false,
true,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
Expand Down Expand Up @@ -96,6 +97,12 @@ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
return new CalciteOpenSearchIndexScan(getCluster(), table, osIndex);
}

@Override
public RelWriter explainTerms(RelWriter pw) {
return super.explainTerms(pw)
.itemIf("PushDownContext", pushDownContext, !pushDownContext.isEmpty());
}

@Override
public void register(RelOptPlanner planner) {
super.register(planner);
Expand Down Expand Up @@ -147,7 +154,12 @@ public CalciteOpenSearchIndexScan pushDownFilter(Filter filter) {
CalciteOpenSearchIndexScan newScan = this.copyWithNewSchema(filter.getRowType());
List<String> schema = this.getRowType().getFieldNames();
QueryBuilder filterBuilder = PredicateAnalyzer.analyze(filter.getCondition(), schema);
newScan.pushDownContext.add(requestBuilder -> requestBuilder.pushDownFilter(filterBuilder));
newScan.pushDownContext.add(
PushDownAction.of(
PushDownType.FILTER,
filter.getCondition(),
requestBuilder -> requestBuilder.pushDownFilter(filterBuilder)));

// TODO: handle the case where condition contains a score function
return newScan;
} catch (Exception e) {
Expand All @@ -169,18 +181,47 @@ public CalciteOpenSearchIndexScan pushDownProject(List<Integer> selectedColumns)
RelDataType newSchema = builder.build();
CalciteOpenSearchIndexScan newScan = this.copyWithNewSchema(newSchema);
newScan.pushDownContext.add(
requestBuilder -> requestBuilder.pushDownProjectStream(newSchema.getFieldNames().stream()));
PushDownAction.of(
PushDownType.PROJECT,
newSchema.getFieldNames(),
requestBuilder ->
requestBuilder.pushDownProjectStream(newSchema.getFieldNames().stream())));
return newScan;
}

// TODO: should we consider equivalent among PushDownContexts with different push down sequence?
static class PushDownContext extends ArrayDeque<PushDownAction> {
@Override
public PushDownContext clone() {
return (PushDownContext) super.clone();
}
}

private interface PushDownAction {
private enum PushDownType {
FILTER,
PROJECT,
// AGGREGATION,
// SORT,
// LIMIT,
// HIGHLIGHT,
// NESTED
}

private record PushDownAction(PushDownType type, Object digest, AbstractAction action) {
static PushDownAction of(PushDownType type, Object digest, AbstractAction action) {
return new PushDownAction(type, digest, action);
}

public String toString() {
return type + ":" + digest;
}

void apply(OpenSearchRequestBuilder requestBuilder) {
action.apply(requestBuilder);
}
}

private interface AbstractAction {
void apply(OpenSearchRequestBuilder requestBuilder);
}
}
Loading