diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 51a3facf40835..36b167ba9044f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -608,7 +608,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { } } checkCollectedMetrics(plan) - checkOffsetOperator(plan) extendedCheckRules.foreach(_(plan)) plan.foreachUp { case o if !o.resolved => @@ -851,30 +850,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { check(plan) } - /** - * Validate whether the [[Offset]] is valid. - */ - private def checkOffsetOperator(plan: LogicalPlan): Unit = { - plan.foreachUp { - case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit] - && o.children.exists(_.isInstanceOf[Offset]) => - failAnalysis( - s""" - |The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET - |clause found in: ${o.nodeName}.""".stripMargin.replace("\n", " ")) - case _ => - } - plan match { - case Offset(offsetExpr, _) => - checkLimitLikeClause("offset", offsetExpr) - failAnalysis( - s""" - |The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET - |clause is found to be the outermost node.""".stripMargin.replace("\n", " ")) - case _ => - } - } - /** * Validates to make sure the outer references appearing inside the subquery * are allowed. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 087e649704ef1..569fd57252a22 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -556,20 +556,6 @@ class AnalysisErrorSuite extends AnalysisTest { "The offset expression must be equal to or greater than 0, but got -1" :: Nil ) - errorTest( - "OFFSET clause is outermost node", - testRelation.offset(Literal(10, IntegerType)), - "The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" + - " clause is found to be the outermost node." :: Nil - ) - - errorTest( - "OFFSET clause in other node", - testRelation2.offset(Literal(10, IntegerType)).where('b > 1), - "The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" + - " clause found in: Filter." :: Nil - ) - errorTest( "the sum of num_rows in limit clause and num_rows in offset clause less than Int.MaxValue", testRelation.offset(Literal(2000000000, IntegerType)).limit(Literal(1000000000, IntegerType)), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 518b218199c69..080dbe9dba384 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -101,6 +101,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { limit, order, projectList, planLater(child), Some(offset)) :: Nil case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) => CollectLimitExec(limit, planLater(child), Some(offset)) :: Nil + case logical.Offset(IntegerLiteral(offset), child) => + CollectOffsetExec(offset, planLater(child)) :: Nil case Tail(IntegerLiteral(limit), child) => CollectTailExec(limit, planLater(child)) :: Nil case other => planLater(other) :: Nil @@ -820,6 +822,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.GlobalLimitExec(limit, planLater(child)) :: Nil case logical.GlobalLimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) => execution.GlobalLimitAndOffsetExec(limit, offset, planLater(child)) :: Nil + case logical.Offset(IntegerLiteral(offset), child) => + execution.GlobalOffsetExec(offset, planLater(child)) :: Nil case union: logical.Union => execution.UnionExec(union.children.map(planLater)) :: Nil case g @ logical.Generate(generator, _, outer, _, _, child) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index f40245dca184a..1bfbd81d6c4a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -123,6 +123,55 @@ case class CollectTailExec(limit: Int, child: SparkPlan) extends LimitExec { copy(child = newChild) } +/** + * Take all elements and collect them to a single partition and then to + * drop the first `offset` elements. + * + * This operator will be used when a logical `Offset` operation is the final operator in an + * logical plan, which happens when the user is collecting results back to the driver. + */ +case class CollectOffsetExec(offset: Int, child: SparkPlan) extends UnaryExecNode { + override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = SinglePartition + override def executeCollect(): Array[InternalRow] = { + // Because CollectOffsetExec collect all the output of child to a single partition, so we need + // collect all elements and then to drop the first `offset` elements. + // For example: offset is 2 and the child output two partition. + // The first partition output [1, 2] and the Second partition output [3, 4, 5]. + // Then [1, 2, 3, 4, 5] will be taken and output [3, 4, 5]. + child.executeCollect().drop(offset) + } + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) + private lazy val writeMetrics = + SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) + private lazy val readMetrics = + SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) + override lazy val metrics = readMetrics ++ writeMetrics + protected override def doExecute(): RDD[InternalRow] = { + val childRDD = child.execute() + if (childRDD.getNumPartitions == 0) { + new ParallelCollectionRDD(sparkContext, Seq.empty[InternalRow], 1, Map.empty) + } else { + val singlePartitionRDD = if (childRDD.getNumPartitions == 1) { + childRDD + } else { + new ShuffledRowRDD( + ShuffleExchangeExec.prepareShuffleDependency( + childRDD, + child.output, + SinglePartition, + serializer, + writeMetrics), + readMetrics) + } + singlePartitionRDD.mapPartitionsInternal(_.drop(offset)) + } + } + + override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = + copy(child = newChild) +} + object BaseLimitExec { private val curId = new java.util.concurrent.atomic.AtomicInteger() @@ -202,6 +251,53 @@ case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { copy(child = newChild) } +/** + * Skip the first `offset` elements of the child's single output partition. + */ +case class GlobalOffsetExec(offset: Int, child: SparkPlan) + extends UnaryExecNode with CodegenSupport { + + override def output: Seq[Attribute] = child.output + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + + protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => + iter.drop(offset) + } + + override def inputRDDs(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].inputRDDs() + } + + // Mark this as empty. This plan doesn't need to evaluate any inputs and can defer the evaluation + // to the parent operator. + override def usedInputs: AttributeSet = AttributeSet.empty + + private lazy val skipTerm = BaseLimitExec.newLimitCountTerm() + + protected override def doProduce(ctx: CodegenContext): String = { + child.asInstanceOf[CodegenSupport].produce(ctx, this) + } + + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { + ctx.addMutableState(CodeGenerator.JAVA_INT, skipTerm, forceInline = true, useFreshName = false) + s""" + | if ($skipTerm < $offset) { + | $skipTerm += 1; + | } else { + | ${consume(ctx, input)} + | } + """.stripMargin + } + + override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = + copy(child = newChild) +} + /** * Skip the first `offset` elements then take the first `limit` of the following elements in * the child's single output partition. diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/limit.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/limit.sql index ffaebce9796a7..dcac676a0f024 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/limit.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/limit.sql @@ -21,9 +21,9 @@ SELECT '' AS three, unique1, unique2, stringu1 SELECT '' AS eleven, unique1, unique2, stringu1 FROM onek WHERE unique1 < 50 ORDER BY unique1 DESC LIMIT 20 OFFSET 39; --- SELECT '' AS ten, unique1, unique2, stringu1 --- FROM onek --- ORDER BY unique1 OFFSET 990; +SELECT '' AS ten, unique1, unique2, stringu1 + FROM onek + ORDER BY unique1 OFFSET 990; -- SELECT '' AS five, unique1, unique2, stringu1 -- FROM onek -- ORDER BY unique1 OFFSET 990 LIMIT 5; diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-orderby-limit.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-orderby-limit.sql index a1cc218b4ad71..8e6b49fea8f4e 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-orderby-limit.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-orderby-limit.sql @@ -162,3 +162,40 @@ WHERE NOT EXISTS (SELECT max(dept.dept_id) GROUP BY state LIMIT 1 OFFSET 2); + +-- offset in the exists subquery block. +-- TC.04.01 +SELECT * +FROM emp +WHERE EXISTS (SELECT dept.dept_name + FROM dept + WHERE dept.dept_id > 10 + OFFSET 2); + +-- offset in the exists subquery block with aggregate. +-- TC.04.02 +SELECT * +FROM emp +WHERE EXISTS (SELECT max(dept.dept_id) + FROM dept + GROUP BY state + OFFSET 2); + +-- limit in the not exists subquery block. +-- TC.04.03 +SELECT * +FROM emp +WHERE NOT EXISTS (SELECT dept.dept_name + FROM dept + WHERE dept.dept_id > 100 + OFFSET 2); + +-- limit in the not exists subquery block with aggregates. +-- TC.04.04 +SELECT * +FROM emp +WHERE NOT EXISTS (SELECT max(dept.dept_id) + FROM dept + WHERE dept.dept_id > 100 + GROUP BY state + OFFSET 2); diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql index b6adebeca3e80..fcfd6f075848e 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql @@ -159,3 +159,54 @@ GROUP BY t1b ORDER BY t1b NULLS last LIMIT 1 OFFSET 1; + +-- OFFSET in parent side +-- TC 03.01 +SELECT * +FROM t1 +WHERE t1a IN (SELECT t2a + FROM t2 + WHERE t1d = t2d) +OFFSET 2; + +-- TC 03.02 +SELECT * +FROM t1 +WHERE t1c IN (SELECT t2c + FROM t2 + WHERE t2b >= 8 + OFFSET 2) +OFFSET 4; + +-- TC 03.03 +SELECT Count(DISTINCT( t1a )), + t1b +FROM t1 +WHERE t1d IN (SELECT t2d + FROM t2 + ORDER BY t2c, t2d + OFFSET 2) +GROUP BY t1b +ORDER BY t1b DESC NULLS FIRST +OFFSET 1; + +-- OFFSET with NOT IN +-- TC 04.04 +SELECT * +FROM t1 +WHERE t1b NOT IN (SELECT t2b + FROM t2 + WHERE t2b > 6 + OFFSET 2); + +-- TC 04.05 +SELECT Count(DISTINCT( t1a )), + t1b +FROM t1 +WHERE t1d NOT IN (SELECT t2d + FROM t2 + ORDER BY t2b DESC nulls first, t2d + OFFSET 1) +GROUP BY t1b +ORDER BY t1b NULLS last +OFFSET 1; diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/limit.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/limit.sql.out index 416bcc9931ce7..8800dc82cfca7 100644 --- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/limit.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/limit.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 12 +-- Number of queries: 13 -- !query @@ -80,6 +80,25 @@ struct 0 998 AAAAAA +-- !query +SELECT '' AS ten, unique1, unique2, stringu1 + FROM onek + ORDER BY unique1 OFFSET 990 +-- !query schema +struct +-- !query output + 990 369 CMAAAA + 991 426 DMAAAA + 992 363 EMAAAA + 993 661 FMAAAA + 994 695 GMAAAA + 995 144 HMAAAA + 996 258 IMAAAA + 997 21 JMAAAA + 998 549 KMAAAA + 999 152 LMAAAA + + -- !query SELECT '' AS five, unique1, unique2, stringu1 FROM onek diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-orderby-limit.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-orderby-limit.sql.out index 76c377e8ca624..3442cfd689a2b 100644 --- a/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-orderby-limit.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-orderby-limit.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 16 +-- Number of queries: 20 -- !query @@ -309,3 +309,88 @@ struct 600 emp 6 - no dept 2001-01-01 400.0 100 700 emp 7 2010-01-01 400.0 100 800 emp 8 2016-01-01 150.0 70 + + +-- !query +SELECT * +FROM emp +WHERE EXISTS (SELECT dept.dept_name + FROM dept + WHERE dept.dept_id > 10 + OFFSET 2) +-- !query schema +struct +-- !query output +100 emp 1 2005-01-01 100.0 10 +100 emp 1 2005-01-01 100.0 10 +200 emp 2 2003-01-01 200.0 10 +300 emp 3 2002-01-01 300.0 20 +400 emp 4 2005-01-01 400.0 30 +500 emp 5 2001-01-01 400.0 NULL +600 emp 6 - no dept 2001-01-01 400.0 100 +700 emp 7 2010-01-01 400.0 100 +800 emp 8 2016-01-01 150.0 70 + + +-- !query +SELECT * +FROM emp +WHERE EXISTS (SELECT max(dept.dept_id) + FROM dept + GROUP BY state + OFFSET 2) +-- !query schema +struct +-- !query output +100 emp 1 2005-01-01 100.0 10 +100 emp 1 2005-01-01 100.0 10 +200 emp 2 2003-01-01 200.0 10 +300 emp 3 2002-01-01 300.0 20 +400 emp 4 2005-01-01 400.0 30 +500 emp 5 2001-01-01 400.0 NULL +600 emp 6 - no dept 2001-01-01 400.0 100 +700 emp 7 2010-01-01 400.0 100 +800 emp 8 2016-01-01 150.0 70 + + +-- !query +SELECT * +FROM emp +WHERE NOT EXISTS (SELECT dept.dept_name + FROM dept + WHERE dept.dept_id > 100 + OFFSET 2) +-- !query schema +struct +-- !query output +100 emp 1 2005-01-01 100.0 10 +100 emp 1 2005-01-01 100.0 10 +200 emp 2 2003-01-01 200.0 10 +300 emp 3 2002-01-01 300.0 20 +400 emp 4 2005-01-01 400.0 30 +500 emp 5 2001-01-01 400.0 NULL +600 emp 6 - no dept 2001-01-01 400.0 100 +700 emp 7 2010-01-01 400.0 100 +800 emp 8 2016-01-01 150.0 70 + + +-- !query +SELECT * +FROM emp +WHERE NOT EXISTS (SELECT max(dept.dept_id) + FROM dept + WHERE dept.dept_id > 100 + GROUP BY state + OFFSET 2) +-- !query schema +struct +-- !query output +100 emp 1 2005-01-01 100.0 10 +100 emp 1 2005-01-01 100.0 10 +200 emp 2 2003-01-01 200.0 10 +300 emp 3 2002-01-01 300.0 20 +400 emp 4 2005-01-01 400.0 30 +500 emp 5 2001-01-01 400.0 NULL +600 emp 6 - no dept 2001-01-01 400.0 100 +700 emp 7 2010-01-01 400.0 100 +800 emp 8 2016-01-01 150.0 70 diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out index 185eb9fed285b..08f76f0936a56 100644 --- a/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 13 +-- Number of queries: 18 -- !query @@ -235,3 +235,85 @@ OFFSET 1 struct -- !query output 2 8 + + +-- !query +SELECT * +FROM t1 +WHERE t1a IN (SELECT t2a + FROM t2 + WHERE t1d = t2d) +OFFSET 2 +-- !query schema +struct +-- !query output +val1e 10 NULL 19 17.0 25.0 2600 2014-05-04 01:01:00 2014-05-04 +val1e 10 NULL 19 17.0 25.0 2600 2014-09-04 01:02:00.001 2014-09-04 + + +-- !query +SELECT * +FROM t1 +WHERE t1c IN (SELECT t2c + FROM t2 + WHERE t2b >= 8 + OFFSET 2) +OFFSET 4 +-- !query schema +struct +-- !query output +val1d NULL 16 19 17.0 25.0 2600 2014-07-04 01:02:00.001 NULL +val1d NULL 16 22 17.0 25.0 2600 2014-06-04 01:01:00 NULL + + +-- !query +SELECT Count(DISTINCT( t1a )), + t1b +FROM t1 +WHERE t1d IN (SELECT t2d + FROM t2 + ORDER BY t2c, t2d + OFFSET 2) +GROUP BY t1b +ORDER BY t1b DESC NULLS FIRST +OFFSET 1 +-- !query schema +struct +-- !query output +1 10 +2 8 + + +-- !query +SELECT * +FROM t1 +WHERE t1b NOT IN (SELECT t2b + FROM t2 + WHERE t2b > 6 + OFFSET 2) +-- !query schema +struct +-- !query output +val1a 16 12 10 15.0 20.0 2000 2014-07-04 01:01:00 2014-07-04 +val1a 16 12 21 15.0 20.0 2000 2014-06-04 01:02:00.001 2014-06-04 +val1a 6 8 10 15.0 20.0 2000 2014-04-04 01:00:00 2014-04-04 +val1a 6 8 10 15.0 20.0 2000 2014-04-04 01:02:00.001 2014-04-04 + + +-- !query +SELECT Count(DISTINCT( t1a )), + t1b +FROM t1 +WHERE t1d NOT IN (SELECT t2d + FROM t2 + ORDER BY t2b DESC nulls first, t2d + OFFSET 1) +GROUP BY t1b +ORDER BY t1b NULLS last +OFFSET 1 +-- !query schema +struct +-- !query output +2 10 +1 16 +1 NULL