Skip to content

Conversation

@HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented May 26, 2022

What changes were proposed in this pull request?

This PR proposes to use LocalRelation instead of LogicalRDD when creating a (small) DataFrame with Arrow optimization, which passes the data as a local data in the driver side (which is consistent with Scala code path).

Namely:

import pandas as pd
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)
spark.createDataFrame(pd.DataFrame({'a': [1, 2, 3, 4]})).explain(True)

Before

== Parsed Logical Plan ==
LogicalRDD [a#0L], false

== Analyzed Logical Plan ==
a: bigint
LogicalRDD [a#0L], false

== Optimized Logical Plan ==
LogicalRDD [a#0L], false

== Physical Plan ==
*(1) Scan ExistingRDD arrow[a#0L]

After

== Parsed Logical Plan ==
LocalRelation [a#0L]

== Analyzed Logical Plan ==
a: bigint
LocalRelation [a#0L]

== Optimized Logical Plan ==
LocalRelation [a#0L]

== Physical Plan ==
LocalTableScan [a#0L]

This is controlled by a new configuration spark.sql.execution.arrow.localRelationThreshold defaulting to 48MB. This default was picked by benchmark I ran below.

In addition, this PR also fixes createDataFrame to respect spark.sql.execution.arrow.maxRecordsPerBatch configuration when creating Arrow bathes. Previously, we divided the input pandas DataFrame by the default partition number which forced users to set spark.rpc.message.maxSize when the input pandas DataFrame is too large. See the benchmark performed below.

Why are the changes needed?

We have some nice optimization for LocalRelation (e.g., ConvertToLocalRelation). For example, the stats are fully known when you use LocalRelation. With LogicalRDD, many optimizations cannot be applied. Even in some cases (e.g., executeCollect), we can avoid creating RDDs too.

For respecting spark.sql.execution.arrow.maxRecordsPerBatch, 1. we can avoid forcing users to set spark.rpc.message.maxSize, and 2. I believe the configuration is supposed to be respected for all code path that creates Arrow batches if possible.

Does this PR introduce any user-facing change?

No, it is an optimization. The number of partitions can be different, but that should be internal.

How was this patch tested?

  • Manually tested.
  • Added a unittest.
  • I did two benchmark tests with 1 Driver & 4 Workers (i3.xlarge), see below.

Benchmark 1 (best cases)

import time
import random
import string

import pandas as pd

names = [random.choice(list(string.ascii_lowercase)) for i in range(1000)]
ages = [random.randint(0, 100) for i in range(1000)]
l = list(zip(names, ages))
d = [{'name': a_name, 'age': an_age} for a_name, an_age in l]
pdf = pd.DataFrame({'name': names, 'age': ages})
spark.range(1).count()  # heat up

start = time.time()
for _ in range(100):
    _ = spark.createDataFrame(pdf)

end = time.time()
print(end - start)

Before

10.250491698582968

After

6.004616181055705

Benchmark 2 (worst cases)

curl -O https://eforexcel.com/wp/wp-content/uploads/2020/09/HR2m.zip
unzip HR2m.zip
import pandas as pd
pdf = pd.read_csv("HR2m.csv")
pdf23 = pdf.iloc[:int(len(pdf)/32)]
pdf45 = pdf.iloc[:int(len(pdf)/16)]
pdf90 = pdf.iloc[:int(len(pdf)/8)]
pdf175 = pdf.iloc[:int(len(pdf)/4)]
pdf350 = pdf.iloc[:int(len(pdf)/2)]
pdf700 = pdf.iloc[:int(len(pdf))]
pdf2gb = pd.concat([pdf, pdf, pdf])
pdf5gb = pd.concat([pdf2gb, pdf2gb])

spark.createDataFrame(pdf23)._jdf.rdd().count()  # explicitly create RDD.
...

Before

23MB: 1.02 seconds
45MB: 1.69 seconds
90MB: 2.38 seconds
175MB: 3.19 seconds
350MB: 6.10 seconds
2GB: 43.21 seconds
5GB: X (threw an exception that says to set 'spark.rpc.message.size' higher)

After

23MB: 1.31 seconds (local collection is used)
45MB: 2.47 seconds (local collection is used)
90MB: 1.79 seconds
175MB: 3.22 seconds
350MB: 6.41 seconds
2GB: 47.12 seconds
5GB: 1.29 minutes

NOTE that the performance varies depending on network stability, and the numbers above are from second run (it's not the average).

@HyukjinKwon HyukjinKwon marked this pull request as draft May 26, 2022 06:51
@HyukjinKwon HyukjinKwon marked this pull request as ready for review May 26, 2022 09:59
@HyukjinKwon HyukjinKwon force-pushed the SPARK-39301 branch 4 times, most recently from c943c4d to 1f25a6b Compare May 26, 2022 10:10
@HyukjinKwon
Copy link
Member Author

cc @ueshin @viirya @BryanCutler FYI

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentionally I used Iterator to avoid Py4J copies Array into Python driver side.

@HyukjinKwon HyukjinKwon changed the title [SPARK-39301][PYTHON] Leverage LocalRelation in createDataFrame with Arrow optimization [SPARK-39301][SQL][PYTHON] Leverage LocalRelation in createDataFrame with Arrow optimization May 26, 2022
@HyukjinKwon
Copy link
Member Author

Wow, thanks for reviews guys. Let me think a bit more and push some changes soon.

@HyukjinKwon HyukjinKwon marked this pull request as draft May 30, 2022 08:53
@HyukjinKwon HyukjinKwon force-pushed the SPARK-39301 branch 3 times, most recently from 449e7c3 to 357d9b2 Compare May 31, 2022 08:23
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason of doing this is to avoid reconfiguring spark.rpc.message.maxSize. When the batch is too large, it throws an exception with complaining spark.rpc.message.maxSize is too small.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this was to control how many partitions were in the rdd? Each partition could have multiple batches, and probably should be capped at arrowMaxRecordsPerBatch, but since it was coming from a local Pandas DataFrame already in memory, that didn't seem to be a big deal.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's true... perf diff seems trivial in any event and seems it works around the spark.rpc.message.maxSize issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it was like this to create the same number of partitions as when arrow is disabled, although that might have changed since. If the DataFrame is split with arrowMaxRecordsPerBatch and a user wanted to create a certain number of partitions, then would they have to look at the size of the input and then adjust arrowMaxRecordsPerBatch accordingly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's true .. but I wonder if the default number of partitions is something we should consider given that it wasn't already configurable before, and SparkSession.createDataFrame does not expose the number of partitions too.

If they really need, users might want to create an RDD with an explicit parallelism .. we don't support this now though (see also #29719).

Copy link
Member Author

@HyukjinKwon HyukjinKwon Jun 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, just to extra clarify, when the pandas DataFrame is small (lower than the threshold), the number of partitions remains same (configured by spark.sql.leafNodeDefaultParallelism that falls back to sparkContext.defaultParallelism if not set).

The number of partitions is only different when the input DataFrame is large, which I think makes more sense in general ..

@HyukjinKwon HyukjinKwon force-pushed the SPARK-39301 branch 4 times, most recently from 7bc08ef to a88eef8 Compare June 1, 2022 05:07
@HyukjinKwon HyukjinKwon marked this pull request as ready for review June 1, 2022 07:42
@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Jun 1, 2022

This PR is ready for a look now. TL;DR:

  • I added one new configuration spark.sql.execution.arrow.localRelationThreshold as a threshold. Default value was picked given the benchmark.
  • I fixed the code path to respect spark.sql.execution.arrow.maxRecordsPerBatch when creating Arrow batches to avoid forcing users to set a larger spark.rpc.message.maxSize.

@HyukjinKwon HyukjinKwon changed the title [SPARK-39301][SQL][PYTHON] Leverage LocalRelation in createDataFrame with Arrow optimization [SPARK-39301][SQL][PYTHON] Leverage LocalRelation and respect Arrow batch size in createDataFrame with Arrow optimization Jun 1, 2022
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe 32 MB? Don't have a strong preference.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the max size of each batch or all batches together?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's all together ..so pretty small

@HyukjinKwon
Copy link
Member Author

Gentle ping for a review :-). I know it has some trade-off but I believe this addresses more common cases and benefit more users.

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it's a good optimization for smaller files, thanks @HyukjinKwon !

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the max size of each batch or all batches together?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this was to control how many partitions were in the rdd? Each partition could have multiple batches, and probably should be capped at arrowMaxRecordsPerBatch, but since it was coming from a local Pandas DataFrame already in memory, that didn't seem to be a big deal.

@HyukjinKwon
Copy link
Member Author

cc @mengxr and @WeichenXu123 in case you guys have some comments.

@HyukjinKwon
Copy link
Member Author

Let me merge this in few days ... assuming that we're all good. Hopefully my benchmark is good enough.

@HyukjinKwon
Copy link
Member Author

Rebased

@HyukjinKwon
Copy link
Member Author

Let me get this in. It's the early stage of Spark 3.4 so should be good timing to merge such stuff.

Merged to master.

@HyukjinKwon HyukjinKwon deleted the SPARK-39301 branch January 15, 2024 00:53
HyukjinKwon added a commit that referenced this pull request Feb 16, 2024
…ution.arrow.maxRecordsPerBatch

### What changes were proposed in this pull request?

This PR fixes the regression introduced by #36683.

```python
import pandas as pd
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 0)
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", False)
spark.createDataFrame(pd.DataFrame({'a': [123]})).toPandas()

spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", -1)
spark.createDataFrame(pd.DataFrame({'a': [123]})).toPandas()
```

**Before**

```
/.../spark/python/pyspark/sql/pandas/conversion.py:371: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has reached the error below and will not continue because automatic fallback with 'spark.sql.execution.arrow.pyspark.fallback.enabled' has been set to false.
  range() arg 3 must not be zero
  warn(msg)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/session.py", line 1483, in createDataFrame
    return super(SparkSession, self).createDataFrame(  # type: ignore[call-overload]
  File "/.../spark/python/pyspark/sql/pandas/conversion.py", line 351, in createDataFrame
    return self._create_from_pandas_with_arrow(data, schema, timezone)
  File "/.../spark/python/pyspark/sql/pandas/conversion.py", line 633, in _create_from_pandas_with_arrow
    pdf_slices = (pdf.iloc[start : start + step] for start in range(0, len(pdf), step))
ValueError: range() arg 3 must not be zero
```
```
Empty DataFrame
Columns: [a]
Index: []
```

**After**

```
     a
0  123
```

```
     a
0  123
```

### Why are the changes needed?

It fixes a regerssion. This is a documented behaviour. It should be backported to branch-3.4 and branch-3.5.

### Does this PR introduce _any_ user-facing change?

Yes, it fixes a regression as described above.

### How was this patch tested?

Unittest was added.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #45132 from HyukjinKwon/SPARK-47068.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
HyukjinKwon added a commit that referenced this pull request Feb 16, 2024
…ution.arrow.maxRecordsPerBatch

This PR fixes the regression introduced by #36683.

```python
import pandas as pd
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 0)
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", False)
spark.createDataFrame(pd.DataFrame({'a': [123]})).toPandas()

spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", -1)
spark.createDataFrame(pd.DataFrame({'a': [123]})).toPandas()
```

**Before**

```
/.../spark/python/pyspark/sql/pandas/conversion.py:371: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has reached the error below and will not continue because automatic fallback with 'spark.sql.execution.arrow.pyspark.fallback.enabled' has been set to false.
  range() arg 3 must not be zero
  warn(msg)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/session.py", line 1483, in createDataFrame
    return super(SparkSession, self).createDataFrame(  # type: ignore[call-overload]
  File "/.../spark/python/pyspark/sql/pandas/conversion.py", line 351, in createDataFrame
    return self._create_from_pandas_with_arrow(data, schema, timezone)
  File "/.../spark/python/pyspark/sql/pandas/conversion.py", line 633, in _create_from_pandas_with_arrow
    pdf_slices = (pdf.iloc[start : start + step] for start in range(0, len(pdf), step))
ValueError: range() arg 3 must not be zero
```
```
Empty DataFrame
Columns: [a]
Index: []
```

**After**

```
     a
0  123
```

```
     a
0  123
```

It fixes a regerssion. This is a documented behaviour. It should be backported to branch-3.4 and branch-3.5.

Yes, it fixes a regression as described above.

Unittest was added.

No.

Closes #45132 from HyukjinKwon/SPARK-47068.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 3bb762d)
Signed-off-by: Hyukjin Kwon <[email protected]>
HyukjinKwon added a commit that referenced this pull request Feb 16, 2024
…ution.arrow.maxRecordsPerBatch

This PR fixes the regression introduced by #36683.

```python
import pandas as pd
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 0)
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", False)
spark.createDataFrame(pd.DataFrame({'a': [123]})).toPandas()

spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", -1)
spark.createDataFrame(pd.DataFrame({'a': [123]})).toPandas()
```

**Before**

```
/.../spark/python/pyspark/sql/pandas/conversion.py:371: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has reached the error below and will not continue because automatic fallback with 'spark.sql.execution.arrow.pyspark.fallback.enabled' has been set to false.
  range() arg 3 must not be zero
  warn(msg)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/session.py", line 1483, in createDataFrame
    return super(SparkSession, self).createDataFrame(  # type: ignore[call-overload]
  File "/.../spark/python/pyspark/sql/pandas/conversion.py", line 351, in createDataFrame
    return self._create_from_pandas_with_arrow(data, schema, timezone)
  File "/.../spark/python/pyspark/sql/pandas/conversion.py", line 633, in _create_from_pandas_with_arrow
    pdf_slices = (pdf.iloc[start : start + step] for start in range(0, len(pdf), step))
ValueError: range() arg 3 must not be zero
```
```
Empty DataFrame
Columns: [a]
Index: []
```

**After**

```
     a
0  123
```

```
     a
0  123
```

It fixes a regerssion. This is a documented behaviour. It should be backported to branch-3.4 and branch-3.5.

Yes, it fixes a regression as described above.

Unittest was added.

No.

Closes #45132 from HyukjinKwon/SPARK-47068.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 3bb762d)
Signed-off-by: Hyukjin Kwon <[email protected]>
snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Mar 26, 2024
…ution.arrow.maxRecordsPerBatch

This PR fixes the regression introduced by apache#36683.

```python
import pandas as pd
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 0)
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", False)
spark.createDataFrame(pd.DataFrame({'a': [123]})).toPandas()

spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", -1)
spark.createDataFrame(pd.DataFrame({'a': [123]})).toPandas()
```

**Before**

```
/.../spark/python/pyspark/sql/pandas/conversion.py:371: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has reached the error below and will not continue because automatic fallback with 'spark.sql.execution.arrow.pyspark.fallback.enabled' has been set to false.
  range() arg 3 must not be zero
  warn(msg)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/session.py", line 1483, in createDataFrame
    return super(SparkSession, self).createDataFrame(  # type: ignore[call-overload]
  File "/.../spark/python/pyspark/sql/pandas/conversion.py", line 351, in createDataFrame
    return self._create_from_pandas_with_arrow(data, schema, timezone)
  File "/.../spark/python/pyspark/sql/pandas/conversion.py", line 633, in _create_from_pandas_with_arrow
    pdf_slices = (pdf.iloc[start : start + step] for start in range(0, len(pdf), step))
ValueError: range() arg 3 must not be zero
```
```
Empty DataFrame
Columns: [a]
Index: []
```

**After**

```
     a
0  123
```

```
     a
0  123
```

It fixes a regerssion. This is a documented behaviour. It should be backported to branch-3.4 and branch-3.5.

Yes, it fixes a regression as described above.

Unittest was added.

No.

Closes apache#45132 from HyukjinKwon/SPARK-47068.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 3bb762d)
Signed-off-by: Hyukjin Kwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants