Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,6 @@ class ArrowPandasSerializer(ArrowSerializer):
Serializes Pandas.Series as Arrow data.
"""

def __init__(self):
super(ArrowPandasSerializer, self).__init__()

def dumps(self, series):
"""
Make an ArrowRecordBatch from a Pandas Series and serialize. Input is a single series or
Expand All @@ -245,16 +242,10 @@ def cast_series(s, t):

def loads(self, obj):
"""
Deserialize an ArrowRecordBatch to an Arrow table and return as a list of pandas.Series
followed by a dictionary containing length of the loaded batches.
Deserialize an ArrowRecordBatch to an Arrow table and return as a list of pandas.Series.
"""
import pyarrow as pa
reader = pa.RecordBatchFileReader(pa.BufferReader(obj))
batches = [reader.get_batch(i) for i in xrange(reader.num_record_batches)]
# NOTE: a 0-parameter pandas_udf will produce an empty batch that can have num_rows set
num_rows = sum((batch.num_rows for batch in batches))
table = pa.Table.from_batches(batches)
return [c.to_pandas() for c in table.itercolumns()] + [{"length": num_rows}]
table = super(ArrowPandasSerializer, self).loads(obj)
return [c.to_pandas() for c in table.itercolumns()]

def __repr__(self):
return "ArrowPandasSerializer"
Expand Down
32 changes: 25 additions & 7 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2127,6 +2127,10 @@ def wrapper(*args):
def _create_udf(f, returnType, vectorized):

def _udf(f, returnType=StringType(), vectorized=vectorized):
if vectorized:
import inspect
if len(inspect.getargspec(f).args) == 0:
raise NotImplementedError("0-parameter pandas_udfs are not currently supported")
udf_obj = UserDefinedFunction(f, returnType, vectorized=vectorized)
return udf_obj._wrapped()

Expand Down Expand Up @@ -2183,14 +2187,28 @@ def pandas_udf(f=None, returnType=StringType()):
:param f: python function if used as a standalone function
:param returnType: a :class:`pyspark.sql.types.DataType` object

# TODO: doctest
>>> from pyspark.sql.types import IntegerType, StringType
>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
>>> @pandas_udf(returnType=StringType())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we installed pyarrow on Jenkins? The failed test complains ImportError: No module named pyarrow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could just do # doctest: +SKIP maybe.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I thought that the Jenkins environment for unit tests would be the same for doctests and have pyarrow installed. @holdenk or @shaneknapp do you know if that is the case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding @JoshRosen too.

the doc building node (amp-jenkins-worker-01) doesn't have arrow installed for the default conda python 2.7 environment. for the python 3 environment, we're running arrow 0.4.0.

i looked at the script and it seems to be agnostic to python 2 vs 3... once i know which version of python we'll be running i can make sure that the version of arrow installed is correct.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, thanks @shaneknapp!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm.. but shouldn't we skip those doctests because they are not hard dependencies anyway?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, I see that toPandas() also skips doctests. I'll skip this now and can always enable later if we decide differently. @shaneknapp , looks like we will hold off on this so no need to do anything to Jenkins I believe, sorry to bug you.

... def to_upper(s):
... return s.str.upper()
...
>>> @pandas_udf(returnType="integer")
... def add_one(x):
... return x + 1
...
>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
... .show() # doctest: +SKIP
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we don't skip it actually?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks actually we do :). Let me test this one for sure in my local before merging it, (I have pypy installed in my local that does not have pyarrow or pandas).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. It is. :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just double checked it passes

./run-tests --python-executables=pypy --modules pyspark-sql
...
Will test against the following Python executables: ['pypy']
Will test the following Python modules: ['pyspark-sql']
Starting test(pypy): pyspark.sql.functions
...
Finished test(pypy): pyspark.sql.functions (74s)
...

Also, checked without # doctest: +SKIP:

diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 63e9a830bbc..3265ecc974b 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2199,7 +2199,7 @@ def pandas_udf(f=None, returnType=StringType()):
     ...
     >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
     >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
-    ...     .show() # doctest: +SKIP
+    ...     .show()
     +----------+--------------+------------+
     |slen(name)|to_upper(name)|add_one(age)|
     +----------+--------------+------------+
./run-tests --python-executables=pypy --modules pyspark-sql
...
Will test against the following Python executables: ['pypy']
Will test the following Python modules: ['pyspark-sql']
...
Starting test(pypy): pyspark.sql.functions
...
Failed example:
    df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \
        .show()
