Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
}
}
checkCollectedMetrics(plan)
checkOffsetOperator(plan)
extendedCheckRules.foreach(_(plan))
plan.foreachUp {
case o if !o.resolved =>
Expand Down Expand Up @@ -851,30 +850,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
check(plan)
}

/**
* Validate whether the [[Offset]] is valid.
*/
private def checkOffsetOperator(plan: LogicalPlan): Unit = {
plan.foreachUp {
case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit]
&& o.children.exists(_.isInstanceOf[Offset]) =>
failAnalysis(
s"""
|The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET
|clause found in: ${o.nodeName}.""".stripMargin.replace("\n", " "))
case _ =>
}
plan match {
case Offset(offsetExpr, _) =>
checkLimitLikeClause("offset", offsetExpr)
failAnalysis(
s"""
|The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET
|clause is found to be the outermost node.""".stripMargin.replace("\n", " "))
case _ =>
}
}

/**
* Validates to make sure the outer references appearing inside the subquery
* are allowed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,20 +556,6 @@ class AnalysisErrorSuite extends AnalysisTest {
"The offset expression must be equal to or greater than 0, but got -1" :: Nil
)

errorTest(
"OFFSET clause is outermost node",
testRelation.offset(Literal(10, IntegerType)),
"The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" +
" clause is found to be the outermost node." :: Nil
)

errorTest(
"OFFSET clause in other node",
testRelation2.offset(Literal(10, IntegerType)).where('b > 1),
"The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" +
" clause found in: Filter." :: Nil
)

errorTest(
"the sum of num_rows in limit clause and num_rows in offset clause less than Int.MaxValue",
testRelation.offset(Literal(2000000000, IntegerType)).limit(Literal(1000000000, IntegerType)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
limit, order, projectList, planLater(child), Some(offset)) :: Nil
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) =>
CollectLimitExec(limit, planLater(child), Some(offset)) :: Nil
case logical.Offset(IntegerLiteral(offset), child) =>
CollectOffsetExec(offset, planLater(child)) :: Nil
case Tail(IntegerLiteral(limit), child) =>
CollectTailExec(limit, planLater(child)) :: Nil
case other => planLater(other) :: Nil
Expand Down Expand Up @@ -820,6 +822,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.GlobalLimitExec(limit, planLater(child)) :: Nil
case logical.GlobalLimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) =>
execution.GlobalLimitAndOffsetExec(limit, offset, planLater(child)) :: Nil
case logical.Offset(IntegerLiteral(offset), child) =>
execution.GlobalOffsetExec(offset, planLater(child)) :: Nil
case union: logical.Union =>
execution.UnionExec(union.children.map(planLater)) :: Nil
case g @ logical.Generate(generator, _, outer, _, _, child) =>
Expand Down
96 changes: 96 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,55 @@ case class CollectTailExec(limit: Int, child: SparkPlan) extends LimitExec {
copy(child = newChild)
}

/**
* Take all elements and collect them to a single partition and then to
* drop the first `offset` elements.
*
* This operator will be used when a logical `Offset` operation is the final operator in an
* logical plan, which happens when the user is collecting results back to the driver.
*/
case class CollectOffsetExec(offset: Int, child: SparkPlan) extends UnaryExecNode {
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = SinglePartition
override def executeCollect(): Array[InternalRow] = {
// Because CollectOffsetExec collect all the output of child to a single partition, so we need
// collect all elements and then to drop the first `offset` elements.
// For example: offset is 2 and the child output two partition.
// The first partition output [1, 2] and the Second partition output [3, 4, 5].
// Then [1, 2, 3, 4, 5] will be taken and output [3, 4, 5].
child.executeCollect().drop(offset)
}
private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
private lazy val writeMetrics =
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
private lazy val readMetrics =
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics = readMetrics ++ writeMetrics
protected override def doExecute(): RDD[InternalRow] = {
val childRDD = child.execute()
if (childRDD.getNumPartitions == 0) {
new ParallelCollectionRDD(sparkContext, Seq.empty[InternalRow], 1, Map.empty)
} else {
val singlePartitionRDD = if (childRDD.getNumPartitions == 1) {
childRDD
} else {
new ShuffledRowRDD(
ShuffleExchangeExec.prepareShuffleDependency(
childRDD,
child.output,
SinglePartition,
serializer,
writeMetrics),
readMetrics)
}
singlePartitionRDD.mapPartitionsInternal(_.drop(offset))
}
}

override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
}

object BaseLimitExec {
private val curId = new java.util.concurrent.atomic.AtomicInteger()

Expand Down Expand Up @@ -202,6 +251,53 @@ case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
copy(child = newChild)
}

/**
* Skip the first `offset` elements of the child's single output partition.
*/
case class GlobalOffsetExec(offset: Int, child: SparkPlan)
extends UnaryExecNode with CodegenSupport {

override def output: Seq[Attribute] = child.output

override def outputPartitioning: Partitioning = child.outputPartitioning

override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil

protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
iter.drop(offset)
}

