diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 6a556eccc92..d7b7bb290ad 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -143,6 +143,7 @@ import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.ast.tree.Values; import org.opensearch.sql.ast.tree.Window; +import org.opensearch.sql.calcite.plan.AliasFieldsWrappable; import org.opensearch.sql.calcite.plan.LogicalSystemLimit; import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType; import org.opensearch.sql.calcite.plan.OpenSearchConstants; @@ -178,6 +179,16 @@ public RelNode analyze(UnresolvedPlan unresolved, CalcitePlanContext context) { return unresolved.accept(this, context); } + /** + * Resolve a relation's data source, validate it for Calcite support, push a table scan onto + * the relBuilder, and return the resulting RelNode (wrapping it for alias fields when supported). + * + * @param node the unresolved relation node containing the qualified table name + * @param context the current Calcite plan context holding the relBuilder and other state + * @return the pushed table scan RelNode, or a project-wrapped RelNode when the scan supports alias-field wrapping + * @throws CalciteUnsupportedException if the relation references a non-default datasource, the DATASOURCES + * table, or the information_schema, which are unsupported by Calcite + */ @Override public RelNode visitRelation(Relation node, CalcitePlanContext context) { DataSourceSchemaIdentifierNameResolver nameResolver = @@ -196,7 +207,11 @@ public RelNode visitRelation(Relation node, CalcitePlanContext context) { throw new CalciteUnsupportedException("information_schema is unsupported in Calcite"); } context.relBuilder.scan(node.getTableQualifiedName().getParts()); - return context.relBuilder.peek(); + RelNode scan = context.relBuilder.peek(); + if (scan instanceof AliasFieldsWrappable) { + return ((AliasFieldsWrappable) scan).wrapProjectForAliasFields(context.relBuilder); + } + return scan; } // This is a tool method to add an existed RelOptTable to builder stack, not used for now @@ -3234,4 +3249,4 @@ private RexNode createOptimizedTransliteration( throw new RuntimeException("Failed to optimize sed expression: " + sedExpression, e); } } -} +} \ No newline at end of file diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/AliasFieldsWrappable.java b/core/src/main/java/org/opensearch/sql/calcite/plan/AliasFieldsWrappable.java new file mode 100644 index 00000000000..a3db4859d87 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/AliasFieldsWrappable.java @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.plan; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilder; + +/** + * Wrapper for TableScan to add alias fields by creating another project with alias upon on it. This + * allows TableScan or Table to emit alias type fields in its schema, while it still supports + * resolving these fields used in the query. + */ +public interface AliasFieldsWrappable { + + /** + * Provide the alias-to-original field name mapping for this rel node. + * + * The map's keys are alias field names exposed in the schema and the values are + * the corresponding original field names in the underlying relational expression. + * + * @return a map where each key is an alias field name and each value is the original field name it aliases + */ +Map getAliasMapping(); + + /** + * Wraps the current relational node with a projection that adds alias fields defined by {@link #getAliasMapping()}. + * + *

