From 89df3cb4b3e99a7f08f131f8afbb8767b5594f7f Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Fri, 28 Feb 2025 15:38:39 +0800 Subject: [PATCH] Enable push down optimization by default Signed-off-by: Heng Qian --- .../standalone/CalcitePPLIntegTestCase.java | 2 +- .../OpenSearchProjectIndexScanRule.java | 18 +++---- .../setting/OpenSearchSettings.java | 2 +- .../scan/CalciteOpenSearchIndexScan.java | 47 +++++++++++++++++-- 4 files changed, 56 insertions(+), 13 deletions(-) diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java index 4384bee0f48..9703dfaa631 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java @@ -104,7 +104,7 @@ private Settings defaultSettings() { .put(Key.FIELD_TYPE_TOLERANCE, true) .put(Key.CALCITE_ENGINE_ENABLED, true) .put(Key.CALCITE_FALLBACK_ALLOWED, false) - .put(Key.CALCITE_PUSHDOWN_ENABLED, false) + .put(Key.CALCITE_PUSHDOWN_ENABLED, true) .build(); @Override diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchProjectIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchProjectIndexScanRule.java index 0e6c755fa43..e865a9eb024 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchProjectIndexScanRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchProjectIndexScanRule.java @@ -67,15 +67,17 @@ public Void visitInputRef(RexInputRef inputRef) { } }; visitor.visitEach(project.getProjects()); + // Only do push down when an actual projection happens + if (!selectedColumns.isEmpty() && selectedColumns.size() != scan.getRowType().getFieldCount()) { + Mapping mapping = Mappings.target(selectedColumns, scan.getRowType().getFieldCount()); + CalciteOpenSearchIndexScan newScan = scan.pushDownProject(selectedColumns); + final List newProjectRexNodes = RexUtil.apply(mapping, project.getProjects()); - Mapping mapping = Mappings.target(selectedColumns, scan.getRowType().getFieldCount()); - CalciteOpenSearchIndexScan newScan = scan.pushDownProject(selectedColumns); - final List newProjectRexNodes = RexUtil.apply(mapping, project.getProjects()); - - if (RexUtil.isIdentity(newProjectRexNodes, newScan.getRowType())) { - call.transformTo(newScan); - } else { - call.transformTo(call.builder().push(newScan).project(newProjectRexNodes).build()); + if (RexUtil.isIdentity(newProjectRexNodes, newScan.getRowType())) { + call.transformTo(newScan); + } else { + call.transformTo(call.builder().push(newScan).project(newProjectRexNodes).build()); + } } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java index 96c74164025..5edf9cf881f 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java @@ -102,7 +102,7 @@ public class OpenSearchSettings extends Settings { public static final Setting CALCITE_PUSHDOWN_ENABLED_SETTING = Setting.boolSetting( Key.CALCITE_PUSHDOWN_ENABLED.getKeyValue(), - false, + true, Setting.Property.NodeScope, Setting.Property.Dynamic); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java index 6c1386c5a3f..cda92c55f20 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java @@ -24,6 +24,7 @@ import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; import org.apache.calcite.rel.core.Filter; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; @@ -96,6 +97,12 @@ public RelNode copy(RelTraitSet traitSet, List inputs) { return new CalciteOpenSearchIndexScan(getCluster(), table, osIndex); } + @Override + public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw) + .itemIf("PushDownContext", pushDownContext, !pushDownContext.isEmpty()); + } + @Override public void register(RelOptPlanner planner) { super.register(planner); @@ -147,7 +154,12 @@ public CalciteOpenSearchIndexScan pushDownFilter(Filter filter) { CalciteOpenSearchIndexScan newScan = this.copyWithNewSchema(filter.getRowType()); List schema = this.getRowType().getFieldNames(); QueryBuilder filterBuilder = PredicateAnalyzer.analyze(filter.getCondition(), schema); - newScan.pushDownContext.add(requestBuilder -> requestBuilder.pushDownFilter(filterBuilder)); + newScan.pushDownContext.add( + PushDownAction.of( + PushDownType.FILTER, + filter.getCondition(), + requestBuilder -> requestBuilder.pushDownFilter(filterBuilder))); + // TODO: handle the case where condition contains a score function return newScan; } catch (Exception e) { @@ -169,10 +181,15 @@ public CalciteOpenSearchIndexScan pushDownProject(List selectedColumns) RelDataType newSchema = builder.build(); CalciteOpenSearchIndexScan newScan = this.copyWithNewSchema(newSchema); newScan.pushDownContext.add( - requestBuilder -> requestBuilder.pushDownProjectStream(newSchema.getFieldNames().stream())); + PushDownAction.of( + PushDownType.PROJECT, + newSchema.getFieldNames(), + requestBuilder -> + requestBuilder.pushDownProjectStream(newSchema.getFieldNames().stream()))); return newScan; } + // TODO: should we consider equivalent among PushDownContexts with different push down sequence? static class PushDownContext extends ArrayDeque { @Override public PushDownContext clone() { @@ -180,7 +197,31 @@ public PushDownContext clone() { } } - private interface PushDownAction { + private enum PushDownType { + FILTER, + PROJECT, + // AGGREGATION, + // SORT, + // LIMIT, + // HIGHLIGHT, + // NESTED + } + + private record PushDownAction(PushDownType type, Object digest, AbstractAction action) { + static PushDownAction of(PushDownType type, Object digest, AbstractAction action) { + return new PushDownAction(type, digest, action); + } + + public String toString() { + return type + ":" + digest; + } + + void apply(OpenSearchRequestBuilder requestBuilder) { + action.apply(requestBuilder); + } + } + + private interface AbstractAction { void apply(OpenSearchRequestBuilder requestBuilder); } }