Skip to content

Conversation

@peter-toth
Copy link
Contributor

@peter-toth peter-toth commented Oct 21, 2022

What changes were proposed in this pull request?

This PR fixes a bug in broadcast handling PythonRunner when encryption is enabed. Due to this bug the following pyspark script:

bin/pyspark --conf spark.io.encryption.enabled=true

...

bar = {"a": "aa", "b": "bb"}
foo = spark.sparkContext.broadcast(bar)
spark.udf.register("MYUDF", lambda x: foo.value[x] if x else "")
spark.sql("SELECT MYUDF('a') AS a, MYUDF('b') AS b").collect()

fails with:

22/10/21 17:14:32 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 811, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 87, in read_command
    command = serializer._read_with_length(file)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
EOFError: Ran out of input

The reason for this failure is that we have multiple Python UDF referencing the same broadcast and in the current code:

val oldBids = PythonRDD.getWorkerBroadcasts(worker)
val newBids = broadcastVars.map(_.id).toSet
// number of different broadcasts
val toRemove = oldBids.diff(newBids)
val addedBids = newBids.diff(oldBids)
val cnt = toRemove.size + addedBids.size
val needsDecryptionServer = env.serializerManager.encryptionEnabled && addedBids.nonEmpty
dataOut.writeBoolean(needsDecryptionServer)
dataOut.writeInt(cnt)
def sendBidsToRemove(): Unit = {
for (bid <- toRemove) {
// remove the broadcast from worker
dataOut.writeLong(-bid - 1) // bid >= 0
oldBids.remove(bid)
}
}
if (needsDecryptionServer) {
// if there is encryption, we setup a server which reads the encrypted files, and sends
// the decrypted data to python
val idsAndFiles = broadcastVars.flatMap { broadcast =>
if (!oldBids.contains(broadcast.id)) {
Some((broadcast.id, broadcast.value.path))
} else {
None
}
}
val server = new EncryptedPythonBroadcastServer(env, idsAndFiles)
dataOut.writeInt(server.port)
logTrace(s"broadcast decryption server setup on ${server.port}")
PythonRDD.writeUTF(server.secret, dataOut)
sendBidsToRemove()
idsAndFiles.foreach { case (id, _) =>
// send new broadcast
dataOut.writeLong(id)
oldBids.add(id)
}

the number of broadcasts (cnt) is correct (1) but the broadcast id is serialized 2 times from JVM to Python ruining the next item that Python expects from JVM side.

Please note that the example above works in Spark 3.3 without this fix. That is because #36121 in Spark 3.4 modified ExpressionSet and so udfs in ExtractPythonUDFs:

val udfs = ExpressionSet(collectEvaluableUDFsFromExpressions(plan.expressions))
// ignore the PythonUDF that come from second/third aggregate, which is not used
.filter(udf => udf.references.subsetOf(plan.inputSet))
.toSeq.asInstanceOf[Seq[PythonUDF]]

changed from Stream to Vector. When broadcastVars (and so idsAndFiles) is a Stream the example accidentaly works as the broadcast id is written to dataOut once (oldBids.add(id) in idsAndFiles.foreach is called before the 2nd item is calculated in broadcastVars.flatMap). But that doesn't mean that #36121 introduced the regression as EncryptedPythonBroadcastServer shouldn't serve the broadcast data 2 times (which EncryptedPythonBroadcastServer does now, but it is not noticed) as it could fail other cases when there are more than 1 broadcast used in UDFs).

Why are the changes needed?

To fix a bug.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added new UT.

@HyukjinKwon
Copy link
Member

I am merging this to master but is the issue only in master branch?

@peter-toth
Copy link
Contributor Author

Thanks @HyukjinKwon for the quick review!

The bug was introduced in 58419b9#diff-ed4fb5ce30273e8eefcc7d4b0152ea7a60fb4f8f709d4da8ea1ab56aeda26001R307-R323 in Spark 3.0.

@HyukjinKwon
Copy link
Member

@peter-toth I wll backport to the all active branches.

HyukjinKwon pushed a commit that referenced this pull request Oct 24, 2022
…nabled

This PR fixes a bug in broadcast handling `PythonRunner` when encryption is enabed. Due to this bug the following pyspark script:
```
bin/pyspark --conf spark.io.encryption.enabled=true

...

bar = {"a": "aa", "b": "bb"}
foo = spark.sparkContext.broadcast(bar)
spark.udf.register("MYUDF", lambda x: foo.value[x] if x else "")
spark.sql("SELECT MYUDF('a') AS a, MYUDF('b') AS b").collect()
```
fails with:
```
22/10/21 17:14:32 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 811, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 87, in read_command
    command = serializer._read_with_length(file)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
EOFError: Ran out of input
```
The reason for this failure is that we have multiple Python UDF referencing the same broadcast and in the current code:
https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L385-L420
the number of broadcasts (`cnt`) is correct (1) but the broadcast id is serialized 2 times from JVM to Python ruining the next item that Python expects from JVM side.

