-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-8641][SPARK-12455][SQL] Native Spark Window functions - Follow-up (docs & tests) #10402
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
e120562
Add documentation to window functions.
hvanhovell 2b806bb
Add NULL tests
hvanhovell cf58954
Further Docs for NTile and acknowledging Hive and Presto projects.
hvanhovell 767305a
Fix OffsetWindowFunction foldable and nullable. Use raw strings inste…
hvanhovell 13f9c95
Fix rank strings.
hvanhovell File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -314,8 +314,8 @@ abstract class OffsetWindowFunction | |
| val offset: Expression | ||
|
|
||
| /** | ||
| * Direction (above = 1/below = -1) of the number of rows between the current row and the row | ||
| * where the input expression is evaluated. | ||
| * Direction of the number of rows between the current row and the row where the input expression | ||
| * is evaluated. | ||
| */ | ||
| val direction: SortDirection | ||
|
|
||
|
|
@@ -327,9 +327,9 @@ abstract class OffsetWindowFunction | |
| * both the input and the default expression are foldable, the result is still not foldable due to | ||
| * the frame. | ||
| */ | ||
| override def foldable: Boolean = input.foldable && (default == null || default.foldable) | ||
| override def foldable: Boolean = false | ||
|
|
||
| override def nullable: Boolean = input.nullable && (default == null || default.nullable) | ||
| override def nullable: Boolean = default == null || default.nullable | ||
|
|
||
| override lazy val frame = { | ||
| // This will be triggered by the Analyzer. | ||
|
|
@@ -353,6 +353,21 @@ abstract class OffsetWindowFunction | |
| override def toString: String = s"$prettyName($input, $offset, $default)" | ||
| } | ||
|
|
||
| /** | ||
| * The Lead function returns the value of 'x' at 'offset' rows after the current row in the window. | ||
| * Offsets start at 0, which is the current row. The offset must be constant integer value. The | ||
| * default offset is 1. When the value of 'x' is null at the offset, or when the offset is larger | ||
| * than the window, the default expression is evaluated. | ||
| * | ||
| * This documentation has been based upon similar documentation for the Hive and Presto projects. | ||
| * | ||
| * @param input expression to evaluate 'offset' rows after the current row. | ||
| * @param offset rows to jump ahead in the partition. | ||
| * @param default to use when the input value is null or when the offset is larger than the window. | ||
| */ | ||
| @ExpressionDescription(usage = | ||
| """_FUNC_(input, offset, default) - LEAD returns the value of 'x' at 'offset' rows after the | ||
| current row in the window""") | ||
| case class Lead(input: Expression, offset: Expression, default: Expression) | ||
| extends OffsetWindowFunction { | ||
|
|
||
|
|
@@ -365,6 +380,21 @@ case class Lead(input: Expression, offset: Expression, default: Expression) | |
| override val direction = Ascending | ||
| } | ||
|
|
||
| /** | ||
| * The Lag function returns the value of 'x' at 'offset' rows before the current row in the window. | ||
| * Offsets start at 0, which is the current row. The offset must be constant integer value. The | ||
| * default offset is 1. When the value of 'x' is null at the offset, or when the offset is smaller | ||
| * than the window, the default expression is evaluated. | ||
| * | ||
| * This documentation has been based upon similar documentation for the Hive and Presto projects. | ||
| * | ||
| * @param input expression to evaluate 'offset' rows before the current row. | ||
| * @param offset rows to jump back in the partition. | ||
| * @param default to use when the input value is null or when the offset is smaller than the window. | ||
| */ | ||
| @ExpressionDescription(usage = | ||
| """_FUNC_(input, offset, default) - LAG returns the value of 'x' at 'offset' rows before the | ||
| current row in the window""") | ||
| case class Lag(input: Expression, offset: Expression, default: Expression) | ||
| extends OffsetWindowFunction { | ||
|
|
||
|
|
@@ -409,10 +439,31 @@ object SizeBasedWindowFunction { | |
| val n = AttributeReference("window__partition__size", IntegerType, nullable = false)() | ||
| } | ||
|
|
||
| /** | ||
| * The RowNumber function computes a unique, sequential number to each row, starting with one, | ||
| * according to the ordering of rows within the window partition. | ||
| * | ||
| * This documentation has been based upon similar documentation for the Hive and Presto projects. | ||
| */ | ||
| @ExpressionDescription(usage = | ||
| """_FUNC_() - The ROW_NUMBER() function assigns a unique, sequential | ||
| number to each row, starting with one, according to the ordering of rows within the window | ||
| partition.""") | ||
| case class RowNumber() extends RowNumberLike { | ||
| override val evaluateExpression = rowNumber | ||
| } | ||
|
|
||
| /** | ||
| * The CumeDist function computes the position of a value relative to a all values in the partition. | ||
| * The result is the number of rows preceding or equal to the current row in the ordering of the | ||
| * partition divided by the total number of rows in the window partition. Any tie values in the | ||
| * ordering will evaluate to the same position. | ||
| * | ||
| * This documentation has been based upon similar documentation for the Hive and Presto projects. | ||
| */ | ||
| @ExpressionDescription(usage = | ||
| """_FUNC_() - The CUME_DIST() function computes the position of a value relative to a all values | ||
| in the partition.""") | ||
| case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction { | ||
| override def dataType: DataType = DoubleType | ||
| // The frame for CUME_DIST is Range based instead of Row based, because CUME_DIST must | ||
|
|
@@ -421,6 +472,30 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction { | |
| override val evaluateExpression = Divide(Cast(rowNumber, DoubleType), Cast(n, DoubleType)) | ||
| } | ||
|
|
||
| /** | ||
| * The NTile function divides the rows for each window partition into 'n' buckets ranging from 1 to | ||
| * at most 'n'. Bucket values will differ by at most 1. If the number of rows in the partition does | ||
| * not divide evenly into the number of buckets, then the remainder values are distributed one per | ||
| * bucket, starting with the first bucket. | ||
| * | ||
| * The NTile function is particularly useful for the calculation of tertiles, quartiles, deciles and | ||
| * other common summary statistics | ||
| * | ||
| * The function calculates two variables during initialization: The size of a regular bucket, and | ||
| * the number of buckets that will have one extra row added to it (when the rows do not evenly fit | ||
| * into the number of buckets); both variables are based on the size of the current partition. | ||
| * During the calculation process the function keeps track of the current row number, the current | ||
| * bucket number, and the row number at which the bucket will change (bucketThreshold). When the | ||
| * current row number reaches bucket threshold, the bucket value is increased by one and the the | ||
| * threshold is increased by the bucket size (plus one extra if the current bucket is padded). | ||
| * | ||
| * This documentation has been based upon similar documentation for the Hive and Presto projects. | ||
| * | ||
| * @param buckets number of buckets to divide the rows in. Default value is 1. | ||
| */ | ||
| @ExpressionDescription(usage = | ||
| """_FUNC_(x) - The NTILE(n) function divides the rows for each window partition into 'n' buckets | ||
| ranging from 1 to at most 'n'.""") | ||
| case class NTile(buckets: Expression) extends RowNumberLike with SizeBasedWindowFunction { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add some comments to explain the implementation of NTile? |
||
| def this() = this(Literal(1)) | ||
|
|
||
|
|
@@ -474,6 +549,8 @@ case class NTile(buckets: Expression) extends RowNumberLike with SizeBasedWindow | |
| * the order of the window in which is processed. For instance, when the value of 'x' changes in a | ||
| * window ordered by 'x' the rank function also changes. The size of the change of the rank function | ||
| * is (typically) not dependent on the size of the change in 'x'. | ||
| * | ||
| * This documentation has been based upon similar documentation for the Hive and Presto projects. | ||
| */ | ||
| abstract class RankLike extends AggregateWindowFunction { | ||
| override def inputTypes: Seq[AbstractDataType] = children.map(_ => AnyDataType) | ||
|
|
@@ -513,11 +590,41 @@ abstract class RankLike extends AggregateWindowFunction { | |
| def withOrder(order: Seq[Expression]): RankLike | ||
| } | ||
|
|
||
| /** | ||
| * The Rank function computes the rank of a value in a group of values. The result is one plus the | ||
| * number of rows preceding or equal to the current row in the ordering of the partition. Tie values | ||
| * will produce gaps in the sequence. | ||
| * | ||
| * This documentation has been based upon similar documentation for the Hive and Presto projects. | ||
| * | ||
| * @param children to base the rank on; a change in the value of one the children will trigger a | ||
| * change in rank. This is an internal parameter and will be assigned by the | ||
| * Analyser. | ||
| */ | ||
| @ExpressionDescription(usage = | ||
| """_FUNC_() - RANK() computes the rank of a value in a group of values. The result is one plus | ||
| the number of rows preceding or equal to the current row in the ordering of the partition. Tie | ||
| values will produce gaps in the sequence.""") | ||
| case class Rank(children: Seq[Expression]) extends RankLike { | ||
| def this() = this(Nil) | ||
| override def withOrder(order: Seq[Expression]): Rank = Rank(order) | ||
| } | ||
|
|
||
| /** | ||
| * The DenseRank function computes the rank of a value in a group of values. The result is one plus | ||
| * the previously assigned rank value. Unlike Rank, DenseRank will not produce gaps in the ranking | ||
| * sequence. | ||
| * | ||
| * This documentation has been based upon similar documentation for the Hive and Presto projects. | ||
| * | ||
| * @param children to base the rank on; a change in the value of one the children will trigger a | ||
| * change in rank. This is an internal parameter and will be assigned by the | ||
| * Analyser. | ||
| */ | ||
| @ExpressionDescription(usage = | ||
| """_FUNC_() - The DENSE_RANK() function computes the rank of a value in a group of values. The | ||
| result is one plus the previously assigned rank value. Unlike Rank, DenseRank will not produce | ||
| gaps in the ranking sequence.""") | ||
| case class DenseRank(children: Seq[Expression]) extends RankLike { | ||
| def this() = this(Nil) | ||
| override def withOrder(order: Seq[Expression]): DenseRank = DenseRank(order) | ||
|
|
@@ -527,6 +634,23 @@ case class DenseRank(children: Seq[Expression]) extends RankLike { | |
| override val initialValues = zero +: orderInit | ||
| } | ||
|
|
||
| /** | ||
| * The PercentRank function computes the percentage ranking of a value in a group of values. The | ||
| * result the rank of the minus one divided by the total number of rows in the partitiion minus one: | ||
| * (r - 1) / (n - 1). If a partition only contains one row, the function will return 0. | ||
| * | ||
| * The PercentRank function is similar to the CumeDist function, but it uses rank values instead of | ||
| * row counts in the its numerator. | ||
| * | ||
| * This documentation has been based upon similar documentation for the Hive and Presto projects. | ||
| * | ||
| * @param children to base the rank on; a change in the value of one the children will trigger a | ||
| * change in rank. This is an internal parameter and will be assigned by the | ||
| * Analyser. | ||
| */ | ||
| @ExpressionDescription(usage = | ||
| """_FUNC_() - PERCENT_RANK() The PercentRank function computes the percentage ranking of a value | ||
| in a group of values.""") | ||
| case class PercentRank(children: Seq[Expression]) extends RankLike with SizeBasedWindowFunction { | ||
| def this() = this(Nil) | ||
| override def withOrder(order: Seq[Expression]): PercentRank = PercentRank(order) | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a clarification question. For our current code,
defaultwill never benull, right?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We currently always pass a
literal(null)if nodefaultis specified. However a user can pass anullvalue if (s)he wants to; there is no check to prevent this.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW: This is not a problem, all code should be able to deal with a
nulldefault expression