Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions python/pyspark/sql/pandas/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,16 @@ def create_array(s, t):
s = s.astype(s.dtypes.categories.dtype)
try:
array = pa.Array.from_pandas(s, mask=mask, type=t, safe=self._safecheck)
except pa.ArrowException as e:
error_msg = "Exception thrown when converting pandas.Series (%s) to Arrow " + \
"Array (%s). It can be caused by overflows or other unsafe " + \
"conversions warned by Arrow. Arrow safe type check can be " + \
"disabled by using SQL config " + \
"`spark.sql.execution.pandas.convertToArrowArraySafely`."
raise RuntimeError(error_msg % (s.dtype, t), e)
except ValueError as e:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errors during safe conversion will be ArrowInvalid, which subclasses ValueError

if self._safecheck:
error_msg = "Exception thrown when converting pandas.Series (%s) to " + \
"Arrow Array (%s). It can be caused by overflows or other " + \
"unsafe conversions warned by Arrow. Arrow safe type check " + \
"can be disabled by using SQL config " + \
"`spark.sql.execution.pandas.convertToArrowArraySafely`."
raise ValueError(error_msg % (s.dtype, t)) from e
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we dropped Python 2, this seems more appropriate

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In branch-3.0.

  File "/home/jenkins/workspace/spark-branch-3.0-test-sbt-hadoop-2.7-hive-1.2/python/pyspark/sql/pandas/serializers.py", line 166
    raise ValueError(error_msg % (s.dtype, t)) from e
                                              ^
SyntaxError: invalid syntax

else:
raise e
return array

arrs = []
Expand Down
9 changes: 5 additions & 4 deletions python/pyspark/sql/tests/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,12 @@ def test_createDataFrame_with_schema(self):
def test_createDataFrame_with_incorrect_schema(self):
pdf = self.create_pandas_data_frame()
fields = list(self.schema)
fields[0], fields[1] = fields[1], fields[0] # swap str with int
fields[5], fields[6] = fields[6], fields[5] # swap decimal with date
wrong_schema = StructType(fields)
with QuietTest(self.sc):
with self.assertRaisesRegexp(Exception, "integer.*required"):
self.spark.createDataFrame(pdf, schema=wrong_schema)
with self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": False}):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error here is a TypeError, but in case it gets changed to an ArrowInvalid, we do not want to have the original error chained because the assert does not include it when checking

with QuietTest(self.sc):
with self.assertRaisesRegexp(Exception, "[D|d]ecimal.*got.*date"):
self.spark.createDataFrame(pdf, schema=wrong_schema)

def test_createDataFrame_with_names(self):
pdf = self.create_pandas_data_frame()
Expand Down
15 changes: 8 additions & 7 deletions python/pyspark/sql/tests/test_pandas_grouped_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,15 +446,16 @@ def int_index(pdf):
def column_name_typo(pdf):
return pd.DataFrame({'iid': pdf.id, 'v': pdf.v})

@pandas_udf('id long, v int', PandasUDFType.GROUPED_MAP)
@pandas_udf('id long, v decimal', PandasUDFType.GROUPED_MAP)
def invalid_positional_types(pdf):
return pd.DataFrame([(u'a', 1.2)])
return pd.DataFrame([(1, datetime.date(2020, 10, 5))])

with QuietTest(self.sc):
with self.assertRaisesRegexp(Exception, "KeyError: 'id'"):
grouped_df.apply(column_name_typo).collect()
with self.assertRaisesRegexp(Exception, "an integer is required"):
grouped_df.apply(invalid_positional_types).collect()
with self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": False}):
with QuietTest(self.sc):
with self.assertRaisesRegexp(Exception, "KeyError: 'id'"):
grouped_df.apply(column_name_typo).collect()
with self.assertRaisesRegexp(Exception, "[D|d]ecimal.*got.*date"):
grouped_df.apply(invalid_positional_types).collect()

def test_positional_assignment_conf(self):
with self.sql_conf({
Expand Down