Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ jobs:
SKIP_PACKAGING: true
METASPACE_SIZE: 1g
BRANCH: ${{ inputs.branch }}
SPARK_ANSI_SQL_MODE: true
steps:
- name: Checkout Spark repository
uses: actions/checkout@v4
Expand Down
171 changes: 134 additions & 37 deletions python/pyspark/sql/functions/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -15224,32 +15224,73 @@ def array_sort(
@_try_remote_functions
def shuffle(col: "ColumnOrName") -> Column:
"""
Collection function: Generates a random permutation of the given array.
Array function: Generates a random permutation of the given array.

.. versionadded:: 2.4.0

.. versionchanged:: 3.4.0
Supports Spark Connect.

Notes
-----
The function is non-deterministic.

Parameters
----------
col : :class:`~pyspark.sql.Column` or str
name of column or expression
The name of the column or expression to be shuffled.

Returns
-------
:class:`~pyspark.sql.Column`
an array of elements in random order.
A new column that contains an array of elements in random order.

Notes
-----
The `shuffle` function is non-deterministic, meaning the order of the output array
can be different for each execution.

Examples
--------
>>> df = spark.createDataFrame([([1, 20, 3, 5],), ([1, 20, None, 3],)], ['data'])
>>> df.select(shuffle(df.data).alias('s')).collect() # doctest: +SKIP
[Row(s=[3, 1, 5, 20]), Row(s=[20, None, 3, 1])]
Example 1: Shuffling a simple array

>>> import pyspark.sql.functions as sf
>>> df = spark.createDataFrame([([1, 20, 3, 5],)], ['data'])
>>> df.select(sf.shuffle(df.data)).show() # doctest: +SKIP
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the execution result of shuffle is non-deterministic, only examples are shown here without actual testing. Also, I personally think that the result of shuffle should not be sorted again. If there are any other opinions, please let me know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable to me

+-------------+
|shuffle(data)|
+-------------+
|[1, 3, 20, 5]|
+-------------+

Example 2: Shuffling an array with null values

>>> import pyspark.sql.functions as sf
>>> df = spark.createDataFrame([([1, 20, None, 3],)], ['data'])
>>> df.select(sf.shuffle(df.data)).show() # doctest: +SKIP
+----------------+
| shuffle(data)|
+----------------+
|[20, 3, NULL, 1]|
+----------------+

Example 3: Shuffling an array with duplicate values

>>> import pyspark.sql.functions as sf
>>> df = spark.createDataFrame([([1, 2, 2, 3, 3, 3],)], ['data'])
>>> df.select(sf.shuffle(df.data)).show() # doctest: +SKIP
+------------------+
| shuffle(data)|
+------------------+
|[3, 2, 1, 3, 2, 3]|
+------------------+

Example 4: Shuffling an array with different types of elements

>>> import pyspark.sql.functions as sf
>>> df = spark.createDataFrame([(['a', 'b', 'c', 1, 2, 3],)], ['data'])
>>> df.select(sf.shuffle(df.data)).show() # doctest: +SKIP
+------------------+
| shuffle(data)|
+------------------+
|[1, c, 2, a, b, 3]|
+------------------+
"""
return _invoke_function_over_columns("shuffle", col)

Expand Down Expand Up @@ -15289,7 +15330,7 @@ def reverse(col: "ColumnOrName") -> Column:
@_try_remote_functions
def flatten(col: "ColumnOrName") -> Column:
"""
Collection function: creates a single array from an array of arrays.
Array function: creates a single array from an array of arrays.
If a structure of nested arrays is deeper than two levels,
only one level of nesting is removed.

Expand All @@ -15301,29 +15342,57 @@ def flatten(col: "ColumnOrName") -> Column:
Parameters
----------
col : :class:`~pyspark.sql.Column` or str
name of column or expression
The name of the column or expression to be flattened.

Returns
-------
:class:`~pyspark.sql.Column`
flattened array.
A new column that contains the flattened array.

Examples
--------
>>> df = spark.createDataFrame([([[1, 2, 3], [4, 5], [6]],), ([None, [4, 5]],)], ['data'])
>>> df.show(truncate=False)
+------------------------+
|data |
+------------------------+
|[[1, 2, 3], [4, 5], [6]]|
|[NULL, [4, 5]] |
+------------------------+
>>> df.select(flatten(df.data).alias('r')).show()
Example 1: Flattening a simple nested array

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame([([[1, 2, 3], [4, 5], [6]],)], ['data'])
>>> df.select(sf.flatten(df.data)).show()
+------------------+
| r|
| flatten(data)|
+------------------+
|[1, 2, 3, 4, 5, 6]|
| NULL|
+------------------+

Example 2: Flattening an array with null values

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame([([None, [4, 5]],)], ['data'])
>>> df.select(sf.flatten(df.data)).show()
+-------------+
|flatten(data)|
+-------------+
| NULL|
+-------------+

Example 3: Flattening an array with more than two levels of nesting

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame([([[[1, 2], [3, 4]], [[5, 6], [7, 8]]],)], ['data'])
>>> df.select(sf.flatten(df.data)).show(truncate=False)
+--------------------------------+
|flatten(data) |
+--------------------------------+
|[[1, 2], [3, 4], [5, 6], [7, 8]]|
+--------------------------------+

Example 4: Flattening an array with mixed types

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame([([['a', 'b', 'c'], [1, 2, 3]],)], ['data'])
>>> df.select(sf.flatten(df.data)).show()
+------------------+
| flatten(data)|
+------------------+
|[a, b, c, 1, 2, 3]|
+------------------+
"""
return _invoke_function_over_columns("flatten", col)
Expand Down Expand Up @@ -15718,9 +15787,9 @@ def sequence(
start: "ColumnOrName", stop: "ColumnOrName", step: Optional["ColumnOrName"] = None
) -> Column:
"""
Generate a sequence of integers from `start` to `stop`, incrementing by `step`.
If `step` is not set, incrementing by 1 if `start` is less than or equal to `stop`,
otherwise -1.
Array function: Generate a sequence of integers from `start` to `stop`, incrementing by `step`.
If `step` is not set, the function increments by 1 if `start` is less than or equal to `stop`,
otherwise it decrements by 1.

.. versionadded:: 2.4.0

Expand All @@ -15730,25 +15799,53 @@ def sequence(
Parameters
----------
start : :class:`~pyspark.sql.Column` or str
starting value (inclusive)
The starting value (inclusive) of the sequence.
stop : :class:`~pyspark.sql.Column` or str
last values (inclusive)
The last value (inclusive) of the sequence.
step : :class:`~pyspark.sql.Column` or str, optional
value to add to current to get next element (default is 1)
The value to add to the current element to get the next element in the sequence.
The default is 1 if `start` is less than or equal to `stop`, otherwise -1.

Returns
-------
:class:`~pyspark.sql.Column`
an array of sequence values
A new column that contains an array of sequence values.

Examples
--------
>>> df1 = spark.createDataFrame([(-2, 2)], ('C1', 'C2'))
>>> df1.select(sequence('C1', 'C2').alias('r')).collect()
[Row(r=[-2, -1, 0, 1, 2])]
>>> df2 = spark.createDataFrame([(4, -4, -2)], ('C1', 'C2', 'C3'))
>>> df2.select(sequence('C1', 'C2', 'C3').alias('r')).collect()
[Row(r=[4, 2, 0, -2, -4])]
Example 1: Generating a sequence with default step

>>> import pyspark.sql.functions as sf
>>> df = spark.createDataFrame([(-2, 2)], ['start', 'stop'])
>>> df.select(sf.sequence(df.start, df.stop)).show()
+---------------------+
|sequence(start, stop)|
+---------------------+
| [-2, -1, 0, 1, 2]|
+---------------------+

Example 2: Generating a sequence with a custom step

>>> import pyspark.sql.functions as sf
>>> df = spark.createDataFrame([(4, -4, -2)], ['start', 'stop', 'step'])
>>> df.select(sf.sequence(df.start, df.stop, df.step)).show()
+---------------------------+
|sequence(start, stop, step)|
+---------------------------+
| [4, 2, 0, -2, -4]|
+---------------------------+


Example 3: Generating a sequence with a negative step

>>> import pyspark.sql.functions as sf
>>> df = spark.createDataFrame([(5, 1, -1)], ['start', 'stop', 'step'])
>>> df.select(sf.sequence(df.start, df.stop, df.step)).show()
+---------------------------+
|sequence(start, stop, step)|
+---------------------------+
| [5, 4, 3, 2, 1]|
+---------------------------+
"""
if step is None:
return _invoke_function_over_columns("sequence", start, stop)
Expand Down