diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index aa44e677e577b..29da51764cded 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -399,6 +399,7 @@ queryOrganization (SORT BY sort+=sortItem (',' sort+=sortItem)*)? windowClause? (LIMIT (ALL | limit=expression))? + (OFFSET offset=expression)? ; multiInsertQueryBody @@ -1008,6 +1009,7 @@ ansiNonReserved | NO | NULLS | OF + | OFFSET | OPTION | OPTIONS | OUT @@ -1259,6 +1261,7 @@ nonReserved | NULL | NULLS | OF + | OFFSET | ONLY | OPTION | OPTIONS @@ -1520,6 +1523,7 @@ NOT: 'NOT' | '!'; NULL: 'NULL'; NULLS: 'NULLS'; OF: 'OF'; +OFFSET: 'OFFSET'; ON: 'ON'; ONLY: 'ONLY'; OPTION: 'OPTION'; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 519c558d12770..0bca672c6750f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -65,20 +65,19 @@ trait CheckAnalysis extends PredicateHelper { case _ => None } - private def checkLimitClause(limitExpr: Expression): Unit = { - limitExpr match { + private def checkLimitOrOffsetClause(expr: Expression, name: String): Unit = { + expr match { case e if !e.foldable => failAnalysis( - "The limit expression must evaluate to a constant value, but got " + - limitExpr.sql) + s"The $name expression must evaluate to a constant value, but got ${expr.sql}") case e if e.dataType != IntegerType => failAnalysis( - s"The limit expression must be integer type, but got " + + s"The $name expression must be integer type, but got " + e.dataType.catalogString) case e => e.eval() match { case null => failAnalysis( - s"The evaluated limit expression must not be null, but got ${limitExpr.sql}") + s"The evaluated $name expression must not be null, but got ${expr.sql}") case v: Int if v < 0 => failAnalysis( - s"The limit expression must be equal to or greater than 0, but got $v") + s"The $name expression must be equal to or greater than 0, but got $v") case _ => // OK } } @@ -264,9 +263,11 @@ trait CheckAnalysis extends PredicateHelper { } } - case GlobalLimit(limitExpr, _) => checkLimitClause(limitExpr) + case GlobalLimit(limitExpr, _) => checkLimitOrOffsetClause(limitExpr, "limit") + + case LocalLimit(limitExpr, _) => checkLimitOrOffsetClause(limitExpr, "limit") - case LocalLimit(limitExpr, _) => checkLimitClause(limitExpr) + case Offset(offsetExpr, _) => checkLimitOrOffsetClause(offsetExpr, "offset") case _: Union | _: SetOperation if operator.children.length > 1 => def dataTypes(plan: LogicalPlan): Seq[DataType] = plan.output.map(_.dataType) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 288ff1a04737e..1420c75e31ed9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -319,6 +319,9 @@ object UnsupportedOperationChecker { throwError("Limits are not supported on streaming DataFrames/Datasets in Update " + "output mode") + case Offset(_, _) => + throwError("Offset is not supported on streaming DataFrames/Datasets") + case Sort(_, _, _) if !containsCompleteData(subPlan) => throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on " + "aggregated DataFrame/Dataset in Complete output mode") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 796043fff665e..3b768c45cd314 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -319,6 +319,8 @@ package object dsl { def limit(limitExpr: Expression): LogicalPlan = Limit(limitExpr, logicalPlan) + def offset(offsetExpr: Expression): LogicalPlan = Offset(offsetExpr, logicalPlan) + def join( otherPlan: LogicalPlan, joinType: JoinType = Inner, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index b19e13870aa65..3cab5e4a2ade6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -85,6 +85,7 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit case _: Filter => empty(p) case _: Sample => empty(p) case _: Sort => empty(p) + case _: Offset => empty(p) case _: GlobalLimit => empty(p) case _: LocalLimit => empty(p) case _: Repartition => empty(p) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index 39709529c00d3..0e0c2b2f380b7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -642,6 +642,7 @@ object FoldablePropagation extends Rule[LogicalPlan] { case _: Aggregate => true case _: Window => true case _: Sample => true + case _: Offset => true case _: GlobalLimit => true case _: LocalLimit => true case _: Generate => true diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 7bbdd4f3c520e..ef1138535b47c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -422,10 +422,16 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging // WINDOWS val withWindow = withOrder.optionalMap(windowClause)(withWindowClause) + // OFFSET + // - OFFSET 0 is the same as omitting the OFFSET clause + val withOffset = withWindow.optional(offset) { + Offset(typedVisit(offset), withWindow) + } + // LIMIT // - LIMIT ALL is the same as omitting the LIMIT clause - withWindow.optional(limit) { - Limit(typedVisit(limit), withWindow) + withOffset.optional(limit) { + Limit(typedVisit(limit), withOffset) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala index 18baced8f3d61..4257b5a47c559 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala @@ -29,6 +29,7 @@ trait LogicalPlanVisitor[T] { case p: Expand => visitExpand(p) case p: Filter => visitFilter(p) case p: Generate => visitGenerate(p) + case p: Offset => visitOffset(p) case p: GlobalLimit => visitGlobalLimit(p) case p: Intersect => visitIntersect(p) case p: Join => visitJoin(p) @@ -58,6 +59,8 @@ trait LogicalPlanVisitor[T] { def visitGenerate(p: Generate): T + def visitOffset(p: Offset): T + def visitGlobalLimit(p: GlobalLimit): T def visitIntersect(p: Intersect): T diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index d9c370af47fb8..73c91c956b628 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -998,6 +998,20 @@ case class Pivot( } } +/** + * A global (coordinated) offset. + */ +case class Offset(offsetExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode { + override def output: Seq[Attribute] = child.output + override def maxRows: Option[Long] = { + import scala.math.max + offsetExpr match { + case IntegerLiteral(offset) => child.maxRows.map { x => max(x - offset, 0) } + case _ => None + } + } +} + /** * A constructor for creating a logical limit, which is split into two separate logical nodes: * a [[LocalLimit]], which is a partition local limit, followed by a [[GlobalLimit]]. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/BasicStatsPlanVisitor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/BasicStatsPlanVisitor.scala index b8c652dc8f12e..6b9da224446aa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/BasicStatsPlanVisitor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/BasicStatsPlanVisitor.scala @@ -45,6 +45,8 @@ object BasicStatsPlanVisitor extends LogicalPlanVisitor[Statistics] { override def visitGenerate(p: Generate): Statistics = fallback(p) + override def visitOffset(p: Offset): Statistics = fallback(p) + override def visitGlobalLimit(p: GlobalLimit): Statistics = fallback(p) override def visitIntersect(p: Intersect): Statistics = fallback(p) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala index da36db7ae1f5f..803bb2a8d0da9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala @@ -79,6 +79,15 @@ object SizeInBytesOnlyStatsPlanVisitor extends LogicalPlanVisitor[Statistics] { override def visitGenerate(p: Generate): Statistics = default(p) + override def visitOffset(p: Offset): Statistics = { + val offset = p.offsetExpr.eval().asInstanceOf[Int] + val childStats = p.child.stats + val rowCount: BigInt = childStats.rowCount.map(_.-(offset).max(0)).getOrElse(0) + Statistics( + sizeInBytes = EstimationUtils.getOutputSize(p.output, rowCount, childStats.attributeStats), + rowCount = Some(rowCount)) + } + override def visitGlobalLimit(p: GlobalLimit): Statistics = { val limit = p.limitExpr.eval().asInstanceOf[Int] val childStats = p.child.stats diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala index 16a5c2d3001a7..b07dc8ca78c02 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala @@ -56,6 +56,22 @@ class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase { expectedStatsCboOff = windowsStats) } + test("offset estimation: offset < child's rowCount") { + val offset = Offset(Literal(2), plan) + checkStats(offset, Statistics(sizeInBytes = 96, rowCount = Some(8))) + } + + test("offset estimation: offset > child's rowCount") { + val offset = Offset(Literal(20), plan) + checkStats(offset, Statistics(sizeInBytes = 1, rowCount = Some(0))) + } + + test("offset estimation: offset = 0") { + val offset = Offset(Literal(0), plan) + // Offset is equal to zero, so Offset's stats is equal to its child's stats. + checkStats(offset, plan.stats.copy(attributeStats = AttributeMap(Nil))) + } + test("limit estimation: limit < child's rowCount") { val localLimit = LocalLimit(Literal(2), plan) val globalLimit = GlobalLimit(Literal(2), plan) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index dc7fb7741e7a7..8fb1250b696ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -42,6 +42,7 @@ class SparkPlanner( DataSourceV2Strategy :: FileSourceStrategy :: DataSourceStrategy(conf) :: + SpecialOffset :: SpecialLimits :: Aggregation :: Window :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 5500941936442..f8fd4828b5400 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -77,6 +77,22 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } + /** + * Plans special cases of offset operators. + */ + object SpecialOffset extends Strategy { + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case ReturnAnswer(rootPlan) => rootPlan match { + case logical.Offset(IntegerLiteral(offset), child) => + CollectOffsetExec(offset, planLater(child)) :: Nil + case _ => Nil + } + case logical.Offset(IntegerLiteral(offset), child) => + OffsetExec(offset, planLater(child)) :: Nil + case _ => Nil + } + } + /** * Plans special cases of limit operators. */ @@ -722,6 +738,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil case logical.LocalRelation(output, data, _) => LocalTableScanExec(output, data) :: Nil + case logical.Offset(IntegerLiteral(offset), child) => + execution.OffsetExec(offset, planLater(child)) :: Nil case logical.LocalLimit(IntegerLiteral(limit), child) => execution.LocalLimitExec(limit, planLater(child)) :: Nil case logical.GlobalLimit(IntegerLiteral(limit), child) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/offset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/offset.scala new file mode 100644 index 0000000000000..5ce026277c88c --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/offset.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.RDD +import org.apache.spark.serializer.Serializer +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, SinglePartition} + + +/** + * Skip the first `offset` elements and collect them to a single partition. + * This operator will be used when a logical `Offset` operation is the final operator in an + * logical plan, which happens when the user is collecting results back to the driver. + */ +case class CollectOffsetExec(offset: Int, child: SparkPlan) extends UnaryExecNode { + + override def output: Seq[Attribute] = child.output + + override def outputPartitioning: Partitioning = SinglePartition + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def executeCollect(): Array[InternalRow] = child.executeCollect.drop(offset) + + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) + + protected override def doExecute(): RDD[InternalRow] = { + sparkContext.parallelize(executeCollect(), 1) + } + +} + +/** + * Skip the first `offset` elements and collect them to a single partition. + */ +case class OffsetExec(offset: Int, child: SparkPlan) extends UnaryExecNode { + + override def output: Seq[Attribute] = child.output + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + protected override def doExecute(): RDD[InternalRow] = { + val rdd = child.execute() + val arr = rdd.take(offset) + rdd.filter(!arr.contains(_)) + } + +} +