Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 142 additions & 30 deletions python/pyspark/sql/functions/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -17160,7 +17160,8 @@ def transform_values(col: "ColumnOrName", f: Callable[[Column, Column], Column])
@_try_remote_functions
def map_filter(col: "ColumnOrName", f: Callable[[Column, Column], Column]) -> Column:
"""
Returns a map whose key-value pairs satisfy a predicate.
Collection function: Returns a new map column whose key-value pairs satisfy a given
predicate function.

.. versionadded:: 3.1.0

Expand All @@ -17170,9 +17171,10 @@ def map_filter(col: "ColumnOrName", f: Callable[[Column, Column], Column]) -> Co
Parameters
----------
col : :class:`~pyspark.sql.Column` or str
name of column or expression
The name of the column or a column expression representing the map to be filtered.
f : function
a binary function ``(k: Column, v: Column) -> Column...``
A binary function ``(k: Column, v: Column) -> Column...`` that defines the predicate.
This function should return a boolean column that will be used to filter the input map.
Can use methods of :class:`~pyspark.sql.Column`, functions defined in
:py:mod:`pyspark.sql.functions` and Scala ``UserDefinedFunctions``.
Python ``UserDefinedFunctions`` are not supported
Expand All @@ -17181,16 +17183,39 @@ def map_filter(col: "ColumnOrName", f: Callable[[Column, Column], Column]) -> Co
Returns
-------
:class:`~pyspark.sql.Column`
filtered map.
A new map column containing only the key-value pairs that satisfy the predicate.

Examples
--------
Example 1: Filtering a map with a simple condition

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame([(1, {"foo": 42.0, "bar": 1.0, "baz": 32.0})], ("id", "data"))
>>> row = df.select(map_filter(
... "data", lambda _, v: v > 30.0).alias("data_filtered")
>>> row = df.select(
... sf.map_filter("data", lambda _, v: v > 30.0).alias("data_filtered")
... ).head()
>>> sorted(row["data_filtered"].items())
[('baz', 32.0), ('foo', 42.0)]

Example 2: Filtering a map with a condition on keys

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame([(1, {"foo": 42.0, "bar": 1.0, "baz": 32.0})], ("id", "data"))
>>> row = df.select(
... sf.map_filter("data", lambda k, _: k.startswith("b")).alias("data_filtered")
... ).head()
>>> sorted(row["data_filtered"].items())
[('bar', 1.0), ('baz', 32.0)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not use the 'show' because the result data sorting is not stable


Example 3: Filtering a map with a complex condition

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame([(1, {"foo": 42.0, "bar": 1.0, "baz": 32.0})], ("id", "data"))
>>> row = df.select(
... sf.map_filter("data", lambda k, v: k.startswith("b") & (v > 1.0)).alias("data_filtered")
... ).head()
>>> sorted(row["data_filtered"].items())
[('baz', 32.0)]
"""
return _invoke_higher_order_function("MapFilter", [col], [f])

Expand All @@ -17202,7 +17227,8 @@ def map_zip_with(
f: Callable[[Column, Column, Column], Column],
) -> Column:
"""
Merge two given maps, key-wise into a single map using a function.
Collection: Merges two given maps into a single map by applying a function to
the key-value pairs.

.. versionadded:: 3.1.0

Expand All @@ -17212,11 +17238,13 @@ def map_zip_with(
Parameters
----------
col1 : :class:`~pyspark.sql.Column` or str
name of the first column or expression
The name of the first column or a column expression representing the first map.
col2 : :class:`~pyspark.sql.Column` or str
name of the second column or expression
The name of the second column or a column expression representing the second map.
f : function
a ternary function ``(k: Column, v1: Column, v2: Column) -> Column...``
A ternary function ``(k: Column, v1: Column, v2: Column) -> Column...`` that defines
how to merge the values from the two maps. This function should return a column that
will be used as the value in the resulting map.
Can use methods of :class:`~pyspark.sql.Column`, functions defined in
:py:mod:`pyspark.sql.functions` and Scala ``UserDefinedFunctions``.
Python ``UserDefinedFunctions`` are not supported
Expand All @@ -17225,20 +17253,50 @@ def map_zip_with(
Returns
-------
:class:`~pyspark.sql.Column`
zipped map where entries are calculated by applying given function to each
pair of arguments.
A new map column where each key-value pair is the result of applying the function to
the corresponding key-value pairs in the input maps.

Examples
--------
Example 1: Merging two maps with a simple function

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame([
... (1, {"IT": 24.0, "SALES": 12.00}, {"IT": 2.0, "SALES": 1.4})],
... ("id", "base", "ratio")
... )
>>> row = df.select(map_zip_with(
... "base", "ratio", lambda k, v1, v2: round(v1 * v2, 2)).alias("updated_data")
... (1, {"A": 1, "B": 2}, {"A": 3, "B": 4})],
... ("id", "map1", "map2"))
>>> row = df.select(
... sf.map_zip_with("map1", "map2", lambda _, v1, v2: v1 + v2).alias("updated_data")
... ).head()
>>> sorted(row["updated_data"].items())
[('A', 4), ('B', 6)]

Example 2: Merging two maps with a complex function

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame([
... (1, {"A": 1, "B": 2}, {"A": 3, "B": 4})],
... ("id", "map1", "map2"))
>>> row = df.select(
... sf.map_zip_with("map1", "map2",
... lambda k, v1, v2: sf.when(k == "A", v1 + v2).otherwise(v1 - v2)
... ).alias("updated_data")
... ).head()
>>> sorted(row["updated_data"].items())
[('A', 4), ('B', -2)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not use the 'show' because the result data sorting is not stable


Example 3: Merging two maps with mismatched keys

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame([
... (1, {"A": 1, "B": 2}, {"B": 3, "C": 4})],
... ("id", "map1", "map2"))
>>> row = df.select(
... sf.map_zip_with("map1", "map2",
... lambda _, v1, v2: sf.when(v2.isNull(), v1).otherwise(v1 + v2)
... ).alias("updated_data")
... ).head()
>>> sorted(row["updated_data"].items())
[('IT', 48.0), ('SALES', 16.8)]
[('A', 1), ('B', 5), ('C', None)]
"""
return _invoke_higher_order_function("MapZipWith", [col1, col2], [f])

