Skip to content

Conversation

@minyyy
Copy link
Contributor

@minyyy minyyy commented Apr 8, 2022

What changes were proposed in this pull request?

This PR does a refactor of the ExpressionSet class. It does two things:

  1. Changes the ExpressionSet to inherit from Set and SetLike instead of Iterable and removed lots of redundant implementations as they are just default.
  2. Removed lots of unnecessary access to canonicalized.

Why are the changes needed?

It can dramatically improve the performance of ExpressionSet.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

It is a performance improvement, code review should be a safe way.

@github-actions github-actions bot added the SQL label Apr 8, 2022
@minyyy
Copy link
Contributor Author

minyyy commented Apr 8, 2022

@sigmod @cloud-fan Could you two help review this PR? Thanks.

if (e.deterministic) {
baseSet --= baseSet.filter(_ == e.canonicalized)
originals --= originals.filter(_.canonicalized == e.canonicalized)
baseSet.retain(_ != e.canonicalized)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

baseSet.retain(_ != e.canonicalized) only needs to traverse the set once, much better than the previous implementation.

baseSet --= baseSet.filter(_ == e.canonicalized)
originals --= originals.filter(_.canonicalized == e.canonicalized)
baseSet.retain(_ != e.canonicalized)
originals = originals.filter(!_.semanticEquals(e))
Copy link
Contributor Author

@minyyy minyyy Apr 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two changes happening in this line.

  1. Changes to originals = originals.filter(...). This is O(n) as the previous implementation is O(mn).
  2. Uses semanticEquals instead of the previous _.canonicalized == e.canonicalized as the condition. semanticEquals can short circuit and avoid unnecessary access of canonicalized for non-deterministic expressions.

By invariant 2, the current implementation should not evaluate canonicalized at all.


override def filter(p: Expression => Boolean): ExpressionSet = {
val newBaseSet = baseSet.filter(e => p(e.canonicalized))
val newBaseSet = baseSet.filter(e => p(e))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By invariant 1: .canonicalized is not needed and can cause performance issue.

}

override def filterNot(p: Expression => Boolean): ExpressionSet = {
val newBaseSet = baseSet.filterNot(e => p(e.canonicalized))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By invariant 1: .canonicalized is not needed and can cause performance issue.

Copy link
Contributor Author

@minyyy minyyy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put some review comments to pin the key changes.

@minyyy minyyy changed the title [SQL][SPARK-38836] Improve the performance of ExpressionSet [SPARK-38836][SQL] Improve the performance of ExpressionSet Apr 8, 2022
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@ulysses-you
Copy link
Contributor

Is there a perf number ? FYI, It's for semantics why the ExpressionSet inherit Iterable which make sure we have the same ordering with different runs, some history #29598

@minyyy
Copy link
Contributor Author

minyyy commented Apr 11, 2022

Is there a perf number ?

Not yet, but I can create a benchmark for it.

FYI, It's for semantics why the ExpressionSet inherit Iterable which make sure we have the same ordering with different runs, some history #29598

I actually saw this PR. But from what I understood #29598. It did 2 things. The first is that it fixed the insertion stability issue of AttributeSet. The second is a refactor of ExpressionSet which does not relate to insertion order. Per the comment in the PR, the reason for the refactor is that the default implementation of some set operations returns Set[Expression] instead of ExpressionSet. However, changing the base class from Set to Iterable is not the correct refactor IMO. Per https://www.scala-lang.org/api/2.12.3/scala/collection/Set.html

Implementation note: If your additions and mutations return the same kind of set as the set you are defining, you should inherit from SetLike as well.

What we should do is to also extend SetLike instead of changing Set to Iterable.

@cloud-fan
Copy link
Contributor

LGTM, it's better to see some numbers of microbenchmark though.

@minyyy
Copy link
Contributor Author

minyyy commented Apr 11, 2022

I ran the TPC-DS benchmark twice locally:

~/Work/spark (expr_set ✔) cat /tmp/master_result | grep Filter
org.apache.spark.sql.catalyst.optimizer.PruneFilters 20420542 / 499545296 4 / 1740
org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints 383832301 / 386115187 271 / 312
org.apache.spark.sql.execution.dynamicpruning.CleanupDynamicPruningFilters 0 / 41344027 0 / 312
org.apache.spark.sql.catalyst.optimizer.EliminateAggregateFilter 0 / 12488623 0 / 1428
org.apache.spark.sql.catalyst.optimizer.CombineFilters 0 / 11010749 0 / 1428
org.apache.spark.sql.catalyst.optimizer.ReplaceExceptWithFilter 0 / 2485281 0 / 352
org.apache.spark.sql.catalyst.optimizer.InjectRuntimeFilter 0 / 1394478 0 / 312
org.apache.spark.sql.catalyst.optimizer.CombineTypedFilters 0 / 1097207 0 / 312
org.apache.spark.sql.catalyst.optimizer.InferFiltersFromGenerate 0 / 1063153 0 / 312
~/Work/spark (expr_set ✔) cat /tmp/exprset_result | grep Filter
org.apache.spark.sql.catalyst.optimizer.PruneFilters 12251268 / 475647800 4 / 1740
org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints 320604026 / 322550195 271 / 312
org.apache.spark.sql.execution.dynamicpruning.CleanupDynamicPruningFilters 0 / 40531409 0 / 312
org.apache.spark.sql.catalyst.optimizer.EliminateAggregateFilter 0 / 12426889 0 / 1428
org.apache.spark.sql.catalyst.optimizer.CombineFilters 0 / 10278258 0 / 1428
org.apache.spark.sql.catalyst.optimizer.ReplaceExceptWithFilter 0 / 2384373 0 / 352
org.apache.spark.sql.catalyst.optimizer.InjectRuntimeFilter 0 / 1285621 0 / 312
org.apache.spark.sql.catalyst.optimizer.CombineTypedFilters 0 / 1065992 0 / 312
org.apache.spark.sql.catalyst.optimizer.InferFiltersFromGenerate 0 / 1025776 0 / 312

~/Work/spark (master ✔) cat /tmp/master_result2 | grep Filter
org.apache.spark.sql.catalyst.optimizer.PruneFilters 12819415 / 505890443 4 / 1740
org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints 390196686 / 392300726 271 / 312
org.apache.spark.sql.execution.dynamicpruning.CleanupDynamicPruningFilters 0 / 40328844 0 / 312
org.apache.spark.sql.catalyst.optimizer.EliminateAggregateFilter 0 / 12271679 0 / 1428
org.apache.spark.sql.catalyst.optimizer.CombineFilters 0 / 10468968 0 / 1428
org.apache.spark.sql.catalyst.optimizer.ReplaceExceptWithFilter 0 / 2274441 0 / 352
org.apache.spark.sql.catalyst.optimizer.InjectRuntimeFilter 0 / 1453612 0 / 312
org.apache.spark.sql.catalyst.optimizer.CombineTypedFilters 0 / 1008569 0 / 312
org.apache.spark.sql.catalyst.optimizer.InferFiltersFromGenerate 0 / 970068 0 / 312
~/Work/spark (master ✔) cat /tmp/exprset_result2 | grep Filter
org.apache.spark.sql.catalyst.optimizer.PruneFilters 12891206 / 468780255 4 / 1740
org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints 319539076 / 321154860 271 / 312
org.apache.spark.sql.execution.dynamicpruning.CleanupDynamicPruningFilters 0 / 47870664 0 / 312
org.apache.spark.sql.catalyst.optimizer.EliminateAggregateFilter 0 / 13480991 0 / 1428
org.apache.spark.sql.catalyst.optimizer.CombineFilters 0 / 10508080 0 / 1428
org.apache.spark.sql.catalyst.optimizer.ReplaceExceptWithFilter 0 / 2389351 0 / 352
org.apache.spark.sql.catalyst.optimizer.InjectRuntimeFilter 0 / 1501485 0 / 312
org.apache.spark.sql.catalyst.optimizer.CombineTypedFilters 0 / 1328054 0 / 312
org.apache.spark.sql.catalyst.optimizer.InferFiltersFromGenerate 0 / 973707 0 / 312

The major improvement is to InferFiltersFromConstraints, where it does lots of unnecessary access to .canonicalized.

@minyyy
Copy link
Contributor Author

minyyy commented Apr 12, 2022

@cloud-fan
Copy link
Contributor

InferFiltersFromGenerate is a newly added rule, does it mean we don't have this performance issue in old versions of Spark?

@minyyy
Copy link
Contributor Author

minyyy commented Apr 25, 2022

InferFiltersFromGenerate is a newly added rule, does it mean we don't have this performance issue in old versions of Spark?

The benchmark ran for tpc-ds queries. But the PruneFilters rule actually runs for all filters (its last branch is case f @ Filter(fc, p: LogicalPlan)). In some extreme cases where a query has lots of filters with complicated enough predicates that canonicalized costs significant amount of time, the optimization probably matters.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, please fix test failures.

@github-actions github-actions bot added the INFRA label May 2, 2022
@minyyy
Copy link
Contributor Author

minyyy commented May 2, 2022

All tests passed. CI was broken when I updated the PR last time. Re-triggered the CI.

@minyyy minyyy requested a review from cloud-fan May 2, 2022 17:52
if (e.deterministic) {
baseSet --= baseSet.filter(_ == e.canonicalized)
originals --= originals.filter(_.canonicalized == e.canonicalized)
baseSet.retain(_ != e.canonicalized)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about:
baseSet.remove(e.canonicalized)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


protected def remove(e: Expression): Unit = {
if (e.deterministic) {
baseSet.filterInPlace(_ != e.canonicalized)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

baseSet.remove(e.canonicalized)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in ea3c48d May 16, 2022
HyukjinKwon pushed a commit that referenced this pull request Oct 22, 2022
…nabled

### What changes were proposed in this pull request?
This PR fixes a bug in broadcast handling `PythonRunner` when encryption is enabed. Due to this bug the following pyspark script:
```
bin/pyspark --conf spark.io.encryption.enabled=true

...

bar = {"a": "aa", "b": "bb"}
foo = spark.sparkContext.broadcast(bar)
spark.udf.register("MYUDF", lambda x: foo.value[x] if x else "")
spark.sql("SELECT MYUDF('a') AS a, MYUDF('b') AS b").collect()
```
fails with:
```
22/10/21 17:14:32 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 811, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 87, in read_command
    command = serializer._read_with_length(file)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
EOFError: Ran out of input
```
The reason for this failure is that we have multiple Python UDF referencing the same broadcast and in the current code:
https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L385-L420
the number of broadcasts (`cnt`) is correct (1) but the broadcast id is serialized 2 times from JVM to Python ruining the next item that Python expects from JVM side.

Please note that the example above works in Spark 3.3 without this fix. That is because #36121 in Spark 3.4 modified `ExpressionSet` and so `udfs` in `ExtractPythonUDFs`:
https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala#L239-L242
changed from `Stream` to `Vector`. When `broadcastVars` (and so `idsAndFiles`) is a `Stream` the example accidentaly works as the broadcast id is written to `dataOut` once (`oldBids.add(id)` in `idsAndFiles.foreach` is called before the 2nd item is calculated in `broadcastVars.flatMap`). But that doesn't mean that #36121 introduced the regression as `EncryptedPythonBroadcastServer` shouldn't serve the broadcast data 2 times (which `EncryptedPythonBroadcastServer` does now, but it is not noticed) as it could fail other cases when there are more than 1 broadcast used in UDFs).

### Why are the changes needed?
To fix a bug.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added new UT.

Closes #38334 from peter-toth/SPARK-40874-fix-broadcasts-in-python-udf.

Authored-by: Peter Toth <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
HyukjinKwon pushed a commit that referenced this pull request Oct 24, 2022
…nabled

This PR fixes a bug in broadcast handling `PythonRunner` when encryption is enabed. Due to this bug the following pyspark script:
```
bin/pyspark --conf spark.io.encryption.enabled=true

...

bar = {"a": "aa", "b": "bb"}
foo = spark.sparkContext.broadcast(bar)
spark.udf.register("MYUDF", lambda x: foo.value[x] if x else "")
spark.sql("SELECT MYUDF('a') AS a, MYUDF('b') AS b").collect()
```
fails with:
```
22/10/21 17:14:32 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 811, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 87, in read_command
    command = serializer._read_with_length(file)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
EOFError: Ran out of input
```
The reason for this failure is that we have multiple Python UDF referencing the same broadcast and in the current code:
https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L385-L420
the number of broadcasts (`cnt`) is correct (1) but the broadcast id is serialized 2 times from JVM to Python ruining the next item that Python expects from JVM side.

Please note that the example above works in Spark 3.3 without this fix. That is because #36121 in Spark 3.4 modified `ExpressionSet` and so `udfs` in `ExtractPythonUDFs`:
https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala#L239-L242
changed from `Stream` to `Vector`. When `broadcastVars` (and so `idsAndFiles`) is a `Stream` the example accidentaly works as the broadcast id is written to `dataOut` once (`oldBids.add(id)` in `idsAndFiles.foreach` is called before the 2nd item is calculated in `broadcastVars.flatMap`). But that doesn't mean that #36121 introduced the regression as `EncryptedPythonBroadcastServer` shouldn't serve the broadcast data 2 times (which `EncryptedPythonBroadcastServer` does now, but it is not noticed) as it could fail other cases when there are more than 1 broadcast used in UDFs).

To fix a bug.

No.

Added new UT.

Closes #38334 from peter-toth/SPARK-40874-fix-broadcasts-in-python-udf.

Authored-by: Peter Toth <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
HyukjinKwon pushed a commit that referenced this pull request Oct 24, 2022
…nabled

This PR fixes a bug in broadcast handling `PythonRunner` when encryption is enabed. Due to this bug the following pyspark script:
```
bin/pyspark --conf spark.io.encryption.enabled=true

...

bar = {"a": "aa", "b": "bb"}
foo = spark.sparkContext.broadcast(bar)
spark.udf.register("MYUDF", lambda x: foo.value[x] if x else "")
spark.sql("SELECT MYUDF('a') AS a, MYUDF('b') AS b").collect()
```
fails with:
```
22/10/21 17:14:32 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 811, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 87, in read_command
    command = serializer._read_with_length(file)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
EOFError: Ran out of input
```
The reason for this failure is that we have multiple Python UDF referencing the same broadcast and in the current code:
https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L385-L420
the number of broadcasts (`cnt`) is correct (1) but the broadcast id is serialized 2 times from JVM to Python ruining the next item that Python expects from JVM side.

Please note that the example above works in Spark 3.3 without this fix. That is because #36121 in Spark 3.4 modified `ExpressionSet` and so `udfs` in `ExtractPythonUDFs`:
https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala#L239-L242
changed from `Stream` to `Vector`. When `broadcastVars` (and so `idsAndFiles`) is a `Stream` the example accidentaly works as the broadcast id is written to `dataOut` once (`oldBids.add(id)` in `idsAndFiles.foreach` is called before the 2nd item is calculated in `broadcastVars.flatMap`). But that doesn't mean that #36121 introduced the regression as `EncryptedPythonBroadcastServer` shouldn't serve the broadcast data 2 times (which `EncryptedPythonBroadcastServer` does now, but it is not noticed) as it could fail other cases when there are more than 1 broadcast used in UDFs).

To fix a bug.

No.

Added new UT.

Closes #38334 from peter-toth/SPARK-40874-fix-broadcasts-in-python-udf.

Authored-by: Peter Toth <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 8a96f69)
Signed-off-by: Hyukjin Kwon <[email protected]>
HyukjinKwon pushed a commit that referenced this pull request Oct 24, 2022
…nabled

This PR fixes a bug in broadcast handling `PythonRunner` when encryption is enabed. Due to this bug the following pyspark script:
```
bin/pyspark --conf spark.io.encryption.enabled=true

...

bar = {"a": "aa", "b": "bb"}
foo = spark.sparkContext.broadcast(bar)
spark.udf.register("MYUDF", lambda x: foo.value[x] if x else "")
spark.sql("SELECT MYUDF('a') AS a, MYUDF('b') AS b").collect()
```
fails with:
```
22/10/21 17:14:32 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 811, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 87, in read_command
    command = serializer._read_with_length(file)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
EOFError: Ran out of input
```
The reason for this failure is that we have multiple Python UDF referencing the same broadcast and in the current code:
https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L385-L420
the number of broadcasts (`cnt`) is correct (1) but the broadcast id is serialized 2 times from JVM to Python ruining the next item that Python expects from JVM side.

Please note that the example above works in Spark 3.3 without this fix. That is because #36121 in Spark 3.4 modified `ExpressionSet` and so `udfs` in `ExtractPythonUDFs`:
https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala#L239-L242
changed from `Stream` to `Vector`. When `broadcastVars` (and so `idsAndFiles`) is a `Stream` the example accidentaly works as the broadcast id is written to `dataOut` once (`oldBids.add(id)` in `idsAndFiles.foreach` is called before the 2nd item is calculated in `broadcastVars.flatMap`). But that doesn't mean that #36121 introduced the regression as `EncryptedPythonBroadcastServer` shouldn't serve the broadcast data 2 times (which `EncryptedPythonBroadcastServer` does now, but it is not noticed) as it could fail other cases when there are more than 1 broadcast used in UDFs).

