Skip to content
Closed
1 change: 1 addition & 0 deletions docs/sql-ref-ansi-compliance.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ Below is a list of all the keywords in Spark SQL.
|NULL|reserved|non-reserved|reserved|
|NULLS|non-reserved|non-reserved|non-reserved|
|OF|non-reserved|non-reserved|reserved|
|OFFSET|reserved|non-reserved|reserved|
|ON|reserved|strict-non-reserved|reserved|
|ONLY|reserved|non-reserved|reserved|
|OPTION|non-reserved|non-reserved|non-reserved|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ queryOrganization
(SORT BY sort+=sortItem (',' sort+=sortItem)*)?
windowClause?
(LIMIT (ALL | limit=expression))?
(OFFSET offset=expression)?
;

multiInsertQueryBody
Expand Down Expand Up @@ -1358,6 +1359,7 @@ nonReserved
| NULL
| NULLS
| OF
| OFFSET
| ONLY
| OPTION
| OPTIONS
Expand Down Expand Up @@ -1611,6 +1613,7 @@ NOT: 'NOT' | '!';
NULL: 'NULL';
NULLS: 'NULLS';
OF: 'OF';
OFFSET: 'OFFSET';
ON: 'ON';
ONLY: 'ONLY';
OPTION: 'OPTION';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,30 @@ trait CheckAnalysis extends PredicateHelper {

case GlobalLimit(limitExpr, _) => checkLimitLikeClause("limit", limitExpr)

case LocalLimit(limitExpr, _) => checkLimitLikeClause("limit", limitExpr)
case LocalLimit(limitExpr, child) =>
checkLimitLikeClause("limit", limitExpr)
child match {
case Offset(offsetExpr, _) =>
val limit = limitExpr.eval().asInstanceOf[Int]
val offset = offsetExpr.eval().asInstanceOf[Int]
if (Int.MaxValue - limit < offset) {
failAnalysis(
s"""The sum of limit and offset must not be greater than Int.MaxValue,
| but found limit = $limit, offset = $offset.""".stripMargin)
}
case _ =>
}

case Offset(offsetExpr, _) => checkLimitLikeClause("offset", offsetExpr)

case Tail(limitExpr, _) => checkLimitLikeClause("tail", limitExpr)

case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit]
&& o.children.exists(_.isInstanceOf[Offset]) =>
failAnalysis(
s"""Only the OFFSET clause is allowed in the LIMIT clause, but the OFFSET
| clause found in: ${o.nodeName}.""".stripMargin)

case _: Union | _: SetOperation if operator.children.length > 1 =>
def dataTypes(plan: LogicalPlan): Seq[DataType] = plan.output.map(_.dataType)
def ordinalNumber(i: Int): String = i match {
Expand Down Expand Up @@ -661,6 +681,7 @@ trait CheckAnalysis extends PredicateHelper {
}
}
checkCollectedMetrics(plan)
checkOutermostOffset(plan)
extendedCheckRules.foreach(_(plan))
plan.foreachUp {
case o if !o.resolved =>
Expand Down Expand Up @@ -786,6 +807,20 @@ trait CheckAnalysis extends PredicateHelper {
check(plan)
}

/**
* Validate that the root node of query or subquery is [[Offset]].
*/
private def checkOutermostOffset(plan: LogicalPlan): Unit = {
plan match {
case Offset(offsetExpr, _) =>
checkLimitLikeClause("offset", offsetExpr)
failAnalysis(
s"""Only the OFFSET clause is allowed in the LIMIT clause, but the OFFSET
| clause is found to be the outermost node.""".stripMargin)
case _ =>
}
}

/**
* Validates to make sure the outer references appearing inside the subquery
* are allowed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,8 @@ object UnsupportedOperationChecker extends Logging {
throwError("Limits are not supported on streaming DataFrames/Datasets in Update " +
"output mode")

case Offset(_, _) => throwError("Offset is not supported on streaming DataFrames/Datasets")

case Sort(_, _, _) if !containsCompleteData(subPlan) =>
throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on " +
"aggregated DataFrame/Dataset in Complete output mode")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,8 @@ package object dsl {

def limit(limitExpr: Expression): LogicalPlan = Limit(limitExpr, logicalPlan)

def offset(offsetExpr: Expression): LogicalPlan = Offset(offsetExpr, logicalPlan)

def join(
otherPlan: LogicalPlan,
joinType: JoinType = Inner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
CollapseWindow,
CombineFilters,
CombineLimits,
RewriteOffsets,
CombineUnions,
// Constant folding and strength reduction
TransposeWindow,
Expand Down Expand Up @@ -1449,6 +1450,19 @@ object CombineLimits extends Rule[LogicalPlan] {
}
}

/**
* Rewrite [[Offset]] as [[Limit]] or combines two adjacent [[Offset]] operators into one,
* merging the expressions into one single expression.
*/
object RewriteOffsets extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case GlobalLimit(le, Offset(oe, grandChild)) =>
GlobalLimitAndOffset(le, oe, grandChild)
case LocalLimit(le, Offset(oe, grandChild)) =>
Offset(oe, LocalLimit(Add(le, oe), grandChild))
}
}

