Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.ast.tree.Window;
import org.opensearch.sql.calcite.plan.AliasFieldsWrappable;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType;
import org.opensearch.sql.calcite.plan.OpenSearchConstants;
Expand Down Expand Up @@ -196,7 +197,11 @@ public RelNode visitRelation(Relation node, CalcitePlanContext context) {
throw new CalciteUnsupportedException("information_schema is unsupported in Calcite");
}
context.relBuilder.scan(node.getTableQualifiedName().getParts());
return context.relBuilder.peek();
RelNode scan = context.relBuilder.peek();
if (scan instanceof AliasFieldsWrappable) {
return ((AliasFieldsWrappable) scan).wrapProjectForAliasFields(context.relBuilder);
}
return scan;
}

// This is a tool method to add an existed RelOptTable to builder stack, not used for now
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.plan;

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.RelBuilder;

/**
* Wrapper for TableScan to add alias fields by creating another project with alias upon on it. This
* allows TableScan or Table to emit alias type fields in its schema, while it still supports
* resolving these fields used in the query.
*/
public interface AliasFieldsWrappable {

Map<String, String> getAliasMapping();

default RelNode wrapProjectForAliasFields(RelBuilder relBuilder) {
assert relBuilder.peek() instanceof AliasFieldsWrappable
: "The top node in RelBuilder must be AliasFieldsWrappable";
Set<Entry<String, String>> aliasFieldsSet = this.getAliasMapping().entrySet();
// Adding alias referring to the original field.
List<RexNode> aliasFieldsNew =
aliasFieldsSet.stream()
.map(entry -> relBuilder.alias(relBuilder.field(entry.getValue()), entry.getKey()))
.toList();
return relBuilder.projectPlus(aliasFieldsNew).peek();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ public static RelDataType convertSchema(Table table) {
Map<String, ExprType> fieldTypes = new LinkedHashMap<>(table.getFieldTypes());
fieldTypes.putAll(table.getReservedFieldTypes());
for (Entry<String, ExprType> entry : fieldTypes.entrySet()) {
// skip alias type fields when constructing schema
if (entry.getValue().getOriginalPath().isPresent()) continue;
fieldNameList.add(entry.getKey());
typeList.add(OpenSearchTypeFactory.convertExprTypeToRelDataType(entry.getValue()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.calcite.remote;

import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ALIAS;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK_WITH_NULL_VALUES;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_LOGS;
Expand Down Expand Up @@ -42,6 +43,7 @@ public void init() throws Exception {
loadIndex(Index.WORKER);
loadIndex(Index.WORK_INFORMATION);
loadIndex(Index.WEBLOG);
loadIndex(Index.DATA_TYPE_ALIAS);
}

@Override
Expand Down Expand Up @@ -1953,4 +1955,15 @@ public void testDedupTextTypeNotPushdown() throws IOException {
assertYamlEqualsIgnoreId(
expected, explainQueryYaml(String.format("source=%s | dedup email", TEST_INDEX_BANK)));
}

@Test
public void testAliasTypeField() throws IOException {
String expected = loadExpectedPlan("explain_alias_type_field.yaml");
assertYamlEqualsIgnoreId(
expected,
explainQueryYaml(
String.format(
"source=%s | fields alias_col | where alias_col > 10 | stats avg(alias_col)",
TEST_INDEX_ALIAS)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
calcite:
logical: |
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalAggregate(group=[{}], avg(alias_col)=[AVG($0)])
LogicalFilter(condition=[>($0, 10)])
LogicalProject(alias_col=[$0])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_alias]])
physical: |
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_alias]], PushDownContext=[[FILTER->>($0, 10), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={},avg(alias_col)=AVG($0)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"range":{"original_col":{"from":10,"to":null,"include_lower":false,"include_upper":true,"boost":1.0}}},"aggregations":{"avg(alias_col)":{"avg":{"field":"original_col"}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
calcite:
logical: |
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalAggregate(group=[{}], avg(alias_col)=[AVG($0)])
LogicalFilter(condition=[>($0, 10)])
LogicalProject(alias_col=[$0])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_alias]])
physical: |
EnumerableLimit(fetch=[10000])
EnumerableCalc(expr#0..1=[{inputs}], expr#2=[0], expr#3=[=($t1, $t2)], expr#4=[null:BIGINT], expr#5=[CASE($t3, $t4, $t0)], expr#6=[CAST($t5):DOUBLE], expr#7=[/($t6, $t1)], avg(alias_col)=[$t7])
EnumerableAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[COUNT($0)])
EnumerableCalc(expr#0..6=[{inputs}], expr#7=[10], expr#8=[>($t0, $t7)], proj#0..6=[{exprs}], $condition=[$t8])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_alias]])
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import lombok.Getter;
import org.apache.calcite.adapter.enumerable.EnumerableMergeJoin;
Expand Down Expand Up @@ -43,6 +44,7 @@
import org.opensearch.search.sort.SortBuilder;
import org.opensearch.search.sort.SortBuilders;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.sql.calcite.plan.AliasFieldsWrappable;
import org.opensearch.sql.common.setting.Settings.Key;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.opensearch.data.type.OpenSearchTextType;
Expand All @@ -61,7 +63,7 @@

/** An abstract relational operator representing a scan of an OpenSearchIndex type. */
@Getter
public abstract class AbstractCalciteIndexScan extends TableScan {
public abstract class AbstractCalciteIndexScan extends TableScan implements AliasFieldsWrappable {
private static final Logger LOG = LogManager.getLogger(AbstractCalciteIndexScan.class);
public final OpenSearchIndex osIndex;
// The schema of this scan operator, it's initialized with the row type of the table, but may be
Expand Down Expand Up @@ -252,6 +254,11 @@ protected abstract AbstractCalciteIndexScan buildScan(
RelDataType schema,
PushDownContext pushDownContext);

@Override
public Map<String, String> getAliasMapping() {
return osIndex.getAliasMapping();
}

protected List<String> getCollationNames(List<RelFieldCollation> collations) {
return collations.stream()
.map(collation -> getRowType().getFieldNames().get(collation.getFieldIndex()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,12 @@ public Enumerator<Object> enumerator() {
OpenSearchRequestBuilder requestBuilder = pushDownContext.createRequestBuilder();
return new OpenSearchIndexEnumerator(
osIndex.getClient(),
getFieldPath(),
getRowType().getFieldNames(),
requestBuilder.getMaxResponseSize(),
requestBuilder.getMaxResultWindow(),
osIndex.buildRequest(requestBuilder),
osIndex.createOpenSearchResourceMonitor());
}
};
}

private List<String> getFieldPath() {
return getRowType().getFieldNames().stream()
.map(f -> osIndex.getAliasMapping().getOrDefault(f, f))
.toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,15 +272,10 @@ public CalciteLogicalIndexScan pushDownProject(List<Integer> selectedColumns) {
// For aggregate, we do nothing on query builder but only change the schema of the scan.
action = (AggregationBuilderAction) aggAction -> {};
} else {
Map<String, String> aliasMapping = this.osIndex.getAliasMapping();
// For alias types, we need to push down its original path instead of the alias name.
List<String> projectedFields =
newSchema.getFieldNames().stream()
.map(fieldName -> aliasMapping.getOrDefault(fieldName, fieldName))
.toList();
action =
(OSRequestBuilderAction)
requestBuilder -> requestBuilder.pushDownProjectStream(projectedFields.stream());
requestBuilder ->
requestBuilder.pushDownProjectStream(newSchema.getFieldNames().stream());
}
newScan.pushDownContext.add(PushDownType.PROJECT, newSchema.getFieldNames(), action);
return newScan;
Expand Down
Loading