To fix a bug.

No.

Added new UT.

Closes #38334 from peter-toth/SPARK-40874-fix-broadcasts-in-python-udf.

Authored-by: Peter Toth <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 8a96f69)
Signed-off-by: Hyukjin Kwon <[email protected]>
SandishKumarHN pushed a commit to SandishKumarHN/spark that referenced this pull request Dec 12, 2022
…nabled

### What changes were proposed in this pull request?
This PR fixes a bug in broadcast handling `PythonRunner` when encryption is enabed. Due to this bug the following pyspark script:
```
bin/pyspark --conf spark.io.encryption.enabled=true

...

bar = {"a": "aa", "b": "bb"}
foo = spark.sparkContext.broadcast(bar)
spark.udf.register("MYUDF", lambda x: foo.value[x] if x else "")
spark.sql("SELECT MYUDF('a') AS a, MYUDF('b') AS b").collect()
```
fails with:
```
22/10/21 17:14:32 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 811, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 87, in read_command
    command = serializer._read_with_length(file)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
EOFError: Ran out of input
```
The reason for this failure is that we have multiple Python UDF referencing the same broadcast and in the current code:
https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L385-L420
the number of broadcasts (`cnt`) is correct (1) but the broadcast id is serialized 2 times from JVM to Python ruining the next item that Python expects from JVM side.

