Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.ast.tree.Window;
import org.opensearch.sql.calcite.plan.AliasFieldsWrappable;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType;
import org.opensearch.sql.calcite.plan.OpenSearchConstants;
Expand Down Expand Up @@ -178,6 +179,16 @@ public RelNode analyze(UnresolvedPlan unresolved, CalcitePlanContext context) {
return unresolved.accept(this, context);
}

/**
* Resolve a relation's data source, validate it for Calcite support, push a table scan onto
* the relBuilder, and return the resulting RelNode (wrapping it for alias fields when supported).
*
* @param node the unresolved relation node containing the qualified table name
* @param context the current Calcite plan context holding the relBuilder and other state
* @return the pushed table scan RelNode, or a project-wrapped RelNode when the scan supports alias-field wrapping
* @throws CalciteUnsupportedException if the relation references a non-default datasource, the DATASOURCES
* table, or the information_schema, which are unsupported by Calcite
*/
@Override
public RelNode visitRelation(Relation node, CalcitePlanContext context) {
DataSourceSchemaIdentifierNameResolver nameResolver =
Expand All @@ -196,7 +207,11 @@ public RelNode visitRelation(Relation node, CalcitePlanContext context) {
throw new CalciteUnsupportedException("information_schema is unsupported in Calcite");
}
context.relBuilder.scan(node.getTableQualifiedName().getParts());
return context.relBuilder.peek();
RelNode scan = context.relBuilder.peek();
if (scan instanceof AliasFieldsWrappable) {
return ((AliasFieldsWrappable) scan).wrapProjectForAliasFields(context.relBuilder);
}
return scan;
}

