[SPARK-19451][SQL] rangeBetween method should accept Long value as boundary#18540
[SPARK-19451][SQL] rangeBetween method should accept Long value as boundary#18540jiangxb1987 wants to merge 12 commits intoapache:masterfrom
Conversation
hvanhovell
left a comment
There was a problem hiding this comment.
Thanks for taking this over. I left a few comments.
There was a problem hiding this comment.
I made lower and upper AnyRef, this was to allow the use of both of (foldable) Expressions and SpecialFrameBoundary. This works rather well with things like constant folding. The reason for not making SpecialFrameBoundary an Expression is that this cannot have a type (unless you make it a case class I suppose) and that it showed some weird behavior during analysis/optimization.
There was a problem hiding this comment.
Both DateType and TimestampType expressions are going to need a time zone. I was wondering if we can use a GMT because these are just offset calculation? cc @ueshin
If we can't then we either need to thread through the session local timezone, or it might be easier to put the entire offset calculation in the frame.
There was a problem hiding this comment.
We should also create API's that allow for other types of literals, at least one for CalendarIntervals.
There was a problem hiding this comment.
I totally agree that's what we definitely should do, but I'd suggest we address this in a follow-up work, and focus on resolving the overflow issue on Long frame boundaries in rangeBetween in this PR.
One major concern is the WindowSpec API is marked Stable, so I'm wondering what's the proper procedure to make a change to this interface?
There was a problem hiding this comment.
It might be the easiest to make it take a Column, so rangeBetween(begin: Column, end: Column), only downside to this is that we need some way to express the special boundaries (current row, unbounded). Also cc @rxin
There was a problem hiding this comment.
Maybe we can use Literal(0) to represent CurrentRow? And a sufficient large number(like Literal(Long.MaxValue)) for Unbounded?
There was a problem hiding this comment.
i was trying to avoid introducing a special value, but maybe you can do that.
How important is it to fix this?
There was a problem hiding this comment.
Let's rule it out of the scope of this PR and address this in a follow-up.
There was a problem hiding this comment.
We should also add tests for dates/doubles/timestamps.
There was a problem hiding this comment.
Will do this tomorrow.
There was a problem hiding this comment.
We can only add the test cases after we have finalized the API change.
|
Test build #79210 has finished for PR 18540 at commit
|
|
Test build #79231 has finished for PR 18540 at commit
|
There was a problem hiding this comment.
do we really need this? can we just require strict types?
There was a problem hiding this comment.
Hmmm that would be kind of weird. So a user will get type coercion in its select but not in the range clause.
|
Test build #79531 has finished for PR 18540 at commit
|
| } | ||
| /** | ||
| * A specified Window Frame. The val lower/uppper can be either a foldable [[Expression]] or a | ||
| * [[SpecialFrameBoundary]]. |
There was a problem hiding this comment.
can we make SpecialFrameBoundary an expression?
There was a problem hiding this comment.
I tried that. The problem is that you will need to make them have a proper data type. I tried to make them case object .. {} with data type null, but I ended with these replaced with a null literal.
All I am saying that this will require a little bit more coding. Since you need to resolve the data type of the boundary.
There was a problem hiding this comment.
can we do this in WindowFrameCoercion?
|
|
||
| private def isValidFrameType(ft: DataType): Boolean = (orderSpec.head.dataType, ft) match { | ||
| case (DateType, IntegerType) => true | ||
| case (TimestampType, CalendarIntervalType) => true |
There was a problem hiding this comment.
shall we support DateType, TimestampType in follow-up PR? Let's focus on refactor/cleanup in this PR.
|
Test build #79835 has finished for PR 18540 at commit
|
|
Test build #79841 has finished for PR 18540 at commit
|
| } | ||
|
|
||
| override def children: Seq[Expression] = partitionSpec ++ orderSpec | ||
| override def children: Seq[Expression] = partitionSpec ++ orderSpec ++ Seq(frameSpecification) |
There was a problem hiding this comment.
nit: partitionSpec ++ orderSpec :+ frameSpecification
| * Represents a window frame. | ||
| */ | ||
| sealed trait WindowFrame | ||
| def isValueBound: Boolean = valueBoundary.nonEmpty |
There was a problem hiding this comment.
nit: def isValueBound: Boolean = !isUnbounded
There was a problem hiding this comment.
oh, is it possible that both lower and upper are current row?
There was a problem hiding this comment.
yea we can have ROWS CURRENT ROW
There was a problem hiding this comment.
Having rows between current row and current row is kinda dumb, the aggregate should contain only 1 value. However range between current row and current row can be very useful because you can aggregate over all the observations with the same ordering value.
| } | ||
|
|
||
| private def checkBoundary(b: Expression, location: String): TypeCheckResult = b match { | ||
| case e: Expression if !e.foldable && !e.isInstanceOf[SpecialFrameBoundary] => |
There was a problem hiding this comment.
nit: we can mark SpecialFrameBoundary as foldable.
There was a problem hiding this comment.
I don't think you should, with what are you going to replace the boundary during optimization?
| Seq(UnresolvedAttribute("a")), | ||
| Seq(SortOrder(Literal.default(DateType), Ascending)), | ||
| SpecifiedWindowFrame(RangeFrame, Literal(10.0), Literal(2147483648L))) | ||
| ) |
There was a problem hiding this comment.
can we add some more test cases with special window frame boundary?
| ("between unbounded preceding and current row", UnboundedPreceding, CurrentRow), | ||
| ("10 preceding", -Literal(10), CurrentRow), | ||
| ("2147483648 preceding", -Literal(2147483648L), CurrentRow), | ||
| ("3 + 1 following", Add(Literal(3), Literal(1)), CurrentRow), // Will fail during analysis |
There was a problem hiding this comment.
why this will fail during analysis?
There was a problem hiding this comment.
The lower boundary would be higher than the upper boundary, previously it would fail, but we have removed this check, should add it.
There was a problem hiding this comment.
Do you think that will improve the UX?
| ("2147483648 preceding", -Literal(2147483648L), CurrentRow), | ||
| ("3 + 1 following", Add(Literal(3), Literal(1)), CurrentRow), // Will fail during analysis | ||
| ("unbounded preceding", Unbounded, CurrentRow), | ||
| ("unbounded following", Unbounded, CurrentRow), // Will fail during analysis |
There was a problem hiding this comment.
In fact this is problematic, we would generate the same result for both unbounded preceding and unbounded following. @hvanhovell any idea on resolving this?
There was a problem hiding this comment.
Well the idea was that the unboundedness was tied to the location in which it was used, so for example unbounded in the first position would mean unbounded preceding. However this is completely opposite to how we interpret literal bounds, it might be better to reintroduce special boundaries for unbounded preceding and unbounded following.
| "num-children" -> 2, | ||
| "frameType" -> JObject("object" -> JString(RowFrame.getClass.getName)), | ||
| "lower" -> 0, | ||
| "upper" -> 1), |
There was a problem hiding this comment.
why lower and upper is 0 and 1?
There was a problem hiding this comment.
After this PR, SpecialFrameBoundary and WindowFrame are made Expressions, thus they are TreeNodes, so the field values are made value index in the TreeNode.children.
|
LGTM except some minor comments |
|
Test build #79905 has finished for PR 18540 at commit
|
| TypeCheckFailure( | ||
| "Cannot use an UnspecifiedFrame. This should have been converted during analysis. " + | ||
| "Please file a bug report.") | ||
| case f: SpecifiedWindowFrame if f.frameType == RangeFrame && !f.isUnbounded && |
There was a problem hiding this comment.
I think this should be !f.isValueBound? basically current row and current row is not unbound but should be allowed here.
|
Test build #80006 has finished for PR 18540 at commit
|
| "Frame bound value must be a constant integer.", | ||
| ctx) | ||
| e.eval().asInstanceOf[Int] | ||
| validate(e.resolved && e.foldable, "Frame bound value must be a literal.", ctx) |
There was a problem hiding this comment.
is it necessary? I think analyzer can detect and report this failure too?
There was a problem hiding this comment.
How about keep this so we can fail earlier?
|
LGTM, pending tests |
|
Test build #80020 has finished for PR 18540 at commit
|
|
Test build #80021 has finished for PR 18540 at commit
|
|
You still missed resolving this comment |
| FrameBoundary(l), | ||
| FrameBoundary(h)))) | ||
| if order.isEmpty || frame != RowFrame || l != h => | ||
| frame: SpecifiedWindowFrame)) |
There was a problem hiding this comment.
Nit: Cutting in the middle looks weird. Please keep them in the same line.
WindowSpecDefinition(_, order, frame: SpecifiedWindowFrame))
| failAnalysis(s"Expression '$e' not supported within a window function.") | ||
| } | ||
| // Make sure the window specification is valid. | ||
| s.validate match { |
There was a problem hiding this comment.
The verification is moved to checkInputDataTypes, right?
| private def createBoundaryCast(boundary: Expression, dt: DataType): Expression = { | ||
| boundary match { | ||
| case e: Expression if e.dataType != dt && Cast.canCast(e.dataType, dt) && | ||
| !e.isInstanceOf[SpecialFrameBoundary] => |
There was a problem hiding this comment.
Splitting if in the middle looks weird. How about?
case e: SpecialFrameBoundary => e
case e: Expression if e.dataType != dt && Cast.canCast(e.dataType, dt) => Cast(e, dt)
case o => o| object WindowFrameCoercion extends Rule[LogicalPlan] { | ||
| def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { | ||
| case s @ WindowSpecDefinition(_, Seq(order), SpecifiedWindowFrame(RangeFrame, lower, upper)) | ||
| if order.resolved => |
There was a problem hiding this comment.
Nit: add two more spaces before if
| s"A range window frame with value boundaries cannot be used in a window specification " + | ||
| s"with multiple order by expressions: ${orderSpec.mkString(",")}") | ||
| case f: SpecifiedWindowFrame if f.frameType == RangeFrame && f.isValueBound && | ||
| !isValidFrameType(f.valueBoundary.head.dataType) => |
There was a problem hiding this comment.
Personally, I do not like many long if conditions. Let us add extra two spaces in line 71, 66, and 62.
| TypeCheckFailure( | ||
| s"The data type '${orderSpec.head.dataType}' used in the order specification does " + | ||
| s"not match the data type '${f.valueBoundary.head.dataType}' which is used in the " + | ||
| "range frame.") |
There was a problem hiding this comment.
Just want to confirm whether we have at least four negatives test cases to respectively cover these cases?
There was a problem hiding this comment.
The first case is for defensive guard, I'll add test sql for the two negative cases related to orderBy in RangeFrame.
| sealed trait FrameBoundary { | ||
| def notFollows(other: FrameBoundary): Boolean | ||
| case object RangeFrame extends FrameType { | ||
| override def inputType: AbstractDataType = TypeCollection.NumericAndInterval |
There was a problem hiding this comment.
uh, we also support CalendarInterval. Do we have a test case to verify it works on CalendarInterval?
| s"'${l.dataType.catalogString}' <> '${u.dataType.catalogString}'") | ||
| case (l: Expression, u: Expression) if isGreaterThan(l, u) => | ||
| TypeCheckFailure( | ||
| "The lower bound of a window frame must less than or equal to the upper bound") |
There was a problem hiding this comment.
Another question. Do we have test cases for the above three negative cases?
There was a problem hiding this comment.
The second is defensive check, added test sql for the rest.
| if (check.isFailure) { | ||
| check | ||
| } else if (!offset.foldable) { | ||
| TypeCheckFailure(s"Offset expression '$offset' must be a literal.") |
There was a problem hiding this comment.
Currently it's only used by lead()/lag() functions, that both checked the input types, so we're not able to test this from sql.
| "Frame bound value must be a constant integer.", | ||
| ctx) | ||
| e.eval().asInstanceOf[Int] | ||
| validate(e.resolved && e.foldable, "Frame bound value must be a literal.", ctx) |
| UnboundedFollowing | ||
| case SqlBaseParser.FOLLOWING => | ||
| ValueFollowing(value) | ||
| value |
There was a problem hiding this comment.
It should be an unsigned-integer based on ANSI SQL
There was a problem hiding this comment.
May I ask how should we parse it into an unsigned-integer?
There was a problem hiding this comment.
It sounds like we already allowed it in the previous release. Thus, we need to follow what we have now.
| UnboundedPreceding | ||
| case SqlBaseParser.PRECEDING => | ||
| ValuePreceding(value) | ||
| UnaryMinus(value) |
There was a problem hiding this comment.
The same here. Do we allow users assign a negative value?
| @@ -174,28 +191,22 @@ class WindowSpec private[sql]( | |||
| */ | |||
| // Note: when updating the doc for this method, also update Window.rangeBetween. | |||
| def rangeBetween(start: Long, end: Long): WindowSpec = { | |||
There was a problem hiding this comment.
What happens if start is larger than end?
There was a problem hiding this comment.
We'll get and compute empty frames.
|
It looks pretty solid. Thanks! |
|
Test build #80041 has finished for PR 18540 at commit
|
|
LGTM |
|
Thanks! Merging to master |
…undary ## What changes were proposed in this pull request? Long values can be passed to `rangeBetween` as range frame boundaries, but we silently convert it to Int values, this can cause wrong results and we should fix this. Further more, we should accept any legal literal values as range frame boundaries. In this PR, we make it possible for Long values, and make accepting other DataTypes really easy to add. This PR is mostly based on Herman's previous amazing work: hvanhovell@596f53c After this been merged, we can close #16818 . ## How was this patch tested? Add new tests in `DataFrameWindowFunctionsSuite` and `TypeCoercionSuite`. Author: Xingbo Jiang <xingbo.jiang@databricks.com> Closes #18540 from jiangxb1987/rangeFrame. (cherry picked from commit 92d8563) Signed-off-by: gatorsmile <gatorsmile@gmail.com>
…undary ## What changes were proposed in this pull request? Long values can be passed to `rangeBetween` as range frame boundaries, but we silently convert it to Int values, this can cause wrong results and we should fix this. Further more, we should accept any legal literal values as range frame boundaries. In this PR, we make it possible for Long values, and make accepting other DataTypes really easy to add. This PR is mostly based on Herman's previous amazing work: hvanhovell@596f53c After this been merged, we can close apache#16818 . ## How was this patch tested? Add new tests in `DataFrameWindowFunctionsSuite` and `TypeCoercionSuite`. Author: Xingbo Jiang <xingbo.jiang@databricks.com> Closes apache#18540 from jiangxb1987/rangeFrame. (cherry picked from commit 92d8563) Signed-off-by: gatorsmile <gatorsmile@gmail.com>
What changes were proposed in this pull request?
Long values can be passed to
rangeBetweenas range frame boundaries, but we silently convert it to Int values, this can cause wrong results and we should fix this.Further more, we should accept any legal literal values as range frame boundaries. In this PR, we make it possible for Long values, and make accepting other DataTypes really easy to add.
This PR is mostly based on Herman's previous amazing work: hvanhovell@596f53c
After this been merged, we can close #16818 .
How was this patch tested?
Add new tests in
DataFrameWindowFunctionsSuiteandTypeCoercionSuite.