diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 6a556eccc92..8049afce825 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -143,6 +143,7 @@ import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.ast.tree.Values; import org.opensearch.sql.ast.tree.Window; +import org.opensearch.sql.calcite.plan.AliasFieldsWrappable; import org.opensearch.sql.calcite.plan.LogicalSystemLimit; import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType; import org.opensearch.sql.calcite.plan.OpenSearchConstants; @@ -196,7 +197,11 @@ public RelNode visitRelation(Relation node, CalcitePlanContext context) { throw new CalciteUnsupportedException("information_schema is unsupported in Calcite"); } context.relBuilder.scan(node.getTableQualifiedName().getParts()); - return context.relBuilder.peek(); + RelNode scan = context.relBuilder.peek(); + if (scan instanceof AliasFieldsWrappable) { + return ((AliasFieldsWrappable) scan).wrapProjectForAliasFields(context.relBuilder); + } + return scan; } // This is a tool method to add an existed RelOptTable to builder stack, not used for now diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/AliasFieldsWrappable.java b/core/src/main/java/org/opensearch/sql/calcite/plan/AliasFieldsWrappable.java new file mode 100644 index 00000000000..1a4080ead47 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/AliasFieldsWrappable.java @@ -0,0 +1,36 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.plan; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilder; + +/** + * Wrapper for TableScan to add alias fields by creating another project with alias upon on it. This + * allows TableScan or Table to emit alias type fields in its schema, while it still supports + * resolving these fields used in the query. + */ +public interface AliasFieldsWrappable { + + Map getAliasMapping(); + + default RelNode wrapProjectForAliasFields(RelBuilder relBuilder) { + assert relBuilder.peek() instanceof AliasFieldsWrappable + : "The top node in RelBuilder must be AliasFieldsWrappable"; + Set> aliasFieldsSet = this.getAliasMapping().entrySet(); + // Adding alias referring to the original field. + List aliasFieldsNew = + aliasFieldsSet.stream() + .map(entry -> relBuilder.alias(relBuilder.field(entry.getValue()), entry.getKey())) + .toList(); + return relBuilder.projectPlus(aliasFieldsNew).peek(); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java b/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java index c505a431c27..17d99fb4fbb 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java @@ -313,6 +313,8 @@ public static RelDataType convertSchema(Table table) { Map fieldTypes = new LinkedHashMap<>(table.getFieldTypes()); fieldTypes.putAll(table.getReservedFieldTypes()); for (Entry entry : fieldTypes.entrySet()) { + // skip alias type fields when constructing schema + if (entry.getValue().getOriginalPath().isPresent()) continue; fieldNameList.add(entry.getKey()); typeList.add(OpenSearchTypeFactory.convertExprTypeToRelDataType(entry.getValue())); } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index d0cac82b23f..84591568b07 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -6,6 +6,7 @@ package org.opensearch.sql.calcite.remote; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ALIAS; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK_WITH_NULL_VALUES; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_LOGS; @@ -42,6 +43,7 @@ public void init() throws Exception { loadIndex(Index.WORKER); loadIndex(Index.WORK_INFORMATION); loadIndex(Index.WEBLOG); + loadIndex(Index.DATA_TYPE_ALIAS); } @Override @@ -1953,4 +1955,15 @@ public void testDedupTextTypeNotPushdown() throws IOException { assertYamlEqualsIgnoreId( expected, explainQueryYaml(String.format("source=%s | dedup email", TEST_INDEX_BANK))); } + + @Test + public void testAliasTypeField() throws IOException { + String expected = loadExpectedPlan("explain_alias_type_field.yaml"); + assertYamlEqualsIgnoreId( + expected, + explainQueryYaml( + String.format( + "source=%s | fields alias_col | where alias_col > 10 | stats avg(alias_col)", + TEST_INDEX_ALIAS))); + } } diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_alias_type_field.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_alias_type_field.yaml new file mode 100644 index 00000000000..9465b18b2b1 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_alias_type_field.yaml @@ -0,0 +1,9 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalAggregate(group=[{}], avg(alias_col)=[AVG($0)]) + LogicalFilter(condition=[>($0, 10)]) + LogicalProject(alias_col=[$0]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_alias]]) + physical: | + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_alias]], PushDownContext=[[FILTER->>($0, 10), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={},avg(alias_col)=AVG($0)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"range":{"original_col":{"from":10,"to":null,"include_lower":false,"include_upper":true,"boost":1.0}}},"aggregations":{"avg(alias_col)":{"avg":{"field":"original_col"}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_alias_type_field.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_alias_type_field.yaml new file mode 100644 index 00000000000..a065c90865d --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_alias_type_field.yaml @@ -0,0 +1,13 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalAggregate(group=[{}], avg(alias_col)=[AVG($0)]) + LogicalFilter(condition=[>($0, 10)]) + LogicalProject(alias_col=[$0]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_alias]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..1=[{inputs}], expr#2=[0], expr#3=[=($t1, $t2)], expr#4=[null:BIGINT], expr#5=[CASE($t3, $t4, $t0)], expr#6=[CAST($t5):DOUBLE], expr#7=[/($t6, $t1)], avg(alias_col)=[$t7]) + EnumerableAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[COUNT($0)]) + EnumerableCalc(expr#0..6=[{inputs}], expr#7=[10], expr#8=[>($t0, $t7)], proj#0..6=[{exprs}], $condition=[$t8]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_alias]]) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java index 29b240613b5..80ad06422e2 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java @@ -11,6 +11,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.stream.Stream; import lombok.Getter; import org.apache.calcite.adapter.enumerable.EnumerableMergeJoin; @@ -43,6 +44,7 @@ import org.opensearch.search.sort.SortBuilder; import org.opensearch.search.sort.SortBuilders; import org.opensearch.search.sort.SortOrder; +import org.opensearch.sql.calcite.plan.AliasFieldsWrappable; import org.opensearch.sql.common.setting.Settings.Key; import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.opensearch.data.type.OpenSearchTextType; @@ -61,7 +63,7 @@ /** An abstract relational operator representing a scan of an OpenSearchIndex type. */ @Getter -public abstract class AbstractCalciteIndexScan extends TableScan { +public abstract class AbstractCalciteIndexScan extends TableScan implements AliasFieldsWrappable { private static final Logger LOG = LogManager.getLogger(AbstractCalciteIndexScan.class); public final OpenSearchIndex osIndex; // The schema of this scan operator, it's initialized with the row type of the table, but may be @@ -252,6 +254,11 @@ protected abstract AbstractCalciteIndexScan buildScan( RelDataType schema, PushDownContext pushDownContext); + @Override + public Map getAliasMapping() { + return osIndex.getAliasMapping(); + } + protected List getCollationNames(List collations) { return collations.stream() .map(collation -> getRowType().getFieldNames().get(collation.getFieldIndex())) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java index bc5a289f465..ddf14cfee38 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java @@ -115,7 +115,7 @@ public Enumerator enumerator() { OpenSearchRequestBuilder requestBuilder = pushDownContext.createRequestBuilder(); return new OpenSearchIndexEnumerator( osIndex.getClient(), - getFieldPath(), + getRowType().getFieldNames(), requestBuilder.getMaxResponseSize(), requestBuilder.getMaxResultWindow(), osIndex.buildRequest(requestBuilder), @@ -123,10 +123,4 @@ public Enumerator enumerator() { } }; } - - private List getFieldPath() { - return getRowType().getFieldNames().stream() - .map(f -> osIndex.getAliasMapping().getOrDefault(f, f)) - .toList(); - } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java index 2821aa037da..4aaac41cc75 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java @@ -272,15 +272,10 @@ public CalciteLogicalIndexScan pushDownProject(List selectedColumns) { // For aggregate, we do nothing on query builder but only change the schema of the scan. action = (AggregationBuilderAction) aggAction -> {}; } else { - Map aliasMapping = this.osIndex.getAliasMapping(); - // For alias types, we need to push down its original path instead of the alias name. - List projectedFields = - newSchema.getFieldNames().stream() - .map(fieldName -> aliasMapping.getOrDefault(fieldName, fieldName)) - .toList(); action = (OSRequestBuilderAction) - requestBuilder -> requestBuilder.pushDownProjectStream(projectedFields.stream()); + requestBuilder -> + requestBuilder.pushDownProjectStream(newSchema.getFieldNames().stream()); } newScan.pushDownContext.add(PushDownType.PROJECT, newSchema.getFieldNames(), action); return newScan;