/**
* Check if there any cartesian products between joins of any type in the optimized plan tree.
* Throw an error if a cartesian product is found without an explicit cross join specified.
Expand Down Expand Up @@ -1554,7 +1568,7 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] {
LocalRelation(projectList.map(_.toAttribute), data.map(projection(_).copy()), isStreaming)

case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming)) =>
LocalRelation(output, data.take(limit), isStreaming)
LocalRelation(output, data.take(limit), isStreaming)

case Filter(condition, LocalRelation(output, data, isStreaming))
if !hasUnevaluableExpr(condition) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit
case _: Sort => empty(p)
case _: GlobalLimit if !p.isStreaming => empty(p)
case _: LocalLimit if !p.isStreaming => empty(p)
case _: Offset => empty(p)
case _: Repartition => empty(p)
case _: RepartitionByExpression => empty(p)
// An aggregate with non-empty group expression will return one output row per group when the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,7 @@ object FoldablePropagation extends Rule[LogicalPlan] {
case _: Sample => true
case _: GlobalLimit => true
case _: LocalLimit => true
case _: Offset => true
case _: Generate => true
case _: Distinct => true
case _: AppendColumns => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,10 +561,16 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
// WINDOWS
val withWindow = withOrder.optionalMap(windowClause)(withWindowClause)

// OFFSET
// - OFFSET 0 is the same as omitting the OFFSET clause
val withOffset = withWindow.optional(offset) {
Offset(typedVisit(offset), withWindow)
}

// LIMIT
// - LIMIT ALL is the same as omitting the LIMIT clause
withWindow.optional(limit) {
Limit(typedVisit(limit), withWindow)
withOffset.optional(limit) {
Limit(typedVisit(limit), withOffset)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ trait LogicalPlanVisitor[T] {
case p: Filter => visitFilter(p)
case p: Generate => visitGenerate(p)
case p: GlobalLimit => visitGlobalLimit(p)
case p: Offset => visitOffset(p)
case p: Intersect => visitIntersect(p)
case p: Join => visitJoin(p)
case p: LocalLimit => visitLocalLimit(p)
Expand Down Expand Up @@ -60,6 +61,8 @@ trait LogicalPlanVisitor[T] {

def visitGlobalLimit(p: GlobalLimit): T

def visitOffset(p: Offset): T

def visitIntersect(p: Intersect): T

def visitJoin(p: Join): T
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,21 @@ case class GroupingSets(
override lazy val resolved: Boolean = false
}

/**
* A logical offset, which may removing a specified number of rows from the beginning of the
* output of child logical plan.
*/
case class Offset(offsetExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = child.output
override def maxRows: Option[Long] = {
import scala.math.max
offsetExpr match {
case IntegerLiteral(offset) => child.maxRows.map { x => max(x - offset, 0) }
case _ => None
}
}
}

/**
* A constructor for creating a pivot, which will later be converted to a [[Project]]
* or an [[Aggregate]] during the query analysis.
Expand Down Expand Up @@ -843,6 +858,23 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr
}
}

/**
* A global (coordinated) limit with offset. This operator can skip at most `offsetExpr` number and
* emit at most `limitExpr` number in total.
*/
case class GlobalLimitAndOffset(
limitExpr: Expression,
offsetExpr: Expression,
child: LogicalPlan) extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = child.output
override def maxRows: Option[Long] = {
limitExpr match {
case IntegerLiteral(limit) => Some(limit)
case _ => None
}
}
}