Please note that the example above works in Spark 3.3 without this fix. That is because #36121 in Spark 3.4 modified `ExpressionSet` and so `udfs` in `ExtractPythonUDFs`:
https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala#L239-L242
changed from `Stream` to `Vector`. When `broadcastVars` (and so `idsAndFiles`) is a `Stream` the example accidentaly works as the broadcast id is written to `dataOut` once (`oldBids.add(id)` in `idsAndFiles.foreach` is called before the 2nd item is calculated in `broadcastVars.flatMap`). But that doesn't mean that #36121 introduced the regression as `EncryptedPythonBroadcastServer` shouldn't serve the broadcast data 2 times (which `EncryptedPythonBroadcastServer` does now, but it is not noticed) as it could fail other cases when there are more than 1 broadcast used in UDFs).

To fix a bug.

No.

Added new UT.

Closes #38334 from peter-toth/SPARK-40874-fix-broadcasts-in-python-udf.

Authored-by: Peter Toth <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
HyukjinKwon pushed a commit that referenced this pull request Oct 24, 2022
…nabled

This PR fixes a bug in broadcast handling `PythonRunner` when encryption is enabed. Due to this bug the following pyspark script:
```
bin/pyspark --conf spark.io.encryption.enabled=true

...

bar = {"a": "aa", "b": "bb"}
foo = spark.sparkContext.broadcast(bar)
spark.udf.register("MYUDF", lambda x: foo.value[x] if x else "")
spark.sql("SELECT MYUDF('a') AS a, MYUDF('b') AS b").collect()
```
fails with:
```
22/10/21 17:14:32 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 811, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 87, in read_command
    command = serializer._read_with_length(file)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
EOFError: Ran out of input
```
The reason for this failure is that we have multiple Python UDF referencing the same broadcast and in the current code:
https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L385-L420
the number of broadcasts (`cnt`) is correct (1) but the broadcast id is serialized 2 times from JVM to Python ruining the next item that Python expects from JVM side.

Please note that the example above works in Spark 3.3 without this fix. That is because #36121 in Spark 3.4 modified `ExpressionSet` and so `udfs` in `ExtractPythonUDFs`:
https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala#L239-L242
changed from `Stream` to `Vector`. When `broadcastVars` (and so `idsAndFiles`) is a `Stream` the example accidentaly works as the broadcast id is written to `dataOut` once (`oldBids.add(id)` in `idsAndFiles.foreach` is called before the 2nd item is calculated in `broadcastVars.flatMap`). But that doesn't mean that #36121 introduced the regression as `EncryptedPythonBroadcastServer` shouldn't serve the broadcast data 2 times (which `EncryptedPythonBroadcastServer` does now, but it is not noticed) as it could fail other cases when there are more than 1 broadcast used in UDFs).

To fix a bug.

No.

Added new UT.

Closes #38334 from peter-toth/SPARK-40874-fix-broadcasts-in-python-udf.

Authored-by: Peter Toth <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 8a96f69)
Signed-off-by: Hyukjin Kwon <[email protected]>
HyukjinKwon pushed a commit that referenced this pull request Oct 24, 2022
…nabled

This PR fixes a bug in broadcast handling `PythonRunner` when encryption is enabed. Due to this bug the following pyspark script:
```
bin/pyspark --conf spark.io.encryption.enabled=true

...

bar = {"a": "aa", "b": "bb"}
foo = spark.sparkContext.broadcast(bar)
spark.udf.register("MYUDF", lambda x: foo.value[x] if x else "")
spark.sql("SELECT MYUDF('a') AS a, MYUDF('b') AS b").collect()
```
fails with:
```
22/10/21 17:14:32 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 811, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 87, in read_command
    command = serializer._read_with_length(file)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
EOFError: Ran out of input
```
The reason for this failure is that we have multiple Python UDF referencing the same broadcast and in the current code:
https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L385-L420
the number of broadcasts (`cnt`) is correct (1) but the broadcast id is serialized 2 times from JVM to Python ruining the next item that Python expects from JVM side.

Please note that the example above works in Spark 3.3 without this fix. That is because #36121 in Spark 3.4 modified `ExpressionSet` and so `udfs` in `ExtractPythonUDFs`:
https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala#L239-L242
changed from `Stream` to `Vector`. When `broadcastVars` (and so `idsAndFiles`) is a `Stream` the example accidentaly works as the broadcast id is written to `dataOut` once (`oldBids.add(id)` in `idsAndFiles.foreach` is called before the 2nd item is calculated in `broadcastVars.flatMap`). But that doesn't mean that #36121 introduced the regression as `EncryptedPythonBroadcastServer` shouldn't serve the broadcast data 2 times (which `EncryptedPythonBroadcastServer` does now, but it is not noticed) as it could fail other cases when there are more than 1 broadcast used in UDFs).