The method creates aliased expressions for each mapping (alias -> original field) and applies them + * on top of the existing node using {@code projectPlus}. + * + * @param relBuilder RelBuilder whose top node must implement {@code AliasFieldsWrappable}; the projection is applied to that node + * @return the resulting {@link RelNode} after adding the alias-field projection + */ + default RelNode wrapProjectForAliasFields(RelBuilder relBuilder) { + assert relBuilder.peek() instanceof AliasFieldsWrappable + : "The top node in RelBuilder must be AliasFieldsWrappable"; + Set> aliasFieldsSet = this.getAliasMapping().entrySet(); + // Adding alias referring to the original field. + List aliasFieldsNew = + aliasFieldsSet.stream() + .map(entry -> relBuilder.alias(relBuilder.field(entry.getValue()), entry.getKey())) + .toList(); + return relBuilder.projectPlus(aliasFieldsNew).peek(); + } +} \ No newline at end of file diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java b/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java index c505a431c27..ba8483a3249 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java @@ -307,12 +307,25 @@ public static ExprValue getExprValueByExprType(ExprType type, Object value) { } } + /** + * Builds a Calcite struct type that represents the given table's schema. + * + *

Combines the table's declared and reserved fields, omits fields whose + * ExprType has an original path (type/alias entries), converts each remaining + * ExprType to a corresponding RelDataType, and returns a struct with all + * fields marked nullable. + * + * @param table the table whose fields will be converted into a struct type + * @return a struct {@link RelDataType} containing the table's converted fields (all nullable); fields with an original path are excluded + */ public static RelDataType convertSchema(Table table) { List fieldNameList = new ArrayList<>(); List typeList = new ArrayList<>(); Map fieldTypes = new LinkedHashMap<>(table.getFieldTypes()); fieldTypes.putAll(table.getReservedFieldTypes()); for (Entry entry : fieldTypes.entrySet()) { + // skip alias type fields when constructing schema + if (entry.getValue().getOriginalPath().isPresent()) continue; fieldNameList.add(entry.getKey()); typeList.add(OpenSearchTypeFactory.convertExprTypeToRelDataType(entry.getValue())); } @@ -406,4 +419,4 @@ public static boolean isTimeBasedType(RelDataType fieldType) { // Fallback check if type string contains EXPR_TIMESTAMP return fieldType.toString().contains("EXPR_TIMESTAMP"); } -} +} \ No newline at end of file diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index d0cac82b23f..91a120c8ed9 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -6,6 +6,7 @@ package org.opensearch.sql.calcite.remote; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ALIAS; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK_WITH_NULL_VALUES; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_LOGS; @@ -27,6 +28,14 @@ import org.opensearch.sql.ppl.ExplainIT; public class CalciteExplainIT extends ExplainIT { + /** + * Initialize the test fixture for Calcite explain integration tests. + * + * Sets up Calcite mode, configures the query bucket size, and loads the test indices required + * by the Calcite-specific explain test suite. + * + * @throws Exception if initialization or index loading fails + */ @Override public void init() throws Exception { super.init(); @@ -42,6 +51,7 @@ public void init() throws Exception { loadIndex(Index.WORKER); loadIndex(Index.WORK_INFORMATION); loadIndex(Index.WEBLOG); + loadIndex(Index.DATA_TYPE_ALIAS); } @Override @@ -1946,6 +1956,14 @@ public void testDedupRename2() throws IOException { + " -new_state | dedup 2 new_gender, new_state")); } + /** + * Checks that a dedup operation on a text field is not pushed down in the explain plan. + * + * Loads the expected YAML plan and asserts the actual explain plan for `source=... | dedup email` + * matches the expectation when pushdown is enabled. + * + * @throws IOException if the expected plan file cannot be read + */ @Test public void testDedupTextTypeNotPushdown() throws IOException { enabledOnlyWhenPushdownIsEnabled(); @@ -1953,4 +1971,15 @@ public void testDedupTextTypeNotPushdown() throws IOException { assertYamlEqualsIgnoreId( expected, explainQueryYaml(String.format("source=%s | dedup email", TEST_INDEX_BANK))); } -} + + @Test + public void testAliasTypeField() throws IOException { + String expected = loadExpectedPlan("explain_alias_type_field.yaml"); + assertYamlEqualsIgnoreId( + expected, + explainQueryYaml( + String.format( + "source=%s | fields alias_col | where alias_col > 10 | stats avg(alias_col)", + TEST_INDEX_ALIAS))); + } +} \ No newline at end of file diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java index 29b240613b5..2f641d79a1c 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java @@ -11,6 +11,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.stream.Stream; import lombok.Getter; import org.apache.calcite.adapter.enumerable.EnumerableMergeJoin; @@ -43,6 +44,7 @@ import org.opensearch.search.sort.SortBuilder; import org.opensearch.search.sort.SortBuilders; import org.opensearch.search.sort.SortOrder; +import org.opensearch.sql.calcite.plan.AliasFieldsWrappable; import org.opensearch.sql.common.setting.Settings.Key; import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.opensearch.data.type.OpenSearchTextType; @@ -61,7 +63,7 @@ /** An abstract relational operator representing a scan of an OpenSearchIndex type. */ @Getter -public abstract class AbstractCalciteIndexScan extends TableScan { +public abstract class AbstractCalciteIndexScan extends TableScan implements AliasFieldsWrappable { private static final Logger LOG = LogManager.getLogger(AbstractCalciteIndexScan.class); public final OpenSearchIndex osIndex; // The schema of this scan operator, it's initialized with the row type of the table, but may be @@ -243,7 +245,19 @@ private static float getAggMultiplier(PushDownOperation operation) { return multiplier; } - protected abstract AbstractCalciteIndexScan buildScan( + /** + * Create a new scan instance configured with the given cluster, traits, table and push-down context. + * + * @param cluster the optimization cluster for the new relational node + * @param traitSet the trait set to assign to the new scan + * @param hints planner hints to attach to the created scan + * @param table the table metadata that the scan will read from + * @param osIndex the OpenSearchIndex backing the scan + * @param schema the output row type/schema for the created scan + * @param pushDownContext the push-down actions to apply to the scan + * @return a new AbstractCalciteIndexScan instance configured with the supplied parameters + */ + protected abstract AbstractCalciteIndexScan buildScan( RelOptCluster cluster, RelTraitSet traitSet, List hints, @@ -252,6 +266,22 @@ protected abstract AbstractCalciteIndexScan buildScan( RelDataType schema, PushDownContext pushDownContext); + /** + * Provides the mapping from alias names to underlying field names for the scanned index. + * + * @return a map where each key is an alias and the corresponding value is the actual field name + */ + @Override + public Map getAliasMapping() { + return osIndex.getAliasMapping(); + } + + /** + * Map a list of field collations to their corresponding field names in the current row type. + * + * @param collations list of field collations whose field indexes are resolved against this scan's row type + * @return a list of field names in the same order as the provided collations + */ protected List getCollationNames(List collations) { return collations.stream() .map(collation -> getRowType().getFieldNames().get(collation.getFieldIndex())) @@ -432,4 +462,4 @@ public boolean isScriptPushed() { public boolean isProjectPushed() { return this.getPushDownContext().isProjectPushed(); } -} +} \ No newline at end of file diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java index bc5a289f465..83e69b339b6 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java @@ -103,19 +103,29 @@ public Result implement(EnumerableRelImplementor implementor, Prefer pref) { } /** - * This Enumerator may be iterated for multiple times, so we need to create opensearch request for - * each time to avoid reusing source builder. That's because the source builder has stats like PIT - * or SearchAfter recorded during previous search. + * Provide an enumerable over rows read from the OpenSearch index. + * + *

Each call to {@code enumerator()} builds a fresh OpenSearch request and returns an + * OpenSearchIndexEnumerator so the source builder's mutable search state (for example PIT or + * search_after) is not reused across iterations. + * + * @return an Enumerable that yields row objects (nullable); each enumerator constructs a new + * OpenSearchIndexEnumerator using the current row type field names and a fresh request */ @Override public Enumerable<@Nullable Object> scan() { return new AbstractEnumerable<>() { + /** + * Creates an Enumerator that iterates over documents returned by the configured OpenSearch index scan. + * + * @return an {@code Enumerator} that produces row objects from the OpenSearch scan. + */ @Override public Enumerator enumerator() { OpenSearchRequestBuilder requestBuilder = pushDownContext.createRequestBuilder(); return new OpenSearchIndexEnumerator( osIndex.getClient(), - getFieldPath(), + getRowType().getFieldNames(), requestBuilder.getMaxResponseSize(), requestBuilder.getMaxResultWindow(), osIndex.buildRequest(requestBuilder), @@ -123,10 +133,4 @@ public Enumerator enumerator() { } }; } - - private List getFieldPath() { - return getRowType().getFieldNames().stream() - .map(f -> osIndex.getAliasMapping().getOrDefault(f, f)) - .toList(); - } -} +} \ No newline at end of file diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java index 2821aa037da..f127c550acc 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java @@ -234,8 +234,19 @@ public CalciteLogicalIndexScan pushDownCollapse(Project finalOutput, String fiel } /** - * When pushing down a project, we need to create a new CalciteLogicalIndexScan with the updated - * schema since we cannot override getRowType() which is defined to be final. + * Create a new scan whose row type is the projection of this scan to the given column indices + * and register a PROJECT pushdown for that projection. + * + * The method builds a new row type from the specified column indices, reindexes any collations + * to match the projected field positions, clones the current pushDownContext, and attaches a + * pushdown action: a no-op for an already-pushed aggregation or a request-builder action that + * instructs the request to stream only the projected field names. + * + * @param selectedColumns the list of field indices (from this scan's current row type) to include + * in the projection, in desired output order + * @return the new CalciteLogicalIndexScan with the projected schema and a registered PROJECT + * pushdown, or `null` if the projection cannot be pushed down (for example if it would + * create a circular pushdown) */ public CalciteLogicalIndexScan pushDownProject(List selectedColumns) { final RelDataTypeFactory.Builder builder = getCluster().getTypeFactory().builder(); @@ -272,15 +283,10 @@ public CalciteLogicalIndexScan pushDownProject(List selectedColumns) { // For aggregate, we do nothing on query builder but only change the schema of the scan. action = (AggregationBuilderAction) aggAction -> {}; } else { - Map aliasMapping = this.osIndex.getAliasMapping(); - // For alias types, we need to push down its original path instead of the alias name. - List projectedFields = - newSchema.getFieldNames().stream() - .map(fieldName -> aliasMapping.getOrDefault(fieldName, fieldName)) - .toList(); action = (OSRequestBuilderAction) - requestBuilder -> requestBuilder.pushDownProjectStream(projectedFields.stream()); + requestBuilder -> + requestBuilder.pushDownProjectStream(newSchema.getFieldNames().stream()); } newScan.pushDownContext.add(PushDownType.PROJECT, newSchema.getFieldNames(), action); return newScan; @@ -533,4 +539,4 @@ private ScriptSortType getScriptSortType(RelDataType relDataType) { "Unsupported type for sort expression pushdown: " + relDataType); } } -} +} \ No newline at end of file