override def inputRDDs(): Seq[RDD[InternalRow]] = {
child.asInstanceOf[CodegenSupport].inputRDDs()
}

// Mark this as empty. This plan doesn't need to evaluate any inputs and can defer the evaluation
// to the parent operator.
override def usedInputs: AttributeSet = AttributeSet.empty

private lazy val skipTerm = BaseLimitExec.newLimitCountTerm()

protected override def doProduce(ctx: CodegenContext): String = {
child.asInstanceOf[CodegenSupport].produce(ctx, this)
}

override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
ctx.addMutableState(CodeGenerator.JAVA_INT, skipTerm, forceInline = true, useFreshName = false)
s"""
| if ($skipTerm < $offset) {
| $skipTerm += 1;
| } else {
| ${consume(ctx, input)}
| }
""".stripMargin
}

override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
}

/**
* Skip the first `offset` elements then take the first `limit` of the following elements in
* the child's single output partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ SELECT '' AS three, unique1, unique2, stringu1
SELECT '' AS eleven, unique1, unique2, stringu1
FROM onek WHERE unique1 < 50
ORDER BY unique1 DESC LIMIT 20 OFFSET 39;
-- SELECT '' AS ten, unique1, unique2, stringu1
-- FROM onek
-- ORDER BY unique1 OFFSET 990;
SELECT '' AS ten, unique1, unique2, stringu1
FROM onek
ORDER BY unique1 OFFSET 990;
-- SELECT '' AS five, unique1, unique2, stringu1
-- FROM onek
-- ORDER BY unique1 OFFSET 990 LIMIT 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,40 @@ WHERE NOT EXISTS (SELECT max(dept.dept_id)
GROUP BY state
LIMIT 1
OFFSET 2);

-- offset in the exists subquery block.
-- TC.04.01
SELECT *
FROM emp
WHERE EXISTS (SELECT dept.dept_name
FROM dept
WHERE dept.dept_id > 10
OFFSET 2);

-- offset in the exists subquery block with aggregate.
-- TC.04.02
SELECT *
FROM emp
WHERE EXISTS (SELECT max(dept.dept_id)
FROM dept
GROUP BY state
OFFSET 2);

-- limit in the not exists subquery block.
-- TC.04.03
SELECT *
FROM emp
WHERE NOT EXISTS (SELECT dept.dept_name
FROM dept
WHERE dept.dept_id > 100
OFFSET 2);

-- limit in the not exists subquery block with aggregates.
-- TC.04.04
SELECT *
FROM emp
WHERE NOT EXISTS (SELECT max(dept.dept_id)
FROM dept
WHERE dept.dept_id > 100
GROUP BY state
OFFSET 2);
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,54 @@ GROUP BY t1b
ORDER BY t1b NULLS last
LIMIT 1
OFFSET 1;

-- OFFSET in parent side
-- TC 03.01
SELECT *
FROM t1
WHERE t1a IN (SELECT t2a
FROM t2
WHERE t1d = t2d)
OFFSET 2;

-- TC 03.02
SELECT *
FROM t1
WHERE t1c IN (SELECT t2c
FROM t2
WHERE t2b >= 8
OFFSET 2)
OFFSET 4;

-- TC 03.03
SELECT Count(DISTINCT( t1a )),
t1b
FROM t1
WHERE t1d IN (SELECT t2d
FROM t2
ORDER BY t2c, t2d
OFFSET 2)
GROUP BY t1b
ORDER BY t1b DESC NULLS FIRST
OFFSET 1;

-- OFFSET with NOT IN
-- TC 04.04
SELECT *
FROM t1
WHERE t1b NOT IN (SELECT t2b
FROM t2
WHERE t2b > 6
OFFSET 2);

-- TC 04.05
SELECT Count(DISTINCT( t1a )),
t1b
FROM t1
WHERE t1d NOT IN (SELECT t2d
FROM t2
ORDER BY t2b DESC nulls first, t2d
OFFSET 1)
GROUP BY t1b
ORDER BY t1b NULLS last
OFFSET 1;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 12
-- Number of queries: 13


-- !query
Expand Down Expand Up @@ -80,6 +80,25 @@ struct<eleven:string,unique1:int,unique2:int,stringu1:string>
0 998 AAAAAA


-- !query
SELECT '' AS ten, unique1, unique2, stringu1
FROM onek
ORDER BY unique1 OFFSET 990
-- !query schema
struct<ten:string,unique1:int,unique2:int,stringu1:string>
-- !query output
990 369 CMAAAA
991 426 DMAAAA
992 363 EMAAAA
993 661 FMAAAA
994 695 GMAAAA
995 144 HMAAAA
996 258 IMAAAA
997 21 JMAAAA
998 549 KMAAAA
999 152 LMAAAA


-- !query
SELECT '' AS five, unique1, unique2, stringu1
FROM onek
Expand Down
Loading