-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-28330][SQL] Support ANSI SQL: result offset clause in query expression #35975
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f95a4b4
c50fbf7
1dbfc95
b6f07cc
936456b
bf1b4cf
6bb5b62
8551083
58287ab
beddfcd
0616e44
235c49a
8df2c62
038a50f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -411,7 +411,24 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { | |
|
|
||
| case GlobalLimit(limitExpr, _) => checkLimitLikeClause("limit", limitExpr) | ||
|
|
||
| case LocalLimit(limitExpr, _) => checkLimitLikeClause("limit", limitExpr) | ||
| case LocalLimit(limitExpr, child) => | ||
| checkLimitLikeClause("limit", limitExpr) | ||
| child match { | ||
| case Offset(offsetExpr, _) => | ||
| val limit = limitExpr.eval().asInstanceOf[Int] | ||
| val offset = offsetExpr.eval().asInstanceOf[Int] | ||
| if (Int.MaxValue - limit < offset) { | ||
| failAnalysis( | ||
| s""" | ||
| |The sum of the LIMIT clause and the OFFSET clause must not be greater than | ||
| |the maximum 32-bit integer value (2,147,483,647), | ||
| |but found limit = $limit, offset = $offset. | ||
| |""".stripMargin.replace("\n", " ")) | ||
| } | ||
| case _ => | ||
| } | ||
|
|
||
| case Offset(offsetExpr, _) => checkLimitLikeClause("offset", offsetExpr) | ||
|
|
||
| case Tail(limitExpr, _) => checkLimitLikeClause("tail", limitExpr) | ||
|
|
||
|
|
@@ -588,6 +605,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { | |
| } | ||
| } | ||
| checkCollectedMetrics(plan) | ||
| checkOffsetOperator(plan) | ||
| extendedCheckRules.foreach(_(plan)) | ||
| plan.foreachUp { | ||
| case o if !o.resolved => | ||
|
|
@@ -830,6 +848,30 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { | |
| check(plan) | ||
| } | ||
|
|
||
| /** | ||
| * Validate whether the [[Offset]] is valid. | ||
| */ | ||
| private def checkOffsetOperator(plan: LogicalPlan): Unit = { | ||
| plan.foreachUp { | ||
| case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit] | ||
| && o.children.exists(_.isInstanceOf[Offset]) => | ||
| failAnalysis( | ||
| s""" | ||
| |The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET | ||
| |clause found in: ${o.nodeName}.""".stripMargin.replace("\n", " ")) | ||
| case _ => | ||
| } | ||
| plan match { | ||
| case Offset(offsetExpr, _) => | ||
| checkLimitLikeClause("offset", offsetExpr) | ||
| failAnalysis( | ||
| s""" | ||
| |The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET | ||
| |clause is found to be the outermost node.""".stripMargin.replace("\n", " ")) | ||
| case _ => | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just checking, could line 864 to line 871 be merged with line 855 to line 862? Do
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. line 864 to line 871 used to find the outermost node. line 855 to line 862 can't help do this. |
||
| } | ||
|
|
||
| /** | ||
| * Validates to make sure the outer references appearing inside the subquery | ||
| * are allowed. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -386,6 +386,8 @@ object UnsupportedOperationChecker extends Logging { | |
| throwError("Limits are not supported on streaming DataFrames/Datasets in Update " + | ||
| "output mode") | ||
|
|
||
| case Offset(_, _) => throwError("Offset is not supported on streaming DataFrames/Datasets") | ||
|
||
|
|
||
| case Sort(_, _, _) if !containsCompleteData(subPlan) => | ||
| throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on " + | ||
| "aggregated DataFrame/Dataset in Complete output mode") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1145,6 +1145,23 @@ case class Expand( | |
| copy(child = newChild) | ||
| } | ||
|
|
||
| /** | ||
| * A logical offset, which may removing a specified number of rows from the beginning of the | ||
| * output of child logical plan. | ||
|
||
| */ | ||
| case class Offset(offsetExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode { | ||
| override def output: Seq[Attribute] = child.output | ||
| override def maxRows: Option[Long] = { | ||
| import scala.math.max | ||
| offsetExpr match { | ||
| case IntegerLiteral(offset) => child.maxRows.map { x => max(x - offset, 0) } | ||
| case _ => None | ||
|
||
| } | ||
| } | ||
| override protected def withNewChildInternal(newChild: LogicalPlan): Offset = | ||
| copy(child = newChild) | ||
| } | ||
|
|
||
| /** | ||
| * A constructor for creating a pivot, which will later be converted to a [[Project]] | ||
| * or an [[Aggregate]] during the query analysis. | ||
|
|
@@ -1256,6 +1273,37 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr | |
| copy(child = newChild) | ||
| } | ||
|
|
||
| object LimitAndOffset { | ||
| def unapply(p: GlobalLimitAndOffset): Option[(Expression, Expression, LogicalPlan)] = { | ||
| p match { | ||
| case GlobalLimitAndOffset(le1, le2, LocalLimit(le3, child)) if le1.eval().asInstanceOf[Int] | ||
| + le2.eval().asInstanceOf[Int] == le3.eval().asInstanceOf[Int] => | ||
| Some((le1, le2, child)) | ||
| case _ => None | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * A global (coordinated) limit with offset. This operator can skip at most `offsetExpr` number and | ||
| * emit at most `limitExpr` number in total. For example, if we have LIMIT 10 OFFSET 5, we impose a | ||
| * total limit of 10 + 5 = 15 rows and then discard the first 5, leaving 10 rows remaining. | ||
| */ | ||
| case class GlobalLimitAndOffset( | ||
| limitExpr: Expression, | ||
| offsetExpr: Expression, | ||
| child: LogicalPlan) extends OrderPreservingUnaryNode { | ||
| override def output: Seq[Attribute] = child.output | ||
| override def maxRows: Option[Long] = { | ||
| limitExpr match { | ||
| case IntegerLiteral(limit) => Some(limit) | ||
| case _ => None | ||
|
||
| } | ||
| } | ||
| override protected def withNewChildInternal(newChild: LogicalPlan): GlobalLimitAndOffset = | ||
| copy(child = newChild) | ||
| } | ||
|
|
||
| /** | ||
| * This is similar with [[Limit]] except: | ||
| * | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ | |
| import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData} | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.types._ | ||
| import org.apache.spark.unsafe.types.UTF8String | ||
|
|
||
| private[sql] case class GroupableData(data: Int) { | ||
| def getData: Int = data | ||
|
|
@@ -531,6 +532,52 @@ class AnalysisErrorSuite extends AnalysisTest { | |
| "The limit expression must be equal to or greater than 0, but got -1" :: Nil | ||
| ) | ||
|
|
||
| errorTest( | ||
| "an evaluated offset class must not be string", | ||
| testRelation.offset(Literal(UTF8String.fromString("abc"), StringType)), | ||
| "The offset expression must be integer type, but got string" :: Nil | ||
| ) | ||
|
|
||
| errorTest( | ||
| "an evaluated offset class must not be long", | ||
| testRelation.offset(Literal(10L, LongType)), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a way for SQL to specify if a number is long, thus we hit this error message? Like
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This has nothing to do with it. |
||
| "The offset expression must be integer type, but got bigint" :: Nil | ||
| ) | ||
|
|
||
| errorTest( | ||
| "an evaluated offset class must not be null", | ||
| testRelation.offset(Literal(null, IntegerType)), | ||
| "The evaluated offset expression must not be null, but got " :: Nil | ||
| ) | ||
|
|
||
| errorTest( | ||
| "num_rows in offset clause must be equal to or greater than 0", | ||
| testRelation.offset(-1), | ||
| "The offset expression must be equal to or greater than 0, but got -1" :: Nil | ||
| ) | ||
|
|
||
| errorTest( | ||
| "OFFSET clause is outermost node", | ||
| testRelation.offset(Literal(10, IntegerType)), | ||
| "The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" + | ||
| " clause is found to be the outermost node." :: Nil | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I found the second half of error message is a bit confusing. I would guess it tries to say that the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They are different. users could call |
||
| ) | ||
|
|
||
| errorTest( | ||
| "OFFSET clause in other node", | ||
| testRelation2.offset(Literal(10, IntegerType)).where('b > 1), | ||
| "The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" + | ||
| " clause found in: Filter." :: Nil | ||
| ) | ||
|
|
||
| errorTest( | ||
| "the sum of num_rows in limit clause and num_rows in offset clause less than Int.MaxValue", | ||
| testRelation.offset(Literal(2000000000, IntegerType)).limit(Literal(1000000000, IntegerType)), | ||
| "The sum of the LIMIT clause and the OFFSET clause must not be greater than" + | ||
| " the maximum 32-bit integer value (2,147,483,647)," + | ||
| " but found limit = 1000000000, offset = 2000000000." :: Nil | ||
| ) | ||
|
|
||
| errorTest( | ||
| "more than one generators in SELECT", | ||
| listRelation.select(Explode($"list"), Explode($"list")), | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the parser it looks like we accept any expression for the OFFSET, but here we call
asInstanceOf[Int]. Can we have an explicit check that this expression has integer type with an appropriate error message if not, and an accompanying test case that covers it?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, We check it with
checkLimitLikeClause("offset", offsetExpr)on line 432.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG. Will the
checkLimitLikeClauseexecute before thisasInstanceOf[Int]call here? If so, we are OK. Otherwise, we would receive an exception here, which might result in an confusing error message?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It's OK