Expand All @@ -17250,8 +17308,8 @@ def str_to_map(
keyValueDelim: Optional["ColumnOrName"] = None,
) -> Column:
"""
Creates a map after splitting the text into key/value pairs using delimiters.
Both `pairDelim` and `keyValueDelim` are treated as regular expressions.
Map function: Converts a string into a map after splitting the text into key/value pairs
using delimiters. Both `pairDelim` and `keyValueDelim` are treated as regular expressions.

.. versionadded:: 3.5.0

Expand All @@ -17260,23 +17318,77 @@ def str_to_map(
text : :class:`~pyspark.sql.Column` or str
Input column or strings.
pairDelim : :class:`~pyspark.sql.Column` or str, optional
delimiter to use to split pair.
Delimiter to use to split pairs. Default is comma (,).
keyValueDelim : :class:`~pyspark.sql.Column` or str, optional
delimiter to use to split key/value.
Delimiter to use to split key/value. Default is colon (:).

Returns
-------
:class:`~pyspark.sql.Column`
A new column of map type where each string in the original column is converted into a map.

Examples
--------
>>> df = spark.createDataFrame([("a:1,b:2,c:3",)], ["e"])
>>> df.select(str_to_map(df.e, lit(","), lit(":")).alias('r')).collect()
[Row(r={'a': '1', 'b': '2', 'c': '3'})]
Example 1: Using default delimiters

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame([("a:1,b:2,c:3",)], ["e"])
>>> df.select(str_to_map(df.e, lit(",")).alias('r')).collect()
[Row(r={'a': '1', 'b': '2', 'c': '3'})]
>>> df.select(sf.str_to_map(df.e)).show(truncate=False)
+------------------------+
|str_to_map(e, ,, :) |
+------------------------+
|{a -> 1, b -> 2, c -> 3}|
+------------------------+

>>> df = spark.createDataFrame([("a:1,b:2,c:3",)], ["e"])
>>> df.select(str_to_map(df.e).alias('r')).collect()
[Row(r={'a': '1', 'b': '2', 'c': '3'})]
Example 2: Using custom delimiters

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame([("a=1;b=2;c=3",)], ["e"])
>>> df.select(sf.str_to_map(df.e, sf.lit(";"), sf.lit("="))).show(truncate=False)
+------------------------+
|str_to_map(e, ;, =) |
+------------------------+
|{a -> 1, b -> 2, c -> 3}|
+------------------------+

Example 3: Using different delimiters for different rows

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame([("a:1,b:2,c:3",), ("d=4;e=5;f=6",)], ["e"])
>>> df.select(sf.str_to_map(df.e,
... sf.when(df.e.contains(";"), sf.lit(";")).otherwise(sf.lit(",")),
... sf.when(df.e.contains("="), sf.lit("=")).otherwise(sf.lit(":"))).alias("str_to_map")
... ).show(truncate=False)
+------------------------+
|str_to_map |
+------------------------+
|{a -> 1, b -> 2, c -> 3}|
|{d -> 4, e -> 5, f -> 6}|
+------------------------+

Example 4: Using a column of delimiters

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame([("a:1,b:2,c:3", ","), ("d=4;e=5;f=6", ";")], ["e", "delim"])
>>> df.select(sf.str_to_map(df.e, df.delim, sf.lit(":"))).show(truncate=False)
+---------------------------------------+
|str_to_map(e, delim, :) |
+---------------------------------------+
|{a -> 1, b -> 2, c -> 3} |
|{d=4 -> NULL, e=5 -> NULL, f=6 -> NULL}|
+---------------------------------------+

Example 5: Using a column of key/value delimiters

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame([("a:1,b:2,c:3", ":"), ("d=4;e=5;f=6", "=")], ["e", "delim"])
>>> df.select(sf.str_to_map(df.e, sf.lit(","), df.delim)).show(truncate=False)
+------------------------+
|str_to_map(e, ,, delim) |
+------------------------+
|{a -> 1, b -> 2, c -> 3}|
|{d -> 4;e=5;f=6} |
+------------------------+
"""
if pairDelim is None:
pairDelim = lit(",")
Expand Down