-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23290][SQL][PYTHON] Use datetime.date for date type when converting Spark DataFrame to Pandas DataFrame. #20506
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
223d0a0
57ab41b
ebdbd8c
8823043
f151cdf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2816,7 +2816,7 @@ def test_to_pandas(self): | |||||||||
| self.assertEquals(types[1], np.object) | ||||||||||
| self.assertEquals(types[2], np.bool) | ||||||||||
| self.assertEquals(types[3], np.float32) | ||||||||||
| self.assertEquals(types[4], 'datetime64[ns]') | ||||||||||
| self.assertEquals(types[4], np.object) # datetime.date | ||||||||||
| self.assertEquals(types[5], 'datetime64[ns]') | ||||||||||
|
|
||||||||||
| @unittest.skipIf(not _have_old_pandas, "Old Pandas not installed") | ||||||||||
|
|
@@ -3388,7 +3388,7 @@ class ArrowTests(ReusedSQLTestCase): | |||||||||
|
|
||||||||||
| @classmethod | ||||||||||
| def setUpClass(cls): | ||||||||||
| from datetime import datetime | ||||||||||
| from datetime import date, datetime | ||||||||||
| from decimal import Decimal | ||||||||||
| ReusedSQLTestCase.setUpClass() | ||||||||||
|
|
||||||||||
|
|
@@ -3410,11 +3410,11 @@ def setUpClass(cls): | |||||||||
| StructField("7_date_t", DateType(), True), | ||||||||||
| StructField("8_timestamp_t", TimestampType(), True)]) | ||||||||||
| cls.data = [(u"a", 1, 10, 0.2, 2.0, Decimal("2.0"), | ||||||||||
| datetime(1969, 1, 1), datetime(1969, 1, 1, 1, 1, 1)), | ||||||||||
| date(1969, 1, 1), datetime(1969, 1, 1, 1, 1, 1)), | ||||||||||
| (u"b", 2, 20, 0.4, 4.0, Decimal("4.0"), | ||||||||||
| datetime(2012, 2, 2), datetime(2012, 2, 2, 2, 2, 2)), | ||||||||||
| date(2012, 2, 2), datetime(2012, 2, 2, 2, 2, 2)), | ||||||||||
| (u"c", 3, 30, 0.8, 6.0, Decimal("6.0"), | ||||||||||
| datetime(2100, 3, 3), datetime(2100, 3, 3, 3, 3, 3))] | ||||||||||
| date(2100, 3, 3), datetime(2100, 3, 3, 3, 3, 3))] | ||||||||||
|
|
||||||||||
| @classmethod | ||||||||||
| def tearDownClass(cls): | ||||||||||
|
|
@@ -4062,18 +4062,42 @@ def test_vectorized_udf_unsupported_types(self): | |||||||||
| with self.assertRaisesRegexp(Exception, 'Unsupported data type'): | ||||||||||
| df.select(f(col('map'))).collect() | ||||||||||
|
|
||||||||||
| def test_vectorized_udf_null_date(self): | ||||||||||
| def test_vectorized_udf_dates(self): | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we have a new test to directly verify the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe spark/python/pyspark/sql/tests.py Lines 3461 to 3464 in ebdbd8c
? In addition, I'll modify it to check between its expected Pandas DataFrame. |
||||||||||
| from pyspark.sql.functions import pandas_udf, col | ||||||||||
| from datetime import date | ||||||||||
| schema = StructType().add("date", DateType()) | ||||||||||
| data = [(date(1969, 1, 1),), | ||||||||||
| (date(2012, 2, 2),), | ||||||||||
| (None,), | ||||||||||
| (date(2100, 4, 4),)] | ||||||||||
| schema = StructType().add("idx", LongType()).add("date", DateType()) | ||||||||||
| data = [(0, date(1969, 1, 1),), | ||||||||||
| (1, date(2012, 2, 2),), | ||||||||||
| (2, None,), | ||||||||||
| (3, date(2100, 4, 4),)] | ||||||||||
| df = self.spark.createDataFrame(data, schema=schema) | ||||||||||
| date_f = pandas_udf(lambda t: t, returnType=DateType()) | ||||||||||
| res = df.select(date_f(col("date"))) | ||||||||||
| self.assertEquals(df.collect(), res.collect()) | ||||||||||
|
|
||||||||||
| date_copy = pandas_udf(lambda t: t, returnType=DateType()) | ||||||||||
| df = df.withColumn("date_copy", date_copy(col("date"))) | ||||||||||
|
|
||||||||||
| @pandas_udf(returnType=StringType()) | ||||||||||
| def check_data(idx, date, date_copy): | ||||||||||
| import pandas as pd | ||||||||||
| msgs = [] | ||||||||||
| is_equal = date.isnull() | ||||||||||
| for i in range(len(idx)): | ||||||||||
| if (is_equal[i] and data[idx[i]][1] is None) or \ | ||||||||||
| date[i] == data[idx[i]][1]: | ||||||||||
| msgs.append(None) | ||||||||||
| else: | ||||||||||
| msgs.append( | ||||||||||
| "date values are not equal (date='%s': data[%d][1]='%s')" | ||||||||||
| % (date[i], idx[i], data[idx[i]][1])) | ||||||||||
| return pd.Series(msgs) | ||||||||||
|
|
||||||||||
| result = df.withColumn("check_data", | ||||||||||
| check_data(col("idx"), col("date"), col("date_copy"))).collect() | ||||||||||
|
|
||||||||||
| self.assertEquals(len(data), len(result)) | ||||||||||
| for i in range(len(result)): | ||||||||||
| self.assertEquals(data[i][1], result[i][1]) # "date" col | ||||||||||
| self.assertEquals(data[i][1], result[i][2]) # "date_copy" col | ||||||||||
| self.assertIsNone(result[i][3]) # "check_data" col | ||||||||||
|
|
||||||||||
| def test_vectorized_udf_timestamps(self): | ||||||||||
| from pyspark.sql.functions import pandas_udf, col | ||||||||||
|
|
@@ -4114,6 +4138,7 @@ def check_data(idx, timestamp, timestamp_copy): | |||||||||
| self.assertEquals(len(data), len(result)) | ||||||||||
| for i in range(len(result)): | ||||||||||
| self.assertEquals(data[i][1], result[i][1]) # "timestamp" col | ||||||||||
| self.assertEquals(data[i][1], result[i][2]) # "timestamp_copy" col | ||||||||||
| self.assertIsNone(result[i][3]) # "check_data" col | ||||||||||
|
|
||||||||||
| def test_vectorized_udf_return_timestamp_tz(self): | ||||||||||
|
|
||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1694,6 +1694,21 @@ def from_arrow_schema(arrow_schema): | |
| for field in arrow_schema]) | ||
|
|
||
|
|
||
| def _correct_date_of_dataframe_from_arrow(pdf, schema): | ||
|
||
| """ Correct date type value to use datetime.date. | ||
|
|
||
| Pandas DataFrame created from PyArrow uses datetime64[ns] for date type values, but we should | ||
| use datetime.date to keep backward compatibility. | ||
|
||
|
|
||
| :param pdf: pandas.DataFrame | ||
| :param schema: a Spark schema of the pandas.DataFrame | ||
| """ | ||
| for field in schema: | ||
| if type(field.dataType) == DateType: | ||
| pdf[field.name] = pdf[field.name].dt.date | ||
| return pdf | ||
|
|
||
|
|
||
| def _check_dataframe_localize_timestamps(pdf, timezone): | ||
| """ | ||
| Convert timezone aware timestamps to timezone-naive in the specified timezone or local timezone | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we were considering the interpretation of DateType as object as a bug, similar to how FloatType was being interpreted as float64?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, I feel it was a bug. Maybe we can merge this to branch-2.3 only and update the migration guide in the master branch?