diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 51a3facf40835..cf8e72ffbec4e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -433,6 +433,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { case Offset(offsetExpr, _) => checkLimitLikeClause("offset", offsetExpr) + case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit] + && o.children.exists(_.isInstanceOf[Offset]) => + failAnalysis( + s""" + |The OFFSET clause is allowed in the LIMIT clause or be the outermost node, + |but the OFFSET clause found in: ${o.nodeName}.""".stripMargin.replace("\n", " ")) + case Tail(limitExpr, _) => checkLimitLikeClause("tail", limitExpr) case _: Union | _: SetOperation if operator.children.length > 1 => @@ -608,7 +615,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { } } checkCollectedMetrics(plan) - checkOffsetOperator(plan) extendedCheckRules.foreach(_(plan)) plan.foreachUp { case o if !o.resolved => @@ -851,30 +857,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { check(plan) } - /** - * Validate whether the [[Offset]] is valid. - */ - private def checkOffsetOperator(plan: LogicalPlan): Unit = { - plan.foreachUp { - case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit] - && o.children.exists(_.isInstanceOf[Offset]) => - failAnalysis( - s""" - |The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET - |clause found in: ${o.nodeName}.""".stripMargin.replace("\n", " ")) - case _ => - } - plan match { - case Offset(offsetExpr, _) => - checkLimitLikeClause("offset", offsetExpr) - failAnalysis( - s""" - |The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET - |clause is found to be the outermost node.""".stripMargin.replace("\n", " ")) - case _ => - } - } - /** * Validates to make sure the outer references appearing inside the subquery * are allowed. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 1615ddc00e321..3a36e506f4ee0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -95,8 +95,8 @@ abstract class Optimizer(catalogManager: CatalogManager) OptimizeWindowFunctions, CollapseWindow, CombineFilters, + EliminateOffsets, EliminateLimits, - RewriteOffsets, CombineUnions, // Constant folding and strength reduction OptimizeRepartition, @@ -673,7 +673,7 @@ object RemoveNoopUnion extends Rule[LogicalPlan] { } /** - * Pushes down [[LocalLimit]] beneath UNION ALL and joins. + * Pushes down [[LocalLimit]] beneath UNION ALL, OFFSET and joins. */ object LimitPushDown extends Rule[LogicalPlan] { @@ -750,6 +750,9 @@ object LimitPushDown extends Rule[LogicalPlan] { Limit(le, Project(a.aggregateExpressions, LocalLimit(le, a.child))) case Limit(le @ IntegerLiteral(1), p @ Project(_, a: Aggregate)) if a.groupOnly => Limit(le, p.copy(child = Project(a.aggregateExpressions, LocalLimit(le, a.child)))) + // Merge offset value and limit value into LocalLimit and pushes down LocalLimit through Offset. + case LocalLimit(le, Offset(oe, grandChild)) => + Offset(oe, LocalLimit(Add(le, oe), grandChild)) } } @@ -1871,21 +1874,22 @@ object EliminateLimits extends Rule[LogicalPlan] { } /** - * Rewrite [[Offset]] as [[GlobalLimitAndOffset]] or [[LocalLimit]], - * merging the expressions into one single expression. See [[Limit]] for more information - * about the difference between [[LocalLimit]] and [[GlobalLimit]]. + * This rule optimizes Offset operators by: + * 1. Eliminate [[Offset]] operators if offset == 0. + * 2. Replace [[Offset]] operators to empty [[LocalRelation]] + * if [[Offset]]'s child max row <= offset. + * 3. Combines two adjacent [[Offset]] operators into one, merging the + * expressions into one single expression. */ -object RewriteOffsets extends Rule[LogicalPlan] { +object EliminateOffsets extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case GlobalLimit(le, Offset(oe, grandChild)) => - GlobalLimitAndOffset(le, oe, grandChild) - case localLimit @ LocalLimit(le, Offset(oe, grandChild)) => - val offset = oe.eval().asInstanceOf[Int] - if (offset == 0) { - localLimit.withNewChildren(Seq(grandChild)) - } else { - Offset(oe, LocalLimit(Add(le, oe), grandChild)) - } + case Offset(oe, child) if oe.foldable && oe.eval().asInstanceOf[Int] == 0 => + child + case Offset(oe, child) + if oe.foldable && child.maxRows.exists(_ <= oe.eval().asInstanceOf[Int]) => + LocalRelation(child.output, data = Seq.empty, isStreaming = child.isStreaming) + case Offset(oe1, Offset(oe2, child)) => + Offset(Add(oe1, oe2), child) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 419e28c800791..38926b63786ac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -1304,9 +1304,9 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr } object LimitAndOffset { - def unapply(p: GlobalLimitAndOffset): Option[(Expression, Expression, LogicalPlan)] = { + def unapply(p: GlobalLimit): Option[(Expression, Expression, LogicalPlan)] = { p match { - case GlobalLimitAndOffset(le1, le2, LocalLimit(le3, child)) if le1.eval().asInstanceOf[Int] + case GlobalLimit(le1, Offset(le2, LocalLimit(le3, child))) if le1.eval().asInstanceOf[Int] + le2.eval().asInstanceOf[Int] == le3.eval().asInstanceOf[Int] => Some((le1, le2, child)) case _ => None @@ -1314,26 +1314,6 @@ object LimitAndOffset { } } -/** - * A global (coordinated) limit with offset. This operator can skip at most `offsetExpr` number and - * emit at most `limitExpr` number in total. For example, if we have LIMIT 10 OFFSET 5, we impose a - * total limit of 10 + 5 = 15 rows and then discard the first 5, leaving 10 rows remaining. - */ -case class GlobalLimitAndOffset( - limitExpr: Expression, - offsetExpr: Expression, - child: LogicalPlan) extends OrderPreservingUnaryNode { - override def output: Seq[Attribute] = child.output - override def maxRows: Option[Long] = { - limitExpr match { - case IntegerLiteral(limit) => Some(limit) - case _ => None - } - } - override protected def withNewChildInternal(newChild: LogicalPlan): GlobalLimitAndOffset = - copy(child = newChild) -} - /** * This is similar with [[Limit]] except: * diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 087e649704ef1..4c8e066f1d1b8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -556,18 +556,11 @@ class AnalysisErrorSuite extends AnalysisTest { "The offset expression must be equal to or greater than 0, but got -1" :: Nil ) - errorTest( - "OFFSET clause is outermost node", - testRelation.offset(Literal(10, IntegerType)), - "The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" + - " clause is found to be the outermost node." :: Nil - ) - errorTest( "OFFSET clause in other node", testRelation2.offset(Literal(10, IntegerType)).where('b > 1), - "The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" + - " clause found in: Filter." :: Nil + "The OFFSET clause is allowed in the LIMIT clause or be the outermost node," + + " but the OFFSET clause found in: Filter." :: Nil ) errorTest( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateOffsetsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateOffsetsSuite.scala new file mode 100644 index 0000000000000..d8c0199ac37dc --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateOffsetsSuite.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.{Add, Literal} +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor + +class EliminateOffsetsSuite extends PlanTest { + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("Eliminate Offset", FixedPoint(10), EliminateOffsets) :: Nil + } + + val testRelation = LocalRelation.fromExternalRows( + Seq("a".attr.int, "b".attr.int, "c".attr.int), + 1.to(10).map(_ => Row(1, 2, 3)) + ) + + test("Offsets: eliminate Offset operators if offset == 0") { + val originalQuery = + testRelation + .select($"a") + .offset(0) + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + testRelation + .select($"a") + .analyze + + comparePlans(optimized, correctAnswer) + } + + test("Offsets: cannot eliminate Offset operators if offset > 0") { + val originalQuery = + testRelation + .select($"a") + .offset(2) + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + testRelation + .select($"a") + .offset(2) + .analyze + + comparePlans(optimized, correctAnswer) + } + + test("Replace Offset operators to empty LocalRelation if child max row <= offset") { + val child = testRelation.select($"a").analyze + val originalQuery = child.offset(10) + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + LocalRelation(child.output, data = Seq.empty, isStreaming = child.isStreaming).analyze + + comparePlans(optimized, correctAnswer) + } + + test("Cannot replace Offset operators to empty LocalRelation if child max row > offset") { + val child = testRelation.select($"a").analyze + val originalQuery = child.offset(3) + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = originalQuery.analyze + + comparePlans(optimized, correctAnswer) + } + + test("Combines Offset operators") { + val child = testRelation.select($"a").analyze + val originalQuery = child.offset(2).offset(3) + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = child.offset(Add(Literal(3), Literal(2))).analyze + + comparePlans(optimized, correctAnswer) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala index 8373ada9882eb..59b81116bc17e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala @@ -270,4 +270,10 @@ class LimitPushdownSuite extends PlanTest { Optimize.execute(x.groupBy("x.a".attr)("x.a".attr, count("x.a".attr)).limit(1).analyze), x.groupBy("x.a".attr)("x.a".attr, count("x.a".attr)).limit(1).analyze) } + + test("Push down limit 1 through Offset") { + comparePlans( + Optimize.execute(testRelation.offset(2).limit(1).analyze), + GlobalLimit(1, Offset(2, LocalLimit(3, testRelation))).analyze) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 518b218199c69..01b295c1bea89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -93,14 +93,16 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), Sort(order, true, child)) if limit + offset < conf.topKSortFallbackThreshold => TakeOrderedAndProjectExec( - limit, order, child.output, planLater(child), Some(offset)) :: Nil + limit, order, child.output, planLater(child), offset) :: Nil case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), Project(projectList, Sort(order, true, child))) if limit + offset < conf.topKSortFallbackThreshold => TakeOrderedAndProjectExec( - limit, order, projectList, planLater(child), Some(offset)) :: Nil + limit, order, projectList, planLater(child), offset) :: Nil case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) => - CollectLimitExec(limit, planLater(child), Some(offset)) :: Nil + CollectLimitExec(limit, planLater(child), offset) :: Nil + case logical.Offset(IntegerLiteral(offset), child) => + CollectLimitExec(child = planLater(child), offset = offset) :: Nil case Tail(IntegerLiteral(limit), child) => CollectTailExec(limit, planLater(child)) :: Nil case other => planLater(other) :: Nil @@ -116,20 +118,18 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // In this case we generate a physical top-K sorting operator, passing down // the limit and offset values to be evaluated inline during the physical // sorting operation for greater efficiency. - case LimitAndOffset( - IntegerLiteral(limit), - IntegerLiteral(offset), - Sort(order, true, child)) - if limit + offset < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec( - limit, order, child.output, planLater(child), Some(offset)) :: Nil - case LimitAndOffset( - IntegerLiteral(limit), - IntegerLiteral(offset), - Project(projectList, Sort(order, true, child))) + case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), + Sort(order, true, child)) if limit + offset < conf.topKSortFallbackThreshold => + TakeOrderedAndProjectExec( + limit, order, child.output, planLater(child), offset) :: Nil + case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), + Project(projectList, Sort(order, true, child))) if limit + offset < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, projectList, planLater(child), Some(offset)) :: Nil - case _ => Nil + TakeOrderedAndProjectExec(limit, order, projectList, planLater(child), offset) :: Nil + case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) => + GlobalLimitAndOffsetExec(limit, offset, planLater(child)) :: Nil + case _ => + Nil } } @@ -818,8 +818,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.LocalLimitExec(limit, planLater(child)) :: Nil case logical.GlobalLimit(IntegerLiteral(limit), child) => execution.GlobalLimitExec(limit, planLater(child)) :: Nil - case logical.GlobalLimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) => - execution.GlobalLimitAndOffsetExec(limit, offset, planLater(child)) :: Nil + case logical.Offset(IntegerLiteral(offset), child) => + GlobalLimitAndOffsetExec(offset = offset, child = planLater(child)) :: Nil case union: logical.Union => execution.UnionExec(union.children.map(planLater)) :: Nil case g @ logical.Generate(generator, _, outer, _, _, child) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index f40245dca184a..f79361ff1c530 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -43,8 +43,9 @@ trait LimitExec extends UnaryExecNode { * This operator will be used when a logical `Limit` operation is the final operator in an * logical plan, which happens when the user is collecting results back to the driver. */ -case class CollectLimitExec( - limit: Int, child: SparkPlan, offsetOpt: Option[Int] = None) extends LimitExec { +case class CollectLimitExec(limit: Int = -1, child: SparkPlan, offset: Int = 0) extends LimitExec { + assert(limit >= 0 || (limit == -1 && offset > 0)) + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = { @@ -53,8 +54,15 @@ case class CollectLimitExec( // For example: limit is 1 and offset is 2 and the child output two partition. // The first partition output [1, 2] and the Second partition output [3, 4, 5]. // Then [1, 2, 3] will be taken and output [3]. - offsetOpt.map(offset => child.executeTake(limit + offset).drop(offset)) - .getOrElse(child.executeTake(limit)) + if (limit >= 0) { + if (offset > 0) { + child.executeTake(limit + offset).drop(offset) + } else { + child.executeTake(limit) + } + } else { + child.executeCollect().drop(offset) + } } private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) private lazy val writeMetrics = @@ -70,9 +78,15 @@ case class CollectLimitExec( val singlePartitionRDD = if (childRDD.getNumPartitions == 1) { childRDD } else { - val locallyLimited = - offsetOpt.map(offset => childRDD.mapPartitionsInternal(_.take(limit + offset))) - .getOrElse(childRDD.mapPartitionsInternal(_.take(limit))) + val locallyLimited = if (limit >= 0) { + if (offset > 0) { + childRDD.mapPartitionsInternal(_.take(limit + offset)) + } else { + childRDD.mapPartitionsInternal(_.take(limit)) + } + } else { + childRDD + } new ShuffledRowRDD( ShuffleExchangeExec.prepareShuffleDependency( locallyLimited, @@ -82,14 +96,21 @@ case class CollectLimitExec( writeMetrics), readMetrics) } - offsetOpt.map(offset => singlePartitionRDD.mapPartitionsInternal(_.drop(offset).take(limit))) - .getOrElse(singlePartitionRDD.mapPartitionsInternal(_.take(limit))) + if (limit >= 0) { + if (offset > 0) { + singlePartitionRDD.mapPartitionsInternal(_.drop(offset).take(limit)) + } else { + singlePartitionRDD.mapPartitionsInternal(_.take(limit)) + } + } else { + singlePartitionRDD.mapPartitionsInternal(_.drop(offset)) + } } } override def stringArgs: Iterator[Any] = { super.stringArgs.zipWithIndex.filter { - case (0, 1) => false + case (0, 2) => false case _ => true }.map(_._1) } @@ -157,8 +178,10 @@ trait BaseLimitExec extends LimitExec with CodegenSupport { protected lazy val countTerm = BaseLimitExec.newLimitCountTerm() - override lazy val limitNotReachedChecks: Seq[String] = { + override lazy val limitNotReachedChecks: Seq[String] = if (limit >= 0) { s"$countTerm < $limit" +: super.limitNotReachedChecks + } else { + super.limitNotReachedChecks } protected override def doProduce(ctx: CodegenContext): String = { @@ -207,32 +230,47 @@ case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { * the child's single output partition. */ case class GlobalLimitAndOffsetExec( - limit: Int, + limit: Int = -1, offset: Int, child: SparkPlan) extends BaseLimitExec { + assert(offset > 0) override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil - override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => - iter.take(limit + offset).drop(offset) + override def doExecute(): RDD[InternalRow] = if (limit >= 0) { + child.execute().mapPartitionsInternal(iter => iter.take(limit + offset).drop(offset)) + } else { + child.execute().mapPartitionsInternal(iter => iter.drop(offset)) } private lazy val skipTerm = BaseLimitExec.newLimitCountTerm() override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { - // The counter name is already obtained by the upstream operators via `limitNotReachedChecks`. - // Here we have to inline it to not change its name. This is fine as we won't have many limit - // operators in one query. - ctx.addMutableState(CodeGenerator.JAVA_INT, countTerm, forceInline = true, useFreshName = false) - ctx.addMutableState(CodeGenerator.JAVA_INT, skipTerm, forceInline = true, useFreshName = false) - s""" - | if ($skipTerm < $offset) { - | $skipTerm += 1; - | } else if ($countTerm < $limit) { - | $countTerm += 1; - | ${consume(ctx, input)} - | } - """.stripMargin + ctx.addMutableState( + CodeGenerator.JAVA_INT, skipTerm, forceInline = true, useFreshName = false) + if (limit >= 0) { + // The counter name is already obtained by the upstream operators via `limitNotReachedChecks`. + // Here we have to inline it to not change its name. This is fine as we won't have many limit + // operators in one query. + ctx.addMutableState( + CodeGenerator.JAVA_INT, countTerm, forceInline = true, useFreshName = false) + s""" + | if ($skipTerm < $offset) { + | $skipTerm += 1; + | } else if ($countTerm < $limit) { + | $countTerm += 1; + | ${consume(ctx, input)} + | } + """.stripMargin + } else { + s""" + | if ($skipTerm < $offset) { + | $skipTerm += 1; + | } else { + | ${consume(ctx, input)} + | } + """.stripMargin + } } override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = @@ -250,7 +288,8 @@ case class TakeOrderedAndProjectExec( limit: Int, sortOrder: Seq[SortOrder], projectList: Seq[NamedExpression], - child: SparkPlan, offsetOpt: Option[Int] = None) extends UnaryExecNode { + child: SparkPlan, + offset: Int = 0) extends UnaryExecNode { override def output: Seq[Attribute] = { projectList.map(_.toAttribute) @@ -258,9 +297,12 @@ case class TakeOrderedAndProjectExec( override def executeCollect(): Array[InternalRow] = { val ord = new LazilyGeneratedOrdering(sortOrder, child.output) - val data = offsetOpt - .map(offset => child.execute().map(_.copy()).takeOrdered(limit + offset)(ord).drop(offset)) - .getOrElse(child.execute().map(_.copy()).takeOrdered(limit)(ord)) + val data = if (offset > 0) { + child.execute().mapPartitionsInternal(_.map(_.copy())) + .takeOrdered(limit + offset)(ord).drop(offset) + } else { + child.execute().mapPartitionsInternal(_.map(_.copy())).takeOrdered(limit)(ord) + } if (projectList != child.output) { val proj = UnsafeProjection.create(projectList, child.output) data.map(r => proj(r).copy()) @@ -286,11 +328,15 @@ case class TakeOrderedAndProjectExec( val singlePartitionRDD = if (childRDD.getNumPartitions == 1) { childRDD } else { - val localTopK = offsetOpt.map(offset => childRDD.mapPartitions { iter => - Utils.takeOrdered(iter.map(_.copy()), limit + offset)(ord) - }).getOrElse(childRDD.mapPartitions { iter => - Utils.takeOrdered(iter.map(_.copy()), limit)(ord) - }) + val localTopK = if (offset > 0) { + childRDD.mapPartitionsInternal { iter => + Utils.takeOrdered(iter.map(_.copy()), limit + offset)(ord) + } + } else { + childRDD.mapPartitionsInternal { iter => + Utils.takeOrdered(iter.map(_.copy()), limit)(ord) + } + } new ShuffledRowRDD( ShuffleExchangeExec.prepareShuffleDependency( localTopK, @@ -300,10 +346,12 @@ case class TakeOrderedAndProjectExec( writeMetrics), readMetrics) } - singlePartitionRDD.mapPartitions { iter => - val topK = offsetOpt - .map(offset => Utils.takeOrdered(iter.map(_.copy()), limit + offset)(ord).drop(offset)) - .getOrElse(Utils.takeOrdered(iter.map(_.copy()), limit)(ord)) + singlePartitionRDD.mapPartitionsInternal { iter => + val topK = if (offset > 0) { + Utils.takeOrdered(iter.map(_.copy()), limit + offset)(ord).drop(offset) + } else { + Utils.takeOrdered(iter.map(_.copy()), limit)(ord) + } if (projectList != child.output) { val proj = UnsafeProjection.create(projectList, child.output) topK.map(r => proj(r)) @@ -327,7 +375,7 @@ case class TakeOrderedAndProjectExec( override def stringArgs: Iterator[Any] = { super.stringArgs.zipWithIndex.filter { - case (0, 1) => false + case (0, 4) => false case _ => true }.map(_._1) } diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/limit.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/limit.sql index ffaebce9796a7..f59575817d6b8 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/limit.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/limit.sql @@ -15,15 +15,15 @@ SELECT '' AS two, unique1, unique2, stringu1 SELECT '' AS three, unique1, unique2, stringu1 FROM onek WHERE unique1 > 100 ORDER BY unique1 LIMIT 3 OFFSET 20; - SELECT '' AS zero, unique1, unique2, stringu1 - FROM onek WHERE unique1 < 50 - ORDER BY unique1 DESC LIMIT 8 OFFSET 99; - SELECT '' AS eleven, unique1, unique2, stringu1 - FROM onek WHERE unique1 < 50 - ORDER BY unique1 DESC LIMIT 20 OFFSET 39; --- SELECT '' AS ten, unique1, unique2, stringu1 --- FROM onek --- ORDER BY unique1 OFFSET 990; +SELECT '' AS zero, unique1, unique2, stringu1 + FROM onek WHERE unique1 < 50 + ORDER BY unique1 DESC LIMIT 8 OFFSET 99; +SELECT '' AS eleven, unique1, unique2, stringu1 + FROM onek WHERE unique1 < 50 + ORDER BY unique1 DESC LIMIT 20 OFFSET 39; +SELECT '' AS ten, unique1, unique2, stringu1 + FROM onek + ORDER BY unique1 OFFSET 990; -- SELECT '' AS five, unique1, unique2, stringu1 -- FROM onek -- ORDER BY unique1 OFFSET 990 LIMIT 5; diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-orderby-limit.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-orderby-limit.sql index a1cc218b4ad71..8e6b49fea8f4e 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-orderby-limit.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-orderby-limit.sql @@ -162,3 +162,40 @@ WHERE NOT EXISTS (SELECT max(dept.dept_id) GROUP BY state LIMIT 1 OFFSET 2); + +-- offset in the exists subquery block. +-- TC.04.01 +SELECT * +FROM emp +WHERE EXISTS (SELECT dept.dept_name + FROM dept + WHERE dept.dept_id > 10 + OFFSET 2); + +-- offset in the exists subquery block with aggregate. +-- TC.04.02 +SELECT * +FROM emp +WHERE EXISTS (SELECT max(dept.dept_id) + FROM dept + GROUP BY state + OFFSET 2); + +-- limit in the not exists subquery block. +-- TC.04.03 +SELECT * +FROM emp +WHERE NOT EXISTS (SELECT dept.dept_name + FROM dept + WHERE dept.dept_id > 100 + OFFSET 2); + +-- limit in the not exists subquery block with aggregates. +-- TC.04.04 +SELECT * +FROM emp +WHERE NOT EXISTS (SELECT max(dept.dept_id) + FROM dept + WHERE dept.dept_id > 100 + GROUP BY state + OFFSET 2); diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql index b6adebeca3e80..b9b062a5f7a1d 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql @@ -159,3 +159,54 @@ GROUP BY t1b ORDER BY t1b NULLS last LIMIT 1 OFFSET 1; + +-- OFFSET in parent side +-- TC 03.01 +SELECT * +FROM t1 +WHERE t1a IN (SELECT t2a + FROM t2 + WHERE t1d = t2d) +OFFSET 2; + +-- TC 03.02 +SELECT * +FROM t1 +WHERE t1c IN (SELECT t2c + FROM t2 + WHERE t2b >= 8 + OFFSET 2) +OFFSET 4; + +-- TC 03.03 +SELECT Count(DISTINCT( t1a )), + t1b +FROM t1 +WHERE t1d IN (SELECT t2d + FROM t2 + ORDER BY t2c, t2d + OFFSET 2) +GROUP BY t1b +ORDER BY t1b DESC NULLS FIRST +OFFSET 1; + +-- OFFSET with NOT IN +-- TC 03.04 +SELECT * +FROM t1 +WHERE t1b NOT IN (SELECT t2b + FROM t2 + WHERE t2b > 6 + OFFSET 2); + +-- TC 03.05 +SELECT Count(DISTINCT( t1a )), + t1b +FROM t1 +WHERE t1d NOT IN (SELECT t2d + FROM t2 + ORDER BY t2b DESC nulls first, t2d + OFFSET 1) +GROUP BY t1b +ORDER BY t1b NULLS last +OFFSET 1; diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/limit.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/limit.sql.out index 416bcc9931ce7..d6526d77d3cad 100644 --- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/limit.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/limit.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 12 +-- Number of queries: 13 -- !query @@ -52,8 +52,8 @@ struct -- !query SELECT '' AS zero, unique1, unique2, stringu1 - FROM onek WHERE unique1 < 50 - ORDER BY unique1 DESC LIMIT 8 OFFSET 99 + FROM onek WHERE unique1 < 50 + ORDER BY unique1 DESC LIMIT 8 OFFSET 99 -- !query schema struct -- !query output @@ -62,8 +62,8 @@ struct -- !query SELECT '' AS eleven, unique1, unique2, stringu1 - FROM onek WHERE unique1 < 50 - ORDER BY unique1 DESC LIMIT 20 OFFSET 39 + FROM onek WHERE unique1 < 50 + ORDER BY unique1 DESC LIMIT 20 OFFSET 39 -- !query schema struct -- !query output @@ -80,6 +80,25 @@ struct 0 998 AAAAAA +-- !query +SELECT '' AS ten, unique1, unique2, stringu1 + FROM onek + ORDER BY unique1 OFFSET 990 +-- !query schema +struct +-- !query output + 990 369 CMAAAA + 991 426 DMAAAA + 992 363 EMAAAA + 993 661 FMAAAA + 994 695 GMAAAA + 995 144 HMAAAA + 996 258 IMAAAA + 997 21 JMAAAA + 998 549 KMAAAA + 999 152 LMAAAA + + -- !query SELECT '' AS five, unique1, unique2, stringu1 FROM onek diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-orderby-limit.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-orderby-limit.sql.out index 76c377e8ca624..3442cfd689a2b 100644 --- a/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-orderby-limit.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-orderby-limit.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 16 +-- Number of queries: 20 -- !query @@ -309,3 +309,88 @@ struct 600 emp 6 - no dept 2001-01-01 400.0 100 700 emp 7 2010-01-01 400.0 100 800 emp 8 2016-01-01 150.0 70 + + +-- !query +SELECT * +FROM emp +WHERE EXISTS (SELECT dept.dept_name + FROM dept + WHERE dept.dept_id > 10 + OFFSET 2) +-- !query schema +struct +-- !query output +100 emp 1 2005-01-01 100.0 10 +100 emp 1 2005-01-01 100.0 10 +200 emp 2 2003-01-01 200.0 10 +300 emp 3 2002-01-01 300.0 20 +400 emp 4 2005-01-01 400.0 30 +500 emp 5 2001-01-01 400.0 NULL +600 emp 6 - no dept 2001-01-01 400.0 100 +700 emp 7 2010-01-01 400.0 100 +800 emp 8 2016-01-01 150.0 70 + + +-- !query +SELECT * +FROM emp +WHERE EXISTS (SELECT max(dept.dept_id) + FROM dept + GROUP BY state + OFFSET 2) +-- !query schema +struct +-- !query output +100 emp 1 2005-01-01 100.0 10 +100 emp 1 2005-01-01 100.0 10 +200 emp 2 2003-01-01 200.0 10 +300 emp 3 2002-01-01 300.0 20 +400 emp 4 2005-01-01 400.0 30 +500 emp 5 2001-01-01 400.0 NULL +600 emp 6 - no dept 2001-01-01 400.0 100 +700 emp 7 2010-01-01 400.0 100 +800 emp 8 2016-01-01 150.0 70 + + +-- !query +SELECT * +FROM emp +WHERE NOT EXISTS (SELECT dept.dept_name + FROM dept + WHERE dept.dept_id > 100 + OFFSET 2) +-- !query schema +struct +-- !query output +100 emp 1 2005-01-01 100.0 10 +100 emp 1 2005-01-01 100.0 10 +200 emp 2 2003-01-01 200.0 10 +300 emp 3 2002-01-01 300.0 20 +400 emp 4 2005-01-01 400.0 30 +500 emp 5 2001-01-01 400.0 NULL +600 emp 6 - no dept 2001-01-01 400.0 100 +700 emp 7 2010-01-01 400.0 100 +800 emp 8 2016-01-01 150.0 70 + + +-- !query +SELECT * +FROM emp +WHERE NOT EXISTS (SELECT max(dept.dept_id) + FROM dept + WHERE dept.dept_id > 100 + GROUP BY state + OFFSET 2) +-- !query schema +struct +-- !query output +100 emp 1 2005-01-01 100.0 10 +100 emp 1 2005-01-01 100.0 10 +200 emp 2 2003-01-01 200.0 10 +300 emp 3 2002-01-01 300.0 20 +400 emp 4 2005-01-01 400.0 30 +500 emp 5 2001-01-01 400.0 NULL +600 emp 6 - no dept 2001-01-01 400.0 100 +700 emp 7 2010-01-01 400.0 100 +800 emp 8 2016-01-01 150.0 70 diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out index 185eb9fed285b..08f76f0936a56 100644 --- a/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 13 +-- Number of queries: 18 -- !query @@ -235,3 +235,85 @@ OFFSET 1 struct -- !query output 2 8 + + +-- !query +SELECT * +FROM t1 +WHERE t1a IN (SELECT t2a + FROM t2 + WHERE t1d = t2d) +OFFSET 2 +-- !query schema +struct +-- !query output +val1e 10 NULL 19 17.0 25.0 2600 2014-05-04 01:01:00 2014-05-04 +val1e 10 NULL 19 17.0 25.0 2600 2014-09-04 01:02:00.001 2014-09-04 + + +-- !query +SELECT * +FROM t1 +WHERE t1c IN (SELECT t2c + FROM t2 + WHERE t2b >= 8 + OFFSET 2) +OFFSET 4 +-- !query schema +struct +-- !query output +val1d NULL 16 19 17.0 25.0 2600 2014-07-04 01:02:00.001 NULL +val1d NULL 16 22 17.0 25.0 2600 2014-06-04 01:01:00 NULL + + +-- !query +SELECT Count(DISTINCT( t1a )), + t1b +FROM t1 +WHERE t1d IN (SELECT t2d + FROM t2 + ORDER BY t2c, t2d + OFFSET 2) +GROUP BY t1b +ORDER BY t1b DESC NULLS FIRST +OFFSET 1 +-- !query schema +struct +-- !query output +1 10 +2 8 + + +-- !query +SELECT * +FROM t1 +WHERE t1b NOT IN (SELECT t2b + FROM t2 + WHERE t2b > 6 + OFFSET 2) +-- !query schema +struct +-- !query output +val1a 16 12 10 15.0 20.0 2000 2014-07-04 01:01:00 2014-07-04 +val1a 16 12 21 15.0 20.0 2000 2014-06-04 01:02:00.001 2014-06-04 +val1a 6 8 10 15.0 20.0 2000 2014-04-04 01:00:00 2014-04-04 +val1a 6 8 10 15.0 20.0 2000 2014-04-04 01:02:00.001 2014-04-04 + + +-- !query +SELECT Count(DISTINCT( t1a )), + t1b +FROM t1 +WHERE t1d NOT IN (SELECT t2d + FROM t2 + ORDER BY t2b DESC nulls first, t2d + OFFSET 1) +GROUP BY t1b +ORDER BY t1b NULLS last +OFFSET 1 +-- !query schema +struct +-- !query output +2 10 +1 16 +1 NULL