diff --git a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java index 32729696658..878cad52196 100644 --- a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java +++ b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java @@ -33,6 +33,8 @@ public enum Key { PPL_REX_MAX_MATCH_LIMIT("plugins.ppl.rex.max_match.limit"), PPL_VALUES_MAX_LIMIT("plugins.ppl.values.max.limit"), PPL_SYNTAX_LEGACY_PREFERRED("plugins.ppl.syntax.legacy.preferred"), + PPL_SUBSEARCH_MAXOUT("plugins.ppl.subsearch.maxout"), + PPL_JOIN_SUBSEARCH_MAXOUT("plugins.ppl.join.subsearch_maxout"), /** Enable Calcite as execution engine */ CALCITE_ENGINE_ENABLED("plugins.calcite.enabled"), diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java index 4586f973a09..669d8452dc0 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java @@ -35,7 +35,7 @@ public class CalcitePlanContext { public final ExtendedRexBuilder rexBuilder; public final FunctionProperties functionProperties; public final QueryType queryType; - public final Integer querySizeLimit; + public final SysLimit sysLimit; /** This thread local variable is only used to skip script encoding in script pushdown. */ public static final ThreadLocal skipEncoding = ThreadLocal.withInitial(() -> false); @@ -61,9 +61,9 @@ public class CalcitePlanContext { @Getter public Map rexLambdaRefMap; - private CalcitePlanContext(FrameworkConfig config, Integer querySizeLimit, QueryType queryType) { + private CalcitePlanContext(FrameworkConfig config, SysLimit sysLimit, QueryType queryType) { this.config = config; - this.querySizeLimit = querySizeLimit; + this.sysLimit = sysLimit; this.queryType = queryType; this.connection = CalciteToolsHelper.connect(config, TYPE_FACTORY); this.relBuilder = CalciteToolsHelper.create(config, TYPE_FACTORY, connection); @@ -102,12 +102,12 @@ public Optional peekCorrelVar() { } public CalcitePlanContext clone() { - return new CalcitePlanContext(config, querySizeLimit, queryType); + return new CalcitePlanContext(config, sysLimit, queryType); } public static CalcitePlanContext create( - FrameworkConfig config, Integer querySizeLimit, QueryType queryType) { - return new CalcitePlanContext(config, querySizeLimit, queryType); + FrameworkConfig config, SysLimit sysLimit, QueryType queryType) { + return new CalcitePlanContext(config, sysLimit, queryType); } /** diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index f4a37acd280..8abee3e5a8e 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -133,6 +133,8 @@ import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.ast.tree.Values; import org.opensearch.sql.ast.tree.Window; +import org.opensearch.sql.calcite.plan.LogicalSystemLimit; +import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType; import org.opensearch.sql.calcite.plan.OpenSearchConstants; import org.opensearch.sql.calcite.utils.BinUtils; import org.opensearch.sql.calcite.utils.JoinAndLookupUtils; @@ -1136,6 +1138,15 @@ private Optional extractAliasLiteral(RexNode node) { public RelNode visitJoin(Join node, CalcitePlanContext context) { List children = node.getChildren(); children.forEach(c -> analyze(c, context)); + // add join.subsearch_maxout limit to subsearch side + if (context.sysLimit.joinSubsearchLimit() >= 0) { + PlanUtils.replaceTop( + context.relBuilder, + LogicalSystemLimit.create( + SystemLimitType.JOIN_SUBSEARCH_MAXOUT, + context.relBuilder.peek(), + context.relBuilder.literal(context.sysLimit.joinSubsearchLimit()))); + } if (node.getJoinCondition().isEmpty()) { // join-with-field-list grammar List leftColumns = context.relBuilder.peek(1).getRowType().getFieldNames(); diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java index f1f72dfb2cd..8e06894c2b3 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java @@ -68,9 +68,13 @@ import org.opensearch.sql.ast.expression.subquery.ExistsSubquery; import org.opensearch.sql.ast.expression.subquery.InSubquery; import org.opensearch.sql.ast.expression.subquery.ScalarSubquery; +import org.opensearch.sql.ast.expression.subquery.SubqueryExpression; import org.opensearch.sql.ast.tree.UnresolvedPlan; +import org.opensearch.sql.calcite.plan.LogicalSystemLimit; +import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType; import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory; import org.opensearch.sql.calcite.utils.PlanUtils; +import org.opensearch.sql.calcite.utils.SubsearchUtils; import org.opensearch.sql.common.utils.StringUtils; import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.exception.CalciteUnsupportedException; @@ -465,7 +469,7 @@ private RexNode extractRexNodeFromAlias(RexNode node) { public RexNode visitInSubquery(InSubquery node, CalcitePlanContext context) { List nodes = node.getChild().stream().map(child -> analyze(child, context)).toList(); UnresolvedPlan subquery = node.getQuery(); - RelNode subqueryRel = resolveSubqueryPlan(subquery, context); + RelNode subqueryRel = resolveSubqueryPlan(subquery, node, context); if (subqueryRel.getRowType().getFieldCount() != nodes.size()) { throw new SemanticCheckException( "The number of columns in the left hand side of an IN subquery does not match the number" @@ -489,7 +493,7 @@ public RexNode visitScalarSubquery(ScalarSubquery node, CalcitePlanContext conte return context.relBuilder.scalarQuery( b -> { UnresolvedPlan subquery = node.getQuery(); - return resolveSubqueryPlan(subquery, context); + return resolveSubqueryPlan(subquery, node, context); }); } @@ -498,11 +502,12 @@ public RexNode visitExistsSubquery(ExistsSubquery node, CalcitePlanContext conte return context.relBuilder.exists( b -> { UnresolvedPlan subquery = node.getQuery(); - return resolveSubqueryPlan(subquery, context); + return resolveSubqueryPlan(subquery, node, context); }); } - private RelNode resolveSubqueryPlan(UnresolvedPlan subquery, CalcitePlanContext context) { + private RelNode resolveSubqueryPlan( + UnresolvedPlan subquery, SubqueryExpression subqueryExpression, CalcitePlanContext context) { boolean isNestedSubquery = context.isResolvingSubquery(); context.setResolvingSubquery(true); // clear and store the outer state @@ -510,9 +515,31 @@ private RelNode resolveSubqueryPlan(UnresolvedPlan subquery, CalcitePlanContext if (isResolvingJoinConditionOuter) { context.setResolvingJoinCondition(false); } - RelNode subqueryRel = subquery.accept(planVisitor, context); + subquery.accept(planVisitor, context); + + if (context.sysLimit.subsearchLimit() > 0 && !(subqueryExpression instanceof ScalarSubquery)) { + // Add subsearch.maxout limit to exists-in subsearch: + // Cannot add system limit to the top of subquery simply. + // Instead, add system limit under the correlated conditions. + SubsearchUtils.SystemLimitInsertionShuttle shuttle = + new SubsearchUtils.SystemLimitInsertionShuttle(context); + RelNode replacement = context.relBuilder.peek().accept(shuttle); + if (!shuttle.isCorrelatedConditionFound()) { + // If no correlated condition found, add system limit to the top of subquery. + replacement = + LogicalSystemLimit.create( + SystemLimitType.SUBSEARCH_MAXOUT, + replacement, + context.relBuilder.literal(context.sysLimit.subsearchLimit())); + } + PlanUtils.replaceTop(context.relBuilder, replacement); + } // pop the inner plan - context.relBuilder.build(); + RelNode subqueryRel = context.relBuilder.build(); + // if maxout = 0, return empty results + if (context.sysLimit.subsearchLimit() == 0) { + subqueryRel = context.relBuilder.values(subqueryRel.getRowType()).build(); + } // clear the exists subquery resolving state // restore to the previous state if (isResolvingJoinConditionOuter) { diff --git a/core/src/main/java/org/opensearch/sql/calcite/SysLimit.java b/core/src/main/java/org/opensearch/sql/calcite/SysLimit.java new file mode 100644 index 00000000000..2a17eb31bba --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/SysLimit.java @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite; + +import org.opensearch.sql.common.setting.Settings; + +public record SysLimit(Integer querySizeLimit, Integer subsearchLimit, Integer joinSubsearchLimit) { + /** Create SysLimit from Settings. */ + public static SysLimit fromSettings(Settings settings) { + return settings == null + ? UNLIMITED_SUBSEARCH + : new SysLimit( + settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT), + settings.getSettingValue(Settings.Key.PPL_SUBSEARCH_MAXOUT), + settings.getSettingValue(Settings.Key.PPL_JOIN_SUBSEARCH_MAXOUT)); + } + + /** No limitation on subsearch */ + public static SysLimit UNLIMITED_SUBSEARCH = new SysLimit(10000, -1, -1); + + /** For testing only */ + public static SysLimit DEFAULT = new SysLimit(10000, 10000, 50000); +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/LogicalSystemLimit.java b/core/src/main/java/org/opensearch/sql/calcite/plan/LogicalSystemLimit.java index f910d6dcc61..c33854ebe52 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/plan/LogicalSystemLimit.java +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/LogicalSystemLimit.java @@ -30,7 +30,11 @@ public enum SystemLimitType { * *

This type is used to indicate that the limit is applied to the system level. */ - QUERY_SIZE_LIMIT + QUERY_SIZE_LIMIT, + /** The max output from subsearch to join against. */ + JOIN_SUBSEARCH_MAXOUT, + /** Max output to return from a subsearch. */ + SUBSEARCH_MAXOUT, } @Getter private final SystemLimitType type; diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteUtils.java b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteUtils.java index e995d7efd52..a76fa4a39de 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteUtils.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteUtils.java @@ -7,7 +7,14 @@ import static org.opensearch.sql.common.setting.Settings.Key.CALCITE_ENGINE_ENABLED; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; +import java.util.stream.Collectors; import lombok.experimental.UtilityClass; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; @UtilityClass public class CalciteUtils { @@ -16,4 +23,10 @@ public static UnsupportedOperationException getOnlyForCalciteException(String fe return new UnsupportedOperationException( feature + " is supported only when " + CALCITE_ENGINE_ENABLED.getKeyValue() + "=true"); } + + public static Pair, List> partition( + Collection collection, Predicate predicate) { + Map> map = collection.stream().collect(Collectors.partitioningBy(predicate)); + return new ImmutablePair<>(map.get(true), map.get(false)); + } } diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java b/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java index 4d3bef062fa..41fc8714e17 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java @@ -12,6 +12,7 @@ import static org.apache.calcite.rex.RexWindowBounds.preceding; import com.google.common.collect.ImmutableList; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -26,6 +27,7 @@ import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexCorrelVariable; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexOver; @@ -433,4 +435,36 @@ public static String getActualSignature(List argTypes) { .collect(Collectors.joining(",")) + "]"; } + + /** + * Check if the RexNode contains any CorrelVariable. + * + * @param node the RexNode to check + * @return true if the RexNode contains any CorrelVariable, false otherwise + */ + static boolean containsCorrelVariable(RexNode node) { + try { + node.accept( + new RexVisitorImpl(true) { + @Override + public Void visitCorrelVariable(RexCorrelVariable correlVar) { + throw new RuntimeException("Correl found"); + } + }); + return false; + } catch (Exception e) { + return true; + } + } + + /** Adds a rel node to the top of the stack while preserving the field names and aliases. */ + static void replaceTop(RelBuilder relBuilder, RelNode relNode) { + try { + Method method = RelBuilder.class.getDeclaredMethod("replaceTop", RelNode.class); + method.setAccessible(true); + method.invoke(relBuilder, relNode); + } catch (Exception e) { + throw new IllegalStateException("Unable to invoke RelBuilder.replaceTop", e); + } + } } diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/SubsearchUtils.java b/core/src/main/java/org/opensearch/sql/calcite/utils/SubsearchUtils.java new file mode 100644 index 00000000000..221e4275f64 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/SubsearchUtils.java @@ -0,0 +1,107 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.utils; + +import java.util.List; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.experimental.UtilityClass; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelShuttleImpl; +import org.apache.calcite.rel.logical.LogicalCorrelate; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.logical.LogicalIntersect; +import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rel.logical.LogicalMinus; +import org.apache.calcite.rel.logical.LogicalUnion; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; +import org.apache.commons.lang3.tuple.Pair; +import org.opensearch.sql.calcite.CalcitePlanContext; +import org.opensearch.sql.calcite.plan.LogicalSystemLimit; + +@UtilityClass +public class SubsearchUtils { + + /** Insert a system_limit under correlate conditions. */ + private static RelNode insertSysLimitUnderCorrelateConditions( + LogicalFilter logicalFilter, CalcitePlanContext context) { + // Before: + // LogicalFilter(condition=[AND(=($cor0.SAL, $2), >($1, 1000.0:DECIMAL(5, 1)))]) + // After: + // LogicalFilter(condition=[=($cor0.SAL, $2)]) + // LogicalSystemLimit(fetch=[1], type=[SUBSEARCH_MAXOUT]) + // LogicalFilter(condition=[>($1, 1000.0:DECIMAL(5, 1))]) + RexNode originalCondition = logicalFilter.getCondition(); + List conditions = RelOptUtil.conjunctions(originalCondition); + Pair, List> result = + CalciteUtils.partition(conditions, PlanUtils::containsCorrelVariable); + if (result.getLeft().isEmpty()) { + return logicalFilter; + } + + RelNode input = logicalFilter.getInput(); + if (!result.getRight().isEmpty()) { + RexNode nonCorrelCondition = + RexUtil.composeConjunction(context.rexBuilder, result.getRight()); + input = LogicalFilter.create(input, nonCorrelCondition); + } + input = + LogicalSystemLimit.create( + LogicalSystemLimit.SystemLimitType.SUBSEARCH_MAXOUT, + input, + context.relBuilder.literal(context.sysLimit.subsearchLimit())); + if (!result.getLeft().isEmpty()) { + RexNode correlCondition = RexUtil.composeConjunction(context.rexBuilder, result.getLeft()); + input = LogicalFilter.create(input, correlCondition); + } + return input; + } + + /** Insert a system_limit under correlated conditions by visiting a plan tree. */ + @RequiredArgsConstructor + public static class SystemLimitInsertionShuttle extends RelShuttleImpl { + + private final CalcitePlanContext context; + @Getter private boolean correlatedConditionFound = false; + + @Override + public RelNode visit(LogicalFilter filter) { + RelNode newFilter = insertSysLimitUnderCorrelateConditions(filter, context); + if (newFilter != filter) { + correlatedConditionFound = true; + return newFilter; + } + return super.visitChildren(filter); + } + + @Override + public RelNode visit(LogicalJoin node) { + return node; + } + + @Override + public RelNode visit(LogicalCorrelate node) { + return node; + } + + @Override + public RelNode visit(LogicalUnion node) { + return node; + } + + @Override + public RelNode visit(LogicalIntersect node) { + return node; + } + + @Override + public RelNode visit(LogicalMinus node) { + return node; + } + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java index 47d4a5695d6..7cf57132999 100644 --- a/core/src/main/java/org/opensearch/sql/executor/QueryService.java +++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java @@ -33,11 +33,11 @@ import org.opensearch.sql.calcite.CalcitePlanContext; import org.opensearch.sql.calcite.CalciteRelNodeVisitor; import org.opensearch.sql.calcite.OpenSearchSchema; +import org.opensearch.sql.calcite.SysLimit; import org.opensearch.sql.calcite.plan.LogicalSystemLimit; import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType; import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.common.setting.Settings; -import org.opensearch.sql.common.setting.Settings.Key; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.exception.CalciteUnsupportedException; import org.opensearch.sql.exception.NonFallbackCalciteException; @@ -98,7 +98,7 @@ public void executeWithCalcite( () -> { CalcitePlanContext context = CalcitePlanContext.create( - buildFrameworkConfig(), getQuerySizeLimit(), queryType); + buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType); RelNode relNode = analyze(plan, context); RelNode optimized = optimize(relNode, context); RelNode calcitePlan = convertToCalcitePlan(optimized); @@ -138,7 +138,7 @@ public void explainWithCalcite( () -> { CalcitePlanContext context = CalcitePlanContext.create( - buildFrameworkConfig(), getQuerySizeLimit(), queryType); + buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType); context.run( () -> { RelNode relNode = analyze(plan, context); @@ -242,7 +242,9 @@ public void executePlan( ExecutionContext.querySizeLimit( // For pagination, querySizeLimit shouldn't take effect. // See {@link PaginationWindowIT::testQuerySizeLimitDoesNotEffectPageSize} - plan instanceof LogicalPaginate ? null : getQuerySizeLimit()), + plan instanceof LogicalPaginate + ? null + : SysLimit.fromSettings(settings).querySizeLimit()), listener)); } catch (Exception e) { listener.onFailure(e); @@ -269,7 +271,9 @@ public PhysicalPlan plan(LogicalPlan plan) { */ public RelNode optimize(RelNode plan, CalcitePlanContext context) { return LogicalSystemLimit.create( - SystemLimitType.QUERY_SIZE_LIMIT, plan, context.relBuilder.literal(context.querySizeLimit)); + SystemLimitType.QUERY_SIZE_LIMIT, + plan, + context.relBuilder.literal(context.sysLimit.querySizeLimit())); } private boolean isCalciteFallbackAllowed(@Nullable Throwable t) { @@ -298,10 +302,6 @@ private boolean isCalciteEnabled(Settings settings) { } } - private Integer getQuerySizeLimit() { - return settings == null ? null : settings.getSettingValue(Key.QUERY_SIZE_LIMIT); - } - // TODO https://github.com/opensearch-project/sql/issues/3457 // Calcite is not available for SQL query now. Maybe release in 3.1.0? private boolean shouldUseCalcite(QueryType queryType) { diff --git a/core/src/test/java/org/opensearch/sql/calcite/CalciteRexNodeVisitorTest.java b/core/src/test/java/org/opensearch/sql/calcite/CalciteRexNodeVisitorTest.java index ca9deb77061..1ec8d005716 100644 --- a/core/src/test/java/org/opensearch/sql/calcite/CalciteRexNodeVisitorTest.java +++ b/core/src/test/java/org/opensearch/sql/calcite/CalciteRexNodeVisitorTest.java @@ -68,7 +68,7 @@ public void setUpContext() { mockedStatic.when(() -> CalciteToolsHelper.create(any(), any(), any())).thenReturn(relBuilder); - context = CalcitePlanContext.create(frameworkConfig, 100, QueryType.PPL); + context = CalcitePlanContext.create(frameworkConfig, SysLimit.DEFAULT, QueryType.PPL); } @AfterEach diff --git a/docs/user/ppl/admin/settings.rst b/docs/user/ppl/admin/settings.rst index 43ac5e8c924..13e8f3d7185 100644 --- a/docs/user/ppl/admin/settings.rst +++ b/docs/user/ppl/admin/settings.rst @@ -282,3 +282,72 @@ PPL query:: } } } + + +plugins.ppl.subsearch.maxout +============================ + +Description +----------- + +The size configures the maximum of rows to return from subsearch. The default value is: ``10000``. A value of ``-1`` indicates that the restriction is unlimited. + +Version +------- +3.4.0 + +Example +------- + +Change the subsearch.maxout to unlimited:: + + sh$ curl -sS -H 'Content-Type: application/json' \ + ... -X PUT localhost:9200/_plugins/_query/settings \ + ... -d '{"persistent" : {"plugins.ppl.subsearch.maxout" : "-1"}}' + { + "acknowledged": true, + "persistent": { + "plugins": { + "ppl": { + "subsearch": { + "maxout": "-1" + } + } + } + }, + "transient": {} + } + +plugins.ppl.join.subsearch_maxout +================================= + +Description +----------- + +The size configures the maximum of rows from subsearch to join against. This configuration impacts ``join`` command. The default value is: ``50000``. A value of ``-1`` indicates that the restriction is unlimited. + +Version +------- +3.4.0 + +Example +------- + +Change the join.subsearch_maxout to 5000:: + + sh$ curl -sS -H 'Content-Type: application/json' \ + ... -X PUT localhost:9200/_plugins/_query/settings \ + ... -d '{"persistent" : {"plugins.ppl.join.subsearch_maxout" : "5000"}}' + { + "acknowledged": true, + "persistent": { + "plugins": { + "ppl": { + "join": { + "subsearch_maxout": "5000" + } + } + } + }, + "transient": {} + } diff --git a/docs/user/ppl/cmd/join.rst b/docs/user/ppl/cmd/join.rst index 92244f97f69..fd596e1d568 100644 --- a/docs/user/ppl/cmd/join.rst +++ b/docs/user/ppl/cmd/join.rst @@ -39,6 +39,10 @@ Extended syntax since 3.3.0 Configuration ============= + +plugins.calcite.enabled +----------------------- + This command requires Calcite enabled. In 3.0.0, as an experimental the Calcite configuration is disabled by default. Enable Calcite:: @@ -63,6 +67,32 @@ Result set:: "transient": {} } + +plugins.ppl.join.subsearch_maxout +--------------------------------- + +The size configures the maximum of rows from subsearch to join against. The default value is: ``50000``. A value of ``-1`` indicates that the restriction is unlimited. + +Change the join.subsearch_maxout to 5000:: + + sh$ curl -sS -H 'Content-Type: application/json' \ + ... -X PUT localhost:9200/_plugins/_query/settings \ + ... -d '{"persistent" : {"plugins.ppl.join.subsearch_maxout" : "5000"}}' + { + "acknowledged": true, + "persistent": { + "plugins": { + "ppl": { + "join": { + "subsearch_maxout": "5000" + } + } + } + }, + "transient": {} + } + + Usage ===== diff --git a/docs/user/ppl/cmd/subquery.rst b/docs/user/ppl/cmd/subquery.rst index 534db112b7e..98ee6c28157 100644 --- a/docs/user/ppl/cmd/subquery.rst +++ b/docs/user/ppl/cmd/subquery.rst @@ -42,6 +42,10 @@ RelationSubquery:: Configuration ============= + +plugins.calcite.enabled +----------------------- + This command requires Calcite enabled. In 3.0.0-beta, as an experimental the Calcite configuration is disabled by default. Enable Calcite:: @@ -66,6 +70,31 @@ Result set:: "transient": {} } +plugins.ppl.subsearch.maxout +---------------------------- + +The size configures the maximum of rows to return from subsearch. The default value is: ``10000``. A value of ``-1`` indicates that the restriction is unlimited. + +Change the subsearch.maxout to unlimited:: + + sh$ curl -sS -H 'Content-Type: application/json' \ + ... -X PUT localhost:9200/_plugins/_query/settings \ + ... -d '{"persistent" : {"plugins.ppl.subsearch.maxout" : "-1"}}' + { + "acknowledged": true, + "persistent": { + "plugins": { + "ppl": { + "subsearch": { + "maxout": "-1" + } + } + } + }, + "transient": {} + } + + Usage ===== diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index d755c7acc8f..ae89166ce39 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -11,6 +11,8 @@ import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_NESTED_SIMPLE; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_STRINGS; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_WEBLOGS; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_WORKER; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_WORK_INFORMATION; import static org.opensearch.sql.util.MatcherUtils.assertJsonEqualsIgnoreId; import static org.opensearch.sql.util.MatcherUtils.assertYamlEqualsJsonIgnoreId; @@ -31,6 +33,8 @@ public void init() throws Exception { loadIndex(Index.TIME_TEST_DATA2); loadIndex(Index.EVENTS); loadIndex(Index.LOGS); + loadIndex(Index.WORKER); + loadIndex(Index.WORK_INFORMATION); } @Override @@ -99,8 +103,132 @@ public void testJoinWithFieldList() throws IOException { "source=opensearch-sql_test_index_bank | join type=outer account_number" + " opensearch-sql_test_index_bank"; var result = explainQueryToString(query); - String expected = loadExpectedPlan("explain_join_with_fields.json"); - assertJsonEqualsIgnoreId(expected, result); + String expected = loadExpectedPlan("explain_join_with_fields.yaml"); + assertYamlEqualsJsonIgnoreId(expected, result); + } + + @Test + public void testExplainExistsUncorrelatedSubquery() throws IOException { + String expected = loadExpectedPlan("explain_exists_uncorrelated_subquery.yaml"); + assertYamlEqualsJsonIgnoreId( + expected, + explainQueryToString( + String.format( + "source = %s" + + "| where exists [" + + " source = %s | where name = 'Tom'" + + " ]" + + "| sort - salary" + + "| fields id, name, salary", + TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION))); + } + + @Test + public void testExplainExistsCorrelatedSubquery() throws IOException { + String expected = loadExpectedPlan("explain_exists_correlated_subquery.yaml"); + assertYamlEqualsJsonIgnoreId( + expected, + explainQueryToString( + String.format( + "source = %s" + + "| where exists [" + + " source = %s | where id = uid and name = 'Tom'" + + " ]" + + "| sort - salary" + + "| fields id, name, salary", + TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION))); + } + + @Test + public void testExplainInUncorrelatedSubquery() throws IOException { + String expected = loadExpectedPlan("explain_in_uncorrelated_subquery.yaml"); + assertYamlEqualsJsonIgnoreId( + expected, + explainQueryToString( + String.format( + "source = %s" + + "| where id in [" + + " source = %s | fields uid" + + " ]" + + "| sort - salary" + + "| fields id, name, salary", + TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION))); + } + + @Test + public void testExplainInCorrelatedSubquery() throws IOException { + String expected = loadExpectedPlan("explain_in_correlated_subquery.yaml"); + assertYamlEqualsJsonIgnoreId( + expected, + explainQueryToString( + String.format( + "source = %s" + + "| where name in [" + + " source = %s | where id = uid and name = 'Tom' | fields name" + + " ]" + + "| sort - salary | fields id, name, salary", + TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION))); + } + + @Test + public void testExplainScalarUncorrelatedSubqueryInSelect() throws IOException { + String expected = loadExpectedPlan("explain_scalar_uncorrelated_subquery_in_select.yaml"); + assertYamlEqualsJsonIgnoreId( + expected, + explainQueryToString( + String.format( + "source = %s" + + "| eval count_dept = [" + + " source = %s | stats count(name)" + + " ]" + + "| fields name, count_dept", + TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION, TEST_INDEX_WORK_INFORMATION))); + } + + @Test + public void testExplainScalarUncorrelatedSubqueryInWhere() throws IOException { + String expected = loadExpectedPlan("explain_scalar_uncorrelated_subquery_in_where.yaml"); + assertYamlEqualsJsonIgnoreId( + expected, + explainQueryToString( + String.format( + "source = %s" + + "| where id > [" + + " source = %s | stats count(name)" + + " ] + 999" + + "| fields name", + TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION, TEST_INDEX_WORK_INFORMATION))); + } + + @Test + public void testExplainScalarCorrelatedSubqueryInSelect() throws IOException { + String expected = loadExpectedPlan("explain_scalar_correlated_subquery_in_select.yaml"); + assertYamlEqualsJsonIgnoreId( + expected, + explainQueryToString( + String.format( + "source = %s" + + "| eval count_dept = [" + + " source = %s" + + " | where id = uid | stats count(name)" + + " ]" + + "| fields id, name, count_dept", + TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION))); + } + + @Test + public void testExplainScalarCorrelatedSubqueryInWhere() throws IOException { + String expected = loadExpectedPlan("explain_scalar_correlated_subquery_in_where.yaml"); + assertYamlEqualsJsonIgnoreId( + expected, + explainQueryToString( + String.format( + "source = %s" + + "| where id = [" + + " source = %s | where id = uid | stats max(uid)" + + " ]" + + "| fields id, name", + TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION))); } // Only for Calcite @@ -110,8 +238,8 @@ public void supportPushDownSortMergeJoin() throws IOException { "source=opensearch-sql_test_index_bank| join left=l right=r on" + " l.account_number=r.account_number opensearch-sql_test_index_bank"; var result = explainQueryToString(query); - String expected = loadExpectedPlan("explain_merge_join_sort_push.json"); - assertJsonEqualsIgnoreId(expected, result); + String expected = loadExpectedPlan("explain_merge_join_sort_push.yaml"); + assertYamlEqualsJsonIgnoreId(expected, result); } // Only for Calcite diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLExistsSubqueryIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLExistsSubqueryIT.java index e21f828b210..5eb930bb730 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLExistsSubqueryIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLExistsSubqueryIT.java @@ -282,4 +282,172 @@ public void testIssue3566() throws IOException { verifySchemaInOrder(result, schema("count()", "bigint"), schema("country", "string")); verifyDataRows(result, rows(1, null), rows(1, "England"), rows(1, "USA"), rows(2, "Canada")); } + + @Test + public void testSubsearchMaxOut1() throws IOException { + setSubsearchMaxOut(1); + JSONObject result = + executeQuery( + String.format( + "source = %s" + + "| where exists [" + + " source = %s | where id = uid" + + " ]" + + "| sort - salary" + + "| fields id, name, salary", + TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION)); + verifyNumOfRows(result, 1); + resetSubsearchMaxOut(); + } + + @Test + public void testSubsearchMaxOut2() throws IOException { + setSubsearchMaxOut(2); + JSONObject result = + executeQuery( + String.format( + "source = %s" + + "| where exists [" + + " source = %s | where id = uid and department = 'DATA'" + + " ]" + + "| sort - salary" + + "| fields id, name, salary", + TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION)); + verifyNumOfRows(result, 2); + resetSubsearchMaxOut(); + } + + @Test + public void testSubsearchMaxOut3() throws IOException { + setSubsearchMaxOut(2); + JSONObject result = + executeQuery( + String.format( + "source = %s" + + "| where exists [" + + " source = %s " + + " | where id = uid " + + " | eval dept = department " + + " | where dept = 'DATA' " + + " | sort - dept" + + " ]" + + "| sort - salary" + + "| fields id, name, salary", + TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION)); + verifyNumOfRows(result, 1); + resetSubsearchMaxOut(); + } + + @Test + public void testSubsearchMaxOut4() throws IOException { + setSubsearchMaxOut(2); + JSONObject result = + executeQuery( + String.format( + "source = %s" + + "| where exists [" + + " source = %s " + + " | eval dept = department " + + " | where dept = 'DATA' " + + " | where id = uid" + + " ]" + + "| sort - salary" + + "| fields id, name, salary", + TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION)); + verifyNumOfRows(result, 2); + resetSubsearchMaxOut(); + } + + @Test + public void testSubsearchMaxOutUncorrelated() throws IOException { + setSubsearchMaxOut(1); + JSONObject result = + executeQuery( + String.format( + "source = %s" + + "| where exists [" + + " source = %s | join type=left uid %s" + + " | eval dept = department " + + " | where dept = 'DATA' " + + " ]" + + "| sort - salary" + + "| fields id, name, salary", + TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION, TEST_INDEX_WORK_INFORMATION)); + verifyNumOfRows(result, 7); + resetSubsearchMaxOut(); + } + + @Test + public void testSubsearchMaxOutZero1() throws IOException { + setSubsearchMaxOut(0); + JSONObject result = + executeQuery( + String.format( + "source = %s" + + "| where exists [" + + " source = %s | where name = 'Tom'" + + " ]" + + "| sort - salary" + + "| fields id, name, salary", + TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION)); + verifyNumOfRows(result, 0); + + result = + executeQuery( + String.format( + "source = %s" + + "| where not exists [" + + " source = %s | where name = 'Tom'" + + " ]" + + "| sort - salary" + + "| fields id, name, salary", + TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION)); + verifyNumOfRows(result, 7); + resetSubsearchMaxOut(); + } + + @Test + public void testSubsearchMaxOutZero2() throws IOException { + setSubsearchMaxOut(0); + JSONObject result = + executeQuery( + String.format( + "source = %s" + + "| where exists [" + + " source = %s | where id = uid" + + " ]" + + "| sort - salary" + + "| fields id, name, salary", + TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION)); + verifyNumOfRows(result, 0); + result = + executeQuery( + String.format( + "source = %s" + + "| where not exists [" + + " source = %s | where id = uid" + + " ]" + + "| sort - salary" + + "| fields id, name, salary", + TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION)); + verifyNumOfRows(result, 7); + resetSubsearchMaxOut(); + } + + @Test + public void testSubsearchMaxOutUnlimited() throws IOException { + setSubsearchMaxOut(-1); + JSONObject result = + executeQuery( + String.format( + "source = %s" + + "| where exists [" + + " source = %s | where id = uid" + + " ]" + + "| sort - salary" + + "| fields id, name, salary", + TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION)); + verifyNumOfRows(result, 5); + resetSubsearchMaxOut(); + } } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLExplainIT.java index d888833c05d..cd4e2f5d694 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLExplainIT.java @@ -16,7 +16,6 @@ public class CalcitePPLExplainIT extends PPLIntegTestCase { @Override public void init() throws Exception { - GlobalPushdownConfig.enabled = false; super.init(); enableCalcite(); diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLInSubqueryIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLInSubqueryIT.java index ca97533b4ac..119a251558f 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLInSubqueryIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLInSubqueryIT.java @@ -13,6 +13,7 @@ import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; import static org.opensearch.sql.util.MatcherUtils.verifyDataRowsInOrder; import static org.opensearch.sql.util.MatcherUtils.verifyErrorMessageContains; +import static org.opensearch.sql.util.MatcherUtils.verifyNumOfRows; import static org.opensearch.sql.util.MatcherUtils.verifySchema; import java.io.IOException; @@ -354,4 +355,68 @@ public void testInSubqueryWithTableAlias() throws IOException { verifySchema(result, schema("id", "int"), schema("name", "string"), schema("salary", "int")); verifyDataRowsInOrder(result, rows(1002, "John", 120000), rows(1005, "Jane", 90000)); } + + @Test + public void testInCorrelatedSubquery() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source = %s| where name in [ source = %s | where id = uid and" + + " (like(occupation, '%%ist') or occupation = 'Engineer') | fields name ]|" + + " sort - salary | fields id, name, salary", + TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION)); + verifySchema(result, schema("id", "int"), schema("name", "string"), schema("salary", "int")); + verifyDataRowsInOrder( + result, rows(1002, "John", 120000), rows(1000, "Jake", 100000), rows(1005, "Jane", 90000)); + } + + @Test + public void testSubsearchMaxOut() throws IOException { + setSubsearchMaxOut(1); + JSONObject result = + executeQuery( + String.format( + "source = %s" + + "| where id in [" + + " source = %s | fields uid" + + " ]" + + "| sort - salary" + + "| fields id, name, salary", + TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION)); + verifySchema(result, schema("id", "int"), schema("name", "string"), schema("salary", "int")); + verifyDataRowsInOrder(result, rows(1000, "Jake", 100000)); + resetSubsearchMaxOut(); + } + + @Test + public void testInCorrelatedSubqueryMaxOut() throws IOException { + setSubsearchMaxOut(1); + JSONObject result = + executeQuery( + String.format( + "source = %s| where name in [ source = %s | where id = uid and" + + " (like(occupation, '%%ist') or occupation = 'Engineer') | fields name ]|" + + " sort - salary | fields id, name, salary", + TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION)); + verifyNumOfRows(result, 1); + resetSubsearchMaxOut(); + } + + @Test + public void testSubsearchMaxOutZero() throws IOException { + setSubsearchMaxOut(0); + JSONObject result = + executeQuery( + String.format( + "source = %s" + + "| where id in [" + + " source = %s | fields uid" + + " ]" + + "| sort - salary" + + "| fields id, name, salary", + TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION)); + verifySchema(result, schema("id", "int"), schema("name", "string"), schema("salary", "int")); + verifyNumOfRows(result, 0); + resetSubsearchMaxOut(); + } } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLJoinIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLJoinIT.java index c9886f687c2..95931157947 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLJoinIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLJoinIT.java @@ -938,4 +938,22 @@ public void testJoinWithoutFieldListMaxEqualsOne() throws IOException { schema("month", "int")); verifyNumOfRows(actual, 8); } + + @Test + public void testJoinSubsearchMaxOut() throws IOException { + setJoinSubsearchMaxOut(5); + JSONObject actual = + executeQuery( + String.format( + "source=%s | where country = 'Canada' | join type=inner max=0 country %s", + TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION)); + verifyNumOfRows(actual, 10); + resetJoinSubsearchMaxOut(); + actual = + executeQuery( + String.format( + "source=%s | where country = 'Canada' | join type=inner max=0 country %s", + TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION)); + verifyNumOfRows(actual, 15); + } } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLScalarSubqueryIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLScalarSubqueryIT.java index 4aac43580f3..9fc4f8b2b12 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLScalarSubqueryIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLScalarSubqueryIT.java @@ -11,6 +11,7 @@ import static org.opensearch.sql.util.MatcherUtils.rows; import static org.opensearch.sql.util.MatcherUtils.schema; import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifyNumOfRows; import static org.opensearch.sql.util.MatcherUtils.verifySchema; import java.io.IOException; @@ -308,4 +309,20 @@ public void testNestedScalarSubqueryWithTableAlias() throws IOException { verifySchema(result, schema("id", "int"), schema("name", "string")); verifyDataRows(result, rows(1000, "Jake")); } + + @Test + public void testSubsearchMaxOutZero() throws IOException { + setSubsearchMaxOut(0); + JSONObject result = + executeQuery( + String.format( + "source = %s" + + "| where id = [" + + " source = %s | where id = uid | stats max(uid)" + + " ]" + + "| fields id, name", + TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION)); + verifyNumOfRows(result, 0); + resetSubsearchMaxOut(); + } } diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/PPLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/ppl/PPLIntegTestCase.java index 0ac42564192..f772d61d823 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/PPLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/PPLIntegTestCase.java @@ -39,6 +39,8 @@ public abstract class PPLIntegTestCase extends SQLIntegTestCase { "/_plugins/_ppl/_explain?format=extended"; private static final Logger LOG = LogManager.getLogger(); @Rule public final RetryProcessor retryProcessor = new RetryProcessor(); + public static final Integer DEFAULT_SUBSEARCH_MAXOUT = 10000; + public static final Integer DEFAULT_JOIN_SUBSEARCH_MAXOUT = 50000; @Override protected void init() throws Exception { @@ -334,6 +336,34 @@ public void updatePushdownSettings() throws IOException { } } + protected void setSubsearchMaxOut(Integer limit) throws IOException { + updateClusterSettings( + new SQLIntegTestCase.ClusterSetting( + "transient", Key.PPL_SUBSEARCH_MAXOUT.getKeyValue(), limit.toString())); + } + + protected void resetSubsearchMaxOut() throws IOException { + updateClusterSettings( + new SQLIntegTestCase.ClusterSetting( + "transient", + Settings.Key.PPL_SUBSEARCH_MAXOUT.getKeyValue(), + DEFAULT_SUBSEARCH_MAXOUT.toString())); + } + + protected void setJoinSubsearchMaxOut(Integer limit) throws IOException { + updateClusterSettings( + new SQLIntegTestCase.ClusterSetting( + "transient", Key.PPL_JOIN_SUBSEARCH_MAXOUT.getKeyValue(), limit.toString())); + } + + protected void resetJoinSubsearchMaxOut() throws IOException { + updateClusterSettings( + new SQLIntegTestCase.ClusterSetting( + "transient", + Settings.Key.PPL_JOIN_SUBSEARCH_MAXOUT.getKeyValue(), + DEFAULT_JOIN_SUBSEARCH_MAXOUT.toString())); + } + /** * Sanitizes the PPL query by removing block comments and replacing new lines with spaces. * diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_exists_correlated_subquery.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_exists_correlated_subquery.yaml new file mode 100644 index 00000000000..920149399c0 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_exists_correlated_subquery.yaml @@ -0,0 +1,22 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$2], dir0=[DESC-nulls-last], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(id=[$2], name=[$0], salary=[$4]) + LogicalSort(sort0=[$4], dir0=[DESC-nulls-last]) + LogicalFilter(condition=[EXISTS({ + LogicalProject(name=[$0], uid=[$1], occupation=[$2], department=[$3]) + LogicalFilter(condition=[=($cor0.id, $1)]) + LogicalSystemLimit(fetch=[10000], type=[SUBSEARCH_MAXOUT]) + LogicalFilter(condition=[=($0, 'Tom')]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]]) + })], variablesSet=[[$cor0]]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]]) + physical: | + EnumerableCalc(expr#0..3=[{inputs}], id=[$t1], name=[$t0], salary=[$t2]) + EnumerableLimit(fetch=[10000]) + EnumerableSort(sort0=[$2], dir0=[DESC-nulls-last]) + EnumerableCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1}]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]], PushDownContext=[[PROJECT->[name, id, salary]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["name","id","salary"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + EnumerableAggregate(group=[{0}]) + EnumerableCalc(expr#0=[{inputs}], expr#1=[true], expr#2=[$cor0], expr#3=[$t2.id], expr#4=[=($t3, $t0)], i=[$t1], $condition=[$t4]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]], PushDownContext=[[PROJECT->[name, uid], FILTER->=($0, 'Tom'), LIMIT->10000, PROJECT->[uid]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":10000,"timeout":"1m","query":{"term":{"name":{"value":"Tom","boost":1.0}}},"_source":{"includes":["uid"],"excludes":[]}}, requestedTotalSize=10000, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_exists_uncorrelated_subquery.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_exists_uncorrelated_subquery.yaml new file mode 100644 index 00000000000..c8c58c090b8 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_exists_uncorrelated_subquery.yaml @@ -0,0 +1,21 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$2], dir0=[DESC-nulls-last], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(id=[$2], name=[$0], salary=[$4]) + LogicalSort(sort0=[$4], dir0=[DESC-nulls-last]) + LogicalFilter(condition=[EXISTS({ + LogicalSystemLimit(fetch=[10000], type=[SUBSEARCH_MAXOUT]) + LogicalProject(name=[$0], uid=[$1], occupation=[$2], department=[$3]) + LogicalFilter(condition=[=($0, 'Tom')]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]]) + })], variablesSet=[[$cor0]]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..3=[{inputs}], id=[$t1], name=[$t0], salary=[$t2]) + EnumerableSort(sort0=[$2], dir0=[DESC-nulls-last]) + EnumerableNestedLoopJoin(condition=[true], joinType=[inner]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]], PushDownContext=[[PROJECT->[name, id, salary]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["name","id","salary"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + EnumerableAggregate(group=[{0}]) + EnumerableCalc(expr#0=[{inputs}], expr#1=[true], i=[$t1]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]], PushDownContext=[[PROJECT->[name], FILTER->=($0, 'Tom'), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":10000,"timeout":"1m","query":{"term":{"name":{"value":"Tom","boost":1.0}}},"_source":{"includes":["name"],"excludes":[]}}, requestedTotalSize=10000, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_in_correlated_subquery.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_in_correlated_subquery.yaml new file mode 100644 index 00000000000..7c9b83b75f5 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_in_correlated_subquery.yaml @@ -0,0 +1,23 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$2], dir0=[DESC-nulls-last], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(id=[$2], name=[$0], salary=[$4]) + LogicalSort(sort0=[$4], dir0=[DESC-nulls-last]) + LogicalFilter(condition=[IN($0, { + LogicalProject(name=[$0]) + LogicalFilter(condition=[=($cor0.id, $1)]) + LogicalSystemLimit(fetch=[10000], type=[SUBSEARCH_MAXOUT]) + LogicalFilter(condition=[=($0, 'Tom')]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]]) + })], variablesSet=[[$cor0]]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]]) + physical: | + EnumerableCalc(expr#0..3=[{inputs}], id=[$t1], name=[$t0], salary=[$t2]) + EnumerableLimit(fetch=[10000]) + EnumerableSort(sort0=[$2], dir0=[DESC-nulls-last]) + EnumerableCalc(expr#0..3=[{inputs}], expr#4=[=($t0, $t3)], proj#0..3=[{exprs}], $condition=[$t4]) + EnumerableCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1}]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]], PushDownContext=[[PROJECT->[name, id, salary]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["name","id","salary"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + EnumerableAggregate(group=[{0}]) + EnumerableCalc(expr#0..1=[{inputs}], expr#2=[$cor0], expr#3=[$t2.id], expr#4=[=($t3, $t1)], proj#0..1=[{exprs}], $condition=[$t4]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]], PushDownContext=[[PROJECT->[name, uid], FILTER->=($0, 'Tom'), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":10000,"timeout":"1m","query":{"term":{"name":{"value":"Tom","boost":1.0}}},"_source":{"includes":["name","uid"],"excludes":[]}}, requestedTotalSize=10000, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_in_uncorrelated_subquery.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_in_uncorrelated_subquery.yaml new file mode 100644 index 00000000000..42de0ae2f46 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_in_uncorrelated_subquery.yaml @@ -0,0 +1,18 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$2], dir0=[DESC-nulls-last], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(id=[$2], name=[$0], salary=[$4]) + LogicalSort(sort0=[$4], dir0=[DESC-nulls-last]) + LogicalFilter(condition=[IN($2, { + LogicalSystemLimit(fetch=[10000], type=[SUBSEARCH_MAXOUT]) + LogicalProject(uid=[$1]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]]) + })], variablesSet=[[$cor0]]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..2=[{inputs}], id=[$t1], name=[$t0], salary=[$t2]) + EnumerableSort(sort0=[$2], dir0=[DESC-nulls-last]) + EnumerableHashJoin(condition=[=($1, $3)], joinType=[semi]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]], PushDownContext=[[PROJECT->[name, id, salary]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["name","id","salary"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]], PushDownContext=[[PROJECT->[uid], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":10000,"timeout":"1m","_source":{"includes":["uid"],"excludes":[]}}, requestedTotalSize=10000, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_join_with_fields.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_join_with_fields.json deleted file mode 100644 index 0a662c047ee..00000000000 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_join_with_fields.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "calcite": { - "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$13], firstname=[$14], address=[$15], birthdate=[$16], gender=[$17], city=[$18], lastname=[$19], balance=[$20], employer=[$21], state=[$22], age=[$23], email=[$24], male=[$25])\n LogicalJoin(condition=[=($0, $13)], joinType=[left])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n", - "physical": "EnumerableCalc(expr#0..13=[{inputs}], account_number=[$t1], firstname=[$t2], address=[$t3], birthdate=[$t4], gender=[$t5], city=[$t6], lastname=[$t7], balance=[$t8], employer=[$t9], state=[$t10], age=[$t11], email=[$t12], male=[$t13])\n EnumerableLimit(fetch=[10000])\n EnumerableMergeJoin(condition=[=($0, $1)], joinType=[left])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number], SORT->[{\n \"account_number\" : {\n \"order\" : \"asc\",\n \"missing\" : \"_last\"\n }\n}]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\"],\"excludes\":[]},\"sort\":[{\"account_number\":{\"order\":\"asc\",\"missing\":\"_last\"}}]}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], SORT->[{\n \"account_number\" : {\n \"order\" : \"asc\",\n \"missing\" : \"_last\"\n }\n}]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"birthdate\",\"gender\",\"city\",\"lastname\",\"balance\",\"employer\",\"state\",\"age\",\"email\",\"male\"],\"excludes\":[]},\"sort\":[{\"account_number\":{\"order\":\"asc\",\"missing\":\"_last\"}}]}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" - } -} \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_join_with_fields.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_join_with_fields.yaml new file mode 100644 index 00000000000..0dbd15655b0 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_join_with_fields.yaml @@ -0,0 +1,22 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(account_number=[$13], firstname=[$14], address=[$15], birthdate=[$16], gender=[$17], city=[$18], lastname=[$19], balance=[$20], employer=[$21], state=[$22], age=[$23], email=[$24], male=[$25]) + LogicalJoin(condition=[=($0, $13)], joinType=[left]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]]) + LogicalSystemLimit(fetch=[50000], type=[JOIN_SUBSEARCH_MAXOUT]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]]) + physical: | + EnumerableCalc(expr#0..13=[{inputs}], account_number=[$t1], firstname=[$t2], address=[$t3], birthdate=[$t4], gender=[$t5], city=[$t6], lastname=[$t7], balance=[$t8], employer=[$t9], state=[$t10], age=[$t11], email=[$t12], male=[$t13]) + EnumerableLimit(fetch=[10000]) + EnumerableMergeJoin(condition=[=($0, $1)], joinType=[left]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number], SORT->[{ + "account_number" : { + "order" : "asc", + "missing" : "_last" + } + }]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number"],"excludes":[]},"sort":[{"account_number":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + EnumerableSort(sort0=[$0], dir0=[ASC]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], LIMIT->50000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":50000,"timeout":"1m","_source":{"includes":["account_number","firstname","address","birthdate","gender","city","lastname","balance","employer","state","age","email","male"],"excludes":[]}}, requestedTotalSize=50000, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_merge_join_sort_push.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_merge_join_sort_push.json deleted file mode 100644 index 19bfbbaa6b9..00000000000 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_merge_join_sort_push.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "calcite": { - "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], r.account_number=[$13], r.firstname=[$14], r.address=[$15], r.birthdate=[$16], r.gender=[$17], r.city=[$18], r.lastname=[$19], r.balance=[$20], r.employer=[$21], r.state=[$22], r.age=[$23], r.email=[$24], r.male=[$25])\n LogicalJoin(condition=[=($0, $13)], joinType=[inner])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n", - "physical": "EnumerableLimit(fetch=[10000])\n EnumerableMergeJoin(condition=[=($0, $13)], joinType=[inner])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], SORT->[{\n \"account_number\" : {\n \"order\" : \"asc\",\n \"missing\" : \"_last\"\n }\n}]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"birthdate\",\"gender\",\"city\",\"lastname\",\"balance\",\"employer\",\"state\",\"age\",\"email\",\"male\"],\"excludes\":[]},\"sort\":[{\"account_number\":{\"order\":\"asc\",\"missing\":\"_last\"}}]}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], SORT->[{\n \"account_number\" : {\n \"order\" : \"asc\",\n \"missing\" : \"_last\"\n }\n}]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"birthdate\",\"gender\",\"city\",\"lastname\",\"balance\",\"employer\",\"state\",\"age\",\"email\",\"male\"],\"excludes\":[]},\"sort\":[{\"account_number\":{\"order\":\"asc\",\"missing\":\"_last\"}}]}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" - } -} diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_merge_join_sort_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_merge_join_sort_push.yaml new file mode 100644 index 00000000000..13a5dfe1d7b --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_merge_join_sort_push.yaml @@ -0,0 +1,21 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], r.account_number=[$13], r.firstname=[$14], r.address=[$15], r.birthdate=[$16], r.gender=[$17], r.city=[$18], r.lastname=[$19], r.balance=[$20], r.employer=[$21], r.state=[$22], r.age=[$23], r.email=[$24], r.male=[$25]) + LogicalJoin(condition=[=($0, $13)], joinType=[inner]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]]) + LogicalSystemLimit(fetch=[50000], type=[JOIN_SUBSEARCH_MAXOUT]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableMergeJoin(condition=[=($0, $13)], joinType=[inner]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], SORT->[{ + "account_number" : { + "order" : "asc", + "missing" : "_last" + } + }]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number","firstname","address","birthdate","gender","city","lastname","balance","employer","state","age","email","male"],"excludes":[]},"sort":[{"account_number":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + EnumerableSort(sort0=[$0], dir0=[ASC]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], LIMIT->50000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":50000,"timeout":"1m","_source":{"includes":["account_number","firstname","address","birthdate","gender","city","lastname","balance","employer","state","age","email","male"],"excludes":[]}}, requestedTotalSize=50000, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_scalar_correlated_subquery_in_select.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_scalar_correlated_subquery_in_select.yaml new file mode 100644 index 00000000000..0cacee911dd --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_scalar_correlated_subquery_in_select.yaml @@ -0,0 +1,22 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(variablesSet=[[$cor0]], id=[$2], name=[$0], count_dept=[$SCALAR_QUERY({ + LogicalAggregate(group=[{}], count(name)=[COUNT($0)]) + LogicalProject(name=[$0]) + LogicalFilter(condition=[IS NOT NULL($0)]) + LogicalFilter(condition=[=($cor0.id, $1)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]]) + })]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]]) + physical: | + EnumerableCalc(expr#0..3=[{inputs}], expr#4=[IS NULL($t3)], expr#5=[0:BIGINT], expr#6=[CASE($t4, $t5, $t3)], id=[$t1], name=[$t0], count_dept=[$t6]) + EnumerableLimit(fetch=[10000]) + EnumerableMergeJoin(condition=[=($1, $2)], joinType=[left]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]], PushDownContext=[[PROJECT->[name, id], SORT->[{ + "id" : { + "order" : "asc", + "missing" : "_last" + } + }]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["name","id"],"excludes":[]},"sort":[{"id":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]], PushDownContext=[[PROJECT->[name, uid], FILTER->AND(IS NOT NULL($1), IS NOT NULL($0)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count(name)=COUNT($1)), SORT->[0]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"exists":{"field":"uid","boost":1.0}},{"exists":{"field":"name","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["name","uid"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"uid":{"terms":{"field":"uid","missing_bucket":true,"missing_order":"last","order":"asc"}}}]},"aggregations":{"count(name)":{"value_count":{"field":"name"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_scalar_correlated_subquery_in_where.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_scalar_correlated_subquery_in_where.yaml new file mode 100644 index 00000000000..3f4cb15d194 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_scalar_correlated_subquery_in_where.yaml @@ -0,0 +1,18 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(id=[$2], name=[$0]) + LogicalFilter(condition=[=($2, $SCALAR_QUERY({ + LogicalAggregate(group=[{}], max(uid)=[MAX($0)]) + LogicalProject(uid=[$1]) + LogicalFilter(condition=[=($cor0.id, $1)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]]) + }))], variablesSet=[[$cor0]]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..1=[{inputs}], id=[$t1], name=[$t0]) + EnumerableHashJoin(condition=[=($1, $2)], joinType=[semi]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]], PushDownContext=[[PROJECT->[name, id]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["name","id"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + EnumerableCalc(expr#0..1=[{inputs}], expr#2=[=($t0, $t1)], proj#0..1=[{exprs}], $condition=[$t2]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]], PushDownContext=[[PROJECT->[uid], FILTER->IS NOT NULL($0), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},max(uid)=MAX($1))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"exists":{"field":"uid","boost":1.0}},"_source":{"includes":["uid"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"uid1":{"terms":{"field":"uid","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"max(uid)":{"max":{"field":"uid"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_scalar_uncorrelated_subquery_in_select.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_scalar_uncorrelated_subquery_in_select.yaml new file mode 100644 index 00000000000..70fcf1c804d --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_scalar_uncorrelated_subquery_in_select.yaml @@ -0,0 +1,15 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(variablesSet=[[$cor0]], name=[$0], count_dept=[$SCALAR_QUERY({ + LogicalAggregate(group=[{}], count(name)=[COUNT($0)]) + LogicalProject(name=[$0]) + LogicalFilter(condition=[IS NOT NULL($0)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]]) + })]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableNestedLoopJoin(condition=[true], joinType=[left]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]], PushDownContext=[[PROJECT->[name]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["name"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]], PushDownContext=[[FILTER->IS NOT NULL($0), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={},count(name)=COUNT($0))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"exists":{"field":"name","boost":1.0}},"track_total_hits":2147483647}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_scalar_uncorrelated_subquery_in_where.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_scalar_uncorrelated_subquery_in_where.yaml new file mode 100644 index 00000000000..a14c422cfb5 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_scalar_uncorrelated_subquery_in_where.yaml @@ -0,0 +1,17 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(name=[$0]) + LogicalFilter(condition=[>($2, +($SCALAR_QUERY({ + LogicalAggregate(group=[{}], count(name)=[COUNT($0)]) + LogicalProject(name=[$0]) + LogicalFilter(condition=[IS NOT NULL($0)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]]) + }), 999))], variablesSet=[[$cor0]]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..2=[{inputs}], name=[$t0]) + EnumerableNestedLoopJoin(condition=[>($1, +($2, 999))], joinType=[inner]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]], PushDownContext=[[PROJECT->[name, id]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["name","id"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]], PushDownContext=[[FILTER->IS NOT NULL($0), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={},count(name)=COUNT($0))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"exists":{"field":"name","boost":1.0}},"track_total_hits":2147483647}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_exists_correlated_subquery.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_exists_correlated_subquery.yaml new file mode 100644 index 00000000000..400bd549ee8 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_exists_correlated_subquery.yaml @@ -0,0 +1,25 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$2], dir0=[DESC-nulls-last], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(id=[$2], name=[$0], salary=[$4]) + LogicalSort(sort0=[$4], dir0=[DESC-nulls-last]) + LogicalFilter(condition=[EXISTS({ + LogicalProject(name=[$0], uid=[$1], occupation=[$2], department=[$3]) + LogicalFilter(condition=[=($cor0.id, $1)]) + LogicalSystemLimit(fetch=[10000], type=[SUBSEARCH_MAXOUT]) + LogicalFilter(condition=[=($0, 'Tom')]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]]) + })], variablesSet=[[$cor0]]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..3=[{inputs}], id=[$t1], name=[$t0], salary=[$t2]) + EnumerableSort(sort0=[$2], dir0=[DESC-nulls-last]) + EnumerableCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1}]) + EnumerableCalc(expr#0..10=[{inputs}], name=[$t0], id=[$t2], salary=[$t4]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]]) + EnumerableAggregate(group=[{0}]) + EnumerableCalc(expr#0..1=[{inputs}], expr#2=[true], expr#3=[$cor0], expr#4=[$t3.id], expr#5=[=($t4, $t1)], i=[$t2], $condition=[$t5]) + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..9=[{inputs}], expr#10=['Tom':VARCHAR], expr#11=[=($t0, $t10)], proj#0..1=[{exprs}], $condition=[$t11]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_exists_uncorrelated_subquery.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_exists_uncorrelated_subquery.yaml new file mode 100644 index 00000000000..3b4e34539c7 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_exists_uncorrelated_subquery.yaml @@ -0,0 +1,24 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$2], dir0=[DESC-nulls-last], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(id=[$2], name=[$0], salary=[$4]) + LogicalSort(sort0=[$4], dir0=[DESC-nulls-last]) + LogicalFilter(condition=[EXISTS({ + LogicalSystemLimit(fetch=[10000], type=[SUBSEARCH_MAXOUT]) + LogicalProject(name=[$0], uid=[$1], occupation=[$2], department=[$3]) + LogicalFilter(condition=[=($0, 'Tom')]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]]) + })], variablesSet=[[$cor0]]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..3=[{inputs}], id=[$t1], name=[$t0], salary=[$t2]) + EnumerableSort(sort0=[$2], dir0=[DESC-nulls-last]) + EnumerableNestedLoopJoin(condition=[true], joinType=[inner]) + EnumerableCalc(expr#0..10=[{inputs}], name=[$t0], id=[$t2], salary=[$t4]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]]) + EnumerableAggregate(group=[{0}]) + EnumerableCalc(expr#0..9=[{inputs}], expr#10=[true], i=[$t10]) + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..9=[{inputs}], expr#10=['Tom':VARCHAR], expr#11=[=($t0, $t10)], proj#0..9=[{exprs}], $condition=[$t11]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_in_correlated_subquery.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_in_correlated_subquery.yaml new file mode 100644 index 00000000000..cb17a67d1cd --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_in_correlated_subquery.yaml @@ -0,0 +1,26 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$2], dir0=[DESC-nulls-last], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(id=[$2], name=[$0], salary=[$4]) + LogicalSort(sort0=[$4], dir0=[DESC-nulls-last]) + LogicalFilter(condition=[IN($0, { + LogicalProject(name=[$0]) + LogicalFilter(condition=[=($cor0.id, $1)]) + LogicalSystemLimit(fetch=[10000], type=[SUBSEARCH_MAXOUT]) + LogicalFilter(condition=[=($0, 'Tom')]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]]) + })], variablesSet=[[$cor0]]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]]) + physical: | + EnumerableCalc(expr#0..3=[{inputs}], id=[$t1], name=[$t0], salary=[$t2]) + EnumerableLimit(fetch=[10000]) + EnumerableSort(sort0=[$2], dir0=[DESC-nulls-last]) + EnumerableCalc(expr#0..3=[{inputs}], expr#4=[=($t0, $t3)], proj#0..3=[{exprs}], $condition=[$t4]) + EnumerableCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1}]) + EnumerableCalc(expr#0..10=[{inputs}], name=[$t0], id=[$t2], salary=[$t4]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]]) + EnumerableAggregate(group=[{0}]) + EnumerableCalc(expr#0..1=[{inputs}], expr#2=[$cor0], expr#3=[$t2.id], expr#4=[=($t3, $t1)], proj#0..1=[{exprs}], $condition=[$t4]) + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..9=[{inputs}], expr#10=['Tom':VARCHAR], expr#11=[=($t0, $t10)], proj#0..1=[{exprs}], $condition=[$t11]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_in_uncorrelated_subquery.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_in_uncorrelated_subquery.yaml new file mode 100644 index 00000000000..e94a46d70d8 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_in_uncorrelated_subquery.yaml @@ -0,0 +1,20 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$2], dir0=[DESC-nulls-last], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(id=[$2], name=[$0], salary=[$4]) + LogicalSort(sort0=[$4], dir0=[DESC-nulls-last]) + LogicalFilter(condition=[IN($2, { + LogicalSystemLimit(fetch=[10000], type=[SUBSEARCH_MAXOUT]) + LogicalProject(uid=[$1]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]]) + })], variablesSet=[[$cor0]]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..2=[{inputs}], id=[$t1], name=[$t0], salary=[$t2]) + EnumerableSort(sort0=[$2], dir0=[DESC-nulls-last]) + EnumerableHashJoin(condition=[=($1, $4)], joinType=[semi]) + EnumerableCalc(expr#0..10=[{inputs}], name=[$t0], id=[$t2], salary=[$t4]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]]) + EnumerableLimit(fetch=[10000]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_join_with_fields.json b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_join_with_fields.json deleted file mode 100644 index 21cbcfab737..00000000000 --- a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_join_with_fields.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "calcite": { - "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$13], firstname=[$14], address=[$15], birthdate=[$16], gender=[$17], city=[$18], lastname=[$19], balance=[$20], employer=[$21], state=[$22], age=[$23], email=[$24], male=[$25])\n LogicalJoin(condition=[=($0, $13)], joinType=[left])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n", - "physical": "EnumerableCalc(expr#0..13=[{inputs}], account_number=[$t1], firstname=[$t2], address=[$t3], birthdate=[$t4], gender=[$t5], city=[$t6], lastname=[$t7], balance=[$t8], employer=[$t9], state=[$t10], age=[$t11], email=[$t12], male=[$t13])\n EnumerableLimit(fetch=[10000])\n EnumerableMergeJoin(condition=[=($0, $1)], joinType=[left])\n EnumerableSort(sort0=[$0], dir0=[ASC])\n EnumerableCalc(expr#0..18=[{inputs}], account_number=[$t0])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n EnumerableSort(sort0=[$0], dir0=[ASC])\n EnumerableCalc(expr#0..18=[{inputs}], proj#0..12=[{exprs}])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n" - } -} \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_join_with_fields.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_join_with_fields.yaml new file mode 100644 index 00000000000..bf397010f6b --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_join_with_fields.yaml @@ -0,0 +1,21 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(account_number=[$13], firstname=[$14], address=[$15], birthdate=[$16], gender=[$17], city=[$18], lastname=[$19], balance=[$20], employer=[$21], state=[$22], age=[$23], email=[$24], male=[$25]) + LogicalJoin(condition=[=($0, $13)], joinType=[left]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]]) + LogicalSystemLimit(fetch=[50000], type=[JOIN_SUBSEARCH_MAXOUT]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]]) + physical: | + EnumerableCalc(expr#0..13=[{inputs}], account_number=[$t1], firstname=[$t2], address=[$t3], birthdate=[$t4], gender=[$t5], city=[$t6], lastname=[$t7], balance=[$t8], employer=[$t9], state=[$t10], age=[$t11], email=[$t12], male=[$t13]) + EnumerableLimit(fetch=[10000]) + EnumerableMergeJoin(condition=[=($0, $1)], joinType=[left]) + EnumerableSort(sort0=[$0], dir0=[ASC]) + EnumerableCalc(expr#0..18=[{inputs}], account_number=[$t0]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]]) + EnumerableSort(sort0=[$0], dir0=[ASC]) + EnumerableLimit(fetch=[50000]) + EnumerableCalc(expr#0..18=[{inputs}], proj#0..12=[{exprs}]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_merge_join_sort_push.json b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_merge_join_sort_push.json deleted file mode 100644 index 084817d2f7d..00000000000 --- a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_merge_join_sort_push.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "calcite": { - "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], r.account_number=[$13], r.firstname=[$14], r.address=[$15], r.birthdate=[$16], r.gender=[$17], r.city=[$18], r.lastname=[$19], r.balance=[$20], r.employer=[$21], r.state=[$22], r.age=[$23], r.email=[$24], r.male=[$25])\n LogicalJoin(condition=[=($0, $13)], joinType=[inner])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n", - "physical": "EnumerableLimit(fetch=[10000])\n EnumerableMergeJoin(condition=[=($0, $13)], joinType=[inner])\n EnumerableSort(sort0=[$0], dir0=[ASC])\n EnumerableCalc(expr#0..18=[{inputs}], proj#0..12=[{exprs}])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n EnumerableSort(sort0=[$0], dir0=[ASC])\n EnumerableCalc(expr#0..18=[{inputs}], proj#0..12=[{exprs}])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n" - } -} diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_merge_join_sort_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_merge_join_sort_push.yaml new file mode 100644 index 00000000000..843fa505511 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_merge_join_sort_push.yaml @@ -0,0 +1,20 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], r.account_number=[$13], r.firstname=[$14], r.address=[$15], r.birthdate=[$16], r.gender=[$17], r.city=[$18], r.lastname=[$19], r.balance=[$20], r.employer=[$21], r.state=[$22], r.age=[$23], r.email=[$24], r.male=[$25]) + LogicalJoin(condition=[=($0, $13)], joinType=[inner]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]]) + LogicalSystemLimit(fetch=[50000], type=[JOIN_SUBSEARCH_MAXOUT]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableMergeJoin(condition=[=($0, $13)], joinType=[inner]) + EnumerableSort(sort0=[$0], dir0=[ASC]) + EnumerableCalc(expr#0..18=[{inputs}], proj#0..12=[{exprs}]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]]) + EnumerableSort(sort0=[$0], dir0=[ASC]) + EnumerableLimit(fetch=[50000]) + EnumerableCalc(expr#0..18=[{inputs}], proj#0..12=[{exprs}]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_scalar_correlated_subquery_in_select.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_scalar_correlated_subquery_in_select.yaml new file mode 100644 index 00000000000..3dbadfef106 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_scalar_correlated_subquery_in_select.yaml @@ -0,0 +1,22 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(variablesSet=[[$cor0]], id=[$2], name=[$0], count_dept=[$SCALAR_QUERY({ + LogicalAggregate(group=[{}], count(name)=[COUNT($0)]) + LogicalProject(name=[$0]) + LogicalFilter(condition=[IS NOT NULL($0)]) + LogicalFilter(condition=[=($cor0.id, $1)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]]) + })]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]]) + physical: | + EnumerableCalc(expr#0..3=[{inputs}], expr#4=[IS NULL($t3)], expr#5=[0:BIGINT], expr#6=[CASE($t4, $t5, $t3)], id=[$t1], name=[$t0], count_dept=[$t6]) + EnumerableLimit(fetch=[10000]) + EnumerableMergeJoin(condition=[=($1, $2)], joinType=[left]) + EnumerableSort(sort0=[$1], dir0=[ASC]) + EnumerableCalc(expr#0..10=[{inputs}], name=[$t0], id=[$t2]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]]) + EnumerableSort(sort0=[$0], dir0=[ASC]) + EnumerableAggregate(group=[{1}], count(name)=[COUNT($0)]) + EnumerableCalc(expr#0..9=[{inputs}], expr#10=[IS NOT NULL($t1)], expr#11=[IS NOT NULL($t0)], expr#12=[AND($t10, $t11)], proj#0..9=[{exprs}], $condition=[$t12]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_scalar_correlated_subquery_in_where.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_scalar_correlated_subquery_in_where.yaml new file mode 100644 index 00000000000..ed28cb93b50 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_scalar_correlated_subquery_in_where.yaml @@ -0,0 +1,21 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(id=[$2], name=[$0]) + LogicalFilter(condition=[=($2, $SCALAR_QUERY({ + LogicalAggregate(group=[{}], max(uid)=[MAX($0)]) + LogicalProject(uid=[$1]) + LogicalFilter(condition=[=($cor0.id, $1)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]]) + }))], variablesSet=[[$cor0]]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..1=[{inputs}], id=[$t1], name=[$t0]) + EnumerableHashJoin(condition=[=($1, $2)], joinType=[semi]) + EnumerableCalc(expr#0..10=[{inputs}], name=[$t0], id=[$t2]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]]) + EnumerableCalc(expr#0..1=[{inputs}], expr#2=[=($t0, $t1)], proj#0..1=[{exprs}], $condition=[$t2]) + EnumerableAggregate(group=[{1}], max(uid)=[MAX($1)]) + EnumerableCalc(expr#0..9=[{inputs}], expr#10=[IS NOT NULL($t1)], proj#0..9=[{exprs}], $condition=[$t10]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_scalar_uncorrelated_subquery_in_select.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_scalar_uncorrelated_subquery_in_select.yaml new file mode 100644 index 00000000000..a229cdc7bc0 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_scalar_uncorrelated_subquery_in_select.yaml @@ -0,0 +1,18 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(variablesSet=[[$cor0]], name=[$0], count_dept=[$SCALAR_QUERY({ + LogicalAggregate(group=[{}], count(name)=[COUNT($0)]) + LogicalProject(name=[$0]) + LogicalFilter(condition=[IS NOT NULL($0)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]]) + })]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableNestedLoopJoin(condition=[true], joinType=[left]) + EnumerableCalc(expr#0..10=[{inputs}], name=[$t0]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]]) + EnumerableAggregate(group=[{}], count(name)=[COUNT($0)]) + EnumerableCalc(expr#0..9=[{inputs}], expr#10=[IS NOT NULL($t0)], proj#0..9=[{exprs}], $condition=[$t10]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_scalar_uncorrelated_subquery_in_where.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_scalar_uncorrelated_subquery_in_where.yaml new file mode 100644 index 00000000000..ba13359c44d --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_scalar_uncorrelated_subquery_in_where.yaml @@ -0,0 +1,20 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(name=[$0]) + LogicalFilter(condition=[>($2, +($SCALAR_QUERY({ + LogicalAggregate(group=[{}], count(name)=[COUNT($0)]) + LogicalProject(name=[$0]) + LogicalFilter(condition=[IS NOT NULL($0)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]]) + }), 999))], variablesSet=[[$cor0]]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..2=[{inputs}], name=[$t0]) + EnumerableNestedLoopJoin(condition=[>($1, +($2, 999))], joinType=[inner]) + EnumerableCalc(expr#0..10=[{inputs}], name=[$t0], id=[$t2]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]]) + EnumerableAggregate(group=[{}], count(name)=[COUNT($0)]) + EnumerableCalc(expr#0..9=[{inputs}], expr#10=[IS NOT NULL($t0)], proj#0..9=[{exprs}], $condition=[$t10]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]]) \ No newline at end of file diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index b5c2d6edccc..efc79662f3c 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -205,7 +205,8 @@ public void execute( () -> { try (PreparedStatement statement = OpenSearchRelRunners.run(context, rel)) { ResultSet result = statement.executeQuery(); - buildResultSet(result, rel.getRowType(), context.querySizeLimit, listener); + buildResultSet( + result, rel.getRowType(), context.sysLimit.querySizeLimit(), listener); } catch (SQLException e) { throw new RuntimeException(e); } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java index d1bc05c3895..c201408255a 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java @@ -125,7 +125,23 @@ public class OpenSearchSettings extends Settings { Setting.intSetting( Key.PPL_VALUES_MAX_LIMIT.getKeyValue(), 0, - 0, + -1, + Setting.Property.NodeScope, + Setting.Property.Dynamic); + + public static final Setting PPL_SUBSEARCH_MAXOUT_SETTING = + Setting.intSetting( + Key.PPL_SUBSEARCH_MAXOUT.getKeyValue(), + 10000, + -1, + Setting.Property.NodeScope, + Setting.Property.Dynamic); + + public static final Setting PPL_JOIN_SUBSEARCH_MAXOUT_SETTING = + Setting.intSetting( + Key.PPL_JOIN_SUBSEARCH_MAXOUT.getKeyValue(), + 50000, + -1, Setting.Property.NodeScope, Setting.Property.Dynamic); @@ -388,6 +404,18 @@ public OpenSearchSettings(ClusterSettings clusterSettings) { Key.PPL_VALUES_MAX_LIMIT, PPL_VALUES_MAX_LIMIT_SETTING, new Updater(Key.PPL_VALUES_MAX_LIMIT)); + register( + settingBuilder, + clusterSettings, + Key.PPL_SUBSEARCH_MAXOUT, + PPL_SUBSEARCH_MAXOUT_SETTING, + new Updater(Key.PPL_SUBSEARCH_MAXOUT)); + register( + settingBuilder, + clusterSettings, + Key.PPL_JOIN_SUBSEARCH_MAXOUT, + PPL_JOIN_SUBSEARCH_MAXOUT_SETTING, + new Updater(Key.PPL_JOIN_SUBSEARCH_MAXOUT)); register( settingBuilder, clusterSettings, @@ -603,6 +631,8 @@ public static List> pluginSettings() { .add(DEFAULT_PATTERN_SHOW_NUMBERED_TOKEN_SETTING) .add(PPL_REX_MAX_MATCH_LIMIT_SETTING) .add(PPL_VALUES_MAX_LIMIT_SETTING) + .add(PPL_SUBSEARCH_MAXOUT_SETTING) + .add(PPL_JOIN_SUBSEARCH_MAXOUT_SETTING) .add(QUERY_MEMORY_LIMIT_SETTING) .add(QUERY_SIZE_LIMIT_SETTING) .add(METRICS_ROLLING_WINDOW_SETTING) diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java index 56041984c4d..9dd01b30df5 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java @@ -40,8 +40,8 @@ import org.opensearch.sql.ast.statement.Query; import org.opensearch.sql.calcite.CalcitePlanContext; import org.opensearch.sql.calcite.CalciteRelNodeVisitor; +import org.opensearch.sql.calcite.SysLimit; import org.opensearch.sql.common.setting.Settings; -import org.opensearch.sql.common.setting.Settings.Key; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.exception.ExpressionEvaluationException; import org.opensearch.sql.ppl.antlr.PPLSyntaxParser; @@ -69,6 +69,8 @@ public void init() { doReturn(true).when(settings).getSettingValue(Settings.Key.CALCITE_ENGINE_ENABLED); doReturn(true).when(settings).getSettingValue(Settings.Key.CALCITE_SUPPORT_ALL_JOIN_TYPES); doReturn(true).when(settings).getSettingValue(Settings.Key.PPL_SYNTAX_LEGACY_PREFERRED); + doReturn(-1).when(settings).getSettingValue(Settings.Key.PPL_JOIN_SUBSEARCH_MAXOUT); + doReturn(-1).when(settings).getSettingValue(Settings.Key.PPL_SUBSEARCH_MAXOUT); doReturn(false).when(dataSourceService).dataSourceExists(any()); } @@ -90,8 +92,7 @@ protected CalcitePlanContext createBuilderContext() { /** Creates a CalcitePlanContext with transformed config. */ private CalcitePlanContext createBuilderContext(UnaryOperator transform) { config.context(Contexts.of(transform.apply(RelBuilder.Config.DEFAULT))); - return CalcitePlanContext.create( - config.build(), settings.getSettingValue(Key.QUERY_SIZE_LIMIT), PPL); + return CalcitePlanContext.create(config.build(), SysLimit.fromSettings(settings), PPL); } /** Get the root RelNode of the given PPL query */ diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLExistsSubqueryTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLExistsSubqueryTest.java index 717ad65ce27..99e59289e9c 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLExistsSubqueryTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLExistsSubqueryTest.java @@ -5,9 +5,12 @@ package org.opensearch.sql.ppl.calcite; +import static org.mockito.Mockito.doReturn; + import org.apache.calcite.rel.RelNode; import org.apache.calcite.test.CalciteAssert; import org.junit.Test; +import org.opensearch.sql.common.setting.Settings; public class CalcitePPLExistsSubqueryTest extends CalcitePPLAbstractTest { public CalcitePPLExistsSubqueryTest() { @@ -501,4 +504,290 @@ public void testCorrelatedExistsSubqueryWithOverridingFields() { + "WHERE `t`.`DEPTNO` = `DEPTNO`)"; verifyPPLToSparkSQL(root, expectedSparkSql); } + + @Test + public void testSubsearchMaxOut1() { + doReturn(1).when(settings).getSettingValue(Settings.Key.PPL_SUBSEARCH_MAXOUT); + String ppl = + """ + source=EMP + | where exists [ + source=SALGRADE + | where EMP.SAL = HISAL + ] + | sort - EMPNO | fields EMPNO, ENAME + """; + RelNode root = getRelNode(ppl); + String expectedLogical = + "" + + "LogicalProject(EMPNO=[$0], ENAME=[$1])\n" + + " LogicalSort(sort0=[$0], dir0=[DESC-nulls-last])\n" + + " LogicalFilter(condition=[EXISTS({\n" + + "LogicalFilter(condition=[=($cor0.SAL, $2)])\n" + + " LogicalSystemLimit(sort0=[$0], dir0=[ASC], fetch=[1], type=[SUBSEARCH_MAXOUT])\n" + + " LogicalTableScan(table=[[scott, SALGRADE]])\n" + + "})], variablesSet=[[$cor0]])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `EMPNO`, `ENAME`\n" + + "FROM `scott`.`EMP`\n" + + "WHERE EXISTS (SELECT *\n" + + "FROM (SELECT `GRADE`, `LOSAL`, `HISAL`\n" + + "FROM `scott`.`SALGRADE`\n" + + "ORDER BY `GRADE` NULLS LAST\n" + + "LIMIT 1) `t`\n" + + "WHERE `EMP`.`SAL` = `HISAL`)\n" + + "ORDER BY `EMPNO` DESC"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testSubsearchMaxOut2() { + doReturn(1).when(settings).getSettingValue(Settings.Key.PPL_SUBSEARCH_MAXOUT); + String ppl = + """ + source=EMP + | where exists [ + source=SALGRADE + | where EMP.SAL = HISAL and LOSAL > 1000.0 + ] + | sort - EMPNO | fields EMPNO, ENAME + """; + RelNode root = getRelNode(ppl); + String expectedLogical = + "" + + "LogicalProject(EMPNO=[$0], ENAME=[$1])\n" + + " LogicalSort(sort0=[$0], dir0=[DESC-nulls-last])\n" + + " LogicalFilter(condition=[EXISTS({\n" + + "LogicalFilter(condition=[=($cor0.SAL, $2)])\n" + + " LogicalSystemLimit(sort0=[$0], dir0=[ASC], fetch=[1], type=[SUBSEARCH_MAXOUT])\n" + + " LogicalFilter(condition=[>($1, 1000.0:DECIMAL(5, 1))])\n" + + " LogicalTableScan(table=[[scott, SALGRADE]])\n" + + "})], variablesSet=[[$cor0]])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `EMPNO`, `ENAME`\n" + + "FROM `scott`.`EMP`\n" + + "WHERE EXISTS (SELECT *\n" + + "FROM (SELECT `GRADE`, `LOSAL`, `HISAL`\n" + + "FROM `scott`.`SALGRADE`\n" + + "WHERE `LOSAL` > 1000.0\n" + + "ORDER BY `GRADE` NULLS LAST\n" + + "LIMIT 1) `t0`\n" + + "WHERE `EMP`.`SAL` = `HISAL`)\n" + + "ORDER BY `EMPNO` DESC"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testSubsearchMaxOut3() { + doReturn(1).when(settings).getSettingValue(Settings.Key.PPL_SUBSEARCH_MAXOUT); + String ppl = + """ + source=EMP + | where exists [ + source=SALGRADE + | where EMP.SAL = HISAL + | eval LOSAL1 = LOSAL + | where LOSAL > 1000.0 + | sort - HISAL + ] + | sort - EMPNO | fields EMPNO, ENAME + """; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(EMPNO=[$0], ENAME=[$1])\n" + + " LogicalSort(sort0=[$0], dir0=[DESC-nulls-last])\n" + + " LogicalFilter(condition=[EXISTS({\n" + + "LogicalSort(sort0=[$2], dir0=[DESC-nulls-last])\n" + + " LogicalFilter(condition=[>($1, 1000.0:DECIMAL(5, 1))])\n" + + " LogicalProject(GRADE=[$0], LOSAL=[$1], HISAL=[$2], LOSAL1=[$1])\n" + + " LogicalFilter(condition=[=($cor0.SAL, $2)])\n" + + " LogicalSystemLimit(sort0=[$0], dir0=[ASC], fetch=[1]," + + " type=[SUBSEARCH_MAXOUT])\n" + + " LogicalTableScan(table=[[scott, SALGRADE]])\n" + + "})], variablesSet=[[$cor0]])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `EMPNO`, `ENAME`\n" + + "FROM `scott`.`EMP`\n" + + "WHERE EXISTS (SELECT `GRADE`, `LOSAL`, `HISAL`, `LOSAL1`\n" + + "FROM (SELECT `GRADE`, `LOSAL`, `HISAL`, `LOSAL` `LOSAL1`\n" + + "FROM (SELECT `GRADE`, `LOSAL`, `HISAL`\n" + + "FROM `scott`.`SALGRADE`\n" + + "ORDER BY `GRADE` NULLS LAST\n" + + "LIMIT 1) `t`\n" + + "WHERE `EMP`.`SAL` = `HISAL`) `t1`\n" + + "WHERE `LOSAL` > 1000.0\n" + + "ORDER BY `HISAL` DESC)\n" + + "ORDER BY `EMPNO` DESC"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testSubsearchMaxOutUncorrelated1() { + doReturn(1).when(settings).getSettingValue(Settings.Key.PPL_SUBSEARCH_MAXOUT); + String ppl = + """ + source=EMP + | where exists [ + source=SALGRADE + | eval LOSAL1 = LOSAL + | where LOSAL > 1000.0 + | sort - HISAL + ] + | sort - EMPNO | fields EMPNO, ENAME + """; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(EMPNO=[$0], ENAME=[$1])\n" + + " LogicalSort(sort0=[$0], dir0=[DESC-nulls-last])\n" + + " LogicalFilter(condition=[EXISTS({\n" + + "LogicalSystemLimit(sort0=[$2], dir0=[DESC-nulls-last], fetch=[1]," + + " type=[SUBSEARCH_MAXOUT])\n" + + " LogicalSort(sort0=[$2], dir0=[DESC-nulls-last])\n" + + " LogicalFilter(condition=[>($1, 1000.0:DECIMAL(5, 1))])\n" + + " LogicalProject(GRADE=[$0], LOSAL=[$1], HISAL=[$2], LOSAL1=[$1])\n" + + " LogicalTableScan(table=[[scott, SALGRADE]])\n" + + "})], variablesSet=[[$cor0]])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `EMPNO`, `ENAME`\n" + + "FROM `scott`.`EMP`\n" + + "WHERE EXISTS (SELECT `GRADE`, `LOSAL`, `HISAL`, `LOSAL1`\n" + + "FROM (SELECT `GRADE`, `LOSAL`, `HISAL`, `LOSAL1`\n" + + "FROM (SELECT `GRADE`, `LOSAL`, `HISAL`, `LOSAL` `LOSAL1`\n" + + "FROM `scott`.`SALGRADE`) `t`\n" + + "WHERE `LOSAL` > 1000.0\n" + + "ORDER BY `HISAL` DESC) `t1`\n" + + "ORDER BY `HISAL` DESC\n" + + "LIMIT 1)\n" + + "ORDER BY `EMPNO` DESC"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testSubsearchMaxOutUncorrelated2() { + doReturn(1).when(settings).getSettingValue(Settings.Key.PPL_SUBSEARCH_MAXOUT); + String ppl = + """ + source=EMP + | where exists [ + source=SALGRADE + | join type=left LOSAL SALGRADE + | eval LOSAL1 = LOSAL + | where LOSAL > 1000.0 + | sort - HISAL + ] + | sort - EMPNO | fields EMPNO, ENAME + """; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(EMPNO=[$0], ENAME=[$1])\n" + + " LogicalSort(sort0=[$0], dir0=[DESC-nulls-last])\n" + + " LogicalFilter(condition=[EXISTS({\n" + + "LogicalSystemLimit(sort0=[$2], dir0=[DESC-nulls-last], fetch=[1]," + + " type=[SUBSEARCH_MAXOUT])\n" + + " LogicalSort(sort0=[$2], dir0=[DESC-nulls-last])\n" + + " LogicalFilter(condition=[>($1, 1000.0:DECIMAL(5, 1))])\n" + + " LogicalProject(GRADE=[$3], LOSAL=[$4], HISAL=[$5], LOSAL1=[$4])\n" + + " LogicalJoin(condition=[=($1, $4)], joinType=[left])\n" + + " LogicalTableScan(table=[[scott, SALGRADE]])\n" + + " LogicalTableScan(table=[[scott, SALGRADE]])\n" + + "})], variablesSet=[[$cor0]])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `EMPNO`, `ENAME`\n" + + "FROM `scott`.`EMP`\n" + + "WHERE EXISTS (SELECT `GRADE`, `LOSAL`, `HISAL`, `LOSAL1`\n" + + "FROM (SELECT `GRADE`, `LOSAL`, `HISAL`, `LOSAL1`\n" + + "FROM (SELECT `SALGRADE0`.`GRADE`, `SALGRADE0`.`LOSAL`, `SALGRADE0`.`HISAL`," + + " `SALGRADE0`.`LOSAL` `LOSAL1`\n" + + "FROM `scott`.`SALGRADE`\n" + + "LEFT JOIN `scott`.`SALGRADE` `SALGRADE0` ON `SALGRADE`.`LOSAL` =" + + " `SALGRADE0`.`LOSAL`) `t`\n" + + "WHERE `t`.`LOSAL` > 1000.0\n" + + "ORDER BY `HISAL` DESC) `t1`\n" + + "ORDER BY `HISAL` DESC\n" + + "LIMIT 1)\n" + + "ORDER BY `EMPNO` DESC"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testSubsearchMaxOutZero() { + doReturn(0).when(settings).getSettingValue(Settings.Key.PPL_SUBSEARCH_MAXOUT); + String ppl = + """ + source=EMP + | where exists [ + source=SALGRADE + | where EMP.SAL = HISAL and LOSAL > 1000.0 + ] + | sort - EMPNO | fields EMPNO, ENAME + """; + RelNode root = getRelNode(ppl); + String expectedLogical = + "" + + "LogicalProject(EMPNO=[$0], ENAME=[$1])\n" + + " LogicalSort(sort0=[$0], dir0=[DESC-nulls-last])\n" + + " LogicalFilter(condition=[EXISTS({\n" + + "LogicalValues(tuples=[[]])\n" + + "})], variablesSet=[[$cor0]])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `EMPNO`, `ENAME`\n" + + "FROM `scott`.`EMP`\n" + + "WHERE EXISTS (SELECT *\n" + + "FROM (VALUES (NULL, NULL, NULL)) `t` (`GRADE`, `LOSAL`, `HISAL`)\n" + + "WHERE 1 = 0)\n" + + "ORDER BY `EMPNO` DESC"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testSubsearchMaxOutUnlimited() { + doReturn(-1).when(settings).getSettingValue(Settings.Key.PPL_SUBSEARCH_MAXOUT); + String ppl = + """ + source=EMP + | where exists [ + source=SALGRADE + | where EMP.SAL = HISAL and LOSAL > 1000.0 + ] + | sort - EMPNO | fields EMPNO, ENAME + """; + RelNode root = getRelNode(ppl); + String expectedLogical = + "" + + "LogicalProject(EMPNO=[$0], ENAME=[$1])\n" + + " LogicalSort(sort0=[$0], dir0=[DESC-nulls-last])\n" + + " LogicalFilter(condition=[EXISTS({\n" + + "LogicalFilter(condition=[AND(=($cor0.SAL, $2), >($1, 1000.0:DECIMAL(5, 1)))])\n" + + " LogicalTableScan(table=[[scott, SALGRADE]])\n" + + "})], variablesSet=[[$cor0]])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `EMPNO`, `ENAME`\n" + + "FROM `scott`.`EMP`\n" + + "WHERE EXISTS (SELECT *\n" + + "FROM `scott`.`SALGRADE`\n" + + "WHERE `EMP`.`SAL` = `HISAL` AND `LOSAL` > 1000.0)\n" + + "ORDER BY `EMPNO` DESC"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLInSubqueryTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLInSubqueryTest.java index 99731859e28..3b4c2b72a27 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLInSubqueryTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLInSubqueryTest.java @@ -6,10 +6,12 @@ package org.opensearch.sql.ppl.calcite; import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.doReturn; import org.apache.calcite.rel.RelNode; import org.apache.calcite.test.CalciteAssert; import org.junit.Test; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.exception.SemanticCheckException; public class CalcitePPLInSubqueryTest extends CalcitePPLAbstractTest { @@ -255,4 +257,101 @@ public void failWhenNumOfColumnsNotMatchOutputOfSubquery() { """; assertThrows(SemanticCheckException.class, () -> getRelNode(more)); } + + @Test + public void testInCorrelatedSubquery() { + String ppl = + """ + source=EMP | where ENAME in [ + source=DEPT | where EMP.DEPTNO = DEPTNO and LOC = 'BOSTON'| fields DNAME + ] + | fields EMPNO, ENAME, DEPTNO + """; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(EMPNO=[$0], ENAME=[$1], DEPTNO=[$7])\n" + + " LogicalFilter(condition=[IN($1, {\n" + + "LogicalProject(DNAME=[$1])\n" + + " LogicalFilter(condition=[AND(=($cor0.DEPTNO, $0), =($2, 'BOSTON':VARCHAR))])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n" + + "})], variablesSet=[[$cor0]])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `EMPNO`, `ENAME`, `DEPTNO`\n" + + "FROM `scott`.`EMP`\n" + + "WHERE `ENAME` IN (SELECT `DNAME`\n" + + "FROM `scott`.`DEPT`\n" + + "WHERE `EMP`.`DEPTNO` = `DEPTNO` AND `LOC` = 'BOSTON')"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testSubsearchMaxOut() { + doReturn(1).when(settings).getSettingValue(Settings.Key.PPL_SUBSEARCH_MAXOUT); + String ppl = + """ + source=EMP | where DEPTNO in [ source=DEPT | fields DEPTNO ] + | sort - EMPNO | fields EMPNO, ENAME + """; + RelNode root = getRelNode(ppl); + String expectedLogical = + "" + + "LogicalProject(EMPNO=[$0], ENAME=[$1])\n" + + " LogicalSort(sort0=[$0], dir0=[DESC-nulls-last])\n" + + " LogicalFilter(condition=[IN($7, {\n" + + "LogicalSystemLimit(sort0=[$0], dir0=[ASC], fetch=[1], type=[SUBSEARCH_MAXOUT])\n" + + " LogicalProject(DEPTNO=[$0])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n" + + "})], variablesSet=[[$cor0]])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `EMPNO`, `ENAME`\n" + + "FROM `scott`.`EMP`\n" + + "WHERE `DEPTNO` IN (SELECT `DEPTNO`\n" + + "FROM `scott`.`DEPT`\n" + + "ORDER BY `DEPTNO` NULLS LAST\n" + + "LIMIT 1)\n" + + "ORDER BY `EMPNO` DESC"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testInCorrelatedSubqueryMaxOut() { + doReturn(1).when(settings).getSettingValue(Settings.Key.PPL_SUBSEARCH_MAXOUT); + String ppl = + """ + source=EMP | where ENAME in [ + source=DEPT | where EMP.DEPTNO = DEPTNO and LOC = 'BOSTON'| fields DNAME + ] + | fields EMPNO, ENAME, DEPTNO + """; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(EMPNO=[$0], ENAME=[$1], DEPTNO=[$7])\n" + + " LogicalFilter(condition=[IN($1, {\n" + + "LogicalProject(DNAME=[$1])\n" + + " LogicalFilter(condition=[=($cor0.DEPTNO, $0)])\n" + + " LogicalSystemLimit(sort0=[$0], dir0=[ASC], fetch=[1], type=[SUBSEARCH_MAXOUT])\n" + + " LogicalFilter(condition=[=($2, 'BOSTON':VARCHAR)])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n" + + "})], variablesSet=[[$cor0]])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `EMPNO`, `ENAME`, `DEPTNO`\n" + + "FROM `scott`.`EMP`\n" + + "WHERE `ENAME` IN (SELECT `DNAME`\n" + + "FROM (SELECT `DEPTNO`, `DNAME`, `LOC`\n" + + "FROM `scott`.`DEPT`\n" + + "WHERE `LOC` = 'BOSTON'\n" + + "ORDER BY `DEPTNO` NULLS LAST\n" + + "LIMIT 1) `t0`\n" + + "WHERE `EMP`.`DEPTNO` = `DEPTNO`)"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLJoinTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLJoinTest.java index 773c020dc46..ff230540c93 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLJoinTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLJoinTest.java @@ -1070,6 +1070,41 @@ public void testJoinWithMaxEqualsZero() { verifyPPLToSparkSQL(root, expectedSparkSql); } + @Test + public void testJoinSubsearchMaxOut() { + String ppl1 = "source=EMP | join type=inner max=0 DEPTNO DEPT"; + RelNode root1 = getRelNode(ppl1); + verifyResultCount(root1, 14); // no limit + String ppl2 = "source=EMP | inner join left=l right=r on l.DEPTNO=r.DEPTNO DEPT"; + RelNode root2 = getRelNode(ppl2); + verifyResultCount(root1, 14); // no limit for sql-like syntax + + doReturn(1).when(settings).getSettingValue(Settings.Key.PPL_JOIN_SUBSEARCH_MAXOUT); + root1 = getRelNode(ppl1); + verifyResultCount(root1, 3); // set maxout of subsesarch to 1 + root2 = getRelNode(ppl2); + verifyResultCount(root2, 3); // set maxout to 1 for sql-like syntax + String expectedLogical = + "LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5]," + + " COMM=[$6], DEPTNO=[$8], DNAME=[$9], LOC=[$10])\n" + + " LogicalJoin(condition=[=($7, $8)], joinType=[inner])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalSystemLimit(sort0=[$0], dir0=[ASC], fetch=[1]," + + " type=[JOIN_SUBSEARCH_MAXOUT])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n"; + verifyLogical(root1, expectedLogical); + + String expectedSparkSql = + "SELECT `EMP`.`EMPNO`, `EMP`.`ENAME`, `EMP`.`JOB`, `EMP`.`MGR`, `EMP`.`HIREDATE`," + + " `EMP`.`SAL`, `EMP`.`COMM`, `t`.`DEPTNO`, `t`.`DNAME`, `t`.`LOC`\n" + + "FROM `scott`.`EMP`\n" + + "INNER JOIN (SELECT `DEPTNO`, `DNAME`, `LOC`\n" + + "FROM `scott`.`DEPT`\n" + + "ORDER BY `DEPTNO` NULLS LAST\n" + + "LIMIT 1) `t` ON `EMP`.`DEPTNO` = `t`.`DEPTNO`"; + verifyPPLToSparkSQL(root1, expectedSparkSql); + } + @Test public void testJoinWithMaxLessThanZero() { String ppl = "source=EMP | join type=outer max=-1 DEPTNO DEPT"; diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLScalarSubqueryTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLScalarSubqueryTest.java index a0029e21d64..bd9384cff0c 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLScalarSubqueryTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLScalarSubqueryTest.java @@ -339,11 +339,34 @@ public void testNestedScalarSubquery() { verifyPPLToSparkSQL(root, expectedSparkSql); } - // TODO: With Calcite, we can add more complex scalar subquery, such as - // stats by a scalar subquery: - // | eval count_a = [ - // source=.. - // ] - // | stats .. by count_a - // But currently, statsBy an expression is unsupported in PPL. + @Test + public void testCorrelatedScalarSubqueryInWhereMaxOut() { + String ppl = + """ + source=EMP + | where SAL > [ + source=SALGRADE | where SAL = HISAL | stats AVG(SAL) + ] + """; + RelNode root = getRelNode(ppl); + String expectedLogical = + "" + + "LogicalFilter(condition=[>($5, $SCALAR_QUERY({\n" + + "LogicalAggregate(group=[{}], AVG(SAL)=[AVG($0)])\n" + + " LogicalProject($f0=[$cor0.SAL])\n" + + " LogicalFilter(condition=[=($cor0.SAL, $2)])\n" + + " LogicalTableScan(table=[[scott, SALGRADE]])\n" + + "}))], variablesSet=[[$cor0]])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "" + + "SELECT *\n" + + "FROM `scott`.`EMP`\n" + + "WHERE `SAL` > (((SELECT AVG(`EMP`.`SAL`) `AVG(SAL)`\n" + + "FROM `scott`.`SALGRADE`\n" + + "WHERE `EMP`.`SAL` = `HISAL`)))"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } }