From 25ba3693b746dd10ea8984cfde568fa0c572c222 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 15 Jan 2024 16:32:02 +0800 Subject: [PATCH 1/6] str_to_map --- python/pyspark/sql/functions/builtin.py | 78 +++++++++++++++++++++---- 1 file changed, 66 insertions(+), 12 deletions(-) diff --git a/python/pyspark/sql/functions/builtin.py b/python/pyspark/sql/functions/builtin.py index f1422d17b071..0faf9aac5c09 100644 --- a/python/pyspark/sql/functions/builtin.py +++ b/python/pyspark/sql/functions/builtin.py @@ -17250,8 +17250,8 @@ def str_to_map( keyValueDelim: Optional["ColumnOrName"] = None, ) -> Column: """ - Creates a map after splitting the text into key/value pairs using delimiters. - Both `pairDelim` and `keyValueDelim` are treated as regular expressions. + Map function: Converts a string into a map after splitting the text into key/value pairs + using delimiters. Both `pairDelim` and `keyValueDelim` are treated as regular expressions. .. versionadded:: 3.5.0 @@ -17260,23 +17260,77 @@ def str_to_map( text : :class:`~pyspark.sql.Column` or str Input column or strings. pairDelim : :class:`~pyspark.sql.Column` or str, optional - delimiter to use to split pair. + Delimiter to use to split pairs. Default is comma (,). keyValueDelim : :class:`~pyspark.sql.Column` or str, optional - delimiter to use to split key/value. + Delimiter to use to split key/value. Default is colon (:). + + Returns + ------- + :class:`~pyspark.sql.Column` + A new column of map type where each string in the original column is converted into a map. Examples -------- - >>> df = spark.createDataFrame([("a:1,b:2,c:3",)], ["e"]) - >>> df.select(str_to_map(df.e, lit(","), lit(":")).alias('r')).collect() - [Row(r={'a': '1', 'b': '2', 'c': '3'})] + Example 1: Using default delimiters + >>> from pyspark.sql import functions as sf >>> df = spark.createDataFrame([("a:1,b:2,c:3",)], ["e"]) - >>> df.select(str_to_map(df.e, lit(",")).alias('r')).collect() - [Row(r={'a': '1', 'b': '2', 'c': '3'})] + >>> df.select(sf.str_to_map(df.e)).show(truncate=False) + +------------------------+ + |str_to_map(e, ,, :) | + +------------------------+ + |{a -> 1, b -> 2, c -> 3}| + +------------------------+ - >>> df = spark.createDataFrame([("a:1,b:2,c:3",)], ["e"]) - >>> df.select(str_to_map(df.e).alias('r')).collect() - [Row(r={'a': '1', 'b': '2', 'c': '3'})] + Example 2: Using custom delimiters + + >>> from pyspark.sql import functions as sf + >>> df = spark.createDataFrame([("a=1;b=2;c=3",)], ["e"]) + >>> df.select(sf.str_to_map(df.e, sf.lit(";"), sf.lit("="))).show(truncate=False) + +------------------------+ + |str_to_map(e, ;, =) | + +------------------------+ + |{a -> 1, b -> 2, c -> 3}| + +------------------------+ + + Example 3: Using different delimiters for different rows + + >>> from pyspark.sql import functions as sf + >>> df = spark.createDataFrame([("a:1,b:2,c:3",), ("d=4;e=5;f=6",)], ["e"]) + >>> df.select(sf.str_to_map(df.e, + ... sf.when(df.e.contains(";"), sf.lit(";")).otherwise(sf.lit(",")), + ... sf.when(df.e.contains("="), sf.lit("=")).otherwise(sf.lit(":"))).alias("str_to_map") + ... ).show(truncate=False) + +------------------------+ + |str_to_map | + +------------------------+ + |{a -> 1, b -> 2, c -> 3}| + |{d -> 4, e -> 5, f -> 6}| + +------------------------+ + + Example 4: Using a column of delimiters + + >>> from pyspark.sql import functions as sf + >>> df = spark.createDataFrame([("a:1,b:2,c:3", ","), ("d=4;e=5;f=6", ";")], ["e", "delim"]) + >>> df.select(sf.str_to_map(df.e, df.delim, sf.lit(":"))).show(truncate=False) + +---------------------------------------+ + |str_to_map(e, delim, :) | + +---------------------------------------+ + |{a -> 1, b -> 2, c -> 3} | + |{d=4 -> NULL, e=5 -> NULL, f=6 -> NULL}| + +---------------------------------------+ + + Example 5: Using a column of key/value delimiters + + >>> from pyspark.sql import functions as sf + >>> df = spark.createDataFrame([("a:1,b:2,c:3", ":"), ("d=4;e=5;f=6", "=")], ["e", "delim"]) + >>> df.select(sf.str_to_map(df.e, sf.lit(","), df.delim)).show(truncate=False) + +------------------------+ + |str_to_map(e, ,, delim) | + +------------------------+ + |{a -> 1, b -> 2, c -> 3}| + |{d -> 4;e=5;f=6} | + +------------------------+ """ if pairDelim is None: pairDelim = lit(",") From c966bba1f95a68d021cbd134fcd0ef66f05b8384 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 15 Jan 2024 19:31:52 +0800 Subject: [PATCH 2/6] map_zip_with and map_filter --- python/pyspark/sql/functions/builtin.py | 122 +++++++++++++++++++----- 1 file changed, 99 insertions(+), 23 deletions(-) diff --git a/python/pyspark/sql/functions/builtin.py b/python/pyspark/sql/functions/builtin.py index 0faf9aac5c09..909c049b42dc 100644 --- a/python/pyspark/sql/functions/builtin.py +++ b/python/pyspark/sql/functions/builtin.py @@ -17160,7 +17160,8 @@ def transform_values(col: "ColumnOrName", f: Callable[[Column, Column], Column]) @_try_remote_functions def map_filter(col: "ColumnOrName", f: Callable[[Column, Column], Column]) -> Column: """ - Returns a map whose key-value pairs satisfy a predicate. + Collection function: Returns a new map column whose key-value pairs satisfy a given + predicate function. .. versionadded:: 3.1.0 @@ -17170,9 +17171,10 @@ def map_filter(col: "ColumnOrName", f: Callable[[Column, Column], Column]) -> Co Parameters ---------- col : :class:`~pyspark.sql.Column` or str - name of column or expression + The name of the column or a column expression representing the map to be filtered. f : function - a binary function ``(k: Column, v: Column) -> Column...`` + A binary function ``(k: Column, v: Column) -> Column...`` that defines the predicate. + This function should return a boolean column that will be used to filter the input map. Can use methods of :class:`~pyspark.sql.Column`, functions defined in :py:mod:`pyspark.sql.functions` and Scala ``UserDefinedFunctions``. Python ``UserDefinedFunctions`` are not supported @@ -17181,16 +17183,48 @@ def map_filter(col: "ColumnOrName", f: Callable[[Column, Column], Column]) -> Co Returns ------- :class:`~pyspark.sql.Column` - filtered map. + A new map column containing only the key-value pairs that satisfy the predicate. Examples -------- + Example 1: Filtering a map with a simple condition + + >>> from pyspark.sql import functions as sf >>> df = spark.createDataFrame([(1, {"foo": 42.0, "bar": 1.0, "baz": 32.0})], ("id", "data")) - >>> row = df.select(map_filter( - ... "data", lambda _, v: v > 30.0).alias("data_filtered") - ... ).head() - >>> sorted(row["data_filtered"].items()) - [('baz', 32.0), ('foo', 42.0)] + >>> df.select( + ... sf.map_filter("data", lambda k, v: v > 30.0).alias("data_filtered") + ... ).show(truncate=False) + +--------------------------+ + |data_filtered | + +--------------------------+ + |{baz -> 32.0, foo -> 42.0}| + +--------------------------+ + + Example 2: Filtering a map with a condition on keys + + >>> from pyspark.sql import functions as sf + >>> df = spark.createDataFrame([(1, {"foo": 42.0, "bar": 1.0, "baz": 32.0})], ("id", "data")) + >>> df.select( + ... sf.map_filter("data", lambda k, v: k.startswith("b")).alias("data_filtered") + ... ).show(truncate=False) + +-------------------------+ + |data_filtered | + +-------------------------+ + |{bar -> 1.0, baz -> 32.0}| + +-------------------------+ + + Example 3: Filtering a map with a complex condition + + >>> from pyspark.sql import functions as sf + >>> df = spark.createDataFrame([(1, {"foo": 42.0, "bar": 1.0, "baz": 32.0})], ("id", "data")) + >>> df.select( + ... sf.map_filter("data", lambda k, v: k.startswith("b") & (v > 1.0)).alias("data_filtered") + ... ).show() + +-------------+ + |data_filtered| + +-------------+ + |{baz -> 32.0}| + +-------------+ """ return _invoke_higher_order_function("MapFilter", [col], [f]) @@ -17202,7 +17236,8 @@ def map_zip_with( f: Callable[[Column, Column, Column], Column], ) -> Column: """ - Merge two given maps, key-wise into a single map using a function. + Collection: Merges two given maps into a single map by applying a function to + the key-value pairs. .. versionadded:: 3.1.0 @@ -17212,11 +17247,13 @@ def map_zip_with( Parameters ---------- col1 : :class:`~pyspark.sql.Column` or str - name of the first column or expression + The name of the first column or a column expression representing the first map. col2 : :class:`~pyspark.sql.Column` or str - name of the second column or expression + The name of the second column or a column expression representing the second map. f : function - a ternary function ``(k: Column, v1: Column, v2: Column) -> Column...`` + A ternary function ``(k: Column, v1: Column, v2: Column) -> Column...`` that defines + how to merge the values from the two maps. This function should return a column that + will be used as the value in the resulting map. Can use methods of :class:`~pyspark.sql.Column`, functions defined in :py:mod:`pyspark.sql.functions` and Scala ``UserDefinedFunctions``. Python ``UserDefinedFunctions`` are not supported @@ -17225,20 +17262,59 @@ def map_zip_with( Returns ------- :class:`~pyspark.sql.Column` - zipped map where entries are calculated by applying given function to each - pair of arguments. + A new map column where each key-value pair is the result of applying the function to + the corresponding key-value pairs in the input maps. Examples -------- + Example 1: Merging two maps with a simple function + + >>> from pyspark.sql import functions as sf >>> df = spark.createDataFrame([ - ... (1, {"IT": 24.0, "SALES": 12.00}, {"IT": 2.0, "SALES": 1.4})], - ... ("id", "base", "ratio") - ... ) - >>> row = df.select(map_zip_with( - ... "base", "ratio", lambda k, v1, v2: round(v1 * v2, 2)).alias("updated_data") - ... ).head() - >>> sorted(row["updated_data"].items()) - [('IT', 48.0), ('SALES', 16.8)] + ... (1, {"A": 1, "B": 2}, {"A": 3, "B": 4})], + ... ("id", "map1", "map2")) + >>> df.select( + ... sf.map_zip_with("map1", "map2", lambda k, v1, v2: v1 + v2).alias("updated_data") + ... ).show() + +----------------+ + | updated_data| + +----------------+ + |{A -> 4, B -> 6}| + +----------------+ + + Example 2: Merging two maps with a complex function + + >>> from pyspark.sql import functions as sf + >>> df = spark.createDataFrame([ + ... (1, {"A": 1, "B": 2}, {"A": 3, "B": 4})], + ... ("id", "map1", "map2")) + >>> df.select( + ... sf.map_zip_with("map1", "map2", + ... lambda k, v1, v2: sf.when(k == "A", v1 + v2).otherwise(v1 - v2) + ... ).alias("updated_data") + ... ).show() + +-----------------+ + | updated_data| + +-----------------+ + |{A -> 4, B -> -2}| + +-----------------+ + + Example 3: Merging two maps with mismatched keys + + >>> from pyspark.sql import functions as sf + >>> df = spark.createDataFrame([ + ... (1, {"A": 1, "B": 2}, {"B": 3, "C": 4})], + ... ("id", "map1", "map2")) + >>> df.select( + ... sf.map_zip_with("map1", "map2", + ... lambda k, v1, v2: sf.when(v2.isNull(), v1).otherwise(v1 + v2) + ... ).alias("updated_data") + ... ).show(truncate=False) + +---------------------------+ + |updated_data | + +---------------------------+ + |{A -> 1, B -> 5, C -> NULL}| + +---------------------------+ """ return _invoke_higher_order_function("MapZipWith", [col1, col2], [f]) From 2dc48b46ac9cd1b4a5e46d3e8bb5a584610497f5 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 16 Jan 2024 11:50:17 +0800 Subject: [PATCH 3/6] sort result --- python/pyspark/sql/functions/builtin.py | 66 +++++++++---------------- 1 file changed, 24 insertions(+), 42 deletions(-) diff --git a/python/pyspark/sql/functions/builtin.py b/python/pyspark/sql/functions/builtin.py index 909c049b42dc..2e29ec5d8222 100644 --- a/python/pyspark/sql/functions/builtin.py +++ b/python/pyspark/sql/functions/builtin.py @@ -17191,40 +17191,31 @@ def map_filter(col: "ColumnOrName", f: Callable[[Column, Column], Column]) -> Co >>> from pyspark.sql import functions as sf >>> df = spark.createDataFrame([(1, {"foo": 42.0, "bar": 1.0, "baz": 32.0})], ("id", "data")) - >>> df.select( + >>> row = df.select( ... sf.map_filter("data", lambda k, v: v > 30.0).alias("data_filtered") - ... ).show(truncate=False) - +--------------------------+ - |data_filtered | - +--------------------------+ - |{baz -> 32.0, foo -> 42.0}| - +--------------------------+ + ... ).head() + >>> sorted(row["data_filtered"].items()) + [('baz', 32.0), ('foo', 42.0)] Example 2: Filtering a map with a condition on keys >>> from pyspark.sql import functions as sf >>> df = spark.createDataFrame([(1, {"foo": 42.0, "bar": 1.0, "baz": 32.0})], ("id", "data")) - >>> df.select( + >>> row = df.select( ... sf.map_filter("data", lambda k, v: k.startswith("b")).alias("data_filtered") - ... ).show(truncate=False) - +-------------------------+ - |data_filtered | - +-------------------------+ - |{bar -> 1.0, baz -> 32.0}| - +-------------------------+ + ... ).head() + >>> sorted(row["data_filtered"].items()) + [('bar', 1.0), ('baz', 32.0)] Example 3: Filtering a map with a complex condition >>> from pyspark.sql import functions as sf >>> df = spark.createDataFrame([(1, {"foo": 42.0, "bar": 1.0, "baz": 32.0})], ("id", "data")) - >>> df.select( + >>> row = df.select( ... sf.map_filter("data", lambda k, v: k.startswith("b") & (v > 1.0)).alias("data_filtered") - ... ).show() - +-------------+ - |data_filtered| - +-------------+ - |{baz -> 32.0}| - +-------------+ + ... ).head() + >>> sorted(row["data_filtered"].items()) + [('baz', 32.0)] """ return _invoke_higher_order_function("MapFilter", [col], [f]) @@ -17273,14 +17264,11 @@ def map_zip_with( >>> df = spark.createDataFrame([ ... (1, {"A": 1, "B": 2}, {"A": 3, "B": 4})], ... ("id", "map1", "map2")) - >>> df.select( + >>> row = df.select( ... sf.map_zip_with("map1", "map2", lambda k, v1, v2: v1 + v2).alias("updated_data") - ... ).show() - +----------------+ - | updated_data| - +----------------+ - |{A -> 4, B -> 6}| - +----------------+ + ... ).head() + >>> sorted(row["updated_data"].items()) + [('A', 4), ('B', 6)] Example 2: Merging two maps with a complex function @@ -17288,16 +17276,13 @@ def map_zip_with( >>> df = spark.createDataFrame([ ... (1, {"A": 1, "B": 2}, {"A": 3, "B": 4})], ... ("id", "map1", "map2")) - >>> df.select( + >>> row = df.select( ... sf.map_zip_with("map1", "map2", ... lambda k, v1, v2: sf.when(k == "A", v1 + v2).otherwise(v1 - v2) ... ).alias("updated_data") - ... ).show() - +-----------------+ - | updated_data| - +-----------------+ - |{A -> 4, B -> -2}| - +-----------------+ + ... ).head() + >>> sorted(row["updated_data"].items()) + [('A', 4), ('B', -2)] Example 3: Merging two maps with mismatched keys @@ -17305,16 +17290,13 @@ def map_zip_with( >>> df = spark.createDataFrame([ ... (1, {"A": 1, "B": 2}, {"B": 3, "C": 4})], ... ("id", "map1", "map2")) - >>> df.select( + >>> row = df.select( ... sf.map_zip_with("map1", "map2", ... lambda k, v1, v2: sf.when(v2.isNull(), v1).otherwise(v1 + v2) ... ).alias("updated_data") - ... ).show(truncate=False) - +---------------------------+ - |updated_data | - +---------------------------+ - |{A -> 1, B -> 5, C -> NULL}| - +---------------------------+ + ... ).head() + >>> sorted(row["updated_data"].items()) + [('A', 1), ('B', 5), ('C', None)] """ return _invoke_higher_order_function("MapZipWith", [col1, col2], [f]) From 6617478b7b735bfc17a70643e4c3e2ec883945ab Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 16 Jan 2024 11:56:01 +0800 Subject: [PATCH 4/6] remove unused p --- python/pyspark/sql/functions/builtin.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/functions/builtin.py b/python/pyspark/sql/functions/builtin.py index 2e29ec5d8222..1f6d86de28dc 100644 --- a/python/pyspark/sql/functions/builtin.py +++ b/python/pyspark/sql/functions/builtin.py @@ -17192,7 +17192,7 @@ def map_filter(col: "ColumnOrName", f: Callable[[Column, Column], Column]) -> Co >>> from pyspark.sql import functions as sf >>> df = spark.createDataFrame([(1, {"foo": 42.0, "bar": 1.0, "baz": 32.0})], ("id", "data")) >>> row = df.select( - ... sf.map_filter("data", lambda k, v: v > 30.0).alias("data_filtered") + ... sf.map_filter("data", lambda _, v: v > 30.0).alias("data_filtered") ... ).head() >>> sorted(row["data_filtered"].items()) [('baz', 32.0), ('foo', 42.0)] @@ -17202,7 +17202,7 @@ def map_filter(col: "ColumnOrName", f: Callable[[Column, Column], Column]) -> Co >>> from pyspark.sql import functions as sf >>> df = spark.createDataFrame([(1, {"foo": 42.0, "bar": 1.0, "baz": 32.0})], ("id", "data")) >>> row = df.select( - ... sf.map_filter("data", lambda k, v: k.startswith("b")).alias("data_filtered") + ... sf.map_filter("data", lambda k, _: k.startswith("b")).alias("data_filtered") ... ).head() >>> sorted(row["data_filtered"].items()) [('bar', 1.0), ('baz', 32.0)] @@ -17265,7 +17265,7 @@ def map_zip_with( ... (1, {"A": 1, "B": 2}, {"A": 3, "B": 4})], ... ("id", "map1", "map2")) >>> row = df.select( - ... sf.map_zip_with("map1", "map2", lambda k, v1, v2: v1 + v2).alias("updated_data") + ... sf.map_zip_with("map1", "map2", lambda _, v1, v2: v1 + v2).alias("updated_data") ... ).head() >>> sorted(row["updated_data"].items()) [('A', 4), ('B', 6)] @@ -17292,7 +17292,7 @@ def map_zip_with( ... ("id", "map1", "map2")) >>> row = df.select( ... sf.map_zip_with("map1", "map2", - ... lambda k, v1, v2: sf.when(v2.isNull(), v1).otherwise(v1 + v2) + ... lambda _, v1, v2: sf.when(v2.isNull(), v1).otherwise(v1 + v2) ... ).alias("updated_data") ... ).head() >>> sorted(row["updated_data"].items()) From 67f4181a58ccad2a5fb68cce223ca30f5a20990b Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 16 Jan 2024 12:03:22 +0800 Subject: [PATCH 5/6] test SPARK_ANSI_SQL_MODE true --- .github/workflows/build_and_test.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 40bcf734c6af..d2e235016209 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -377,6 +377,7 @@ jobs: SKIP_PACKAGING: true METASPACE_SIZE: 1g BRANCH: ${{ inputs.branch }} + SPARK_ANSI_SQL_MODE: true steps: - name: Checkout Spark repository uses: actions/checkout@v4 From 4fc99f05a18df5f7c3b616ed1c5fcbb82c9eafe8 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 16 Jan 2024 14:32:17 +0800 Subject: [PATCH 6/6] test SPARK_ANSI_SQL_MODE false --- .github/workflows/build_and_test.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index d2e235016209..40bcf734c6af 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -377,7 +377,6 @@ jobs: SKIP_PACKAGING: true METASPACE_SIZE: 1g BRANCH: ${{ inputs.branch }} - SPARK_ANSI_SQL_MODE: true steps: - name: Checkout Spark repository uses: actions/checkout@v4