Please note that the example above works in Spark 3.3 without this fix. That is because apache#36121 in Spark 3.4 modified `ExpressionSet` and so `udfs` in `ExtractPythonUDFs`:
https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala#L239-L242
changed from `Stream` to `Vector`. When `broadcastVars` (and so `idsAndFiles`) is a `Stream` the example accidentaly works as the broadcast id is written to `dataOut` once (`oldBids.add(id)` in `idsAndFiles.foreach` is called before the 2nd item is calculated in `broadcastVars.flatMap`). But that doesn't mean that apache#36121 introduced the regression as `EncryptedPythonBroadcastServer` shouldn't serve the broadcast data 2 times (which `EncryptedPythonBroadcastServer` does now, but it is not noticed) as it could fail other cases when there are more than 1 broadcast used in UDFs).

### Why are the changes needed?
To fix a bug.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added new UT.

Closes apache#38334 from peter-toth/SPARK-40874-fix-broadcasts-in-python-udf.

Authored-by: Peter Toth <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
…nabled

This PR fixes a bug in broadcast handling `PythonRunner` when encryption is enabed. Due to this bug the following pyspark script:
```
bin/pyspark --conf spark.io.encryption.enabled=true

...

bar = {"a": "aa", "b": "bb"}
foo = spark.sparkContext.broadcast(bar)
spark.udf.register("MYUDF", lambda x: foo.value[x] if x else "")
spark.sql("SELECT MYUDF('a') AS a, MYUDF('b') AS b").collect()
```
fails with:
```
22/10/21 17:14:32 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 811, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 87, in read_command
    command = serializer._read_with_length(file)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
EOFError: Ran out of input
```
The reason for this failure is that we have multiple Python UDF referencing the same broadcast and in the current code:
https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L385-L420
the number of broadcasts (`cnt`) is correct (1) but the broadcast id is serialized 2 times from JVM to Python ruining the next item that Python expects from JVM side.

