From 93f1ce58591fc90afb40c1754ff9cdeadd2d05d3 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Fri, 16 Feb 2024 10:21:20 +0900 Subject: [PATCH] Recover -1 and 0 case for spark.sql.execution.arrow.maxRecordsPerBatch --- python/pyspark/sql/pandas/conversion.py | 1 + python/pyspark/sql/tests/test_arrow.py | 10 ++++++++++ 2 files changed, 11 insertions(+) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 5288f0e100bb..d958b95795b7 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -630,6 +630,7 @@ def _create_from_pandas_with_arrow( # Slice the DataFrame to be batched step = self._jconf.arrowMaxRecordsPerBatch() + step = step if step > 0 else len(pdf) pdf_slices = (pdf.iloc[start : start + step] for start in range(0, len(pdf), step)) # Create list of Arrow (columns, arrow_type, spark_type) for serializer dump_stream diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index fc979c9e8b78..c771e5db65e5 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -1144,6 +1144,16 @@ class MyInheritedTuple(MyTuple): df = self.spark.createDataFrame([MyInheritedTuple(1, 2, MyInheritedTuple(1, 2, 3))]) self.assertEqual(df.first(), Row(a=1, b=2, c=Row(a=1, b=2, c=3))) + def test_negative_and_zero_batch_size(self): + # SPARK-47068: Negative and zero value should work as unlimited batch size. + with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 0}): + pdf = pd.DataFrame({"a": [123]}) + assert_frame_equal(pdf, self.spark.createDataFrame(pdf).toPandas()) + + with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": -1}): + pdf = pd.DataFrame({"a": [123]}) + assert_frame_equal(pdf, self.spark.createDataFrame(pdf).toPandas()) + @unittest.skipIf( not have_pandas or not have_pyarrow,