Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 165 additions & 34 deletions python/pyspark/sql/functions/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -11989,8 +11989,9 @@ def create_map(

@_try_remote_functions
def map_from_arrays(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
"""Creates a new map from two arrays.

"""
Map function: Creates a new map from two arrays. This function takes two arrays of
keys and values respectively, and returns a new map column.
.. versionadded:: 2.4.0

.. versionchanged:: 3.4.0
Expand All @@ -11999,30 +12000,59 @@ def map_from_arrays(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
Parameters
----------
col1 : :class:`~pyspark.sql.Column` or str
name of column containing a set of keys. All elements should not be null
Name of column containing a set of keys. All elements should not be null.
col2 : :class:`~pyspark.sql.Column` or str
name of column containing a set of values
Name of column containing a set of values.

Returns
-------
:class:`~pyspark.sql.Column`
a column of map type.
A column of map type.

Notes
-----
The input arrays for keys and values must have the same length and all elements
in keys should not be null. If these conditions are not met, an exception will be thrown.

Examples
--------
Example 1: Basic usage of map_from_arrays

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame([([2, 5], ['a', 'b'])], ['k', 'v'])
>>> df = df.select(map_from_arrays(df.k, df.v).alias("col"))
>>> df.show()
+----------------+
| col|
+----------------+
|{2 -> a, 5 -> b}|
+----------------+
>>> df.printSchema()
root
|-- col: map (nullable = true)
| |-- key: long
| |-- value: string (valueContainsNull = true)
>>> df.select(sf.map_from_arrays(df.k, df.v)).show()
+---------------------+
|map_from_arrays(k, v)|
+---------------------+
| {2 -> a, 5 -> b}|
+---------------------+

Example 2: map_from_arrays with null values

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame([([1, 2], ['a', None])], ['k', 'v'])
>>> df.select(sf.map_from_arrays(df.k, df.v)).show()
+---------------------+
|map_from_arrays(k, v)|
+---------------------+
| {1 -> a, 2 -> NULL}|
+---------------------+

Example 3: map_from_arrays with empty arrays

>>> from pyspark.sql import functions as sf
>>> from pyspark.sql.types import ArrayType, StringType, IntegerType, StructType, StructField
>>> schema = StructType([
... StructField('k', ArrayType(IntegerType())),
... StructField('v', ArrayType(StringType()))
... ])
>>> df = spark.createDataFrame([([], [])], schema=schema)
>>> df.select(sf.map_from_arrays(df.k, df.v)).show()
+---------------------+
|map_from_arrays(k, v)|
+---------------------+
| {}|
+---------------------+
"""
return _invoke_function_over_columns("map_from_arrays", col1, col2)

Expand Down Expand Up @@ -15557,8 +15587,9 @@ def map_entries(col: "ColumnOrName") -> Column:
@_try_remote_functions
def map_from_entries(col: "ColumnOrName") -> Column:
"""
Collection function: Converts an array of entries (key value struct types) to a map
of values.
Map function: Transforms an array of key-value pair entries (structs with two fields)
into a map. The first field of each entry is used as the key and the second field
as the value in the resulting map column

.. versionadded:: 2.4.0

Expand All @@ -15568,23 +15599,68 @@ def map_from_entries(col: "ColumnOrName") -> Column:
Parameters
----------
col : :class:`~pyspark.sql.Column` or str
name of column or expression
Name of column or expression

Returns
-------
:class:`~pyspark.sql.Column`
a map created from the given array of entries.
A map created from the given array of entries.

Examples
--------
>>> from pyspark.sql.functions import map_from_entries
Example 1: Basic usage of map_from_entries

>>> from pyspark.sql import functions as sf
>>> df = spark.sql("SELECT array(struct(1, 'a'), struct(2, 'b')) as data")
>>> df.select(map_from_entries("data").alias("map")).show()
+----------------+
| map|
+----------------+
|{1 -> a, 2 -> b}|
+----------------+
>>> df.select(sf.map_from_entries(df.data)).show()
+----------------------+
|map_from_entries(data)|
+----------------------+
| {1 -> a, 2 -> b}|
+----------------------+

Example 2: map_from_entries with null values

>>> from pyspark.sql import functions as sf
>>> df = spark.sql("SELECT array(struct(1, null), struct(2, 'b')) as data")
>>> df.select(sf.map_from_entries(df.data)).show()
+----------------------+
|map_from_entries(data)|
+----------------------+
| {1 -> NULL, 2 -> b}|
+----------------------+

Example 3: map_from_entries with a DataFrame

>>> from pyspark.sql import Row, functions as sf
>>> df = spark.createDataFrame([([Row(1, "a"), Row(2, "b")],), ([Row(3, "c")],)], ['data'])
>>> df.select(sf.map_from_entries(df.data)).show()
+----------------------+
|map_from_entries(data)|
+----------------------+
| {1 -> a, 2 -> b}|
| {3 -> c}|
+----------------------+

Example 4: map_from_entries with empty array

>>> from pyspark.sql import functions as sf
>>> from pyspark.sql.types import ArrayType, StringType, IntegerType, StructType, StructField
>>> schema = StructType([
... StructField("data", ArrayType(
... StructType([
... StructField("key", IntegerType()),
... StructField("value", StringType())
... ])
... ), True)
... ])
>>> df = spark.createDataFrame([([],)], schema=schema)
>>> df.select(sf.map_from_entries(df.data)).show()
+----------------------+
|map_from_entries(data)|
+----------------------+
| {}|
+----------------------+
"""
return _invoke_function_over_columns("map_from_entries", col)

Expand Down Expand Up @@ -15754,7 +15830,8 @@ def map_concat(__cols: Union[List["ColumnOrName_"], Tuple["ColumnOrName_", ...]]
def map_concat(
*cols: Union["ColumnOrName", Union[List["ColumnOrName_"], Tuple["ColumnOrName_", ...]]]
) -> Column:
"""Returns the union of all the given maps.
"""
Map function: Returns the union of all given maps.

.. versionadded:: 2.4.0

Expand All @@ -15764,23 +15841,77 @@ def map_concat(
Parameters
----------
cols : :class:`~pyspark.sql.Column` or str
column names or :class:`~pyspark.sql.Column`\\s
Column names or :class:`~pyspark.sql.Column`

Returns
-------
:class:`~pyspark.sql.Column`
a map of merged entries from other maps.
A map of merged entries from other maps.

Notes
-----
For duplicate keys in input maps, the handling is governed by `spark.sql.mapKeyDedupPolicy`.
By default, it throws an exception. If set to `LAST_WIN`, it uses the last map's value.

Examples
--------
>>> from pyspark.sql.functions import map_concat
Example 1: Basic usage of map_concat

>>> from pyspark.sql import functions as sf
>>> df = spark.sql("SELECT map(1, 'a', 2, 'b') as map1, map(3, 'c') as map2")
>>> df.select(map_concat("map1", "map2").alias("map3")).show(truncate=False)
>>> df.select(sf.map_concat("map1", "map2")).show(truncate=False)
+------------------------+
|map3 |
|map_concat(map1, map2) |
+------------------------+
|{1 -> a, 2 -> b, 3 -> c}|
+------------------------+

Example 2: map_concat with overlapping keys

>>> from pyspark.sql import functions as sf
>>> originalmapKeyDedupPolicy = spark.conf.get("spark.sql.mapKeyDedupPolicy")
>>> spark.conf.set("spark.sql.mapKeyDedupPolicy", "LAST_WIN")
>>> df = spark.sql("SELECT map(1, 'a', 2, 'b') as map1, map(2, 'c', 3, 'd') as map2")
>>> df.select(sf.map_concat("map1", "map2")).show(truncate=False)
+------------------------+
|map_concat(map1, map2) |
+------------------------+
|{1 -> a, 2 -> c, 3 -> d}|
+------------------------+
>>> spark.conf.set("spark.sql.mapKeyDedupPolicy", originalmapKeyDedupPolicy)

Example 3: map_concat with three maps

>>> from pyspark.sql import functions as sf
>>> df = spark.sql("SELECT map(1, 'a') as map1, map(2, 'b') as map2, map(3, 'c') as map3")
>>> df.select(sf.map_concat("map1", "map2", "map3")).show(truncate=False)
+----------------------------+
|map_concat(map1, map2, map3)|
+----------------------------+
|{1 -> a, 2 -> b, 3 -> c} |
+----------------------------+

Example 4: map_concat with empty map

>>> from pyspark.sql import functions as sf
>>> df = spark.sql("SELECT map(1, 'a', 2, 'b') as map1, map() as map2")
>>> df.select(sf.map_concat("map1", "map2")).show(truncate=False)
+----------------------+
|map_concat(map1, map2)|
+----------------------+
|{1 -> a, 2 -> b} |
+----------------------+

Example 5: map_concat with null values

>>> from pyspark.sql import functions as sf
>>> df = spark.sql("SELECT map(1, 'a', 2, 'b') as map1, map(3, null) as map2")
>>> df.select(sf.map_concat("map1", "map2")).show(truncate=False)
+---------------------------+
|map_concat(map1, map2) |
+---------------------------+
|{1 -> a, 2 -> b, 3 -> NULL}|
+---------------------------+
"""
if len(cols) == 1 and isinstance(cols[0], (list, set)):
cols = cols[0] # type: ignore[assignment]
Expand Down