Please note that the example above works in Spark 3.3 without this fix. That is because apache#36121 in Spark 3.4 modified `ExpressionSet` and so `udfs` in `ExtractPythonUDFs`:
https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala#L239-L242
changed from `Stream` to `Vector`. When `broadcastVars` (and so `idsAndFiles`) is a `Stream` the example accidentaly works as the broadcast id is written to `dataOut` once (`oldBids.add(id)` in `idsAndFiles.foreach` is called before the 2nd item is calculated in `broadcastVars.flatMap`). But that doesn't mean that apache#36121 introduced the regression as `EncryptedPythonBroadcastServer` shouldn't serve the broadcast data 2 times (which `EncryptedPythonBroadcastServer` does now, but it is not noticed) as it could fail other cases when there are more than 1 broadcast used in UDFs).

To fix a bug.

No.

Added new UT.

Closes apache#38334 from peter-toth/SPARK-40874-fix-broadcasts-in-python-udf.

Authored-by: Peter Toth <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 8a96f69)
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit b6b4945)
yaooqinn pushed a commit that referenced this pull request Apr 19, 2024
…cala 2.12

### What changes were proposed in this pull request?

Fix `ExpressionSet` performance regression in scala 2.12.

### Why are the changes needed?

The implementation of the `SetLike.++` method in scala 2.12 is to iteratively execute the `+` method. The `ExpressionSet.+` method first clones a new object and then adds element, which is very expensive.

