-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23290][SQL][PYTHON] Use datetime.date for date type when converting Spark DataFrame to Pandas DataFrame. #20506
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
223d0a0
57ab41b
ebdbd8c
8823043
f151cdf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2816,7 +2816,7 @@ def test_to_pandas(self): | |||||||||
| self.assertEquals(types[1], np.object) | ||||||||||
| self.assertEquals(types[2], np.bool) | ||||||||||
| self.assertEquals(types[3], np.float32) | ||||||||||
| self.assertEquals(types[4], 'datetime64[ns]') | ||||||||||
| self.assertEquals(types[4], np.object) # datetime.date | ||||||||||
| self.assertEquals(types[5], 'datetime64[ns]') | ||||||||||
|
|
||||||||||
| @unittest.skipIf(not _have_old_pandas, "Old Pandas not installed") | ||||||||||
|
|
@@ -3388,7 +3388,7 @@ class ArrowTests(ReusedSQLTestCase): | |||||||||
|
|
||||||||||
| @classmethod | ||||||||||
| def setUpClass(cls): | ||||||||||
| from datetime import datetime | ||||||||||
| from datetime import date, datetime | ||||||||||
| from decimal import Decimal | ||||||||||
| ReusedSQLTestCase.setUpClass() | ||||||||||
|
|
||||||||||
|
|
@@ -3410,11 +3410,11 @@ def setUpClass(cls): | |||||||||
| StructField("7_date_t", DateType(), True), | ||||||||||
| StructField("8_timestamp_t", TimestampType(), True)]) | ||||||||||
| cls.data = [(u"a", 1, 10, 0.2, 2.0, Decimal("2.0"), | ||||||||||
| datetime(1969, 1, 1), datetime(1969, 1, 1, 1, 1, 1)), | ||||||||||
| date(1969, 1, 1), datetime(1969, 1, 1, 1, 1, 1)), | ||||||||||
| (u"b", 2, 20, 0.4, 4.0, Decimal("4.0"), | ||||||||||
| datetime(2012, 2, 2), datetime(2012, 2, 2, 2, 2, 2)), | ||||||||||
| date(2012, 2, 2), datetime(2012, 2, 2, 2, 2, 2)), | ||||||||||
| (u"c", 3, 30, 0.8, 6.0, Decimal("6.0"), | ||||||||||
| datetime(2100, 3, 3), datetime(2100, 3, 3, 3, 3, 3))] | ||||||||||
| date(2100, 3, 3), datetime(2100, 3, 3, 3, 3, 3))] | ||||||||||
|
|
||||||||||
| @classmethod | ||||||||||
| def tearDownClass(cls): | ||||||||||
|
|
@@ -3461,7 +3461,9 @@ def _toPandas_arrow_toggle(self, df): | |||||||||
| def test_toPandas_arrow_toggle(self): | ||||||||||
| df = self.spark.createDataFrame(self.data, schema=self.schema) | ||||||||||
| pdf, pdf_arrow = self._toPandas_arrow_toggle(df) | ||||||||||
| self.assertPandasEqual(pdf_arrow, pdf) | ||||||||||
| expected = self.create_pandas_data_frame() | ||||||||||
| self.assertPandasEqual(expected, pdf) | ||||||||||
| self.assertPandasEqual(expected, pdf_arrow) | ||||||||||
|
|
||||||||||
| def test_toPandas_respect_session_timezone(self): | ||||||||||
| df = self.spark.createDataFrame(self.data, schema=self.schema) | ||||||||||
|
|
@@ -4062,18 +4064,42 @@ def test_vectorized_udf_unsupported_types(self): | |||||||||
| with self.assertRaisesRegexp(Exception, 'Unsupported data type'): | ||||||||||
| df.select(f(col('map'))).collect() | ||||||||||
|
|
||||||||||
| def test_vectorized_udf_null_date(self): | ||||||||||
| def test_vectorized_udf_dates(self): | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we have a new test to directly verify the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe spark/python/pyspark/sql/tests.py Lines 3461 to 3464 in ebdbd8c
? In addition, I'll modify it to check between its expected Pandas DataFrame. |
||||||||||
| from pyspark.sql.functions import pandas_udf, col | ||||||||||
| from datetime import date | ||||||||||
| schema = StructType().add("date", DateType()) | ||||||||||
| data = [(date(1969, 1, 1),), | ||||||||||
| (date(2012, 2, 2),), | ||||||||||
| (None,), | ||||||||||
| (date(2100, 4, 4),)] | ||||||||||
| schema = StructType().add("idx", LongType()).add("date", DateType()) | ||||||||||
| data = [(0, date(1969, 1, 1),), | ||||||||||
| (1, date(2012, 2, 2),), | ||||||||||
| (2, None,), | ||||||||||
| (3, date(2100, 4, 4),)] | ||||||||||
| df = self.spark.createDataFrame(data, schema=schema) | ||||||||||
| date_f = pandas_udf(lambda t: t, returnType=DateType()) | ||||||||||
| res = df.select(date_f(col("date"))) | ||||||||||
| self.assertEquals(df.collect(), res.collect()) | ||||||||||
|
|
||||||||||
| date_copy = pandas_udf(lambda t: t, returnType=DateType()) | ||||||||||
| df = df.withColumn("date_copy", date_copy(col("date"))) | ||||||||||
|
|
||||||||||
| @pandas_udf(returnType=StringType()) | ||||||||||
| def check_data(idx, date, date_copy): | ||||||||||
| import pandas as pd | ||||||||||
| msgs = [] | ||||||||||
| is_equal = date.isnull() | ||||||||||
| for i in range(len(idx)): | ||||||||||
| if (is_equal[i] and data[idx[i]][1] is None) or \ | ||||||||||
| date[i] == data[idx[i]][1]: | ||||||||||
| msgs.append(None) | ||||||||||
| else: | ||||||||||
| msgs.append( | ||||||||||
| "date values are not equal (date='%s': data[%d][1]='%s')" | ||||||||||
| % (date[i], idx[i], data[idx[i]][1])) | ||||||||||
| return pd.Series(msgs) | ||||||||||
|
|
||||||||||
| result = df.withColumn("check_data", | ||||||||||
| check_data(col("idx"), col("date"), col("date_copy"))).collect() | ||||||||||
|
|
||||||||||
| self.assertEquals(len(data), len(result)) | ||||||||||
| for i in range(len(result)): | ||||||||||
| self.assertEquals(data[i][1], result[i][1]) # "date" col | ||||||||||
| self.assertEquals(data[i][1], result[i][2]) # "date_copy" col | ||||||||||
| self.assertIsNone(result[i][3]) # "check_data" col | ||||||||||
|
|
||||||||||
| def test_vectorized_udf_timestamps(self): | ||||||||||
| from pyspark.sql.functions import pandas_udf, col | ||||||||||
|
|
@@ -4114,6 +4140,7 @@ def check_data(idx, timestamp, timestamp_copy): | |||||||||
| self.assertEquals(len(data), len(result)) | ||||||||||
| for i in range(len(result)): | ||||||||||
| self.assertEquals(data[i][1], result[i][1]) # "timestamp" col | ||||||||||
| self.assertEquals(data[i][1], result[i][2]) # "timestamp_copy" col | ||||||||||
| self.assertIsNone(result[i][3]) # "check_data" col | ||||||||||
|
|
||||||||||
| def test_vectorized_udf_return_timestamp_tz(self): | ||||||||||
|
|
||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we were considering the interpretation of DateType as object as a bug, similar to how FloatType was being interpreted as float64?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, I feel it was a bug. Maybe we can merge this to branch-2.3 only and update the migration guide in the master branch?