@@ -1150,16 +1150,48 @@ object functions {
11501150 /**
11511151 * A column expression that generates monotonically increasing 64-bit integers.
11521152 *
1153- * The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.
1153+ * The generated IDs are guaranteed to be monotonically increasing and unique, but not
1154+ * consecutive (unless all rows are in the same single partition which you rarely want due to
1155+ * the volume of the data).
11541156 * The current implementation puts the partition ID in the upper 31 bits, and the record number
11551157 * within each partition in the lower 33 bits. The assumption is that the data frame has
11561158 * less than 1 billion partitions, and each partition has less than 8 billion records.
11571159 *
1158- * As an example, consider a `DataFrame` with two partitions, each with 3 records.
1159- * This expression would return the following IDs:
1160- *
11611160 * {{{
1162- * 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.
1161+ * // Create a dataset with four partitions, each with two rows.
1162+ * val q = spark.range(start = 0, end = 8, step = 1, numPartitions = 4)
1163+ *
1164+ * // Make sure that every partition has the same number of rows
1165+ * q.mapPartitions(rows => Iterator(rows.size)).foreachPartition(rows => assert(rows.next == 2))
1166+ * q.select(monotonically_increasing_id).show
1167+ *
1168+ * // Assign consecutive IDs for rows per partition
1169+ * import org.apache.spark.sql.expressions.Window
1170+ * // count is the name of the internal registry of MonotonicallyIncreasingID to count rows
1171+ * // Could also be "id" since it is unique and consecutive in a partition
1172+ * import org.apache.spark.sql.functions.{row_number, shiftLeft, spark_partition_id}
1173+ * val rowNumber = row_number over Window.partitionBy(spark_partition_id).orderBy("id")
1174+ * // row_number is a sequential number starting at 1 within a window partition
1175+ * val count = rowNumber - 1 as "count"
1176+ * val partitionMask = shiftLeft(spark_partition_id cast "long", 33) as "partitionMask"
1177+ * val demo = q.select(
1178+ * $"id",
1179+ * partitionMask,
1180+ * count,
1181+ * monotonically_increasing_id)
1182+ * scala> demo.orderBy("id").show
1183+ * +---+-------------+-----+-----------------------------+
1184+ * | id|partitionMask|count|monotonically_increasing_id()|
1185+ * +---+-------------+-----+-----------------------------+
1186+ * | 0| 0| 0| 0|
1187+ * | 1| 0| 1| 1|
1188+ * | 2| 8589934592| 0| 8589934592|
1189+ * | 3| 8589934592| 1| 8589934593|
1190+ * | 4| 17179869184| 0| 17179869184|
1191+ * | 5| 17179869184| 1| 17179869185|
1192+ * | 6| 25769803776| 0| 25769803776|
1193+ * | 7| 25769803776| 1| 25769803777|
1194+ * +---+-------------+-----+-----------------------------+
11631195 * }}}
11641196 *
11651197 * @group normal_funcs
0 commit comments