https://github.com/scala/scala/blob/ceaf7e68ac93e9bbe8642d06164714b2de709c27/src/library/scala/collection/SetLike.scala#L186

After #36121, the `++` and `--` methods in ExpressionSet of scala 2.12 were removed, causing performance regression.

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Benchmark code:

```
object TestBenchmark {
  def main(args: Array[String]): Unit = {
    val count = 300
    val benchmark = new Benchmark("Test ExpressionSetV2 ++ ", count)
    val aUpper = AttributeReference("A", IntegerType)(exprId = ExprId(1))

    var initialSet = ExpressionSet((0 until 300).map(i => aUpper + i))
    val setToAddWithSameDeterministicExpression = ExpressionSet((0 until 300).map(i => aUpper + i))

    benchmark.addCase("Test ++", 10) { _: Int =>
      for (_ <- 0L until count) {
        initialSet ++= setToAddWithSameDeterministicExpression
      }
    }
    benchmark.run()
  }
}
```

before this change:

```
OpenJDK 64-Bit Server VM 1.8.0_222-b10 on Linux 3.10.0-957.el7.x86_64
Intel Core Processor (Skylake, IBRS)
Test ExpressionSetV2 ++ :                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Test ++                                            1577           1691          61          0.0     5255516.0       1.0X
```

