From a81d9e5b4074d1dc3c40d4d31ea1a4e5850eea30 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 31 May 2018 11:50:07 -0700 Subject: [PATCH 1/6] added note to doc test --- python/pyspark/sql/functions.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index cf26523b3cb45..634461810fc49 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2216,7 +2216,8 @@ def pandas_udf(f=None, returnType=None, functionType=None): A grouped map UDF defines transformation: A `pandas.DataFrame` -> A `pandas.DataFrame` The returnType should be a :class:`StructType` describing the schema of the returned `pandas.DataFrame`. - The length of the returned `pandas.DataFrame` can be arbitrary. + The length of the returned `pandas.DataFrame` can be arbitrary and the columns must be + indexed so that the positions match the corresponding field in the schema. Grouped map UDFs are used with :meth:`pyspark.sql.GroupedData.apply`. @@ -2239,6 +2240,10 @@ def pandas_udf(f=None, returnType=None, functionType=None): | 2| 1.1094003924504583| +---+-------------------+ + .. note:: If returning a new `pandas.DataFrame` constructed with a dictionary, it is + recommended to explicitly index the columns by name to ensure the positions are correct. + For example, `pd.DataFrame({'id': ids, 'a': data}, columns=['id', 'a'])`. + .. seealso:: :meth:`pyspark.sql.GroupedData.apply` .. note:: The user-defined functions are considered deterministic by default. Due to From 33e6ce53b96540a0d52a5962aacb0f46f97f27f5 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 31 May 2018 12:31:10 -0700 Subject: [PATCH 2/6] forgot to indent note --- python/pyspark/sql/functions.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 634461810fc49..d61326efc67e4 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2241,8 +2241,8 @@ def pandas_udf(f=None, returnType=None, functionType=None): +---+-------------------+ .. note:: If returning a new `pandas.DataFrame` constructed with a dictionary, it is - recommended to explicitly index the columns by name to ensure the positions are correct. - For example, `pd.DataFrame({'id': ids, 'a': data}, columns=['id', 'a'])`. + recommended to explicitly index the columns by name to ensure the positions are correct. + For example, `pd.DataFrame({'id': ids, 'a': data}, columns=['id', 'a'])`. .. seealso:: :meth:`pyspark.sql.GroupedData.apply` From b8010299a79390b89046c00bd2ab4833cf4a512e Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 31 May 2018 12:12:32 -0700 Subject: [PATCH 3/6] slight reword --- python/pyspark/sql/functions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index d61326efc67e4..475b9fe6bfecd 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2217,7 +2217,7 @@ def pandas_udf(f=None, returnType=None, functionType=None): The returnType should be a :class:`StructType` describing the schema of the returned `pandas.DataFrame`. The length of the returned `pandas.DataFrame` can be arbitrary and the columns must be - indexed so that the positions match the corresponding field in the schema. + indexed so that their position matches the corresponding field in the schema. Grouped map UDFs are used with :meth:`pyspark.sql.GroupedData.apply`. From 343f00a6f1aae2b16df4e078ab6b1c3f3a77904e Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 31 May 2018 15:42:34 -0700 Subject: [PATCH 4/6] also OrderedDict in pydoc --- python/pyspark/sql/functions.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 475b9fe6bfecd..9c02982e4ae22 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2241,8 +2241,10 @@ def pandas_udf(f=None, returnType=None, functionType=None): +---+-------------------+ .. note:: If returning a new `pandas.DataFrame` constructed with a dictionary, it is - recommended to explicitly index the columns by name to ensure the positions are correct. - For example, `pd.DataFrame({'id': ids, 'a': data}, columns=['id', 'a'])`. + recommended to explicitly index the columns by name to ensure the positions are correct, + or alternatively use an `OrderedDict`. + For example, `pd.DataFrame({'id': ids, 'a': data}, columns=['id', 'a'])` or + `pd.DataFrame(OrderedDict([('id', ids), ('a', data)]))`. .. seealso:: :meth:`pyspark.sql.GroupedData.apply` From 34797fadcb7f427a42c316ed1c3c396520cb720c Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 31 May 2018 12:10:09 -0700 Subject: [PATCH 5/6] added to sql guide --- docs/sql-programming-guide.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 14bc5e626771c..217df0eb70873 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1737,6 +1737,15 @@ To use `groupBy().apply()`, the user needs to define the following: * A Python function that defines the computation for each group. * A `StructType` object or a string that defines the schema of the output `DataFrame`. +The output schema will be applied to the columns of the returned `pandas.DataFrame` in order by position, +not by name. This means that the columns in the `pandas.DataFrame` must be indexed so that their +position matches the corresponding field in the schema. + +Note that when creating a new `pandas.DataFrame` using a dictionary, the actual position of the column +can differ from the order that it was placed in the dictionary. It is recommended in this case to +explicitly define the column order using the `columns` keyword, e.g. +`pandas.DataFrame({'id': ids, 'a': data}, columns=['id', 'a'])`. + Note that all data for a group will be loaded into memory before the function is applied. This can lead to out of memory exceptons, especially if the group sizes are skewed. The configuration for [maxRecordsPerBatch](#setting-arrow-batch-size) is not applied on groups and it is up to the user From 58e7927c770aa483837e6d8cb94800afb3c4fdf7 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 31 May 2018 14:56:24 -0700 Subject: [PATCH 6/6] alt OrderedDict --- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 217df0eb70873..461806a659965 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1744,7 +1744,7 @@ position matches the corresponding field in the schema. Note that when creating a new `pandas.DataFrame` using a dictionary, the actual position of the column can differ from the order that it was placed in the dictionary. It is recommended in this case to explicitly define the column order using the `columns` keyword, e.g. -`pandas.DataFrame({'id': ids, 'a': data}, columns=['id', 'a'])`. +`pandas.DataFrame({'id': ids, 'a': data}, columns=['id', 'a'])`, or alternatively use an `OrderedDict`. Note that all data for a group will be loaded into memory before the function is applied. This can lead to out of memory exceptons, especially if the group sizes are skewed. The configuration for