diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index 66161a112b105..7b1122a24312f 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -494,6 +494,7 @@ Below is a list of all the keywords in Spark SQL. |NULL|reserved|non-reserved|reserved| |NULLS|non-reserved|non-reserved|non-reserved| |OF|non-reserved|non-reserved|reserved| +|OFFSET|reserved|non-reserved|reserved| |ON|reserved|strict-non-reserved|reserved| |ONLY|reserved|non-reserved|reserved| |OPTION|non-reserved|non-reserved|non-reserved| diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 index e84d4fa45eb99..9f0f3415d83b5 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 @@ -242,6 +242,7 @@ NOT: 'NOT' | '!'; NULL: 'NULL'; NULLS: 'NULLS'; OF: 'OF'; +OFFSET: 'OFFSET'; ON: 'ON'; ONLY: 'ONLY'; OPTION: 'OPTION'; diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 872ea534ec330..657c519695b76 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -439,6 +439,7 @@ queryOrganization (SORT BY sort+=sortItem (COMMA sort+=sortItem)*)? windowClause? (LIMIT (ALL | limit=expression))? + (OFFSET offset=expression)? ; multiInsertQueryBody @@ -1450,6 +1451,7 @@ nonReserved | NULL | NULLS | OF + | OFFSET | ONLY | OPTION | OPTIONS diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 7928aad1e6421..2883ab1e993f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -411,7 +411,24 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { case GlobalLimit(limitExpr, _) => checkLimitLikeClause("limit", limitExpr) - case LocalLimit(limitExpr, _) => checkLimitLikeClause("limit", limitExpr) + case LocalLimit(limitExpr, child) => + checkLimitLikeClause("limit", limitExpr) + child match { + case Offset(offsetExpr, _) => + val limit = limitExpr.eval().asInstanceOf[Int] + val offset = offsetExpr.eval().asInstanceOf[Int] + if (Int.MaxValue - limit < offset) { + failAnalysis( + s""" + |The sum of the LIMIT clause and the OFFSET clause must not be greater than + |the maximum 32-bit integer value (2,147,483,647), + |but found limit = $limit, offset = $offset. + |""".stripMargin.replace("\n", " ")) + } + case _ => + } + + case Offset(offsetExpr, _) => checkLimitLikeClause("offset", offsetExpr) case Tail(limitExpr, _) => checkLimitLikeClause("tail", limitExpr) @@ -588,6 +605,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { } } checkCollectedMetrics(plan) + checkOffsetOperator(plan) extendedCheckRules.foreach(_(plan)) plan.foreachUp { case o if !o.resolved => @@ -830,6 +848,30 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { check(plan) } + /** + * Validate whether the [[Offset]] is valid. + */ + private def checkOffsetOperator(plan: LogicalPlan): Unit = { + plan.foreachUp { + case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit] + && o.children.exists(_.isInstanceOf[Offset]) => + failAnalysis( + s""" + |The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET + |clause found in: ${o.nodeName}.""".stripMargin.replace("\n", " ")) + case _ => + } + plan match { + case Offset(offsetExpr, _) => + checkLimitLikeClause("offset", offsetExpr) + failAnalysis( + s""" + |The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET + |clause is found to be the outermost node.""".stripMargin.replace("\n", " ")) + case _ => + } + } + /** * Validates to make sure the outer references appearing inside the subquery * are allowed. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index b7c017c1e57d3..c11ce7d3b90f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -386,6 +386,8 @@ object UnsupportedOperationChecker extends Logging { throwError("Limits are not supported on streaming DataFrames/Datasets in Update " + "output mode") + case Offset(_, _) => throwError("Offset is not supported on streaming DataFrames/Datasets") + case Sort(_, _, _) if !containsCompleteData(subPlan) => throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on " + "aggregated DataFrame/Dataset in Complete output mode") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index b2ddd5057977a..840a38cc8694d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -402,6 +402,8 @@ package object dsl { def limit(limitExpr: Expression): LogicalPlan = Limit(limitExpr, logicalPlan) + def offset(offsetExpr: Expression): LogicalPlan = Offset(offsetExpr, logicalPlan) + def join( otherPlan: LogicalPlan, joinType: JoinType = Inner, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 66c2ad84ccee8..c93bd7b731097 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -95,6 +95,7 @@ abstract class Optimizer(catalogManager: CatalogManager) CollapseWindow, CombineFilters, EliminateLimits, + RewriteOffsets, CombineUnions, // Constant folding and strength reduction OptimizeRepartition, @@ -1845,6 +1846,25 @@ object EliminateLimits extends Rule[LogicalPlan] { } } +/** + * Rewrite [[Offset]] as [[GlobalLimitAndOffset]] or [[LocalLimit]], + * merging the expressions into one single expression. See [[Limit]] for more information + * about the difference between [[LocalLimit]] and [[GlobalLimit]]. + */ +object RewriteOffsets extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case GlobalLimit(le, Offset(oe, grandChild)) => + GlobalLimitAndOffset(le, oe, grandChild) + case localLimit @ LocalLimit(le, Offset(oe, grandChild)) => + val offset = oe.eval().asInstanceOf[Int] + if (offset == 0) { + localLimit.withNewChildren(Seq(grandChild)) + } else { + Offset(oe, LocalLimit(Add(le, oe), grandChild)) + } + } +} + /** * Check if there any cartesian products between joins of any type in the optimized plan tree. * Throw an error if a cartesian product is found without an explicit cross join specified. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index 2c964fa6da3db..894fd0d704223 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -136,6 +136,7 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup case _: Sort => empty(p) case _: GlobalLimit if !p.isStreaming => empty(p) case _: LocalLimit if !p.isStreaming => empty(p) + case _: Offset => empty(p) case _: Repartition => empty(p) case _: RepartitionByExpression => empty(p) case _: RebalancePartitions => empty(p) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index 5aa134a0c1109..7060a1ff1d04e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -970,6 +970,7 @@ object FoldablePropagation extends Rule[LogicalPlan] { case _: Sample => true case _: GlobalLimit => true case _: LocalLimit => true + case _: Offset => true case _: Generate => true case _: Distinct => true case _: AppendColumns => true diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index c435e319637bf..659b568bc6a2f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -575,10 +575,16 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit // WINDOWS val withWindow = withOrder.optionalMap(windowClause)(withWindowClause) + // OFFSET + // - OFFSET 0 is the same as omitting the OFFSET clause + val withOffset = withWindow.optional(offset) { + Offset(typedVisit(offset), withWindow) + } + // LIMIT // - LIMIT ALL is the same as omitting the LIMIT clause - withWindow.optional(limit) { - Limit(typedVisit(limit), withWindow) + withOffset.optional(limit) { + Limit(typedVisit(limit), withOffset) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala index 726c52592887f..5b25a326831d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala @@ -91,6 +91,13 @@ object DistinctKeyVisitor extends LogicalPlanVisitor[Set[ExpressionSet]] { } } + override def visitOffset(p: Offset): Set[ExpressionSet] = { + p.maxRows match { + case Some(value) if value <= 1 => p.output.map(attr => ExpressionSet(Seq(attr))).toSet + case _ => p.child.distinctKeys + } + } + override def visitIntersect(p: Intersect): Set[ExpressionSet] = { if (!p.isAll) Set(ExpressionSet(p.output)) else default(p) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala index fd5f9051719dc..ee0fd7f8ced61 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala @@ -30,6 +30,7 @@ trait LogicalPlanVisitor[T] { case p: Filter => visitFilter(p) case p: Generate => visitGenerate(p) case p: GlobalLimit => visitGlobalLimit(p) + case p: Offset => visitOffset(p) case p: Intersect => visitIntersect(p) case p: Join => visitJoin(p) case p: LocalLimit => visitLocalLimit(p) @@ -64,6 +65,8 @@ trait LogicalPlanVisitor[T] { def visitGlobalLimit(p: GlobalLimit): T + def visitOffset(p: Offset): T + def visitIntersect(p: Intersect): T def visitJoin(p: Join): T diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 895eeb772075d..39b203bceb9f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -1145,6 +1145,23 @@ case class Expand( copy(child = newChild) } +/** + * A logical offset, which may removing a specified number of rows from the beginning of the + * output of child logical plan. + */ +case class Offset(offsetExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode { + override def output: Seq[Attribute] = child.output + override def maxRows: Option[Long] = { + import scala.math.max + offsetExpr match { + case IntegerLiteral(offset) => child.maxRows.map { x => max(x - offset, 0) } + case _ => None + } + } + override protected def withNewChildInternal(newChild: LogicalPlan): Offset = + copy(child = newChild) +} + /** * A constructor for creating a pivot, which will later be converted to a [[Project]] * or an [[Aggregate]] during the query analysis. @@ -1256,6 +1273,37 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr copy(child = newChild) } +object LimitAndOffset { + def unapply(p: GlobalLimitAndOffset): Option[(Expression, Expression, LogicalPlan)] = { + p match { + case GlobalLimitAndOffset(le1, le2, LocalLimit(le3, child)) if le1.eval().asInstanceOf[Int] + + le2.eval().asInstanceOf[Int] == le3.eval().asInstanceOf[Int] => + Some((le1, le2, child)) + case _ => None + } + } +} + +/** + * A global (coordinated) limit with offset. This operator can skip at most `offsetExpr` number and + * emit at most `limitExpr` number in total. For example, if we have LIMIT 10 OFFSET 5, we impose a + * total limit of 10 + 5 = 15 rows and then discard the first 5, leaving 10 rows remaining. + */ +case class GlobalLimitAndOffset( + limitExpr: Expression, + offsetExpr: Expression, + child: LogicalPlan) extends OrderPreservingUnaryNode { + override def output: Seq[Attribute] = child.output + override def maxRows: Option[Long] = { + limitExpr match { + case IntegerLiteral(limit) => Some(limit) + case _ => None + } + } + override protected def withNewChildInternal(newChild: LogicalPlan): GlobalLimitAndOffset = + copy(child = newChild) +} + /** * This is similar with [[Limit]] except: * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/BasicStatsPlanVisitor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/BasicStatsPlanVisitor.scala index 0f09022fb9c2f..59a302b1af900 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/BasicStatsPlanVisitor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/BasicStatsPlanVisitor.scala @@ -60,6 +60,8 @@ object BasicStatsPlanVisitor extends LogicalPlanVisitor[Statistics] { override def visitGlobalLimit(p: GlobalLimit): Statistics = fallback(p) + override def visitOffset(p: Offset): Statistics = fallback(p) + override def visitIntersect(p: Intersect): Statistics = { val leftStats = p.left.stats val rightStats = p.right.stats diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala index 67a045fe5ec1a..311dd31a96b3b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala @@ -90,6 +90,15 @@ object SizeInBytesOnlyStatsPlanVisitor extends LogicalPlanVisitor[Statistics] { rowCount = Some(rowCount)) } + override def visitOffset(p: Offset): Statistics = { + val offset = p.offsetExpr.eval().asInstanceOf[Int] + val childStats = p.child.stats + val rowCount: BigInt = childStats.rowCount.map(c => c - offset).map(_.max(0)).getOrElse(0) + Statistics( + sizeInBytes = EstimationUtils.getOutputSize(p.output, rowCount, childStats.attributeStats), + rowCount = Some(rowCount)) + } + override def visitIntersect(p: Intersect): Statistics = { val leftSize = p.left.stats.sizeInBytes val rightSize = p.right.stats.sizeInBytes diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 0a8a9d844b8cf..486123d2a882a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String private[sql] case class GroupableData(data: Int) { def getData: Int = data @@ -531,6 +532,52 @@ class AnalysisErrorSuite extends AnalysisTest { "The limit expression must be equal to or greater than 0, but got -1" :: Nil ) + errorTest( + "an evaluated offset class must not be string", + testRelation.offset(Literal(UTF8String.fromString("abc"), StringType)), + "The offset expression must be integer type, but got string" :: Nil + ) + + errorTest( + "an evaluated offset class must not be long", + testRelation.offset(Literal(10L, LongType)), + "The offset expression must be integer type, but got bigint" :: Nil + ) + + errorTest( + "an evaluated offset class must not be null", + testRelation.offset(Literal(null, IntegerType)), + "The evaluated offset expression must not be null, but got " :: Nil + ) + + errorTest( + "num_rows in offset clause must be equal to or greater than 0", + testRelation.offset(-1), + "The offset expression must be equal to or greater than 0, but got -1" :: Nil + ) + + errorTest( + "OFFSET clause is outermost node", + testRelation.offset(Literal(10, IntegerType)), + "The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" + + " clause is found to be the outermost node." :: Nil + ) + + errorTest( + "OFFSET clause in other node", + testRelation2.offset(Literal(10, IntegerType)).where('b > 1), + "The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" + + " clause found in: Filter." :: Nil + ) + + errorTest( + "the sum of num_rows in limit clause and num_rows in offset clause less than Int.MaxValue", + testRelation.offset(Literal(2000000000, IntegerType)).limit(Literal(1000000000, IntegerType)), + "The sum of the LIMIT clause and the OFFSET clause must not be greater than" + + " the maximum 32-bit integer value (2,147,483,647)," + + " but found limit = 1000000000, offset = 2000000000." :: Nil + ) + errorTest( "more than one generators in SELECT", listRelation.select(Explode($"list"), Explode($"list")), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala index 80342f6dd7a78..bbf69ae622bc4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala @@ -94,6 +94,15 @@ class DistinctKeyVisitorSuite extends PlanTest { Set(ExpressionSet(Seq(a)), ExpressionSet(Seq(b)), ExpressionSet(Seq(c)))) } + test("Offset's distinct attributes") { + checkDistinctAttributes(Distinct(t1).limit(12).offset(10).limit(10), + Set(ExpressionSet(Seq(a, b, c)))) + checkDistinctAttributes(LocalLimit(10, Offset(10, LocalLimit(12, Distinct(t1)))), + Set(ExpressionSet(Seq(a, b, c)))) + checkDistinctAttributes(t1.offset(1).limit(1), + Set(ExpressionSet(Seq(a)), ExpressionSet(Seq(b)), ExpressionSet(Seq(c)))) + } + test("Intersect's distinct attributes") { checkDistinctAttributes(Intersect(t1, t2, false), Set(ExpressionSet(Seq(a, b, c)))) checkDistinctAttributes(Intersect(t1, t2, true), Set.empty) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala index 884870ed01b2d..265d095ffa448 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala @@ -185,6 +185,22 @@ class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase { expectedStatsCboOff = windowsStats) } + test("offset estimation: offset < child's rowCount") { + val offset = Offset(Literal(2), plan) + checkStats(offset, Statistics(sizeInBytes = 96, rowCount = Some(8))) + } + + test("offset estimation: offset > child's rowCount") { + val offset = Offset(Literal(20), plan) + checkStats(offset, Statistics(sizeInBytes = 1, rowCount = Some(0))) + } + + test("offset estimation: offset = 0") { + val offset = Offset(Literal(0), plan) + // Offset is equal to zero, so Offset's stats is equal to its child's stats. + checkStats(offset, plan.stats.copy(attributeStats = AttributeMap(Nil))) + } + test("limit estimation: limit < child's rowCount") { val localLimit = LocalLimit(Literal(2), plan) val globalLimit = GlobalLimit(Literal(2), plan) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 675b158100394..81ac5434db926 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -91,6 +91,17 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil case Limit(IntegerLiteral(limit), child) => CollectLimitExec(limit, planLater(child)) :: Nil + case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), + Sort(order, true, child)) if limit + offset < conf.topKSortFallbackThreshold => + TakeOrderedAndProjectExec( + limit, order, child.output, planLater(child), Some(offset)) :: Nil + case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), + Project(projectList, Sort(order, true, child))) + if limit + offset < conf.topKSortFallbackThreshold => + TakeOrderedAndProjectExec( + limit, order, projectList, planLater(child), Some(offset)) :: Nil + case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) => + CollectLimitExec(limit, planLater(child), Some(offset)) :: Nil case Tail(IntegerLiteral(limit), child) => CollectTailExec(limit, planLater(child)) :: Nil case other => planLater(other) :: Nil @@ -101,6 +112,24 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) if limit < conf.topKSortFallbackThreshold => TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil + // This is a global LIMIT and OFFSET over a logical sorting operator, + // where the sum of specified limit and specified offset is less than a heuristic threshold. + // In this case we generate a physical top-K sorting operator, passing down + // the limit and offset values to be evaluated inline during the physical + // sorting operation for greater efficiency. + case LimitAndOffset( + IntegerLiteral(limit), + IntegerLiteral(offset), + Sort(order, true, child)) + if limit + offset < conf.topKSortFallbackThreshold => + TakeOrderedAndProjectExec( + limit, order, child.output, planLater(child), Some(offset)) :: Nil + case LimitAndOffset( + IntegerLiteral(limit), + IntegerLiteral(offset), + Project(projectList, Sort(order, true, child))) + if limit + offset < conf.topKSortFallbackThreshold => + TakeOrderedAndProjectExec(limit, order, projectList, planLater(child), Some(offset)) :: Nil case _ => Nil } } @@ -817,6 +846,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.LocalLimitExec(limit, planLater(child)) :: Nil case logical.GlobalLimit(IntegerLiteral(limit), child) => execution.GlobalLimitExec(limit, planLater(child)) :: Nil + case logical.GlobalLimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) => + execution.GlobalLimitAndOffsetExec(limit, offset, planLater(child)) :: Nil case union: logical.Union => execution.UnionExec(union.children.map(planLater)) :: Nil case g @ logical.Generate(generator, _, outer, _, _, child) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 5114c075a72d0..f40245dca184a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -37,15 +37,25 @@ trait LimitExec extends UnaryExecNode { } /** - * Take the first `limit` elements and collect them to a single partition. + * Take the first `limit` + `offset` elements and collect them to a single partition and then to + * drop the first `offset` elements. * * This operator will be used when a logical `Limit` operation is the final operator in an * logical plan, which happens when the user is collecting results back to the driver. */ -case class CollectLimitExec(limit: Int, child: SparkPlan) extends LimitExec { +case class CollectLimitExec( + limit: Int, child: SparkPlan, offsetOpt: Option[Int] = None) extends LimitExec { override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = SinglePartition - override def executeCollect(): Array[InternalRow] = child.executeTake(limit) + override def executeCollect(): Array[InternalRow] = { + // Because CollectLimitExec collect all the output of child to a single partition, so we need + // collect the first `limit` + `offset` elements and then to drop the first `offset` elements. + // For example: limit is 1 and offset is 2 and the child output two partition. + // The first partition output [1, 2] and the Second partition output [3, 4, 5]. + // Then [1, 2, 3] will be taken and output [3]. + offsetOpt.map(offset => child.executeTake(limit + offset).drop(offset)) + .getOrElse(child.executeTake(limit)) + } private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) private lazy val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) @@ -60,7 +70,9 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends LimitExec { val singlePartitionRDD = if (childRDD.getNumPartitions == 1) { childRDD } else { - val locallyLimited = childRDD.mapPartitionsInternal(_.take(limit)) + val locallyLimited = + offsetOpt.map(offset => childRDD.mapPartitionsInternal(_.take(limit + offset))) + .getOrElse(childRDD.mapPartitionsInternal(_.take(limit))) new ShuffledRowRDD( ShuffleExchangeExec.prepareShuffleDependency( locallyLimited, @@ -70,10 +82,18 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends LimitExec { writeMetrics), readMetrics) } - singlePartitionRDD.mapPartitionsInternal(_.take(limit)) + offsetOpt.map(offset => singlePartitionRDD.mapPartitionsInternal(_.drop(offset).take(limit))) + .getOrElse(singlePartitionRDD.mapPartitionsInternal(_.take(limit))) } } + override def stringArgs: Iterator[Any] = { + super.stringArgs.zipWithIndex.filter { + case (0, 1) => false + case _ => true + }.map(_._1) + } + override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) } @@ -135,7 +155,7 @@ trait BaseLimitExec extends LimitExec with CodegenSupport { // to the parent operator. override def usedInputs: AttributeSet = AttributeSet.empty - private lazy val countTerm = BaseLimitExec.newLimitCountTerm() + protected lazy val countTerm = BaseLimitExec.newLimitCountTerm() override lazy val limitNotReachedChecks: Seq[String] = { s"$countTerm < $limit" +: super.limitNotReachedChecks @@ -182,6 +202,43 @@ case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { copy(child = newChild) } +/** + * Skip the first `offset` elements then take the first `limit` of the following elements in + * the child's single output partition. + */ +case class GlobalLimitAndOffsetExec( + limit: Int, + offset: Int, + child: SparkPlan) extends BaseLimitExec { + + override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + + override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => + iter.take(limit + offset).drop(offset) + } + + private lazy val skipTerm = BaseLimitExec.newLimitCountTerm() + + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { + // The counter name is already obtained by the upstream operators via `limitNotReachedChecks`. + // Here we have to inline it to not change its name. This is fine as we won't have many limit + // operators in one query. + ctx.addMutableState(CodeGenerator.JAVA_INT, countTerm, forceInline = true, useFreshName = false) + ctx.addMutableState(CodeGenerator.JAVA_INT, skipTerm, forceInline = true, useFreshName = false) + s""" + | if ($skipTerm < $offset) { + | $skipTerm += 1; + | } else if ($countTerm < $limit) { + | $countTerm += 1; + | ${consume(ctx, input)} + | } + """.stripMargin + } + + override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = + copy(child = newChild) +} + /** * Take the first limit elements as defined by the sortOrder, and do projection if needed. * This is logically equivalent to having a Limit operator after a [[SortExec]] operator, @@ -193,7 +250,7 @@ case class TakeOrderedAndProjectExec( limit: Int, sortOrder: Seq[SortOrder], projectList: Seq[NamedExpression], - child: SparkPlan) extends UnaryExecNode { + child: SparkPlan, offsetOpt: Option[Int] = None) extends UnaryExecNode { override def output: Seq[Attribute] = { projectList.map(_.toAttribute) @@ -201,7 +258,9 @@ case class TakeOrderedAndProjectExec( override def executeCollect(): Array[InternalRow] = { val ord = new LazilyGeneratedOrdering(sortOrder, child.output) - val data = child.execute().map(_.copy()).takeOrdered(limit)(ord) + val data = offsetOpt + .map(offset => child.execute().map(_.copy()).takeOrdered(limit + offset)(ord).drop(offset)) + .getOrElse(child.execute().map(_.copy()).takeOrdered(limit)(ord)) if (projectList != child.output) { val proj = UnsafeProjection.create(projectList, child.output) data.map(r => proj(r).copy()) @@ -227,9 +286,11 @@ case class TakeOrderedAndProjectExec( val singlePartitionRDD = if (childRDD.getNumPartitions == 1) { childRDD } else { - val localTopK = childRDD.mapPartitions { iter => + val localTopK = offsetOpt.map(offset => childRDD.mapPartitions { iter => + Utils.takeOrdered(iter.map(_.copy()), limit + offset)(ord) + }).getOrElse(childRDD.mapPartitions { iter => Utils.takeOrdered(iter.map(_.copy()), limit)(ord) - } + }) new ShuffledRowRDD( ShuffleExchangeExec.prepareShuffleDependency( localTopK, @@ -240,7 +301,9 @@ case class TakeOrderedAndProjectExec( readMetrics) } singlePartitionRDD.mapPartitions { iter => - val topK = Utils.takeOrdered(iter.map(_.copy()), limit)(ord) + val topK = offsetOpt + .map(offset => Utils.takeOrdered(iter.map(_.copy()), limit + offset)(ord).drop(offset)) + .getOrElse(Utils.takeOrdered(iter.map(_.copy()), limit)(ord)) if (projectList != child.output) { val proj = UnsafeProjection.create(projectList, child.output) topK.map(r => proj(r)) @@ -262,6 +325,13 @@ case class TakeOrderedAndProjectExec( s"TakeOrderedAndProject(limit=$limit, orderBy=$orderByString, output=$outputString)" } + override def stringArgs: Iterator[Any] = { + super.stringArgs.zipWithIndex.filter { + case (0, 1) => false + case _ => true + }.map(_._1) + } + override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) } diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/limit.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/limit.sql index bc0b5d6dddc52..ffaebce9796a7 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/limit.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/limit.sql @@ -12,25 +12,24 @@ SELECT '' AS five, unique1, unique2, stringu1 SELECT '' AS two, unique1, unique2, stringu1 FROM onek WHERE unique1 > 60 AND unique1 < 63 ORDER BY unique1 LIMIT 5; --- [SPARK-28330] ANSI SQL: Top-level in --- SELECT '' AS three, unique1, unique2, stringu1 --- FROM onek WHERE unique1 > 100 --- ORDER BY unique1 LIMIT 3 OFFSET 20; --- SELECT '' AS zero, unique1, unique2, stringu1 --- FROM onek WHERE unique1 < 50 --- ORDER BY unique1 DESC LIMIT 8 OFFSET 99; --- SELECT '' AS eleven, unique1, unique2, stringu1 --- FROM onek WHERE unique1 < 50 --- ORDER BY unique1 DESC LIMIT 20 OFFSET 39; +SELECT '' AS three, unique1, unique2, stringu1 + FROM onek WHERE unique1 > 100 + ORDER BY unique1 LIMIT 3 OFFSET 20; + SELECT '' AS zero, unique1, unique2, stringu1 + FROM onek WHERE unique1 < 50 + ORDER BY unique1 DESC LIMIT 8 OFFSET 99; + SELECT '' AS eleven, unique1, unique2, stringu1 + FROM onek WHERE unique1 < 50 + ORDER BY unique1 DESC LIMIT 20 OFFSET 39; -- SELECT '' AS ten, unique1, unique2, stringu1 -- FROM onek -- ORDER BY unique1 OFFSET 990; -- SELECT '' AS five, unique1, unique2, stringu1 -- FROM onek -- ORDER BY unique1 OFFSET 990 LIMIT 5; --- SELECT '' AS five, unique1, unique2, stringu1 --- FROM onek --- ORDER BY unique1 LIMIT 5 OFFSET 900; +SELECT '' AS five, unique1, unique2, stringu1 + FROM onek + ORDER BY unique1 LIMIT 5 OFFSET 900; CREATE OR REPLACE TEMPORARY VIEW INT8_TBL AS SELECT * FROM (VALUES @@ -45,8 +44,7 @@ CREATE OR REPLACE TEMPORARY VIEW INT8_TBL AS SELECT * FROM -- constant, so to ensure executor is exercised, do this: -- [SPARK-29650] Discard a NULL constant in LIMIT select * from int8_tbl limit (case when random() < 0.5 then bigint(null) end); --- [SPARK-28330] ANSI SQL: Top-level in --- select * from int8_tbl offset (case when random() < 0.5 then bigint(null) end); +select * from int8_tbl offset (case when random() < 0.5 then bigint(null) end); -- Test assorted cases involving backwards fetch from a LIMIT plan node -- [SPARK-20965] Support PREPARE/EXECUTE/DECLARE/FETCH statements @@ -90,7 +88,7 @@ DROP VIEW INT8_TBL; -- Stress test for variable LIMIT in conjunction with bounded-heap sorting --- [SPARK-28330] ANSI SQL: Top-level in +-- [SPARK-27767] Built-in function: generate_series -- SELECT -- (SELECT n -- FROM (VALUES (1)) AS x, diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-orderby-limit.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-orderby-limit.sql index 580fc1d4162eb..a1cc218b4ad71 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-orderby-limit.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-orderby-limit.sql @@ -121,3 +121,44 @@ WHERE NOT EXISTS (SELECT max(dept.dept_id) WHERE dept.dept_id > 100 GROUP BY state LIMIT 1); + +-- limit and offset in the exists subquery block. +-- TC.03.01 +SELECT * +FROM emp +WHERE EXISTS (SELECT dept.dept_name + FROM dept + WHERE dept.dept_id > 10 + LIMIT 1 + OFFSET 2); + +-- limit and offset in the exists subquery block with aggregate. +-- TC.03.02 +SELECT * +FROM emp +WHERE EXISTS (SELECT max(dept.dept_id) + FROM dept + GROUP BY state + LIMIT 1 + OFFSET 2); + +-- limit and offset in the not exists subquery block. +-- TC.03.03 +SELECT * +FROM emp +WHERE NOT EXISTS (SELECT dept.dept_name + FROM dept + WHERE dept.dept_id > 100 + LIMIT 1 + OFFSET 2); + +-- limit and offset in the not exists subquery block with aggregates. +-- TC.03.04 +SELECT * +FROM emp +WHERE NOT EXISTS (SELECT max(dept.dept_id) + FROM dept + WHERE dept.dept_id > 100 + GROUP BY state + LIMIT 1 + OFFSET 2); diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql index 53fc2b8be7501..b6adebeca3e80 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql @@ -100,4 +100,62 @@ WHERE t1d NOT IN (SELECT t2d LIMIT 1) GROUP BY t1b ORDER BY t1b NULLS last -LIMIT 1; \ No newline at end of file +LIMIT 1; + +-- LIMIT and OFFSET in parent side +-- TC 02.01 +SELECT * +FROM t1 +WHERE t1a IN (SELECT t2a + FROM t2 + WHERE t1d = t2d) +LIMIT 2 +OFFSET 2; + +-- TC 02.02 +SELECT * +FROM t1 +WHERE t1c IN (SELECT t2c + FROM t2 + WHERE t2b >= 8 + LIMIT 2 + OFFSET 2) +LIMIT 4 +OFFSET 2; + +-- TC 02.03 +SELECT Count(DISTINCT( t1a )), + t1b +FROM t1 +WHERE t1d IN (SELECT t2d + FROM t2 + ORDER BY t2c, t2d + LIMIT 2) +GROUP BY t1b +ORDER BY t1b DESC NULLS FIRST +LIMIT 1 +OFFSET 1; + +-- LIMIT with NOT IN +-- TC 02.04 +SELECT * +FROM t1 +WHERE t1b NOT IN (SELECT t2b + FROM t2 + WHERE t2b > 6 + LIMIT 2 + OFFSET 2); + +-- TC 02.05 +SELECT Count(DISTINCT( t1a )), + t1b +FROM t1 +WHERE t1d NOT IN (SELECT t2d + FROM t2 + ORDER BY t2b DESC nulls first, t2d + LIMIT 1 + OFFSET 1) +GROUP BY t1b +ORDER BY t1b NULLS last +LIMIT 1 +OFFSET 1; diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/limit.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/limit.sql.out index 2384010c67b4d..416bcc9931ce7 100644 --- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/limit.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/limit.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 7 +-- Number of queries: 12 -- !query @@ -38,6 +38,62 @@ struct 62 633 KCAAAA +-- !query +SELECT '' AS three, unique1, unique2, stringu1 + FROM onek WHERE unique1 > 100 + ORDER BY unique1 LIMIT 3 OFFSET 20 +-- !query schema +struct +-- !query output + 121 700 REAAAA + 122 519 SEAAAA + 123 777 TEAAAA + + +-- !query +SELECT '' AS zero, unique1, unique2, stringu1 + FROM onek WHERE unique1 < 50 + ORDER BY unique1 DESC LIMIT 8 OFFSET 99 +-- !query schema +struct +-- !query output + + + +-- !query +SELECT '' AS eleven, unique1, unique2, stringu1 + FROM onek WHERE unique1 < 50 + ORDER BY unique1 DESC LIMIT 20 OFFSET 39 +-- !query schema +struct +-- !query output + 10 520 KAAAAA + 9 49 JAAAAA + 8 653 IAAAAA + 7 647 HAAAAA + 6 978 GAAAAA + 5 541 FAAAAA + 4 833 EAAAAA + 3 431 DAAAAA + 2 326 CAAAAA + 1 214 BAAAAA + 0 998 AAAAAA + + +-- !query +SELECT '' AS five, unique1, unique2, stringu1 + FROM onek + ORDER BY unique1 LIMIT 5 OFFSET 900 +-- !query schema +struct +-- !query output + 900 913 QIAAAA + 901 931 RIAAAA + 902 702 SIAAAA + 903 641 TIAAAA + 904 793 UIAAAA + + -- !query CREATE OR REPLACE TEMPORARY VIEW INT8_TBL AS SELECT * FROM (VALUES @@ -62,6 +118,15 @@ org.apache.spark.sql.AnalysisException The limit expression must evaluate to a constant value, but got CASE WHEN (_nondeterministic < CAST(0.5BD AS DOUBLE)) THEN CAST(NULL AS BIGINT) END +-- !query +select * from int8_tbl offset (case when random() < 0.5 then bigint(null) end) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +The offset expression must evaluate to a constant value, but got CASE WHEN (_nondeterministic < CAST(0.5BD AS DOUBLE)) THEN CAST(NULL AS BIGINT) END + + -- !query DROP VIEW INT8_TBL -- !query schema diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-orderby-limit.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-orderby-limit.sql.out index ebd4da6ccbd5d..76c377e8ca624 100644 --- a/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-orderby-limit.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-orderby-limit.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 12 +-- Number of queries: 16 -- !query @@ -220,3 +220,92 @@ struct 600 emp 6 - no dept 2001-01-01 400.0 100 700 emp 7 2010-01-01 400.0 100 800 emp 8 2016-01-01 150.0 70 + + +-- !query +SELECT * +FROM emp +WHERE EXISTS (SELECT dept.dept_name + FROM dept + WHERE dept.dept_id > 10 + LIMIT 1 + OFFSET 2) +-- !query schema +struct +-- !query output +100 emp 1 2005-01-01 100.0 10 +100 emp 1 2005-01-01 100.0 10 +200 emp 2 2003-01-01 200.0 10 +300 emp 3 2002-01-01 300.0 20 +400 emp 4 2005-01-01 400.0 30 +500 emp 5 2001-01-01 400.0 NULL +600 emp 6 - no dept 2001-01-01 400.0 100 +700 emp 7 2010-01-01 400.0 100 +800 emp 8 2016-01-01 150.0 70 + + +-- !query +SELECT * +FROM emp +WHERE EXISTS (SELECT max(dept.dept_id) + FROM dept + GROUP BY state + LIMIT 1 + OFFSET 2) +-- !query schema +struct +-- !query output +100 emp 1 2005-01-01 100.0 10 +100 emp 1 2005-01-01 100.0 10 +200 emp 2 2003-01-01 200.0 10 +300 emp 3 2002-01-01 300.0 20 +400 emp 4 2005-01-01 400.0 30 +500 emp 5 2001-01-01 400.0 NULL +600 emp 6 - no dept 2001-01-01 400.0 100 +700 emp 7 2010-01-01 400.0 100 +800 emp 8 2016-01-01 150.0 70 + + +-- !query +SELECT * +FROM emp +WHERE NOT EXISTS (SELECT dept.dept_name + FROM dept + WHERE dept.dept_id > 100 + LIMIT 1 + OFFSET 2) +-- !query schema +struct +-- !query output +100 emp 1 2005-01-01 100.0 10 +100 emp 1 2005-01-01 100.0 10 +200 emp 2 2003-01-01 200.0 10 +300 emp 3 2002-01-01 300.0 20 +400 emp 4 2005-01-01 400.0 30 +500 emp 5 2001-01-01 400.0 NULL +600 emp 6 - no dept 2001-01-01 400.0 100 +700 emp 7 2010-01-01 400.0 100 +800 emp 8 2016-01-01 150.0 70 + + +-- !query +SELECT * +FROM emp +WHERE NOT EXISTS (SELECT max(dept.dept_id) + FROM dept + WHERE dept.dept_id > 100 + GROUP BY state + LIMIT 1 + OFFSET 2) +-- !query schema +struct +-- !query output +100 emp 1 2005-01-01 100.0 10 +100 emp 1 2005-01-01 100.0 10 +200 emp 2 2003-01-01 200.0 10 +300 emp 3 2002-01-01 300.0 20 +400 emp 4 2005-01-01 400.0 30 +500 emp 5 2001-01-01 400.0 NULL +600 emp 6 - no dept 2001-01-01 400.0 100 +700 emp 7 2010-01-01 400.0 100 +800 emp 8 2016-01-01 150.0 70 diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out index e24538b9138ba..185eb9fed285b 100644 --- a/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 8 +-- Number of queries: 13 -- !query @@ -145,3 +145,93 @@ LIMIT 1 struct -- !query output 1 6 + + +-- !query +SELECT * +FROM t1 +WHERE t1a IN (SELECT t2a + FROM t2 + WHERE t1d = t2d) +LIMIT 2 +OFFSET 2 +-- !query schema +struct +-- !query output +val1e 10 NULL 19 17.0 25.0 2600 2014-05-04 01:01:00 2014-05-04 +val1e 10 NULL 19 17.0 25.0 2600 2014-09-04 01:02:00.001 2014-09-04 + + +-- !query +SELECT * +FROM t1 +WHERE t1c IN (SELECT t2c + FROM t2 + WHERE t2b >= 8 + LIMIT 2 + OFFSET 2) +LIMIT 4 +OFFSET 2 +-- !query schema +struct +-- !query output +val1d NULL 16 19 17.0 25.0 2600 2014-07-04 01:02:00.001 NULL +val1d NULL 16 22 17.0 25.0 2600 2014-06-04 01:01:00 NULL + + +-- !query +SELECT Count(DISTINCT( t1a )), + t1b +FROM t1 +WHERE t1d IN (SELECT t2d + FROM t2 + ORDER BY t2c, t2d + LIMIT 2) +GROUP BY t1b +ORDER BY t1b DESC NULLS FIRST +LIMIT 1 +OFFSET 1 +-- !query schema +struct +-- !query output +1 10 + + +-- !query +SELECT * +FROM t1 +WHERE t1b NOT IN (SELECT t2b + FROM t2 + WHERE t2b > 6 + LIMIT 2 + OFFSET 2) +-- !query schema +struct +-- !query output +val1a 16 12 10 15.0 20.0 2000 2014-07-04 01:01:00 2014-07-04 +val1a 16 12 21 15.0 20.0 2000 2014-06-04 01:02:00.001 2014-06-04 +val1a 6 8 10 15.0 20.0 2000 2014-04-04 01:00:00 2014-04-04 +val1a 6 8 10 15.0 20.0 2000 2014-04-04 01:02:00.001 2014-04-04 +val1d 10 NULL 12 17.0 25.0 2600 2015-05-04 01:01:00 2015-05-04 +val1e 10 NULL 19 17.0 25.0 2600 2014-05-04 01:01:00 2014-05-04 +val1e 10 NULL 19 17.0 25.0 2600 2014-09-04 01:02:00.001 2014-09-04 +val1e 10 NULL 25 17.0 25.0 2600 2014-08-04 01:01:00 2014-08-04 + + +-- !query +SELECT Count(DISTINCT( t1a )), + t1b +FROM t1 +WHERE t1d NOT IN (SELECT t2d + FROM t2 + ORDER BY t2b DESC nulls first, t2d + LIMIT 1 + OFFSET 1) +GROUP BY t1b +ORDER BY t1b NULLS last +LIMIT 1 +OFFSET 1 +-- !query schema +struct +-- !query output +2 8 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index bd2fa50bb061c..8de81b81e9ead 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -242,6 +242,21 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { } } + test("TakeOrderedAndProjectExec appears only when limit + offset is below the threshold.") { + withTempView("testLimitAndOffset") { + testData.createOrReplaceTempView("testLimitAndOffset") + withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1000") { + val query0 = sql("select value from testLimitAndOffset order by key limit 100 offset 100") + val planned0 = query0.queryExecution.executedPlan + assert(planned0.exists(_.isInstanceOf[TakeOrderedAndProjectExec])) + + val query1 = sql("select value from testLimitAndOffset order by key limit 100 offset 1000") + val planned1 = query1.queryExecution.executedPlan + assert(!planned1.exists(_.isInstanceOf[TakeOrderedAndProjectExec])) + } + } + } + test("PartitioningCollection") { withTempView("normal", "small", "tiny") { testData.createOrReplaceTempView("normal")