after this change:

```
OpenJDK 64-Bit Server VM 1.8.0_222-b10 on Linux 3.10.0-957.el7.x86_64
Intel Core Processor (Skylake, IBRS)
Test ExpressionSetV2 ++ :                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Test ++                                              14             14           0          0.0       45395.2       1.0X
```

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #46114 from wForget/SPARK-47897.

Authored-by: Zhen Wang <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
yaooqinn pushed a commit that referenced this pull request Apr 19, 2024
…cala 2.12

### What changes were proposed in this pull request?

Fix `ExpressionSet` performance regression in scala 2.12.

### Why are the changes needed?

The implementation of the `SetLike.++` method in scala 2.12 is to iteratively execute the `+` method. The `ExpressionSet.+` method first clones a new object and then adds element, which is very expensive.

https://github.com/scala/scala/blob/ceaf7e68ac93e9bbe8642d06164714b2de709c27/src/library/scala/collection/SetLike.scala#L186

After #36121, the `++` and `--` methods in ExpressionSet of scala 2.12 were removed, causing performance regression.

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Benchmark code:

```
object TestBenchmark {
  def main(args: Array[String]): Unit = {
    val count = 300
    val benchmark = new Benchmark("Test ExpressionSetV2 ++ ", count)
    val aUpper = AttributeReference("A", IntegerType)(exprId = ExprId(1))

    var initialSet = ExpressionSet((0 until 300).map(i => aUpper + i))
    val setToAddWithSameDeterministicExpression = ExpressionSet((0 until 300).map(i => aUpper + i))

    benchmark.addCase("Test ++", 10) { _: Int =>
      for (_ <- 0L until count) {
        initialSet ++= setToAddWithSameDeterministicExpression
      }
    }
    benchmark.run()
  }
}
```

