diff --git a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java index 63ee60b7683..6225d8fe0b7 100644 --- a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java +++ b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java @@ -31,6 +31,7 @@ public enum Key { /** Enable Calcite as execution engine */ CALCITE_ENGINE_ENABLED("plugins.calcite.enabled"), CALCITE_FALLBACK_ALLOWED("plugins.calcite.fallback.allowed"), + CALCITE_PUSHDOWN_ENABLED("plugins.calcite.pushdown.enabled"), /** Query Settings. */ FIELD_TYPE_TOLERANCE("plugins.query.field_type_tolerance"), diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java index 16845b4abb4..6cb27e1480d 100644 --- a/core/src/main/java/org/opensearch/sql/executor/QueryService.java +++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java @@ -15,7 +15,11 @@ import lombok.RequiredArgsConstructor; import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollations; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.logical.LogicalSort; import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.sql.parser.SqlParser; @@ -129,7 +133,31 @@ public void executePlanByCalcite( RelNode plan, CalcitePlanContext context, ResponseListener listener) { - executionEngine.execute(optimize(plan), context, listener); + executionEngine.execute(convertToCalcitePlan(optimize(plan)), context, listener); + } + + /** + * Convert OpenSearch Plan to Calcite Plan. Although both plans consist of Calcite RelNodes, there + * are some differences in the topological structures or semantics between them. + * + * @param osPlan Logical Plan derived from OpenSearch PPL + */ + private static RelNode convertToCalcitePlan(RelNode osPlan) { + RelNode calcitePlan = osPlan; + + /* Calcite only ensures collation of the final result produced from the root sort operator. + * While we expect that the collation can be preserved through the pipes over PPL, we need to + * explicitly add a sort operator on top of the original plan + * to ensure the correct collation of the final result. + * See logic in ${@link CalcitePrepareImpl} + * For the redundant sort, we rely on Calcite optimizer to eliminate + */ + RelCollation collation = osPlan.getTraitSet().getCollation(); + if (!(osPlan instanceof Sort) && collation != RelCollations.EMPTY) { + calcitePlan = LogicalSort.create(osPlan, collation, null, null); + } + + return calcitePlan; } /** diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java index 9a366c2a2a0..4384bee0f48 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java @@ -104,6 +104,7 @@ private Settings defaultSettings() { .put(Key.FIELD_TYPE_TOLERANCE, true) .put(Key.CALCITE_ENGINE_ENABLED, true) .put(Key.CALCITE_FALLBACK_ALLOWED, false) + .put(Key.CALCITE_PUSHDOWN_ENABLED, false) .build(); @Override diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLSortIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLSortIT.java index 2100efaa3d8..545bed6e1a8 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLSortIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLSortIT.java @@ -8,11 +8,8 @@ import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; import java.io.IOException; -import org.junit.Ignore; import org.junit.jupiter.api.Test; -/** testSortXXAndXX could fail. TODO Remove this @Ignore when the issue fixed. */ -@Ignore public class CalcitePPLSortIT extends CalcitePPLIntegTestCase { @Override diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchFilterIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchFilterIndexScanRule.java index f8beb339fe1..621bfd8c6fd 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchFilterIndexScanRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchFilterIndexScanRule.java @@ -42,8 +42,9 @@ public void onMatch(RelOptRuleCall call) { } protected void apply(RelOptRuleCall call, Filter filter, CalciteOpenSearchIndexScan scan) { - if (scan.pushDownFilter(filter)) { - call.transformTo(scan); + CalciteOpenSearchIndexScan newScan = scan.pushDownFilter(filter); + if (newScan != null) { + call.transformTo(newScan); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java index 53bf6536c96..96c74164025 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java @@ -99,6 +99,13 @@ public class OpenSearchSettings extends Settings { Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting CALCITE_PUSHDOWN_ENABLED_SETTING = + Setting.boolSetting( + Key.CALCITE_PUSHDOWN_ENABLED.getKeyValue(), + false, + Setting.Property.NodeScope, + Setting.Property.Dynamic); + public static final Setting QUERY_MEMORY_LIMIT_SETTING = new Setting<>( Key.QUERY_MEMORY_LIMIT.getKeyValue(), @@ -302,6 +309,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) { Key.CALCITE_FALLBACK_ALLOWED, CALCITE_FALLBACK_ALLOWED_SETTING, new Updater(Key.CALCITE_FALLBACK_ALLOWED)); + register( + settingBuilder, + clusterSettings, + Key.CALCITE_PUSHDOWN_ENABLED, + CALCITE_PUSHDOWN_ENABLED_SETTING, + new Updater(Key.CALCITE_PUSHDOWN_ENABLED)); register( settingBuilder, clusterSettings, @@ -478,6 +491,7 @@ public static List> pluginSettings() { .add(PPL_ENABLED_SETTING) .add(CALCITE_ENGINE_ENABLED_SETTING) .add(CALCITE_FALLBACK_ALLOWED_SETTING) + .add(CALCITE_PUSHDOWN_ENABLED_SETTING) .add(QUERY_MEMORY_LIMIT_SETTING) .add(QUERY_SIZE_LIMIT_SETTING) .add(METRICS_ROLLING_WINDOW_SETTING) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index 99df0465bdc..c77bb3c94d7 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -68,7 +68,7 @@ public class OpenSearchIndex extends OpenSearchTable { /** OpenSearch client connection. */ @Getter private final OpenSearchClient client; - private final Settings settings; + @Getter private final Settings settings; /** {@link OpenSearchRequest.IndexName}. */ private final OpenSearchRequest.IndexName indexName; diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java index 20fb52c1125..6c1386c5a3f 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java @@ -7,6 +7,7 @@ import static java.util.Objects.requireNonNull; +import java.util.ArrayDeque; import java.util.List; import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; import org.apache.calcite.adapter.enumerable.PhysType; @@ -32,11 +33,10 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.opensearch.index.query.QueryBuilder; import org.opensearch.sql.calcite.plan.OpenSearchTableScan; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.opensearch.planner.physical.OpenSearchIndexRules; import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; import org.opensearch.sql.opensearch.request.PredicateAnalyzer; -import org.opensearch.sql.opensearch.request.PredicateAnalyzer.ExpressionNotAnalyzableException; -import org.opensearch.sql.opensearch.request.PredicateAnalyzer.PredicateAnalyzerException; import org.opensearch.sql.opensearch.storage.OpenSearchIndex; /** Relational expression representing a scan of an OpenSearchIndex type. */ @@ -44,8 +44,15 @@ public class CalciteOpenSearchIndexScan extends OpenSearchTableScan { private static final Logger LOG = LogManager.getLogger(CalciteOpenSearchIndexScan.class); private final OpenSearchIndex osIndex; - private final OpenSearchRequestBuilder requestBuilder; + // The schema of this scan operator, it's initialized with the row type of the table, but may be + // changed by push down operations. private final RelDataType schema; + // This context maintains all the push down actions, which will be applied to the requestBuilder + // when it begins to scan data from OpenSearch. + // Because OpenSearchRequestBuilder doesn't support deep copy while we want to keep the + // requestBuilder independent among different plans produced in the optimization process, + // so we cannot apply these actions right away. + private final PushDownContext pushDownContext; /** * Creates an CalciteOpenSearchIndexScan. @@ -56,24 +63,31 @@ public class CalciteOpenSearchIndexScan extends OpenSearchTableScan { */ public CalciteOpenSearchIndexScan( RelOptCluster cluster, RelOptTable table, OpenSearchIndex index) { - this(cluster, table, index, index.createRequestBuilder(), table.getRowType()); + this(cluster, table, index, table.getRowType(), new PushDownContext()); } - public CalciteOpenSearchIndexScan( + private CalciteOpenSearchIndexScan( RelOptCluster cluster, RelOptTable table, OpenSearchIndex index, - OpenSearchRequestBuilder requestBuilder, - RelDataType schema) { + RelDataType schema, + PushDownContext pushDownContext) { super(cluster, table); this.osIndex = requireNonNull(index, "OpenSearch index"); - this.requestBuilder = requestBuilder; this.schema = schema; + this.pushDownContext = pushDownContext; + } + + public CalciteOpenSearchIndexScan copy() { + return new CalciteOpenSearchIndexScan( + getCluster(), table, osIndex, this.schema, pushDownContext.clone()); } public CalciteOpenSearchIndexScan copyWithNewSchema(RelDataType schema) { - // TODO: need to do deep-copy on requestBuilder in case non-idempotent push down. - return new CalciteOpenSearchIndexScan(getCluster(), table, osIndex, requestBuilder, schema); + // Do shallow copy for requestBuilder, thus requestBuilder among different plans produced in the + // optimization process won't affect each other. + return new CalciteOpenSearchIndexScan( + getCluster(), table, osIndex, schema, pushDownContext.clone()); } @Override @@ -85,8 +99,10 @@ public RelNode copy(RelTraitSet traitSet, List inputs) { @Override public void register(RelOptPlanner planner) { super.register(planner); - for (RelOptRule rule : OpenSearchIndexRules.OPEN_SEARCH_INDEX_SCAN_RULES) { - planner.addRule(rule); + if (osIndex.getSettings().getSettingValue(Settings.Key.CALCITE_PUSHDOWN_ENABLED)) { + for (RelOptRule rule : OpenSearchIndexRules.OPEN_SEARCH_INDEX_SCAN_RULES) { + planner.addRule(rule); + } } } @@ -97,15 +113,23 @@ public RelDataType deriveRowType() { @Override public Result implement(EnumerableRelImplementor implementor, Prefer pref) { - // Avoid optimizing the java row type since the scan will always return an array. + /* In Calcite enumerable operators, row of single column will be optimized to a scalar value. + * See {@link PhysTypeImpl}. + * Since we need to combine this operator with their original ones, + * let's follow this convention to apply the optimization here and ensure `scan` method + * returns the correct data format for single column rows. + * See {@link OpenSearchIndexEnumerator} + */ PhysType physType = - PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray(), false); + PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray()); Expression scanOperator = implementor.stash(this, CalciteOpenSearchIndexScan.class); return implementor.result(physType, Blocks.toBlock(Expressions.call(scanOperator, "scan"))); } public Enumerable<@Nullable Object> scan() { + OpenSearchRequestBuilder requestBuilder = osIndex.createRequestBuilder(); + pushDownContext.forEach(action -> action.apply(requestBuilder)); return new AbstractEnumerable<>() { @Override public Enumerator enumerator() { @@ -118,17 +142,18 @@ public Enumerator enumerator() { }; } - public boolean pushDownFilter(Filter filter) { + public CalciteOpenSearchIndexScan pushDownFilter(Filter filter) { try { + CalciteOpenSearchIndexScan newScan = this.copyWithNewSchema(filter.getRowType()); List schema = this.getRowType().getFieldNames(); QueryBuilder filterBuilder = PredicateAnalyzer.analyze(filter.getCondition(), schema); - requestBuilder.pushDownFilter(filterBuilder); + newScan.pushDownContext.add(requestBuilder -> requestBuilder.pushDownFilter(filterBuilder)); // TODO: handle the case where condition contains a score function - return true; - } catch (ExpressionNotAnalyzableException | PredicateAnalyzerException e) { + return newScan; + } catch (Exception e) { LOG.warn("Cannot analyze the filter condition {}", filter.getCondition(), e); } - return false; + return null; } /** @@ -143,7 +168,19 @@ public CalciteOpenSearchIndexScan pushDownProject(List selectedColumns) } RelDataType newSchema = builder.build(); CalciteOpenSearchIndexScan newScan = this.copyWithNewSchema(newSchema); - newScan.requestBuilder.pushDownProjectStream(newSchema.getFieldNames().stream()); + newScan.pushDownContext.add( + requestBuilder -> requestBuilder.pushDownProjectStream(newSchema.getFieldNames().stream())); return newScan; } + + static class PushDownContext extends ArrayDeque { + @Override + public PushDownContext clone() { + return (PushDownContext) super.clone(); + } + } + + private interface PushDownAction { + void apply(OpenSearchRequestBuilder requestBuilder); + } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java index e4db98ac4b9..6e778422db2 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java @@ -11,7 +11,6 @@ import lombok.EqualsAndHashCode; import lombok.ToString; import org.apache.calcite.linq4j.Enumerator; -import org.opensearch.sql.data.model.ExprNullValue; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.opensearch.client.OpenSearchClient; import org.opensearch.sql.opensearch.request.OpenSearchRequest; @@ -62,11 +61,13 @@ private void fetchNextBatch() { @Override public Object current() { - Object[] p = - fields.stream() - .map(k -> current.tupleValue().getOrDefault(k, ExprNullValue.of()).valueForCalcite()) - .toArray(); - return p; + /* In Calcite enumerable operators, row of single column will be optimized to a scalar value. + * See {@link PhysTypeImpl} + */ + if (fields.size() == 1) { + return current.tupleValue().get(fields.getFirst()).valueForCalcite(); + } + return fields.stream().map(k -> current.tupleValue().get(k).valueForCalcite()).toArray(); } @Override