Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.types.{DataType, LongType}
*/
@ExpressionDescription(
usage = """
_FUNC_() - Returns monotonically increasing 64-bit integers. The generated ID is guaranteed
_FUNC_() - Returns monotonically increasing 64-bit integers. The generated IDs are guaranteed
to be monotonically increasing and unique, but not consecutive. The current implementation
puts the partition ID in the upper 31 bits, and the lower 33 bits represent the record number
within each partition. The assumption is that the data frame has less than 1 billion
Expand Down Expand Up @@ -80,7 +80,5 @@ case class MonotonicallyIncreasingID() extends LeafExpression with Stateful {

override def prettyName: String = "monotonically_increasing_id"

override def sql: String = s"$prettyName()"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the default and no need for the override, isn't it?


override def freshCopy(): MonotonicallyIncreasingID = MonotonicallyIncreasingID()
}
42 changes: 37 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1150,16 +1150,48 @@ object functions {
/**
* A column expression that generates monotonically increasing 64-bit integers.
*
* The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.
* The generated IDs are guaranteed to be monotonically increasing and unique, but not
* consecutive (unless all rows are in the same single partition which you rarely want due to
* the volume of the data).
* The current implementation puts the partition ID in the upper 31 bits, and the record number
* within each partition in the lower 33 bits. The assumption is that the data frame has
* less than 1 billion partitions, and each partition has less than 8 billion records.
*
* As an example, consider a `DataFrame` with two partitions, each with 3 records.
* This expression would return the following IDs:
*
* {{{
* 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.
* // Create a dataset with four partitions, each with two rows.
* val q = spark.range(start = 0, end = 8, step = 1, numPartitions = 4)
*
* // Make sure that every partition has the same number of rows
* q.mapPartitions(rows => Iterator(rows.size)).foreachPartition(rows => assert(rows.next == 2))
* q.select(monotonically_increasing_id).show
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eh @jaceklaskowski, wouldn't this one be enough as an example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about explaining the "internals" of the operator through a more involved example and actually thought about removing the line 1166 (but forgot). I think the following lines make for a very in-depth explanation and use other operators in use.

In other words, I'm in favour of removing the line 1166 and leaving the others with no changes. Possible?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you're exploring the internals but .. to be honest I was wondering if users are usually interested in such in-deep explanation since I guess most of them wouldn't care about the details.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO It' enough to add that rows are consecutive in each partition, but not between partitions and that values are shifted left by 33 - written in words, not code, will be much shorter and concise

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally would simplify the example to not focus on the particular shift; yeah that behavior ought not change but it's not really something a caller would ever rely on. And I think you don't need to make a new variable to subtract 1 from row number, etc. Something simply showing the two properties -- increasing within partition, not between partitions -- is enough.

*
* // Assign consecutive IDs for rows per partition
* import org.apache.spark.sql.expressions.Window
* // count is the name of the internal registry of MonotonicallyIncreasingID to count rows
* // Could also be "id" since it is unique and consecutive in a partition
* import org.apache.spark.sql.functions.{row_number, shiftLeft, spark_partition_id}
* val rowNumber = row_number over Window.partitionBy(spark_partition_id).orderBy("id")
* // row_number is a sequential number starting at 1 within a window partition
* val count = rowNumber - 1 as "count"
* val partitionMask = shiftLeft(spark_partition_id cast "long", 33) as "partitionMask"
* val demo = q.select(
* $"id",
* partitionMask,
* count,
* monotonically_increasing_id)
* scala> demo.orderBy("id").show
* +---+-------------+-----+-----------------------------+
* | id|partitionMask|count|monotonically_increasing_id()|
* +---+-------------+-----+-----------------------------+
* | 0| 0| 0| 0|
* | 1| 0| 1| 1|
* | 2| 8589934592| 0| 8589934592|
* | 3| 8589934592| 1| 8589934593|
* | 4| 17179869184| 0| 17179869184|
* | 5| 17179869184| 1| 17179869185|
* | 6| 25769803776| 0| 25769803776|
* | 7| 25769803776| 1| 25769803777|
* +---+-------------+-----+-----------------------------+
* }}}
*
* @group normal_funcs
Expand Down