before this change:

```
OpenJDK 64-Bit Server VM 1.8.0_222-b10 on Linux 3.10.0-957.el7.x86_64
Intel Core Processor (Skylake, IBRS)
Test ExpressionSetV2 ++ :                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Test ++                                            1577           1691          61          0.0     5255516.0       1.0X
```

after this change:

```
OpenJDK 64-Bit Server VM 1.8.0_222-b10 on Linux 3.10.0-957.el7.x86_64
Intel Core Processor (Skylake, IBRS)
Test ExpressionSetV2 ++ :                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Test ++                                              14             14           0          0.0       45395.2       1.0X
```

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #46114 from wForget/SPARK-47897.

Authored-by: Zhen Wang <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
(cherry picked from commit afd99d1)
Signed-off-by: Kent Yao <[email protected]>
turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
…cala 2.12 (apache#382)

### What changes were proposed in this pull request?

Fix `ExpressionSet` performance regression in scala 2.12.

### Why are the changes needed?

The implementation of the `SetLike.++` method in scala 2.12 is to iteratively execute the `+` method. The `ExpressionSet.+` method first clones a new object and then adds element, which is very expensive.

https://github.com/scala/scala/blob/ceaf7e68ac93e9bbe8642d06164714b2de709c27/src/library/scala/collection/SetLike.scala#L186

After apache#36121, the `++` and `--` methods in ExpressionSet of scala 2.12 were removed, causing performance regression.

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Benchmark code:

```
object TestBenchmark {
  def main(args: Array[String]): Unit = {
    val count = 300
    val benchmark = new Benchmark("Test ExpressionSetV2 ++ ", count)
    val aUpper = AttributeReference("A", IntegerType)(exprId = ExprId(1))

    var initialSet = ExpressionSet((0 until 300).map(i => aUpper + i))
    val setToAddWithSameDeterministicExpression = ExpressionSet((0 until 300).map(i => aUpper + i))

    benchmark.addCase("Test ++", 10) { _: Int =>
      for (_ <- 0L until count) {
        initialSet ++= setToAddWithSameDeterministicExpression
      }
    }
    benchmark.run()
  }
}
```

before this change:

```
OpenJDK 64-Bit Server VM 1.8.0_222-b10 on Linux 3.10.0-957.el7.x86_64
Intel Core Processor (Skylake, IBRS)
Test ExpressionSetV2 ++ :                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Test ++                                            1577           1691          61          0.0     5255516.0       1.0X
```

after this change:

```
OpenJDK 64-Bit Server VM 1.8.0_222-b10 on Linux 3.10.0-957.el7.x86_64
Intel Core Processor (Skylake, IBRS)
Test ExpressionSetV2 ++ :                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Test ++                                              14             14           0          0.0       45395.2       1.0X
```

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#46114 from wForget/SPARK-47897.

Authored-by: Zhen Wang <[email protected]>

Signed-off-by: Kent Yao <[email protected]>
Co-authored-by: Zhen Wang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants