From 36a7ccc37374a42a2c9cf67f3f1748df638eb937 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 4 Sep 2018 18:00:34 +0800 Subject: [PATCH 1/3] Add an example for having two columns as the grouping key in group aggregate pandas UDF --- python/pyspark/sql/functions.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index d58d8d10e5cd..fad317395647 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2781,14 +2781,14 @@ def pandas_udf(f=None, returnType=None, functionType=None): +---+-------------------+ Alternatively, the user can define a function that takes two arguments. - In this case, the grouping key will be passed as the first argument and the data will - be passed as the second argument. The grouping key will be passed as a tuple of numpy + In this case, the grouping key(s) will be passed as the first argument and the data will + be passed as the second argument. The grouping key(s) will be passed as a tuple of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in as a `pandas.DataFrame` containing all columns from the original Spark DataFrame. - This is useful when the user does not want to hardcode grouping key in the function. + This is useful when the user does not want to hardcode grouping key(s) in the function. - >>> from pyspark.sql.functions import pandas_udf, PandasUDFType >>> import pandas as pd # doctest: +SKIP + >>> from pyspark.sql.functions import pandas_udf, PandasUDFType >>> df = spark.createDataFrame( ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ... ("id", "v")) # doctest: +SKIP @@ -2804,6 +2804,20 @@ def pandas_udf(f=None, returnType=None, functionType=None): | 1|1.5| | 2|6.0| +---+---+ + >>> @pandas_udf("id long, v1 double, v2 double", PandasUDFType.GROUPED_MAP) # doctest: +SKIP + >>> def sum_udf(key, pdf): + ... # key is a tuple of two numpy.int64s, which is the values + ... # of 'id' and 'ceil(df.v / 2)' for the current group + ... return pd.DataFrame([key + (pdf.v.sum(),)]) + >>> df.groupby(df.id, ceil(df.v / 2)).apply(sum_udf).show() # doctest: +SKIP + +---+---+----+ + | id| v1| v2| + +---+---+----+ + | 2|5.0|10.0| + | 1|1.0| 3.0| + | 2|3.0| 5.0| + | 2|2.0| 3.0| + +---+---+----+ .. note:: If returning a new `pandas.DataFrame` constructed with a dictionary, it is recommended to explicitly index the columns by name to ensure the positions are correct, From 2ad350c79bd2004282a43d6d189a828cad54cc60 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 5 Sep 2018 09:56:40 +0800 Subject: [PATCH 2/3] Address comments --- python/pyspark/sql/functions.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index fad317395647..eb2584e05a57 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2804,20 +2804,22 @@ def pandas_udf(f=None, returnType=None, functionType=None): | 1|1.5| | 2|6.0| +---+---+ - >>> @pandas_udf("id long, v1 double, v2 double", PandasUDFType.GROUPED_MAP) # doctest: +SKIP + >>> @pandas_udf( + ... "id long, additional_key double, v double", + ... PandasUDFType.GROUPED_MAP) # doctest: +SKIP >>> def sum_udf(key, pdf): ... # key is a tuple of two numpy.int64s, which is the values ... # of 'id' and 'ceil(df.v / 2)' for the current group ... return pd.DataFrame([key + (pdf.v.sum(),)]) >>> df.groupby(df.id, ceil(df.v / 2)).apply(sum_udf).show() # doctest: +SKIP - +---+---+----+ - | id| v1| v2| - +---+---+----+ - | 2|5.0|10.0| - | 1|1.0| 3.0| - | 2|3.0| 5.0| - | 2|2.0| 3.0| - +---+---+----+ + +---+--------------+----+ + | id|additional_key| v2| + +---+--------------+----+ + | 2| 5.0|10.0| + | 1| 1.0| 3.0| + | 2| 3.0| 5.0| + | 2| 2.0| 3.0| + +---+--------------+----+ .. note:: If returning a new `pandas.DataFrame` constructed with a dictionary, it is recommended to explicitly index the columns by name to ensure the positions are correct, From 1f342aa7158bc2440f504b7cb47b692fcdcce41d Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 6 Sep 2018 10:47:10 +0800 Subject: [PATCH 3/3] Address comments --- python/pyspark/sql/functions.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index eb2584e05a57..e93b55d6ae12 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2805,21 +2805,21 @@ def pandas_udf(f=None, returnType=None, functionType=None): | 2|6.0| +---+---+ >>> @pandas_udf( - ... "id long, additional_key double, v double", + ... "id long, `ceil(v / 2)` long, v double", ... PandasUDFType.GROUPED_MAP) # doctest: +SKIP >>> def sum_udf(key, pdf): ... # key is a tuple of two numpy.int64s, which is the values ... # of 'id' and 'ceil(df.v / 2)' for the current group ... return pd.DataFrame([key + (pdf.v.sum(),)]) >>> df.groupby(df.id, ceil(df.v / 2)).apply(sum_udf).show() # doctest: +SKIP - +---+--------------+----+ - | id|additional_key| v2| - +---+--------------+----+ - | 2| 5.0|10.0| - | 1| 1.0| 3.0| - | 2| 3.0| 5.0| - | 2| 2.0| 3.0| - +---+--------------+----+ + +---+-----------+----+ + | id|ceil(v / 2)| v| + +---+-----------+----+ + | 2| 5|10.0| + | 1| 1| 3.0| + | 2| 3| 5.0| + | 2| 2| 3.0| + +---+-----------+----+ .. note:: If returning a new `pandas.DataFrame` constructed with a dictionary, it is recommended to explicitly index the columns by name to ensure the positions are correct,