Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ jobs:
- name: Install Python packages (Python 3.8)
if: (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-')) || contains(matrix.modules, 'connect')
run: |
python3.8 -m pip install 'numpy>=1.20.0' 'pyarrow==12.0.1' pandas scipy unittest-xml-reporting 'grpcio>=1.48,<1.57' 'grpcio-status>=1.48,<1.57' 'protobuf==3.20.3'
python3.8 -m pip install 'numpy>=1.20.0' pyarrow pandas scipy unittest-xml-reporting 'grpcio>=1.48,<1.57' 'grpcio-status>=1.48,<1.57' 'protobuf==3.20.3'
python3.8 -m pip list
# Run the tests.
- name: Run tests
Expand Down Expand Up @@ -728,7 +728,7 @@ jobs:
# See also https://issues.apache.org/jira/browse/SPARK-38279.
python3.9 -m pip install 'sphinx<3.1.0' mkdocs pydata_sphinx_theme nbsphinx numpydoc 'jinja2<3.0.0' 'markupsafe==2.0.1' 'pyzmq<24.0.0'
python3.9 -m pip install ipython_genutils # See SPARK-38517
python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' 'pyarrow==12.0.1' pandas 'plotly>=4.8'
python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' pyarrow pandas 'plotly>=4.8'
python3.9 -m pip install 'docutils<0.18.0' # See SPARK-39421
apt-get update -y
apt-get install -y ruby ruby-dev
Expand Down
2 changes: 1 addition & 1 deletion dev/infra/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ RUN Rscript -e "devtools::install_version('roxygen2', version='7.2.0', repos='ht
ENV R_LIBS_SITE "/usr/local/lib/R/site-library:${R_LIBS_SITE}:/usr/lib/R/library"

RUN pypy3 -m pip install numpy 'pandas<=2.0.3' scipy coverage matplotlib
RUN python3.9 -m pip install numpy 'pyarrow==12.0.1' 'pandas<=2.0.3' scipy unittest-xml-reporting plotly>=4.8 'mlflow>=2.3.1' coverage matplotlib openpyxl 'memory-profiler==0.60.0' 'scikit-learn==1.1.*'
RUN python3.9 -m pip install numpy pyarrow 'pandas<=2.0.3' scipy unittest-xml-reporting plotly>=4.8 'mlflow>=2.3.1' coverage matplotlib openpyxl 'memory-profiler==0.60.0' 'scikit-learn==1.1.*'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also upgrade mlflow version.


# Add Python deps for Spark Connect.
RUN python3.9 -m pip install 'grpcio>=1.48,<1.57' 'grpcio-status>=1.48,<1.57' 'protobuf==3.20.3' 'googleapis-common-protos==1.56.4'
Expand Down
4 changes: 3 additions & 1 deletion python/pyspark/pandas/typedef/typehints.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,9 @@ def spark_type_to_pandas_dtype(
),
):
return np.dtype("object")
elif isinstance(spark_type, types.TimestampType):
elif isinstance(spark_type, types.DayTimeIntervalType):
return np.dtype("timedelta64[ns]")
elif isinstance(spark_type, (types.TimestampType, types.TimestampNTZType)):
return np.dtype("datetime64[ns]")
else:
return np.dtype(to_arrow_type(spark_type).to_pandas_dtype())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the behavior of to_arrow_type(spark_type).to_pandas_dtype() changed, e.g.:

to_arrow_type(DayTimeIntervalType) -> pa.timestamp("us", tz="UTC") -> datetime64[us, UTC] in 13.0.0, but datetime64[ns, UTC] in 12.0.1

Expand Down
28 changes: 20 additions & 8 deletions python/pyspark/sql/connect/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import urllib.parse
import uuid
import sys
from distutils.version import LooseVersion
from types import TracebackType
from typing import (
Iterable,
Expand Down Expand Up @@ -880,19 +881,30 @@ def to_pandas(self, plan: pb2.Plan) -> "pd.DataFrame":

# Rename columns to avoid duplicated column names.
renamed_table = table.rename_columns([f"col_{i}" for i in range(table.num_columns)])

pandas_options = {}
if self_destruct:
# Configure PyArrow to use as little memory as possible:
# self_destruct - free columns as they are converted
# split_blocks - create a separate Pandas block for each column
# use_threads - convert one column at a time
pandas_options = {
"self_destruct": True,
"split_blocks": True,
"use_threads": False,
}
pdf = renamed_table.to_pandas(**pandas_options)
else:
pdf = renamed_table.to_pandas()
pandas_options.update(
{
"self_destruct": True,
"split_blocks": True,
"use_threads": False,
}
)
if LooseVersion(pa.__version__) >= LooseVersion("13.0.0"):
# A legacy option to coerce date32, date64, duration, and timestamp
# time units to nanoseconds when converting to pandas.
# This option can only be added since 13.0.0.
pandas_options.update(
{
"coerce_temporal_nanoseconds": True,
}
)
pdf = renamed_table.to_pandas(**pandas_options)
pdf.columns = schema.names

if len(pdf.columns) > 0:
Expand Down
16 changes: 14 additions & 2 deletions python/pyspark/sql/pandas/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
TYPE_CHECKING,
)
from warnings import warn
from distutils.version import LooseVersion

from pyspark.errors.exceptions.captured import unwrap_spark_exception
from pyspark.rdd import _load_from_socket
Expand Down Expand Up @@ -125,19 +126,30 @@ def toPandas(self) -> "PandasDataFrameLike":
# of PyArrow is found, if 'spark.sql.execution.arrow.pyspark.enabled' is enabled.
if use_arrow:
try:
import pyarrow
import pyarrow as pa

self_destruct = jconf.arrowPySparkSelfDestructEnabled()
batches = self._collect_as_arrow(split_batches=self_destruct)
if len(batches) > 0:
table = pyarrow.Table.from_batches(batches)
table = pa.Table.from_batches(batches)
# Ensure only the table has a reference to the batches, so that
# self_destruct (if enabled) is effective
del batches
# Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
# values, but we should use datetime.date to match the behavior with when
# Arrow optimization is disabled.
pandas_options = {"date_as_object": True}

if LooseVersion(pa.__version__) >= LooseVersion("13.0.0"):
# A legacy option to coerce date32, date64, duration, and timestamp
# time units to nanoseconds when converting to pandas.
# This option can only be added since 13.0.0.
pandas_options.update(
{
"coerce_temporal_nanoseconds": True,
}
)

if self_destruct:
# Configure PyArrow to use as little memory as possible:
# self_destruct - free columns as they are converted
Expand Down
17 changes: 16 additions & 1 deletion python/pyspark/sql/pandas/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,22 @@ def arrow_to_pandas(self, arrow_column, struct_in_pandas="dict", ndarray_as_list
# instead of creating datetime64[ns] as intermediate data to avoid overflow caused by
# datetime64[ns] type handling.
# Cast dates to objects instead of datetime64[ns] dtype to avoid overflow.
s = arrow_column.to_pandas(date_as_object=True)
pandas_options = {"date_as_object": True}

import pyarrow as pa
from distutils.version import LooseVersion

if LooseVersion(pa.__version__) >= LooseVersion("13.0.0"):
# A legacy option to coerce date32, date64, duration, and timestamp
# time units to nanoseconds when converting to pandas.
# This option can only be added since 13.0.0.
pandas_options.update(
{
"coerce_temporal_nanoseconds": True,
}
)

s = arrow_column.to_pandas(**pandas_options)

# TODO(SPARK-43579): cache the converter for reuse
converter = _create_converter_to_pandas(
Expand Down