// This is a tool method to add an existed RelOptTable to builder stack, not used for now
Expand Down Expand Up @@ -3234,4 +3249,4 @@ private RexNode createOptimizedTransliteration(
throw new RuntimeException("Failed to optimize sed expression: " + sedExpression, e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.plan;

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.RelBuilder;

/**
* Wrapper for TableScan to add alias fields by creating another project with alias upon on it. This
* allows TableScan or Table to emit alias type fields in its schema, while it still supports
* resolving these fields used in the query.
*/
public interface AliasFieldsWrappable {

/**
* Provide the alias-to-original field name mapping for this rel node.
*
* The map's keys are alias field names exposed in the schema and the values are
* the corresponding original field names in the underlying relational expression.
*
* @return a map where each key is an alias field name and each value is the original field name it aliases
*/
Map<String, String> getAliasMapping();

/**
* Wraps the current relational node with a projection that adds alias fields defined by {@link #getAliasMapping()}.
*
* <p>The method creates aliased expressions for each mapping (alias -> original field) and applies them
* on top of the existing node using {@code projectPlus}.
*
* @param relBuilder RelBuilder whose top node must implement {@code AliasFieldsWrappable}; the projection is applied to that node
* @return the resulting {@link RelNode} after adding the alias-field projection
*/
default RelNode wrapProjectForAliasFields(RelBuilder relBuilder) {
assert relBuilder.peek() instanceof AliasFieldsWrappable
: "The top node in RelBuilder must be AliasFieldsWrappable";
Set<Entry<String, String>> aliasFieldsSet = this.getAliasMapping().entrySet();
// Adding alias referring to the original field.
List<RexNode> aliasFieldsNew =
aliasFieldsSet.stream()
.map(entry -> relBuilder.alias(relBuilder.field(entry.getValue()), entry.getKey()))
.toList();
return relBuilder.projectPlus(aliasFieldsNew).peek();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,25 @@ public static ExprValue getExprValueByExprType(ExprType type, Object value) {
}
}

/**
* Builds a Calcite struct type that represents the given table's schema.
*
* <p>Combines the table's declared and reserved fields, omits fields whose
* ExprType has an original path (type/alias entries), converts each remaining
* ExprType to a corresponding RelDataType, and returns a struct with all
* fields marked nullable.
*
* @param table the table whose fields will be converted into a struct type
* @return a struct {@link RelDataType} containing the table's converted fields (all nullable); fields with an original path are excluded
*/
public static RelDataType convertSchema(Table table) {
List<String> fieldNameList = new ArrayList<>();
List<RelDataType> typeList = new ArrayList<>();
Map<String, ExprType> fieldTypes = new LinkedHashMap<>(table.getFieldTypes());
fieldTypes.putAll(table.getReservedFieldTypes());
for (Entry<String, ExprType> entry : fieldTypes.entrySet()) {
// skip alias type fields when constructing schema
if (entry.getValue().getOriginalPath().isPresent()) continue;
fieldNameList.add(entry.getKey());
typeList.add(OpenSearchTypeFactory.convertExprTypeToRelDataType(entry.getValue()));
}
Expand Down Expand Up @@ -406,4 +419,4 @@ public static boolean isTimeBasedType(RelDataType fieldType) {
// Fallback check if type string contains EXPR_TIMESTAMP
return fieldType.toString().contains("EXPR_TIMESTAMP");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.calcite.remote;

import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ALIAS;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK_WITH_NULL_VALUES;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_LOGS;
Expand All @@ -27,6 +28,14 @@
import org.opensearch.sql.ppl.ExplainIT;

public class CalciteExplainIT extends ExplainIT {
/**
* Initialize the test fixture for Calcite explain integration tests.
*
* Sets up Calcite mode, configures the query bucket size, and loads the test indices required
* by the Calcite-specific explain test suite.
*
* @throws Exception if initialization or index loading fails
*/
@Override
public void init() throws Exception {
super.init();
Expand All @@ -42,6 +51,7 @@ public void init() throws Exception {
loadIndex(Index.WORKER);
loadIndex(Index.WORK_INFORMATION);
loadIndex(Index.WEBLOG);
loadIndex(Index.DATA_TYPE_ALIAS);
}

@Override
Expand Down Expand Up @@ -1946,11 +1956,30 @@ public void testDedupRename2() throws IOException {
+ " -new_state | dedup 2 new_gender, new_state"));
}

/**
* Checks that a dedup operation on a text field is not pushed down in the explain plan.
*
* Loads the expected YAML plan and asserts the actual explain plan for `source=... | dedup email`
* matches the expectation when pushdown is enabled.
*
* @throws IOException if the expected plan file cannot be read
*/
@Test
public void testDedupTextTypeNotPushdown() throws IOException {
enabledOnlyWhenPushdownIsEnabled();
String expected = loadExpectedPlan("explain_dedup_text_type_no_push.yaml");
assertYamlEqualsIgnoreId(
expected, explainQueryYaml(String.format("source=%s | dedup email", TEST_INDEX_BANK)));
}
}

@Test
public void testAliasTypeField() throws IOException {
String expected = loadExpectedPlan("explain_alias_type_field.yaml");
assertYamlEqualsIgnoreId(
expected,
explainQueryYaml(
String.format(
"source=%s | fields alias_col | where alias_col > 10 | stats avg(alias_col)",
TEST_INDEX_ALIAS)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import lombok.Getter;
import org.apache.calcite.adapter.enumerable.EnumerableMergeJoin;
Expand Down Expand Up @@ -43,6 +44,7 @@
import org.opensearch.search.sort.SortBuilder;
import org.opensearch.search.sort.SortBuilders;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.sql.calcite.plan.AliasFieldsWrappable;
import org.opensearch.sql.common.setting.Settings.Key;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.opensearch.data.type.OpenSearchTextType;
Expand All @@ -61,7 +63,7 @@

/** An abstract relational operator representing a scan of an OpenSearchIndex type. */
@Getter
public abstract class AbstractCalciteIndexScan extends TableScan {
public abstract class AbstractCalciteIndexScan extends TableScan implements AliasFieldsWrappable {
private static final Logger LOG = LogManager.getLogger(AbstractCalciteIndexScan.class);
public final OpenSearchIndex osIndex;
// The schema of this scan operator, it's initialized with the row type of the table, but may be
Expand Down Expand Up @@ -243,7 +245,19 @@ private static float getAggMultiplier(PushDownOperation operation) {
return multiplier;
}

protected abstract AbstractCalciteIndexScan buildScan(
/**
* Create a new scan instance configured with the given cluster, traits, table and push-down context.
*
* @param cluster the optimization cluster for the new relational node
* @param traitSet the trait set to assign to the new scan
* @param hints planner hints to attach to the created scan
* @param table the table metadata that the scan will read from
* @param osIndex the OpenSearchIndex backing the scan
* @param schema the output row type/schema for the created scan
* @param pushDownContext the push-down actions to apply to the scan
* @return a new AbstractCalciteIndexScan instance configured with the supplied parameters
*/
protected abstract AbstractCalciteIndexScan buildScan(
RelOptCluster cluster,
RelTraitSet traitSet,
List<RelHint> hints,
Expand All @@ -252,6 +266,22 @@ protected abstract AbstractCalciteIndexScan buildScan(
RelDataType schema,
PushDownContext pushDownContext);

/**
* Provides the mapping from alias names to underlying field names for the scanned index.
*
* @return a map where each key is an alias and the corresponding value is the actual field name
*/
@Override
public Map<String, String> getAliasMapping() {
return osIndex.getAliasMapping();
}

/**
* Map a list of field collations to their corresponding field names in the current row type.
*
* @param collations list of field collations whose field indexes are resolved against this scan's row type
* @return a list of field names in the same order as the provided collations
*/
protected List<String> getCollationNames(List<RelFieldCollation> collations) {
return collations.stream()
.map(collation -> getRowType().getFieldNames().get(collation.getFieldIndex()))
Expand Down Expand Up @@ -432,4 +462,4 @@ public boolean isScriptPushed() {
public boolean isProjectPushed() {
return this.getPushDownContext().isProjectPushed();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,30 +103,34 @@ public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
}

/**
* This Enumerator may be iterated for multiple times, so we need to create opensearch request for
* each time to avoid reusing source builder. That's because the source builder has stats like PIT
* or SearchAfter recorded during previous search.
* Provide an enumerable over rows read from the OpenSearch index.
*
* <p>Each call to {@code enumerator()} builds a fresh OpenSearch request and returns an
* OpenSearchIndexEnumerator so the source builder's mutable search state (for example PIT or
* search_after) is not reused across iterations.
*
* @return an Enumerable that yields row objects (nullable); each enumerator constructs a new
* OpenSearchIndexEnumerator using the current row type field names and a fresh request
*/
@Override
public Enumerable<@Nullable Object> scan() {
return new AbstractEnumerable<>() {
/**
* Creates an Enumerator that iterates over documents returned by the configured OpenSearch index scan.
*
* @return an {@code Enumerator<Object>} that produces row objects from the OpenSearch scan.
*/
@Override
public Enumerator<Object> enumerator() {
OpenSearchRequestBuilder requestBuilder = pushDownContext.createRequestBuilder();
return new OpenSearchIndexEnumerator(
osIndex.getClient(),
getFieldPath(),
getRowType().getFieldNames(),
requestBuilder.getMaxResponseSize(),
requestBuilder.getMaxResultWindow(),
osIndex.buildRequest(requestBuilder),
osIndex.createOpenSearchResourceMonitor());
}
};
}

private List<String> getFieldPath() {
return getRowType().getFieldNames().stream()
.map(f -> osIndex.getAliasMapping().getOrDefault(f, f))
.toList();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,19 @@ public CalciteLogicalIndexScan pushDownCollapse(Project finalOutput, String fiel
}

/**
* When pushing down a project, we need to create a new CalciteLogicalIndexScan with the updated
* schema since we cannot override getRowType() which is defined to be final.
* Create a new scan whose row type is the projection of this scan to the given column indices
* and register a PROJECT pushdown for that projection.
*
* The method builds a new row type from the specified column indices, reindexes any collations
* to match the projected field positions, clones the current pushDownContext, and attaches a
* pushdown action: a no-op for an already-pushed aggregation or a request-builder action that
* instructs the request to stream only the projected field names.
*
* @param selectedColumns the list of field indices (from this scan's current row type) to include
* in the projection, in desired output order
* @return the new CalciteLogicalIndexScan with the projected schema and a registered PROJECT
* pushdown, or `null` if the projection cannot be pushed down (for example if it would
* create a circular pushdown)
*/
public CalciteLogicalIndexScan pushDownProject(List<Integer> selectedColumns) {
final RelDataTypeFactory.Builder builder = getCluster().getTypeFactory().builder();
Expand Down Expand Up @@ -272,15 +283,10 @@ public CalciteLogicalIndexScan pushDownProject(List<Integer> selectedColumns) {
// For aggregate, we do nothing on query builder but only change the schema of the scan.
action = (AggregationBuilderAction) aggAction -> {};
} else {
Map<String, String> aliasMapping = this.osIndex.getAliasMapping();
// For alias types, we need to push down its original path instead of the alias name.
List<String> projectedFields =
newSchema.getFieldNames().stream()
.map(fieldName -> aliasMapping.getOrDefault(fieldName, fieldName))
.toList();
action =
(OSRequestBuilderAction)
requestBuilder -> requestBuilder.pushDownProjectStream(projectedFields.stream());
requestBuilder ->
requestBuilder.pushDownProjectStream(newSchema.getFieldNames().stream());
}
newScan.pushDownContext.add(PushDownType.PROJECT, newSchema.getFieldNames(), action);
return newScan;
Expand Down Expand Up @@ -533,4 +539,4 @@ private ScriptSortType getScriptSortType(RelDataType relDataType) {
"Unsupported type for sort expression pushdown: " + relDataType);
}
}
}
}
Loading