diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala index f1da592a7684..090ea205da66 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.types.{DataType, LongType} */ @ExpressionDescription( usage = """ - _FUNC_() - Returns monotonically increasing 64-bit integers. The generated ID is guaranteed + _FUNC_() - Returns monotonically increasing 64-bit integers. The generated IDs are guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the lower 33 bits represent the record number within each partition. The assumption is that the data frame has less than 1 billion @@ -80,7 +80,5 @@ case class MonotonicallyIncreasingID() extends LeafExpression with Stateful { override def prettyName: String = "monotonically_increasing_id" - override def sql: String = s"$prettyName()" - override def freshCopy(): MonotonicallyIncreasingID = MonotonicallyIncreasingID() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index de1d422856ba..bd161a8daa7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -1150,16 +1150,48 @@ object functions { /** * A column expression that generates monotonically increasing 64-bit integers. * - * The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. + * The generated IDs are guaranteed to be monotonically increasing and unique, but not + * consecutive (unless all rows are in the same single partition which you rarely want due to + * the volume of the data). * The current implementation puts the partition ID in the upper 31 bits, and the record number * within each partition in the lower 33 bits. The assumption is that the data frame has * less than 1 billion partitions, and each partition has less than 8 billion records. * - * As an example, consider a `DataFrame` with two partitions, each with 3 records. - * This expression would return the following IDs: - * * {{{ - * 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594. + * // Create a dataset with four partitions, each with two rows. + * val q = spark.range(start = 0, end = 8, step = 1, numPartitions = 4) + * + * // Make sure that every partition has the same number of rows + * q.mapPartitions(rows => Iterator(rows.size)).foreachPartition(rows => assert(rows.next == 2)) + * q.select(monotonically_increasing_id).show + * + * // Assign consecutive IDs for rows per partition + * import org.apache.spark.sql.expressions.Window + * // count is the name of the internal registry of MonotonicallyIncreasingID to count rows + * // Could also be "id" since it is unique and consecutive in a partition + * import org.apache.spark.sql.functions.{row_number, shiftLeft, spark_partition_id} + * val rowNumber = row_number over Window.partitionBy(spark_partition_id).orderBy("id") + * // row_number is a sequential number starting at 1 within a window partition + * val count = rowNumber - 1 as "count" + * val partitionMask = shiftLeft(spark_partition_id cast "long", 33) as "partitionMask" + * val demo = q.select( + * $"id", + * partitionMask, + * count, + * monotonically_increasing_id) + * scala> demo.orderBy("id").show + * +---+-------------+-----+-----------------------------+ + * | id|partitionMask|count|monotonically_increasing_id()| + * +---+-------------+-----+-----------------------------+ + * | 0| 0| 0| 0| + * | 1| 0| 1| 1| + * | 2| 8589934592| 0| 8589934592| + * | 3| 8589934592| 1| 8589934593| + * | 4| 17179869184| 0| 17179869184| + * | 5| 17179869184| 1| 17179869185| + * | 6| 25769803776| 0| 25769803776| + * | 7| 25769803776| 1| 25769803777| + * +---+-------------+-----+-----------------------------+ * }}} * * @group normal_funcs