Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public enum Key {
PPL_REX_MAX_MATCH_LIMIT("plugins.ppl.rex.max_match.limit"),
PPL_VALUES_MAX_LIMIT("plugins.ppl.values.max.limit"),
PPL_SYNTAX_LEGACY_PREFERRED("plugins.ppl.syntax.legacy.preferred"),
PPL_SUBSEARCH_MAXOUT("plugins.ppl.subsearch.maxout"),
PPL_JOIN_SUBSEARCH_MAXOUT("plugins.ppl.join.subsearch_maxout"),

/** Enable Calcite as execution engine */
CALCITE_ENGINE_ENABLED("plugins.calcite.enabled"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class CalcitePlanContext {
public final ExtendedRexBuilder rexBuilder;
public final FunctionProperties functionProperties;
public final QueryType queryType;
public final Integer querySizeLimit;
public final SysLimit sysLimit;

/** This thread local variable is only used to skip script encoding in script pushdown. */
public static final ThreadLocal<Boolean> skipEncoding = ThreadLocal.withInitial(() -> false);
Expand All @@ -56,9 +56,9 @@ public class CalcitePlanContext {

@Getter public Map<String, RexLambdaRef> rexLambdaRefMap;

private CalcitePlanContext(FrameworkConfig config, Integer querySizeLimit, QueryType queryType) {
private CalcitePlanContext(FrameworkConfig config, SysLimit sysLimit, QueryType queryType) {
this.config = config;
this.querySizeLimit = querySizeLimit;
this.sysLimit = sysLimit;
this.queryType = queryType;
this.connection = CalciteToolsHelper.connect(config, TYPE_FACTORY);
this.relBuilder = CalciteToolsHelper.create(config, TYPE_FACTORY, connection);
Expand Down Expand Up @@ -97,12 +97,12 @@ public Optional<RexCorrelVariable> peekCorrelVar() {
}

public CalcitePlanContext clone() {
return new CalcitePlanContext(config, querySizeLimit, queryType);
return new CalcitePlanContext(config, sysLimit, queryType);
}

public static CalcitePlanContext create(
FrameworkConfig config, Integer querySizeLimit, QueryType queryType) {
return new CalcitePlanContext(config, querySizeLimit, queryType);
FrameworkConfig config, SysLimit sysLimit, QueryType queryType) {
return new CalcitePlanContext(config, sysLimit, queryType);
}

public void putRexLambdaRefMap(Map<String, RexLambdaRef> candidateMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -133,6 +134,8 @@
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.ast.tree.Window;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType;
import org.opensearch.sql.calcite.plan.OpenSearchConstants;
import org.opensearch.sql.calcite.utils.BinUtils;
import org.opensearch.sql.calcite.utils.JoinAndLookupUtils;
Expand Down Expand Up @@ -166,6 +169,17 @@ public RelNode analyze(UnresolvedPlan unresolved, CalcitePlanContext context) {
return unresolved.accept(this, context);
}

/** Adds a rel node to the top of the stack while preserving the field names and aliases. */
public void replaceTop(RelBuilder relBuilder, RelNode relNode) {
try {
Method method = RelBuilder.class.getDeclaredMethod("replaceTop", RelNode.class);
method.setAccessible(true);
method.invoke(relBuilder, relNode);
} catch (Exception e) {
throw new IllegalStateException("Unable to invoke RelBuilder.replaceTop", e);
}
}

@Override
public RelNode visitRelation(Relation node, CalcitePlanContext context) {
DataSourceSchemaIdentifierNameResolver nameResolver =
Expand Down Expand Up @@ -1136,6 +1150,15 @@ private Optional<RexLiteral> extractAliasLiteral(RexNode node) {
public RelNode visitJoin(Join node, CalcitePlanContext context) {
List<UnresolvedPlan> children = node.getChildren();
children.forEach(c -> analyze(c, context));
// add join.subsearch_maxout limit to subsearch side
if (context.sysLimit.joinSubsearchLimit() >= 0) {
replaceTop(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, is it possible to avoid access private method?

2cents, Add a frame in CalcitePlanContext, frame is boundary of subsearch, and define limit on frame. When visit subsearch, append LogicalSystemLimit to subsearch on each frame.

Copy link
Member Author

@LantaoJin LantaoJin Oct 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, is it possible to avoid access private method?

I don't think so.

When visit the subsearch side (right in join for example), the right plan was pushed to stack.

public RelNode analyze(UnresolvedPlan unresolved, CalcitePlanContext context) {
    return unresolved.accept(this, context);
  }

RelBuilder.pop() is private either. So we don't have a way to replace it.

Here was my previous try code for join

  public RelNode visitJoin(Join node, CalcitePlanContext context) {
    // visit the main side
    analyze(node.getLeft(), context);
    if (context.sysLimit.joinSubsearchLimit() >= 0) {
      // add join.subsearch_maxout limit to subsearch side
      RelNode withLimit = context.relBuilder.with(
          analyze(node.getRight(), context),
          r -> LogicalSystemLimit.create(
            SystemLimitType.JOIN_SUBSEARCH_MAXOUT,
            r.peek(),
            r.literal(context.sysLimit.joinSubsearchLimit())));
      context.relBuilder.push(withLimit); // push the new subsearch plan
    } else {
      // visit the subsearch side
      analyze(node.getRight(), context);
    }

The code use relBuilder.with(), but the first parameter analyze(node.getRight(), context) will push the subsearch to stack, and the with() method push it twice.

  /** Evaluates an expression with a relational expression temporarily on the
   * stack. */
  public <E> E with(RelNode r, Function<RelBuilder, E> fn) {
    try {
      push(r);
      return fn.apply(this);
    } finally {
      stack.pop();
    }
  }
  1. push left plan by analyze(node.getLeft(), context), stack size is 1
  2. push right plan by the first parameter of with(analyze(node.getRight(), context)), stack size is 2
  3. push duplicated right plan by push in with, stack size is 3
  4. pop duplicated right plan by pop in with, stack size is 2
  5. push new right plan by context.relBuilder.push(withLimit), stack size is 3 (incorrect)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it work by using relbuilder.build() + relbuilder.push(newTop)? relbuilder.build() will do pop while public.

Copy link
Member Author

@LantaoJin LantaoJin Oct 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sync offline. We still cannot use relbuilder.build() + relbuilder.push(newTop) since it will empty the fields of Frame.

  private void replaceTop(RelNode node) {
    final Frame frame = stack.pop();
    stack.push(new Frame(node, frame.fields));    // <--- frame.fields will be kept all the time
  }

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does SQL Join translate to RelNode? It use the private method?

context.relBuilder,
LogicalSystemLimit.create(
SystemLimitType.JOIN_SUBSEARCH_MAXOUT,
context.relBuilder.peek(),
context.relBuilder.literal(context.sysLimit.joinSubsearchLimit())));
}
if (node.getJoinCondition().isEmpty()) {
// join-with-field-list grammar
List<String> leftColumns = context.relBuilder.peek(1).getRowType().getFieldNames();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,25 @@
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttleImpl;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLambdaRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.ArraySqlType;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.DateString;
import org.apache.calcite.util.TimeString;
import org.apache.calcite.util.TimestampString;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.util.Strings;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.Alias;
Expand Down Expand Up @@ -67,7 +72,11 @@
import org.opensearch.sql.ast.expression.subquery.ExistsSubquery;
import org.opensearch.sql.ast.expression.subquery.InSubquery;
import org.opensearch.sql.ast.expression.subquery.ScalarSubquery;
import org.opensearch.sql.ast.expression.subquery.SubqueryExpression;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType;
import org.opensearch.sql.calcite.utils.CalciteUtils;
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
import org.opensearch.sql.calcite.utils.PlanUtils;
import org.opensearch.sql.common.utils.StringUtils;
Expand Down Expand Up @@ -463,7 +472,7 @@ private RexNode extractRexNodeFromAlias(RexNode node) {
public RexNode visitInSubquery(InSubquery node, CalcitePlanContext context) {
List<RexNode> nodes = node.getChild().stream().map(child -> analyze(child, context)).toList();
UnresolvedPlan subquery = node.getQuery();
RelNode subqueryRel = resolveSubqueryPlan(subquery, context);
RelNode subqueryRel = resolveSubqueryPlan(subquery, node, context);
if (subqueryRel.getRowType().getFieldCount() != nodes.size()) {
throw new SemanticCheckException(
"The number of columns in the left hand side of an IN subquery does not match the number"
Expand All @@ -487,7 +496,7 @@ public RexNode visitScalarSubquery(ScalarSubquery node, CalcitePlanContext conte
return context.relBuilder.scalarQuery(
b -> {
UnresolvedPlan subquery = node.getQuery();
return resolveSubqueryPlan(subquery, context);
return resolveSubqueryPlan(subquery, node, context);
});
}

Expand All @@ -496,21 +505,104 @@ public RexNode visitExistsSubquery(ExistsSubquery node, CalcitePlanContext conte
return context.relBuilder.exists(
b -> {
UnresolvedPlan subquery = node.getQuery();
return resolveSubqueryPlan(subquery, context);
return resolveSubqueryPlan(subquery, node, context);
});
}

private RelNode resolveSubqueryPlan(UnresolvedPlan subquery, CalcitePlanContext context) {
/** Insert a system_limit under correlate conditions. */
private RelNode insertSysLimitUnderCorrelateConditions(
LogicalFilter logicalFilter, CalcitePlanContext context) {
// Before:
// LogicalFilter(condition=[AND(=($cor0.SAL, $2), >($1, 1000.0:DECIMAL(5, 1)))])
// After:
// LogicalFilter(condition=[=($cor0.SAL, $2)])
// LogicalSystemLimit(fetch=[1], type=[SUBSEARCH_MAXOUT])
// LogicalFilter(condition=[>($1, 1000.0:DECIMAL(5, 1))])
RexNode originalCondition = logicalFilter.getCondition();
List<RexNode> conditions = RelOptUtil.conjunctions(originalCondition);
Pair<List<RexNode>, List<RexNode>> result =
CalciteUtils.partition(conditions, PlanUtils::containsCorrelVariable);
if (result.getLeft().isEmpty()) {
return logicalFilter;
}

RelNode input = logicalFilter.getInput();
if (!result.getRight().isEmpty()) {
RexNode nonCorrelCondition =
RexUtil.composeConjunction(context.rexBuilder, result.getRight());
input = LogicalFilter.create(input, nonCorrelCondition);
}
input =
LogicalSystemLimit.create(
SystemLimitType.SUBSEARCH_MAXOUT,
input,
context.relBuilder.literal(context.sysLimit.subsearchLimit()));
if (!result.getLeft().isEmpty()) {
RexNode correlCondition = RexUtil.composeConjunction(context.rexBuilder, result.getLeft());
input = LogicalFilter.create(input, correlCondition);
}
return input;
}

private RelNode resolveSubqueryPlan(
UnresolvedPlan subquery, SubqueryExpression subqueryExpression, CalcitePlanContext context) {
boolean isNestedSubquery = context.isResolvingSubquery();
context.setResolvingSubquery(true);
// clear and store the outer state
boolean isResolvingJoinConditionOuter = context.isResolvingJoinCondition();
if (isResolvingJoinConditionOuter) {
context.setResolvingJoinCondition(false);
}
RelNode subqueryRel = subquery.accept(planVisitor, context);
subquery.accept(planVisitor, context);

if (context.sysLimit.subsearchLimit() > 0) {
// add subsearch.maxout limit to subsearch
if (subqueryExpression instanceof ExistsSubquery) {
// For exists-subquery, we cannot add system limit to the top of subquery simply.
// Instead, add system limit under the correlated conditions.
RelNode replacement =
context
.relBuilder
.peek()
.accept(
new RelShuttleImpl() {
@Override
public RelNode visit(LogicalFilter filter) {
RelNode newFilter = insertSysLimitUnderCorrelateConditions(filter, context);
if (newFilter != filter) {
return newFilter;
}
return visit((RelNode) filter);
}

@Override
public RelNode visit(RelNode other) {
RelNode newInput =
other.getInputs().isEmpty() ? null : other.getInput(0).accept(this);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question]Will there be case that there is join or union in subsearch? In those case there will be more than 1 input for the specific operators? If so, the current code will construct incorrect plan.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in latest commit. For BiRel or SetOp, just return.

if (newInput == null || newInput == other.getInput(0)) {
return other;
}
return other.copy(other.getTraitSet(), Collections.singletonList(newInput));
}
});
planVisitor.replaceTop(context.relBuilder, replacement);
}
if (subqueryExpression instanceof InSubquery) {
Copy link
Collaborator

@qianheng-aws qianheng-aws Oct 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQL support correlate condition for in subquery or scalar subquery. So Calcite should support them as well.
e.g.

SELECT * FROM EMPLOYEE WHERE location in (select location from DEPART where EMPLOYEE.dept = DEPART.name) limit 1

If there is correlate condition for in or scalar subsearch, shall we do similar operation like above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the same logic for correlated in-subquery. For correlated scalar-subquery, since there is always an aggregation will be perform in subquery, sysLimit is not necessary.

// For in-subquery, add system limit to the top of subquery.
planVisitor.replaceTop(
context.relBuilder,
LogicalSystemLimit.create(
SystemLimitType.SUBSEARCH_MAXOUT,
context.relBuilder.peek(),
context.relBuilder.literal(context.sysLimit.subsearchLimit())));
}
}
// pop the inner plan
context.relBuilder.build();
RelNode subqueryRel = context.relBuilder.build();
// if maxout = 0, return empty results
if (context.sysLimit.subsearchLimit() == 0) {
subqueryRel = context.relBuilder.values(subqueryRel.getRowType()).build();
}
// clear the exists subquery resolving state
// restore to the previous state
if (isResolvingJoinConditionOuter) {
Expand Down
26 changes: 26 additions & 0 deletions core/src/main/java/org/opensearch/sql/calcite/SysLimit.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite;

import org.opensearch.sql.common.setting.Settings;

public record SysLimit(Integer querySizeLimit, Integer subsearchLimit, Integer joinSubsearchLimit) {
/** Create SysLimit from Settings. */
public static SysLimit fromSettings(Settings settings) {
return settings == null
? UNLIMITED_SUBSEARCH
: new SysLimit(
settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT),
settings.getSettingValue(Settings.Key.PPL_SUBSEARCH_MAXOUT),
settings.getSettingValue(Settings.Key.PPL_JOIN_SUBSEARCH_MAXOUT));
}

/** No limitation on subsearch */
public static SysLimit UNLIMITED_SUBSEARCH = new SysLimit(10000, -1, -1);

/** For testing only */
public static SysLimit DEFAULT = new SysLimit(10000, 10000, 50000);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ public enum SystemLimitType {
*
* <p>This type is used to indicate that the limit is applied to the system level.
*/
QUERY_SIZE_LIMIT
QUERY_SIZE_LIMIT,
/** The max output from subsearch to join against. */
JOIN_SUBSEARCH_MAXOUT,
/** Max output to return from a subsearch. */
SUBSEARCH_MAXOUT,
}

@Getter private final SystemLimitType type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@

import static org.opensearch.sql.common.setting.Settings.Key.CALCITE_ENGINE_ENABLED;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import lombok.experimental.UtilityClass;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;

@UtilityClass
public class CalciteUtils {
Expand All @@ -16,4 +23,10 @@ public static UnsupportedOperationException getOnlyForCalciteException(String fe
return new UnsupportedOperationException(
feature + " is supported only when " + CALCITE_ENGINE_ENABLED.getKeyValue() + "=true");
}

public static <T> Pair<List<T>, List<T>> partition(
Collection<T> collection, Predicate<T> predicate) {
Map<Boolean, List<T>> map = collection.stream().collect(Collectors.partitioningBy(predicate));
return new ImmutablePair<>(map.get(true), map.get(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexCorrelVariable;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexOver;
Expand Down Expand Up @@ -433,4 +434,25 @@ public static String getActualSignature(List<RelDataType> argTypes) {
.collect(Collectors.joining(","))
+ "]";
}

/**
* Check if the RexNode contains any CorrelVariable.
*
* @param node the RexNode to check
* @return true if the RexNode contains any CorrelVariable, false otherwise
*/
static boolean containsCorrelVariable(RexNode node) {
try {
node.accept(
new RexVisitorImpl<Void>(true) {
@Override
public Void visitCorrelVariable(RexCorrelVariable correlVar) {
throw new RuntimeException("Correl found");
}
});
return false;
} catch (Exception e) {
return true;
}
}
}
Loading
Loading