Exception raised:
    Traceback (most recent call last):
      File "/usr/local/Cellar/pypy/5.8.0/libexec/lib-python/2.7/doctest.py", line 1315, in __run
        compileflags, 1) in test.globs
      File "<doctest pyspark.sql.functions.pandas_udf[5]>", line 1, in <module>
        df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \
      File "/.../spark/python/pyspark/sql/dataframe.py", line 347, in show
        print(self._jdf.showString(n, 20, vertical))
      File "/.../spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
        answer, self.gateway_client, self.target_id, self.name)
      File "/.../spark/python/pyspark/sql/utils.py", line 63, in deco
        return f(*a, **kw)
      File "/.../spark/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
        format(target_id, ".", name), value)
    Py4JJavaError: An error occurred while calling o1373.showString.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 93.0 failed 1 times, most recent failure: Lost task 0.0 in stage 93.0 (TID 1093, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 190, in main
        func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 112, in read_udfs
        arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type)
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 102, in read_single_udf
        return arg_offsets, wrap_pandas_udf(row_func, return_type)
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 77, in wrap_pandas_udf
        arrow_return_type = toArrowType(return_type)
      File "/.../spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1603, in toArrowType
        import pyarrow as pa
    ImportError: No module named pyarrow

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(D'oh, not a big deal but two spaces before inline comments..)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @HyukjinKwon! Sorry, I didn't notice this :( I'll make a note to fix that spacing on a related change.

+----------+--------------+------------+
|slen(name)|to_upper(name)|add_one(age)|
+----------+--------------+------------+
| 8| JOHN DOE| 22|
+----------+--------------+------------+
"""
import inspect
# If function "f" does not define the optional kwargs, then wrap with a kwargs placeholder
if inspect.getargspec(f).keywords is None:
return _create_udf(lambda *a, **kwargs: f(*a), returnType=returnType, vectorized=True)
else:
return _create_udf(f, returnType=returnType, vectorized=True)
wrapped_udf = _create_udf(f, returnType=returnType, vectorized=True)

return wrapped_udf


blacklist = ['map', 'since', 'ignore_unicode_prefix']
Expand Down
59 changes: 45 additions & 14 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3256,11 +3256,20 @@ def test_vectorized_udf_null_string(self):

def test_vectorized_udf_zero_parameter(self):
from pyspark.sql.functions import pandas_udf
import pandas as pd
df = self.spark.range(10)
f0 = pandas_udf(lambda **kwargs: pd.Series(1).repeat(kwargs['length']), LongType())
res = df.select(f0())
self.assertEquals(df.select(lit(1)).collect(), res.collect())
error_str = '0-parameter pandas_udfs.*not.*supported'
with QuietTest(self.sc):
with self.assertRaisesRegexp(NotImplementedError, error_str):
pandas_udf(lambda: 1, LongType())

with self.assertRaisesRegexp(NotImplementedError, error_str):
@pandas_udf
def zero_no_type():
return 1

with self.assertRaisesRegexp(NotImplementedError, error_str):
@pandas_udf(LongType())
def zero_with_type():
return 1

def test_vectorized_udf_datatype_string(self):
from pyspark.sql.functions import pandas_udf, col
Expand Down Expand Up @@ -3308,12 +3317,12 @@ def test_vectorized_udf_invalid_length(self):
from pyspark.sql.functions import pandas_udf, col
import pandas as pd
df = self.spark.range(10)
raise_exception = pandas_udf(lambda: pd.Series(1), LongType())
raise_exception = pandas_udf(lambda _: pd.Series(1), LongType())
with QuietTest(self.sc):
with self.assertRaisesRegexp(
Exception,
'Result vector from pandas_udf was not the required length'):
df.select(raise_exception()).collect()
df.select(raise_exception(col('id'))).collect()

def test_vectorized_udf_mix_udf(self):
from pyspark.sql.functions import pandas_udf, udf, col
Expand All @@ -3328,22 +3337,44 @@ def test_vectorized_udf_mix_udf(self):

def test_vectorized_udf_chained(self):
from pyspark.sql.functions import pandas_udf, col
df = self.spark.range(10).toDF('x')
df = self.spark.range(10)
f = pandas_udf(lambda x: x + 1, LongType())
g = pandas_udf(lambda x: x - 1, LongType())
res = df.select(g(f(col('x'))))
res = df.select(g(f(col('id'))))
self.assertEquals(df.collect(), res.collect())

def test_vectorized_udf_wrong_return_type(self):
from pyspark.sql.functions import pandas_udf, col
df = self.spark.range(10).toDF('x')
df = self.spark.range(10)
f = pandas_udf(lambda x: x * 1.0, StringType())
with QuietTest(self.sc):
with self.assertRaisesRegexp(
Exception,
'Invalid.*type.*string'):
df.select(f(col('x'))).collect()
with self.assertRaisesRegexp(Exception, 'Invalid.*type.*string'):
df.select(f(col('id'))).collect()

def test_vectorized_udf_return_scalar(self):
from pyspark.sql.functions import pandas_udf, col
df = self.spark.range(10)
f = pandas_udf(lambda x: 1.0, DoubleType())
with QuietTest(self.sc):
with self.assertRaisesRegexp(Exception, 'Return.*type.*pandas_udf.*Series'):
df.select(f(col('id'))).collect()

def test_vectorized_udf_decorator(self):
from pyspark.sql.functions import pandas_udf, col
df = self.spark.range(10)

@pandas_udf(returnType=LongType())
def identity(x):
return x
res = df.select(identity(col('id')))
self.assertEquals(df.collect(), res.collect())

def test_vectorized_udf_empty_partition(self):
from pyspark.sql.functions import pandas_udf, col
df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I miss something, but what this test is intended to test?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. I see. One partition is empty and it is related to the added stuff in ArrowEvalPythonExec.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, an empty partition leads to an empty iterator, so this is to make sure it can handle that.

f = pandas_udf(lambda x: x, LongType())
res = df.select(f(col('id')))
self.assertEquals(df.collect(), res.collect())

if __name__ == "__main__":
from pyspark.sql.tests import *
Expand Down
25 changes: 10 additions & 15 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,9 @@ def read_command(serializer, file):
return command


def chain(f, g, eval_type):
def chain(f, g):
"""chain two functions together """
if eval_type == PythonEvalType.SQL_PANDAS_UDF:
return lambda *a, **kwargs: g(f(*a, **kwargs), **kwargs)
else:
return lambda *a: g(f(*a))
return lambda *a: g(f(*a))


def wrap_udf(f, return_type):
Expand All @@ -80,14 +77,14 @@ def wrap_pandas_udf(f, return_type):
arrow_return_type = toArrowType(return_type)

def verify_result_length(*a):
kwargs = a[-1]
result = f(*a[:-1], **kwargs)
if len(result) != kwargs["length"]:
result = f(*a)
if not hasattr(result, "__len__"):
raise TypeError("Return type of pandas_udf should be a Pandas.Series")
if len(result) != len(a[0]):
raise RuntimeError("Result vector from pandas_udf was not the required length: "
"expected %d, got %d\nUse input vector length or kwargs['length']"
% (kwargs["length"], len(result)))
return result, arrow_return_type
return lambda *a: verify_result_length(*a)
"expected %d, got %d" % (len(a[0]), len(result)))
return result
return lambda *a: (verify_result_length(*a), arrow_return_type)


def read_single_udf(pickleSer, infile, eval_type):
Expand All @@ -99,11 +96,9 @@ def read_single_udf(pickleSer, infile, eval_type):
if row_func is None:
row_func = f
else:
row_func = chain(row_func, f, eval_type)
row_func = chain(row_func, f)
# the last returnType will be the return type of UDF
if eval_type == PythonEvalType.SQL_PANDAS_UDF:
# A pandas_udf will take kwargs as the last argument
arg_offsets = arg_offsets + [-1]
return arg_offsets, wrap_pandas_udf(row_func, return_type)
else:
return arg_offsets, wrap_udf(row_func, return_type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi
outputIterator.map(new ArrowPayload(_)), context)

// Verify that the output schema is correct
val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex
.map { case (attr, i) => attr.withName(s"_$i") })
assert(schemaOut.equals(outputRowIterator.schema),
s"Invalid schema from pandas_udf: expected $schemaOut, got ${outputRowIterator.schema}")
if (outputRowIterator.hasNext) {
val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex
.map { case (attr, i) => attr.withName(s"_$i") })
assert(schemaOut.equals(outputRowIterator.schema),
s"Invalid schema from pandas_udf: expected $schemaOut, got ${outputRowIterator.schema}")
}

outputRowIterator
}
Expand Down