Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public enum Key {
PPL_REX_MAX_MATCH_LIMIT("plugins.ppl.rex.max_match.limit"),
PPL_VALUES_MAX_LIMIT("plugins.ppl.values.max.limit"),
PPL_SYNTAX_LEGACY_PREFERRED("plugins.ppl.syntax.legacy.preferred"),
PPL_SUBSEARCH_MAXOUT("plugins.ppl.subsearch.maxout"),
PPL_JOIN_SUBSEARCH_MAXOUT("plugins.ppl.join.subsearch_maxout"),

/** Enable Calcite as execution engine */
CALCITE_ENGINE_ENABLED("plugins.calcite.enabled"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class CalcitePlanContext {
public final ExtendedRexBuilder rexBuilder;
public final FunctionProperties functionProperties;
public final QueryType queryType;
public final Integer querySizeLimit;
public final SysLimit sysLimit;

/** This thread local variable is only used to skip script encoding in script pushdown. */
public static final ThreadLocal<Boolean> skipEncoding = ThreadLocal.withInitial(() -> false);
Expand All @@ -61,9 +61,9 @@ public class CalcitePlanContext {

@Getter public Map<String, RexLambdaRef> rexLambdaRefMap;

private CalcitePlanContext(FrameworkConfig config, Integer querySizeLimit, QueryType queryType) {
private CalcitePlanContext(FrameworkConfig config, SysLimit sysLimit, QueryType queryType) {
this.config = config;
this.querySizeLimit = querySizeLimit;
this.sysLimit = sysLimit;
this.queryType = queryType;
this.connection = CalciteToolsHelper.connect(config, TYPE_FACTORY);
this.relBuilder = CalciteToolsHelper.create(config, TYPE_FACTORY, connection);
Expand Down Expand Up @@ -102,12 +102,12 @@ public Optional<RexCorrelVariable> peekCorrelVar() {
}

public CalcitePlanContext clone() {
return new CalcitePlanContext(config, querySizeLimit, queryType);
return new CalcitePlanContext(config, sysLimit, queryType);
}

public static CalcitePlanContext create(
FrameworkConfig config, Integer querySizeLimit, QueryType queryType) {
return new CalcitePlanContext(config, querySizeLimit, queryType);
FrameworkConfig config, SysLimit sysLimit, QueryType queryType) {
return new CalcitePlanContext(config, sysLimit, queryType);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.ast.tree.Window;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType;
import org.opensearch.sql.calcite.plan.OpenSearchConstants;
import org.opensearch.sql.calcite.utils.BinUtils;
import org.opensearch.sql.calcite.utils.JoinAndLookupUtils;
Expand Down Expand Up @@ -1139,6 +1141,15 @@ private Optional<RexLiteral> extractAliasLiteral(RexNode node) {
public RelNode visitJoin(Join node, CalcitePlanContext context) {
List<UnresolvedPlan> children = node.getChildren();
children.forEach(c -> analyze(c, context));
// add join.subsearch_maxout limit to subsearch side
if (context.sysLimit.joinSubsearchLimit() >= 0) {
PlanUtils.replaceTop(
context.relBuilder,
LogicalSystemLimit.create(
SystemLimitType.JOIN_SUBSEARCH_MAXOUT,
context.relBuilder.peek(),
context.relBuilder.literal(context.sysLimit.joinSubsearchLimit())));
}
if (node.getJoinCondition().isEmpty()) {
// join-with-field-list grammar
List<String> leftColumns = context.relBuilder.peek(1).getRowType().getFieldNames();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,13 @@
import org.opensearch.sql.ast.expression.subquery.ExistsSubquery;
import org.opensearch.sql.ast.expression.subquery.InSubquery;
import org.opensearch.sql.ast.expression.subquery.ScalarSubquery;
import org.opensearch.sql.ast.expression.subquery.SubqueryExpression;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType;
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
import org.opensearch.sql.calcite.utils.PlanUtils;
import org.opensearch.sql.calcite.utils.SubsearchUtils;
import org.opensearch.sql.common.utils.StringUtils;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.exception.CalciteUnsupportedException;
Expand Down Expand Up @@ -466,32 +470,31 @@ private RexNode extractRexNodeFromAlias(RexNode node) {
public RexNode visitInSubquery(InSubquery node, CalcitePlanContext context) {
List<RexNode> nodes = node.getChild().stream().map(child -> analyze(child, context)).collect(Collectors.toList());
UnresolvedPlan subquery = node.getQuery();
RelNode subqueryRel = resolveSubqueryPlan(subquery, context);
try {
return context.relBuilder.in(subqueryRel, nodes);
// TODO
// The {@link org.apache.calcite.tools.RelBuilder#in(RexNode,java.util.function.Function)}
// only support one expression. Change to follow code after calcite fixed.
// return context.relBuilder.in(
// nodes.getFirst(),
// b -> {
// RelNode subqueryRel = subquery.accept(planVisitor, context);
// b.build();
// return subqueryRel;
// });
} catch (AssertionError e) {
RelNode subqueryRel = resolveSubqueryPlan(subquery, node, context);
if (subqueryRel.getRowType().getFieldCount() != nodes.size()) {
throw new SemanticCheckException(
"The number of columns in the left hand side of an IN subquery does not match the number"
+ " of columns in the output of subquery");
}
// TODO
// The {@link org.apache.calcite.tools.RelBuilder#in(RexNode,java.util.function.Function)}
// only support one expression. Change to follow code after calcite fixed.
// return context.relBuilder.in(
// nodes.getFirst(),
// b -> {
// RelNode subqueryRel = subquery.accept(planVisitor, context);
// b.build();
// return subqueryRel;
// });
return context.relBuilder.in(subqueryRel, nodes);
}

@Override
public RexNode visitScalarSubquery(ScalarSubquery node, CalcitePlanContext context) {
return context.relBuilder.scalarQuery(
b -> {
UnresolvedPlan subquery = node.getQuery();
return resolveSubqueryPlan(subquery, context);
return resolveSubqueryPlan(subquery, node, context);
});
}

Expand All @@ -500,21 +503,44 @@ public RexNode visitExistsSubquery(ExistsSubquery node, CalcitePlanContext conte
return context.relBuilder.exists(
b -> {
UnresolvedPlan subquery = node.getQuery();
return resolveSubqueryPlan(subquery, context);
return resolveSubqueryPlan(subquery, node, context);
});
}

private RelNode resolveSubqueryPlan(UnresolvedPlan subquery, CalcitePlanContext context) {
private RelNode resolveSubqueryPlan(
UnresolvedPlan subquery, SubqueryExpression subqueryExpression, CalcitePlanContext context) {
boolean isNestedSubquery = context.isResolvingSubquery();
context.setResolvingSubquery(true);
// clear and store the outer state
boolean isResolvingJoinConditionOuter = context.isResolvingJoinCondition();
if (isResolvingJoinConditionOuter) {
context.setResolvingJoinCondition(false);
}
RelNode subqueryRel = subquery.accept(planVisitor, context);
subquery.accept(planVisitor, context);

if (context.sysLimit.subsearchLimit() > 0 && !(subqueryExpression instanceof ScalarSubquery)) {
// Add subsearch.maxout limit to exists-in subsearch:
// Cannot add system limit to the top of subquery simply.
// Instead, add system limit under the correlated conditions.
SubsearchUtils.SystemLimitInsertionShuttle shuttle =
new SubsearchUtils.SystemLimitInsertionShuttle(context);
RelNode replacement = context.relBuilder.peek().accept(shuttle);
if (!shuttle.isCorrelatedConditionFound()) {
// If no correlated condition found, add system limit to the top of subquery.
replacement =
LogicalSystemLimit.create(
SystemLimitType.SUBSEARCH_MAXOUT,
replacement,
context.relBuilder.literal(context.sysLimit.subsearchLimit()));
}
PlanUtils.replaceTop(context.relBuilder, replacement);
}
// pop the inner plan
context.relBuilder.build();
RelNode subqueryRel = context.relBuilder.build();
// if maxout = 0, return empty results
if (context.sysLimit.subsearchLimit() == 0) {
subqueryRel = context.relBuilder.values(subqueryRel.getRowType()).build();
}
// clear the exists subquery resolving state
// restore to the previous state
if (isResolvingJoinConditionOuter) {
Expand Down
46 changes: 46 additions & 0 deletions core/src/main/java/org/opensearch/sql/calcite/SysLimit.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite;

import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.common.setting.Settings;

@RequiredArgsConstructor
@EqualsAndHashCode
public class SysLimit {
private final Integer querySizeLimit;
private final Integer subsearchLimit;
private final Integer joinSubsearchLimit;

public Integer querySizeLimit() {
return querySizeLimit;
}

public Integer subsearchLimit() {
return subsearchLimit;
}

public Integer joinSubsearchLimit() {
return joinSubsearchLimit;
}

/** Create SysLimit from Settings. */
public static SysLimit fromSettings(Settings settings) {
return settings == null
? UNLIMITED_SUBSEARCH
: new SysLimit(
settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT),
settings.getSettingValue(Settings.Key.PPL_SUBSEARCH_MAXOUT),
settings.getSettingValue(Settings.Key.PPL_JOIN_SUBSEARCH_MAXOUT));
}

/** No limitation on subsearch */
public static SysLimit UNLIMITED_SUBSEARCH = new SysLimit(10000, -1, -1);

/** For testing only */
public static SysLimit DEFAULT = new SysLimit(10000, 10000, 50000);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ public enum SystemLimitType {
*
* <p>This type is used to indicate that the limit is applied to the system level.
*/
QUERY_SIZE_LIMIT
QUERY_SIZE_LIMIT,
/** The max output from subsearch to join against. */
JOIN_SUBSEARCH_MAXOUT,
/** Max output to return from a subsearch. */
SUBSEARCH_MAXOUT,
}

@Getter private final SystemLimitType type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@

import static org.opensearch.sql.common.setting.Settings.Key.CALCITE_ENGINE_ENABLED;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import lombok.experimental.UtilityClass;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;

@UtilityClass
public class CalciteUtils {
Expand All @@ -16,4 +23,10 @@ public static UnsupportedOperationException getOnlyForCalciteException(String fe
return new UnsupportedOperationException(
feature + " is supported only when " + CALCITE_ENGINE_ENABLED.getKeyValue() + "=true");
}

public static <T> Pair<List<T>, List<T>> partition(
Collection<T> collection, Predicate<T> predicate) {
Map<Boolean, List<T>> map = collection.stream().collect(Collectors.partitioningBy(predicate));
return new ImmutablePair<>(map.get(true), map.get(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static org.apache.calcite.rex.RexWindowBounds.preceding;

import com.google.common.collect.ImmutableList;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand All @@ -26,6 +27,7 @@
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexCorrelVariable;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexOver;
Expand Down Expand Up @@ -460,4 +462,36 @@ public static String getActualSignature(List<RelDataType> argTypes) {
.collect(Collectors.joining(","))
+ "]";
}

/**
* Check if the RexNode contains any CorrelVariable.
*
* @param node the RexNode to check
* @return true if the RexNode contains any CorrelVariable, false otherwise
*/
static boolean containsCorrelVariable(RexNode node) {
try {
node.accept(
new RexVisitorImpl<Void>(true) {
@Override
public Void visitCorrelVariable(RexCorrelVariable correlVar) {
throw new RuntimeException("Correl found");
}
});
return false;
} catch (Exception e) {
return true;
}
}

/** Adds a rel node to the top of the stack while preserving the field names and aliases. */
static void replaceTop(RelBuilder relBuilder, RelNode relNode) {
try {
Method method = RelBuilder.class.getDeclaredMethod("replaceTop", RelNode.class);
method.setAccessible(true);
method.invoke(relBuilder, relNode);
} catch (Exception e) {
throw new IllegalStateException("Unable to invoke RelBuilder.replaceTop", e);
}
}
}
Loading
Loading