To fix a bug.

No.

Added new UT.

Closes #38334 from peter-toth/SPARK-40874-fix-broadcasts-in-python-udf.

Authored-by: Peter Toth <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 8a96f69)
Signed-off-by: Hyukjin Kwon <[email protected]>
@HyukjinKwon
Copy link
Member

Merged to branch-3.3, branch-3.2 and branch-3.1.

SandishKumarHN pushed a commit to SandishKumarHN/spark that referenced this pull request Dec 12, 2022
…nabled

### What changes were proposed in this pull request?
This PR fixes a bug in broadcast handling `PythonRunner` when encryption is enabed. Due to this bug the following pyspark script:
```
bin/pyspark --conf spark.io.encryption.enabled=true

...

bar = {"a": "aa", "b": "bb"}
foo = spark.sparkContext.broadcast(bar)
spark.udf.register("MYUDF", lambda x: foo.value[x] if x else "")
spark.sql("SELECT MYUDF('a') AS a, MYUDF('b') AS b").collect()
```
fails with:
```
22/10/21 17:14:32 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 811, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 87, in read_command
    command = serializer._read_with_length(file)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
EOFError: Ran out of input
```
The reason for this failure is that we have multiple Python UDF referencing the same broadcast and in the current code:
https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L385-L420
the number of broadcasts (`cnt`) is correct (1) but the broadcast id is serialized 2 times from JVM to Python ruining the next item that Python expects from JVM side.

Please note that the example above works in Spark 3.3 without this fix. That is because apache#36121 in Spark 3.4 modified `ExpressionSet` and so `udfs` in `ExtractPythonUDFs`:
https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala#L239-L242
changed from `Stream` to `Vector`. When `broadcastVars` (and so `idsAndFiles`) is a `Stream` the example accidentaly works as the broadcast id is written to `dataOut` once (`oldBids.add(id)` in `idsAndFiles.foreach` is called before the 2nd item is calculated in `broadcastVars.flatMap`). But that doesn't mean that apache#36121 introduced the regression as `EncryptedPythonBroadcastServer` shouldn't serve the broadcast data 2 times (which `EncryptedPythonBroadcastServer` does now, but it is not noticed) as it could fail other cases when there are more than 1 broadcast used in UDFs).

### Why are the changes needed?
To fix a bug.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added new UT.

Closes apache#38334 from peter-toth/SPARK-40874-fix-broadcasts-in-python-udf.

Authored-by: Peter Toth <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
…nabled

This PR fixes a bug in broadcast handling `PythonRunner` when encryption is enabed. Due to this bug the following pyspark script:
```
bin/pyspark --conf spark.io.encryption.enabled=true

...

bar = {"a": "aa", "b": "bb"}
foo = spark.sparkContext.broadcast(bar)
spark.udf.register("MYUDF", lambda x: foo.value[x] if x else "")
spark.sql("SELECT MYUDF('a') AS a, MYUDF('b') AS b").collect()
```
fails with:
```
22/10/21 17:14:32 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 811, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 87, in read_command
    command = serializer._read_with_length(file)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
EOFError: Ran out of input
```
The reason for this failure is that we have multiple Python UDF referencing the same broadcast and in the current code:
https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L385-L420
the number of broadcasts (`cnt`) is correct (1) but the broadcast id is serialized 2 times from JVM to Python ruining the next item that Python expects from JVM side.

Please note that the example above works in Spark 3.3 without this fix. That is because apache#36121 in Spark 3.4 modified `ExpressionSet` and so `udfs` in `ExtractPythonUDFs`:
https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala#L239-L242
changed from `Stream` to `Vector`. When `broadcastVars` (and so `idsAndFiles`) is a `Stream` the example accidentaly works as the broadcast id is written to `dataOut` once (`oldBids.add(id)` in `idsAndFiles.foreach` is called before the 2nd item is calculated in `broadcastVars.flatMap`). But that doesn't mean that apache#36121 introduced the regression as `EncryptedPythonBroadcastServer` shouldn't serve the broadcast data 2 times (which `EncryptedPythonBroadcastServer` does now, but it is not noticed) as it could fail other cases when there are more than 1 broadcast used in UDFs).

To fix a bug.

No.

Added new UT.

Closes apache#38334 from peter-toth/SPARK-40874-fix-broadcasts-in-python-udf.

Authored-by: Peter Toth <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 8a96f69)
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit b6b4945)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants