Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -17,13 +18,21 @@
import java.util.function.BiFunction;
import lombok.Getter;
import lombok.Setter;
import org.apache.calcite.config.NullCollation;
import org.apache.calcite.rex.RexCorrelVariable;
import org.apache.calcite.rex.RexLambdaRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.server.CalciteServerStatement;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.RelBuilder;
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.calcite.utils.CalciteToolsHelper;
import org.opensearch.sql.calcite.validate.OpenSearchSparkSqlDialect;
import org.opensearch.sql.calcite.validate.PplTypeCoercion;
import org.opensearch.sql.calcite.validate.PplTypeCoercionRule;
import org.opensearch.sql.calcite.validate.PplValidator;
import org.opensearch.sql.calcite.validate.SqlOperatorTableProvider;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.executor.QueryType;
import org.opensearch.sql.expression.function.FunctionProperties;
Expand Down Expand Up @@ -72,6 +81,25 @@ public class CalcitePlanContext {
/** Whether we're currently inside a lambda context. */
@Getter @Setter private boolean inLambdaContext = false;

/**
* -- SETTER -- Sets the SQL operator table provider. This must be called during initialization by
* the opensearch module.
*
* @param provider the provider to use for obtaining operator tables
*/
@Setter private static SqlOperatorTableProvider operatorTableProvider;

/** Cached SqlValidator instance (lazy initialized). */
private SqlValidator validator;

/**
* Initialize a new CalcitePlanContext and set up Calcite connection, builders, function
* properties, and lambda/capture state.
*
* @param config the Calcite FrameworkConfig to use for planning
* @param sysLimit system limits governing planning behaviour
* @param queryType the QueryType for this planning context
*/
private CalcitePlanContext(FrameworkConfig config, SysLimit sysLimit, QueryType queryType) {
this.config = config;
this.sysLimit = sysLimit;
Expand All @@ -85,8 +113,11 @@ private CalcitePlanContext(FrameworkConfig config, SysLimit sysLimit, QueryType
}

/**
* Private constructor for creating a context that shares relBuilder with parent. Used by clone()
* to create lambda contexts that can resolve fields from the parent context.
* Creates a child CalcitePlanContext that shares relational builders with the given parent while isolating lambda-specific state.
*
* <p>The child context reuses the parent's relBuilder and rexBuilder so it can resolve fields against the parent, but it initializes independent maps/lists for lambda references and captured variables and marks the context as a lambda context.</p>
*
* @param parent the parent context whose builders and configuration are reused
*/
private CalcitePlanContext(CalcitePlanContext parent) {
this.config = parent.config;
Expand All @@ -101,6 +132,59 @@ private CalcitePlanContext(CalcitePlanContext parent) {
this.inLambdaContext = true; // Mark that we're inside a lambda
}

/**
* Provides the lazily initialized SqlValidator configured for PPL within this context.
*
* @return the SqlValidator configured with PPL type coercion rules, Spark SQL conformance,
* Spark NULL collation, and identifier expansion
* @throws IllegalStateException if the global SqlOperatorTableProvider has not been set
* @throws RuntimeException if creating or unwrapping the underlying Calcite statement fails
*/
public SqlValidator getValidator() {
if (validator == null) {
final CalciteServerStatement statement;
try {
statement = connection.createStatement().unwrap(CalciteServerStatement.class);
} catch (SQLException e) {
throw new RuntimeException(e);
}
if (operatorTableProvider == null) {
throw new IllegalStateException(
"SqlOperatorTableProvider must be set before creating CalcitePlanContext");
}
SqlValidator.Config validatorConfig =
SqlValidator.Config.DEFAULT
.withTypeCoercionRules(PplTypeCoercionRule.instance())
.withTypeCoercionFactory(PplTypeCoercion::create)
// Use lenient conformance for PPL compatibility
.withConformance(OpenSearchSparkSqlDialect.DEFAULT.getConformance())
// Use Spark SQL's NULL collation (NULLs sorted LOW/FIRST)
.withDefaultNullCollation(NullCollation.LOW)
// This ensures that coerced arguments are replaced with cast version in sql select
// list because coercion is performed during select list expansion during sql
// validation. Affects 4356.yml
// See SqlValidatorImpl#validateSelectList and AggConverter#translateAgg
.withIdentifierExpansion(true);
validator =
PplValidator.create(
statement,
config,
operatorTableProvider.getOperatorTable(),
TYPE_FACTORY,
validatorConfig);
}
return validator;
}

/**
* Temporarily marks the context as resolving a join condition and applies a transformation to the given unresolved expression.
*
* <p>The context flag indicating join-condition resolution is set to true before invoking the transform and reset to false afterwards.
*
* @param expr the unresolved expression representing the join condition
* @param transformFunction a function that converts the unresolved expression into a {@code RexNode} using this context
* @return the {@code RexNode} produced by applying {@code transformFunction} to {@code expr}
*/
public RexNode resolveJoinCondition(
UnresolvedExpression expr,
BiFunction<UnresolvedExpression, CalcitePlanContext, RexNode> transformFunction) {
Expand Down Expand Up @@ -206,4 +290,4 @@ public RexLambdaRef captureVariable(RexNode fieldRef, String fieldName) {

return lambdaRef;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;
import lombok.AllArgsConstructor;
import lombok.NonNull;
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.ViewExpanders;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.JoinRelType;
Expand All @@ -55,10 +58,14 @@
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexCorrelVariable;
import org.apache.calcite.rex.RexFieldCollation;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexOver;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.rex.RexWindow;
import org.apache.calcite.rex.RexWindowBounds;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
Expand Down Expand Up @@ -1678,6 +1685,17 @@ private void validateFillNullTypeCompatibility(
}
}

/**
* Builds a Calcite relational plan that implements the streaming window operation defined by the given AST node.
*
* The produced plan handles reset-before/reset-after semantics, global sliding windows, grouped windows, and
* per-window function expressions; it also embeds existing plan collations into window OVER clauses and
* adds any required helper columns (sequence/segment/reset flags) used by the stream-window logic.
*
* @param node the StreamWindow AST node describing window type, grouping, window size, reset options and functions
* @param context the planning context containing the RelBuilder, visitors, and planner state
* @return a RelNode representing the compiled stream window operation ready for integration into the surrounding plan
*/
@Override
public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context) {
visitChildren(node, context);
Expand Down Expand Up @@ -1740,6 +1758,7 @@ public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context)
// Default: first get rawExpr
List<RexNode> overExpressions =
node.getWindowFunctionList().stream().map(w -> rexVisitor.analyze(w, context)).toList();
overExpressions = embedExistingCollationsIntoOver(overExpressions, context);

if (hasGroup) {
// only build sequence when there is by condition
Expand Down Expand Up @@ -1781,6 +1800,101 @@ public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context)
return context.relBuilder.peek();
}

/**
* Embed the current plan's collation into any RexOver window expressions.
*
* <p>Window frames such as ROWS n PRECEDING require an ORDER BY to define row order; this method
* adds the RelBuilder's existing collation as the OVER clause's ORDER BY for each encountered
* RexOver.
*
* @param overExpressions window function expressions that may contain nested {@link RexOver}
* @param context plan context used to obtain the current RelBuilder and RexBuilder
* @return a list of expressions where each {@link RexOver} has its ORDER BY populated from the
* current plan collation (unchanged expressions are returned as-is)
*/
private List<RexNode> embedExistingCollationsIntoOver(
List<RexNode> overExpressions, CalcitePlanContext context) {
RelCollation existingCollation = context.relBuilder.peek().getTraitSet().getCollation();
List<@NonNull RelFieldCollation> relCollations =
existingCollation == null ? List.of() : existingCollation.getFieldCollations();
ImmutableList<@NonNull RexFieldCollation> rexCollations =
relCollations.stream()
.map(f -> relCollationToRexCollation(f, context.relBuilder))
.collect(ImmutableList.toImmutableList());
return overExpressions.stream()
.map(
n ->
n.accept(
new RexShuttle() {
/**
* Rewrite the given windowed aggregation to attach the visitor's collected collations.
*
* @param over the original RexOver node to be rewritten
* @return a RexNode representing the same windowed aggregation with the visitor's
* accumulated RexFieldCollation ordering embedded into its OVER specification
*/
@Override
public RexNode visitOver(RexOver over) {
RexWindow window = over.getWindow();
return context.rexBuilder.makeOver(
over.getType(),
over.getAggOperator(),
over.getOperands(),
window.partitionKeys,
rexCollations,
window.getLowerBound(),
window.getUpperBound(),
window.isRows(),
true,
false,
over.isDistinct(),
over.ignoreNulls());
}
}))
.toList();
}

/**
* Create a RexFieldCollation that mirrors the given RelFieldCollation by referencing
* the corresponding field through the provided RelBuilder.
*
* @param relCollation the source RelFieldCollation whose field index, direction,
* and null ordering will be translated
* @param builder RelBuilder used to produce a RexNode referencing the field
* @return a RexFieldCollation with equivalent sort direction and null-ordering flags
*/
private static RexFieldCollation relCollationToRexCollation(
RelFieldCollation relCollation, RelBuilder builder) {
RexNode fieldRef = builder.field(relCollation.getFieldIndex());

// Convert direction flags to SqlKind set
Set<SqlKind> flags = new HashSet<>();
if (relCollation.direction == RelFieldCollation.Direction.DESCENDING
|| relCollation.direction == RelFieldCollation.Direction.STRICTLY_DESCENDING) {
flags.add(SqlKind.DESCENDING);
}
if (relCollation.nullDirection == RelFieldCollation.NullDirection.FIRST) {
flags.add(SqlKind.NULLS_FIRST);
} else if (relCollation.nullDirection == RelFieldCollation.NullDirection.LAST) {
flags.add(SqlKind.NULLS_LAST);
}

return new RexFieldCollation(fieldRef, flags);
}

/**
* Wraps each window expression in a CASE that yields NULL when the provided group-not-null
* predicate is false, preserving any explicit alias on the original expression.
*
* <p>Each expression is transformed to: CASE groupNotNull WHEN TRUE THEN <original> ELSE NULL END.
*
* @param overExpressions list of window expressions to wrap; if an expression is of the form
* `expr AS alias` the alias is preserved on the wrapped expression
* @param groupNotNull predicate used as the CASE condition to determine non-null groups
* @param context plan context providing RexBuilder/RelBuilder utilities
* @return a new list of `RexNode` where each input expression has been wrapped with the CASE
* nulling logic and retains its original alias when present
*/
private List<RexNode> wrapWindowFunctionsWithGroupNotNull(
List<RexNode> overExpressions, RexNode groupNotNull, CalcitePlanContext context) {
List<RexNode> wrappedOverExprs = new ArrayList<>(overExpressions.size());
Expand Down Expand Up @@ -3499,4 +3613,4 @@ private RexNode createOptimizedTransliteration(
throw new RuntimeException("Failed to optimize sed expression: " + sedExpression, e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,14 @@ public RexNode visitFunction(Function node, CalcitePlanContext context) {
throw new IllegalArgumentException("Unsupported operator: " + node.getFuncName());
}

/**
* Translates a window function expression into a Calcite RexNode representing an OVER() window call.
*
* @param node the window function expression to translate
* @param context the planning context used for resolving arguments, partitions, and frame
* @return a RexNode for the windowed function call (an OVER expression)
* @throws UnsupportedOperationException if the function name is not a supported window function
*/
@Override
public RexNode visitWindowFunction(WindowFunction node, CalcitePlanContext context) {
Function windowFunction = (Function) node.getFunction();
Expand All @@ -480,26 +488,8 @@ public RexNode visitWindowFunction(WindowFunction node, CalcitePlanContext conte
(arguments.isEmpty() || arguments.size() == 1)
? Collections.emptyList()
: arguments.subList(1, arguments.size());
List<RexNode> nodes =
PPLFuncImpTable.INSTANCE.validateAggFunctionSignature(
functionName, field, args, context.rexBuilder);
return nodes != null
? PlanUtils.makeOver(
context,
functionName,
nodes.getFirst(),
nodes.size() <= 1 ? Collections.emptyList() : nodes.subList(1, nodes.size()),
partitions,
List.of(),
node.getWindowFrame())
: PlanUtils.makeOver(
context,
functionName,
field,
args,
partitions,
List.of(),
node.getWindowFrame());
return PlanUtils.makeOver(
context, functionName, field, args, partitions, List.of(), node.getWindowFrame());
})
.orElseThrow(
() ->
Expand Down Expand Up @@ -680,4 +670,4 @@ public RexNode visitUnresolvedArgument(UnresolvedArgument node, CalcitePlanConte
context.rexBuilder.makeLiteral(node.getArgName()),
value);
}
}
}
Loading
Loading