Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/pyspark/sql/pandas/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ def _create_from_pandas_with_arrow(

# Slice the DataFrame to be batched
step = self._jconf.arrowMaxRecordsPerBatch()
step = step if step > 0 else len(pdf)
pdf_slices = (pdf.iloc[start : start + step] for start in range(0, len(pdf), step))

# Create list of Arrow (columns, arrow_type, spark_type) for serializer dump_stream
Expand Down
10 changes: 10 additions & 0 deletions python/pyspark/sql/tests/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,16 @@ class MyInheritedTuple(MyTuple):
df = self.spark.createDataFrame([MyInheritedTuple(1, 2, MyInheritedTuple(1, 2, 3))])
self.assertEqual(df.first(), Row(a=1, b=2, c=Row(a=1, b=2, c=3)))

def test_negative_and_zero_batch_size(self):
# SPARK-47068: Negative and zero value should work as unlimited batch size.
with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 0}):
pdf = pd.DataFrame({"a": [123]})
assert_frame_equal(pdf, self.spark.createDataFrame(pdf).toPandas())

with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": -1}):
pdf = pd.DataFrame({"a": [123]})
assert_frame_equal(pdf, self.spark.createDataFrame(pdf).toPandas())


@unittest.skipIf(
not have_pandas or not have_pyarrow,
Expand Down