/**
* This is similar with [[Limit]] except:
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ object BasicStatsPlanVisitor extends LogicalPlanVisitor[Statistics] {

override def visitGlobalLimit(p: GlobalLimit): Statistics = fallback(p)

override def visitOffset(p: Offset): Statistics = fallback(p)

override def visitIntersect(p: Intersect): Statistics = fallback(p)

override def visitJoin(p: Join): Statistics = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ object SizeInBytesOnlyStatsPlanVisitor extends LogicalPlanVisitor[Statistics] {
rowCount = Some(rowCount))
}

override def visitOffset(p: Offset): Statistics = {
val offset = p.offsetExpr.eval().asInstanceOf[Int]
val childStats = p.child.stats
val rowCount: BigInt = childStats.rowCount.map(_.-(offset).max(0)).getOrElse(0)
Statistics(
sizeInBytes = EstimationUtils.getOutputSize(p.output, rowCount, childStats.attributeStats),
rowCount = Some(rowCount))
}

override def visitIntersect(p: Intersect): Statistics = {
val leftSize = p.left.stats.sizeInBytes
val rightSize = p.right.stats.sizeInBytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,38 @@ class AnalysisErrorSuite extends AnalysisTest {
"The limit expression must be equal to or greater than 0, but got -1" :: Nil
)

errorTest(
"an evaluated offset class must not be null",
testRelation.offset(Literal(null, IntegerType)),
"The evaluated offset expression must not be null, but got " :: Nil
)

errorTest(
"num_rows in offset clause must be equal to or greater than 0",
listRelation.offset(-1),
"The offset expression must be equal to or greater than 0, but got -1" :: Nil
)

errorTest(
"OFFSET clause is outermost node",
testRelation.offset(Literal(10, IntegerType)),
"Only the OFFSET clause is allowed in the LIMIT clause, but the OFFSET" ::
"clause is found to be the outermost node." :: Nil
)

errorTest(
"OFFSET clause in other node",
testRelation2.offset(Literal(10, IntegerType)).where('b > 1),
"Only the OFFSET clause is allowed in the LIMIT clause, but the OFFSET" ::
"clause found in: Filter." :: Nil
)

errorTest(
"the sum of num_rows in limit clause and num_rows in offset clause less than Int.MaxValue",
testRelation.offset(Literal(2000000000, IntegerType)).limit(Literal(1000000000, IntegerType)),
"The sum of limit and offset must not be greater than Int.MaxValue" :: Nil
)

errorTest(
"more than one generators in SELECT",
listRelation.select(Explode($"list"), Explode($"list")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,22 @@ class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase {
expectedStatsCboOff = windowsStats)
}

test("offset estimation: offset < child's rowCount") {
val offset = Offset(Literal(2), plan)
checkStats(offset, Statistics(sizeInBytes = 96, rowCount = Some(8)))
}

test("offset estimation: offset > child's rowCount") {
val offset = Offset(Literal(20), plan)
checkStats(offset, Statistics(sizeInBytes = 1, rowCount = Some(0)))
}

test("offset estimation: offset = 0") {
val offset = Offset(Literal(0), plan)
// Offset is equal to zero, so Offset's stats is equal to its child's stats.
checkStats(offset, plan.stats.copy(attributeStats = AttributeMap(Nil)))
}

test("limit estimation: limit < child's rowCount") {
val localLimit = LocalLimit(Literal(2), plan)
val globalLimit = GlobalLimit(Literal(2), plan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,48 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case ReturnAnswer(rootPlan) => rootPlan match {
case Limit(IntegerLiteral(limit), Sort(order, true, child))
if limit < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
TakeOrderedAndProjectExec(limit, 0, order, child.output, planLater(child)) :: Nil
case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child)))
if limit < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil
TakeOrderedAndProjectExec(limit, 0, order, projectList, planLater(child)) :: Nil
case Limit(IntegerLiteral(limit), child) =>
CollectLimitExec(limit, planLater(child)) :: Nil
CollectLimitExec(limit, 0, planLater(child)) :: Nil
case GlobalLimitAndOffset(
IntegerLiteral(limit),
IntegerLiteral(offset),
Sort(order, true, child))
if limit < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(limit, offset, order, child.output, planLater(child)) :: Nil
case GlobalLimitAndOffset(
IntegerLiteral(limit),
IntegerLiteral(offset),
Project(projectList, Sort(order, true, child)))
if limit < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(limit, offset, order, projectList, planLater(child)) :: Nil
case GlobalLimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) =>
CollectLimitExec(limit, offset, planLater(child)) :: Nil
case Tail(IntegerLiteral(limit), child) =>
CollectTailExec(limit, planLater(child)) :: Nil
case other => planLater(other) :: Nil
}
case Limit(IntegerLiteral(limit), Sort(order, true, child))
if limit < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
TakeOrderedAndProjectExec(limit, 0, order, child.output, planLater(child)) :: Nil
case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child)))
if limit < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil
TakeOrderedAndProjectExec(limit, 0, order, projectList, planLater(child)) :: Nil
case GlobalLimitAndOffset(
IntegerLiteral(limit),
IntegerLiteral(offset),
Sort(order, true, child))
if limit < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(limit, offset, order, child.output, planLater(child)) :: Nil
case GlobalLimitAndOffset(
IntegerLiteral(limit),
IntegerLiteral(offset),
Project(projectList, Sort(order, true, child)))
if limit < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(limit, offset, order, projectList, planLater(child)) :: Nil
case _ => Nil
}
}
Expand Down Expand Up @@ -691,6 +717,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.LocalLimitExec(limit, planLater(child)) :: Nil
case logical.GlobalLimit(IntegerLiteral(limit), child) =>
execution.GlobalLimitExec(limit, planLater(child)) :: Nil
case logical.GlobalLimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) =>
execution.GlobalLimitAndOffsetExec(limit, offset, planLater(child)) :: Nil
case union: logical.Union =>
execution.UnionExec(union.children.map(planLater)) :: Nil
case g @ logical.Generate(generator, _, outer, _, _, child) =>
Expand Down
Loading