Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public enum Key {
PPL_REX_MAX_MATCH_LIMIT("plugins.ppl.rex.max_match.limit"),
PPL_VALUES_MAX_LIMIT("plugins.ppl.values.max.limit"),
PPL_SYNTAX_LEGACY_PREFERRED("plugins.ppl.syntax.legacy.preferred"),
PPL_SUBSEARCH_MAXOUT("plugins.ppl.subsearch.maxout"),
PPL_JOIN_SUBSEARCH_MAXOUT("plugins.ppl.join.subsearch_maxout"),

/** Enable Calcite as execution engine */
CALCITE_ENGINE_ENABLED("plugins.calcite.enabled"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class CalcitePlanContext {
public final ExtendedRexBuilder rexBuilder;
public final FunctionProperties functionProperties;
public final QueryType queryType;
public final Integer querySizeLimit;
public final SysLimit sysLimit;

/** This thread local variable is only used to skip script encoding in script pushdown. */
public static final ThreadLocal<Boolean> skipEncoding = ThreadLocal.withInitial(() -> false);
Expand All @@ -61,9 +61,9 @@ public class CalcitePlanContext {

@Getter public Map<String, RexLambdaRef> rexLambdaRefMap;

private CalcitePlanContext(FrameworkConfig config, Integer querySizeLimit, QueryType queryType) {
private CalcitePlanContext(FrameworkConfig config, SysLimit sysLimit, QueryType queryType) {
this.config = config;
this.querySizeLimit = querySizeLimit;
this.sysLimit = sysLimit;
this.queryType = queryType;
this.connection = CalciteToolsHelper.connect(config, TYPE_FACTORY);
this.relBuilder = CalciteToolsHelper.create(config, TYPE_FACTORY, connection);
Expand Down Expand Up @@ -102,12 +102,12 @@ public Optional<RexCorrelVariable> peekCorrelVar() {
}

public CalcitePlanContext clone() {
return new CalcitePlanContext(config, querySizeLimit, queryType);
return new CalcitePlanContext(config, sysLimit, queryType);
}

public static CalcitePlanContext create(
FrameworkConfig config, Integer querySizeLimit, QueryType queryType) {
return new CalcitePlanContext(config, querySizeLimit, queryType);
FrameworkConfig config, SysLimit sysLimit, QueryType queryType) {
return new CalcitePlanContext(config, sysLimit, queryType);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.ast.tree.Window;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType;
import org.opensearch.sql.calcite.plan.OpenSearchConstants;
import org.opensearch.sql.calcite.utils.BinUtils;
import org.opensearch.sql.calcite.utils.JoinAndLookupUtils;
Expand Down Expand Up @@ -1136,6 +1138,15 @@ private Optional<RexLiteral> extractAliasLiteral(RexNode node) {
public RelNode visitJoin(Join node, CalcitePlanContext context) {
List<UnresolvedPlan> children = node.getChildren();
children.forEach(c -> analyze(c, context));
// add join.subsearch_maxout limit to subsearch side
if (context.sysLimit.joinSubsearchLimit() >= 0) {
PlanUtils.replaceTop(
context.relBuilder,
LogicalSystemLimit.create(
SystemLimitType.JOIN_SUBSEARCH_MAXOUT,
context.relBuilder.peek(),
context.relBuilder.literal(context.sysLimit.joinSubsearchLimit())));
}
if (node.getJoinCondition().isEmpty()) {
// join-with-field-list grammar
List<String> leftColumns = context.relBuilder.peek(1).getRowType().getFieldNames();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,13 @@
import org.opensearch.sql.ast.expression.subquery.ExistsSubquery;
import org.opensearch.sql.ast.expression.subquery.InSubquery;
import org.opensearch.sql.ast.expression.subquery.ScalarSubquery;
import org.opensearch.sql.ast.expression.subquery.SubqueryExpression;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType;
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
import org.opensearch.sql.calcite.utils.PlanUtils;
import org.opensearch.sql.calcite.utils.SubsearchUtils;
import org.opensearch.sql.common.utils.StringUtils;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.exception.CalciteUnsupportedException;
Expand Down Expand Up @@ -465,7 +469,7 @@ private RexNode extractRexNodeFromAlias(RexNode node) {
public RexNode visitInSubquery(InSubquery node, CalcitePlanContext context) {
List<RexNode> nodes = node.getChild().stream().map(child -> analyze(child, context)).toList();
UnresolvedPlan subquery = node.getQuery();
RelNode subqueryRel = resolveSubqueryPlan(subquery, context);
RelNode subqueryRel = resolveSubqueryPlan(subquery, node, context);
if (subqueryRel.getRowType().getFieldCount() != nodes.size()) {
throw new SemanticCheckException(
"The number of columns in the left hand side of an IN subquery does not match the number"
Expand All @@ -489,7 +493,7 @@ public RexNode visitScalarSubquery(ScalarSubquery node, CalcitePlanContext conte
return context.relBuilder.scalarQuery(
b -> {
UnresolvedPlan subquery = node.getQuery();
return resolveSubqueryPlan(subquery, context);
return resolveSubqueryPlan(subquery, node, context);
});
}

Expand All @@ -498,21 +502,44 @@ public RexNode visitExistsSubquery(ExistsSubquery node, CalcitePlanContext conte
return context.relBuilder.exists(
b -> {
UnresolvedPlan subquery = node.getQuery();
return resolveSubqueryPlan(subquery, context);
return resolveSubqueryPlan(subquery, node, context);
});
}

private RelNode resolveSubqueryPlan(UnresolvedPlan subquery, CalcitePlanContext context) {
private RelNode resolveSubqueryPlan(
UnresolvedPlan subquery, SubqueryExpression subqueryExpression, CalcitePlanContext context) {
boolean isNestedSubquery = context.isResolvingSubquery();
context.setResolvingSubquery(true);
// clear and store the outer state
boolean isResolvingJoinConditionOuter = context.isResolvingJoinCondition();
if (isResolvingJoinConditionOuter) {
context.setResolvingJoinCondition(false);
}
RelNode subqueryRel = subquery.accept(planVisitor, context);
subquery.accept(planVisitor, context);

if (context.sysLimit.subsearchLimit() > 0 && !(subqueryExpression instanceof ScalarSubquery)) {
// Add subsearch.maxout limit to exists-in subsearch:
// Cannot add system limit to the top of subquery simply.
// Instead, add system limit under the correlated conditions.
SubsearchUtils.SystemLimitInsertionShuttle shuttle =
new SubsearchUtils.SystemLimitInsertionShuttle(context);
RelNode replacement = context.relBuilder.peek().accept(shuttle);
if (!shuttle.isCorrelatedConditionFound()) {
// If no correlated condition found, add system limit to the top of subquery.
replacement =
LogicalSystemLimit.create(
SystemLimitType.SUBSEARCH_MAXOUT,
replacement,
context.relBuilder.literal(context.sysLimit.subsearchLimit()));
}
PlanUtils.replaceTop(context.relBuilder, replacement);
}
// pop the inner plan
context.relBuilder.build();
RelNode subqueryRel = context.relBuilder.build();
// if maxout = 0, return empty results
if (context.sysLimit.subsearchLimit() == 0) {
subqueryRel = context.relBuilder.values(subqueryRel.getRowType()).build();
}
// clear the exists subquery resolving state
// restore to the previous state
if (isResolvingJoinConditionOuter) {
Expand Down
26 changes: 26 additions & 0 deletions core/src/main/java/org/opensearch/sql/calcite/SysLimit.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite;

import org.opensearch.sql.common.setting.Settings;

public record SysLimit(Integer querySizeLimit, Integer subsearchLimit, Integer joinSubsearchLimit) {
/** Create SysLimit from Settings. */
public static SysLimit fromSettings(Settings settings) {
return settings == null
? UNLIMITED_SUBSEARCH
: new SysLimit(
settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT),
settings.getSettingValue(Settings.Key.PPL_SUBSEARCH_MAXOUT),
settings.getSettingValue(Settings.Key.PPL_JOIN_SUBSEARCH_MAXOUT));
}

/** No limitation on subsearch */
public static SysLimit UNLIMITED_SUBSEARCH = new SysLimit(10000, -1, -1);

/** For testing only */
public static SysLimit DEFAULT = new SysLimit(10000, 10000, 50000);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ public enum SystemLimitType {
*
* <p>This type is used to indicate that the limit is applied to the system level.
*/
QUERY_SIZE_LIMIT
QUERY_SIZE_LIMIT,
/** The max output from subsearch to join against. */
JOIN_SUBSEARCH_MAXOUT,
/** Max output to return from a subsearch. */
SUBSEARCH_MAXOUT,
}

@Getter private final SystemLimitType type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@

import static org.opensearch.sql.common.setting.Settings.Key.CALCITE_ENGINE_ENABLED;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import lombok.experimental.UtilityClass;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;

@UtilityClass
public class CalciteUtils {
Expand All @@ -16,4 +23,10 @@ public static UnsupportedOperationException getOnlyForCalciteException(String fe
return new UnsupportedOperationException(
feature + " is supported only when " + CALCITE_ENGINE_ENABLED.getKeyValue() + "=true");
}

public static <T> Pair<List<T>, List<T>> partition(
Collection<T> collection, Predicate<T> predicate) {
Map<Boolean, List<T>> map = collection.stream().collect(Collectors.partitioningBy(predicate));
return new ImmutablePair<>(map.get(true), map.get(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static org.apache.calcite.rex.RexWindowBounds.preceding;

import com.google.common.collect.ImmutableList;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand All @@ -26,6 +27,7 @@
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexCorrelVariable;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexOver;
Expand Down Expand Up @@ -433,4 +435,36 @@ public static String getActualSignature(List<RelDataType> argTypes) {
.collect(Collectors.joining(","))
+ "]";
}

/**
* Check if the RexNode contains any CorrelVariable.
*
* @param node the RexNode to check
* @return true if the RexNode contains any CorrelVariable, false otherwise
*/
static boolean containsCorrelVariable(RexNode node) {
try {
node.accept(
new RexVisitorImpl<Void>(true) {
@Override
public Void visitCorrelVariable(RexCorrelVariable correlVar) {
throw new RuntimeException("Correl found");
}
});
return false;
} catch (Exception e) {
return true;
}
}

/** Adds a rel node to the top of the stack while preserving the field names and aliases. */
static void replaceTop(RelBuilder relBuilder, RelNode relNode) {
try {
Method method = RelBuilder.class.getDeclaredMethod("replaceTop", RelNode.class);
method.setAccessible(true);
method.invoke(relBuilder, relNode);
} catch (Exception e) {
throw new IllegalStateException("Unable to invoke RelBuilder.replaceTop", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.utils;

import java.util.List;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.experimental.UtilityClass;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttleImpl;
import org.apache.calcite.rel.logical.LogicalCorrelate;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalIntersect;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalMinus;
import org.apache.calcite.rel.logical.LogicalUnion;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.calcite.CalcitePlanContext;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;

@UtilityClass
public class SubsearchUtils {

/** Insert a system_limit under correlate conditions. */
private static RelNode insertSysLimitUnderCorrelateConditions(
LogicalFilter logicalFilter, CalcitePlanContext context) {
// Before:
// LogicalFilter(condition=[AND(=($cor0.SAL, $2), >($1, 1000.0:DECIMAL(5, 1)))])
// After:
// LogicalFilter(condition=[=($cor0.SAL, $2)])
// LogicalSystemLimit(fetch=[1], type=[SUBSEARCH_MAXOUT])
// LogicalFilter(condition=[>($1, 1000.0:DECIMAL(5, 1))])
RexNode originalCondition = logicalFilter.getCondition();
List<RexNode> conditions = RelOptUtil.conjunctions(originalCondition);
Pair<List<RexNode>, List<RexNode>> result =
CalciteUtils.partition(conditions, PlanUtils::containsCorrelVariable);
if (result.getLeft().isEmpty()) {
return logicalFilter;
}

RelNode input = logicalFilter.getInput();
if (!result.getRight().isEmpty()) {
RexNode nonCorrelCondition =
RexUtil.composeConjunction(context.rexBuilder, result.getRight());
input = LogicalFilter.create(input, nonCorrelCondition);
}
input =
LogicalSystemLimit.create(
LogicalSystemLimit.SystemLimitType.SUBSEARCH_MAXOUT,
input,
context.relBuilder.literal(context.sysLimit.subsearchLimit()));
if (!result.getLeft().isEmpty()) {
RexNode correlCondition = RexUtil.composeConjunction(context.rexBuilder, result.getLeft());
input = LogicalFilter.create(input, correlCondition);
}
return input;
}

/** Insert a system_limit under correlated conditions by visiting a plan tree. */
@RequiredArgsConstructor
public static class SystemLimitInsertionShuttle extends RelShuttleImpl {

private final CalcitePlanContext context;
@Getter private boolean correlatedConditionFound = false;

@Override
public RelNode visit(LogicalFilter filter) {
RelNode newFilter = insertSysLimitUnderCorrelateConditions(filter, context);
if (newFilter != filter) {
correlatedConditionFound = true;
return newFilter;
}
return super.visitChildren(filter);
}

@Override
public RelNode visit(LogicalJoin node) {
return node;
}

@Override
public RelNode visit(LogicalCorrelate node) {
return node;
}

@Override
public RelNode visit(LogicalUnion node) {
return node;
}

@Override
public RelNode visit(LogicalIntersect node) {
return node;
}

@Override
public RelNode visit(LogicalMinus node) {
return node;
}
}
}
Loading
Loading