diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 25c95bd607d27..9806a4cf4153b 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -263,7 +263,7 @@ jobs: - name: Install Python packages (Python 3.8) if: (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-')) || contains(matrix.modules, 'connect') run: | - python3.8 -m pip install 'numpy>=1.20.0' 'pyarrow==12.0.1' pandas scipy unittest-xml-reporting 'grpcio>=1.48,<1.57' 'grpcio-status>=1.48,<1.57' 'protobuf==3.20.3' + python3.8 -m pip install 'numpy>=1.20.0' pyarrow pandas scipy unittest-xml-reporting 'grpcio>=1.48,<1.57' 'grpcio-status>=1.48,<1.57' 'protobuf==3.20.3' python3.8 -m pip list # Run the tests. - name: Run tests @@ -728,7 +728,7 @@ jobs: # See also https://issues.apache.org/jira/browse/SPARK-38279. python3.9 -m pip install 'sphinx<3.1.0' mkdocs pydata_sphinx_theme nbsphinx numpydoc 'jinja2<3.0.0' 'markupsafe==2.0.1' 'pyzmq<24.0.0' python3.9 -m pip install ipython_genutils # See SPARK-38517 - python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' 'pyarrow==12.0.1' pandas 'plotly>=4.8' + python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' pyarrow pandas 'plotly>=4.8' python3.9 -m pip install 'docutils<0.18.0' # See SPARK-39421 apt-get update -y apt-get install -y ruby ruby-dev diff --git a/dev/infra/Dockerfile b/dev/infra/Dockerfile index 99423ce072cd7..8e5a3cb7c0539 100644 --- a/dev/infra/Dockerfile +++ b/dev/infra/Dockerfile @@ -85,7 +85,7 @@ RUN Rscript -e "devtools::install_version('roxygen2', version='7.2.0', repos='ht ENV R_LIBS_SITE "/usr/local/lib/R/site-library:${R_LIBS_SITE}:/usr/lib/R/library" RUN pypy3 -m pip install numpy 'pandas<=2.0.3' scipy coverage matplotlib -RUN python3.9 -m pip install numpy 'pyarrow==12.0.1' 'pandas<=2.0.3' scipy unittest-xml-reporting plotly>=4.8 'mlflow>=2.3.1' coverage matplotlib openpyxl 'memory-profiler==0.60.0' 'scikit-learn==1.1.*' +RUN python3.9 -m pip install numpy pyarrow 'pandas<=2.0.3' scipy unittest-xml-reporting plotly>=4.8 'mlflow>=2.3.1' coverage matplotlib openpyxl 'memory-profiler==0.60.0' 'scikit-learn==1.1.*' # Add Python deps for Spark Connect. RUN python3.9 -m pip install 'grpcio>=1.48,<1.57' 'grpcio-status>=1.48,<1.57' 'protobuf==3.20.3' 'googleapis-common-protos==1.56.4' diff --git a/python/pyspark/pandas/typedef/typehints.py b/python/pyspark/pandas/typedef/typehints.py index 6e41395186d3d..413ef20dae0dd 100644 --- a/python/pyspark/pandas/typedef/typehints.py +++ b/python/pyspark/pandas/typedef/typehints.py @@ -293,7 +293,9 @@ def spark_type_to_pandas_dtype( ), ): return np.dtype("object") - elif isinstance(spark_type, types.TimestampType): + elif isinstance(spark_type, types.DayTimeIntervalType): + return np.dtype("timedelta64[ns]") + elif isinstance(spark_type, (types.TimestampType, types.TimestampNTZType)): return np.dtype("datetime64[ns]") else: return np.dtype(to_arrow_type(spark_type).to_pandas_dtype()) diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index e8d598bd0fed8..0584f18e5bbf4 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -33,6 +33,7 @@ import urllib.parse import uuid import sys +from distutils.version import LooseVersion from types import TracebackType from typing import ( Iterable, @@ -880,19 +881,30 @@ def to_pandas(self, plan: pb2.Plan) -> "pd.DataFrame": # Rename columns to avoid duplicated column names. renamed_table = table.rename_columns([f"col_{i}" for i in range(table.num_columns)]) + + pandas_options = {} if self_destruct: # Configure PyArrow to use as little memory as possible: # self_destruct - free columns as they are converted # split_blocks - create a separate Pandas block for each column # use_threads - convert one column at a time - pandas_options = { - "self_destruct": True, - "split_blocks": True, - "use_threads": False, - } - pdf = renamed_table.to_pandas(**pandas_options) - else: - pdf = renamed_table.to_pandas() + pandas_options.update( + { + "self_destruct": True, + "split_blocks": True, + "use_threads": False, + } + ) + if LooseVersion(pa.__version__) >= LooseVersion("13.0.0"): + # A legacy option to coerce date32, date64, duration, and timestamp + # time units to nanoseconds when converting to pandas. + # This option can only be added since 13.0.0. + pandas_options.update( + { + "coerce_temporal_nanoseconds": True, + } + ) + pdf = renamed_table.to_pandas(**pandas_options) pdf.columns = schema.names if len(pdf.columns) > 0: diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 8664c4df73ed8..cc775de0b79a8 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -26,6 +26,7 @@ TYPE_CHECKING, ) from warnings import warn +from distutils.version import LooseVersion from pyspark.errors.exceptions.captured import unwrap_spark_exception from pyspark.rdd import _load_from_socket @@ -125,12 +126,12 @@ def toPandas(self) -> "PandasDataFrameLike": # of PyArrow is found, if 'spark.sql.execution.arrow.pyspark.enabled' is enabled. if use_arrow: try: - import pyarrow + import pyarrow as pa self_destruct = jconf.arrowPySparkSelfDestructEnabled() batches = self._collect_as_arrow(split_batches=self_destruct) if len(batches) > 0: - table = pyarrow.Table.from_batches(batches) + table = pa.Table.from_batches(batches) # Ensure only the table has a reference to the batches, so that # self_destruct (if enabled) is effective del batches @@ -138,6 +139,17 @@ def toPandas(self) -> "PandasDataFrameLike": # values, but we should use datetime.date to match the behavior with when # Arrow optimization is disabled. pandas_options = {"date_as_object": True} + + if LooseVersion(pa.__version__) >= LooseVersion("13.0.0"): + # A legacy option to coerce date32, date64, duration, and timestamp + # time units to nanoseconds when converting to pandas. + # This option can only be added since 13.0.0. + pandas_options.update( + { + "coerce_temporal_nanoseconds": True, + } + ) + if self_destruct: # Configure PyArrow to use as little memory as possible: # self_destruct - free columns as they are converted diff --git a/python/pyspark/sql/pandas/serializers.py b/python/pyspark/sql/pandas/serializers.py index 2cc3db15c9cd5..ed5884879cee2 100644 --- a/python/pyspark/sql/pandas/serializers.py +++ b/python/pyspark/sql/pandas/serializers.py @@ -185,7 +185,22 @@ def arrow_to_pandas(self, arrow_column, struct_in_pandas="dict", ndarray_as_list # instead of creating datetime64[ns] as intermediate data to avoid overflow caused by # datetime64[ns] type handling. # Cast dates to objects instead of datetime64[ns] dtype to avoid overflow. - s = arrow_column.to_pandas(date_as_object=True) + pandas_options = {"date_as_object": True} + + import pyarrow as pa + from distutils.version import LooseVersion + + if LooseVersion(pa.__version__) >= LooseVersion("13.0.0"): + # A legacy option to coerce date32, date64, duration, and timestamp + # time units to nanoseconds when converting to pandas. + # This option can only be added since 13.0.0. + pandas_options.update( + { + "coerce_temporal_nanoseconds": True, + } + ) + + s = arrow_column.to_pandas(**pandas_options) # TODO(SPARK-43579): cache the converter for reuse converter = _create_converter_to_pandas(