Skip to content

Conversation

@HyukjinKwon
Copy link
Member

What changes were proposed in this pull request?

This PR backports #23248 which seems mistakenly not backported.

This is a regression introduced by #22104 at Spark 2.4.0.

When we have Python UDF in subquery, we will hit an exception

Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.AttributeReference cannot be cast to org.apache.spark.sql.catalyst.expressions.PythonUDF
	at scala.collection.immutable.Stream.map(Stream.scala:414)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:98)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:815)
...

#22104 turned ExtractPythonUDFs from a physical rule to optimizer rule. However, there is a difference between a physical rule and optimizer rule. A physical rule always runs once, an optimizer rule may be applied twice on a query tree even the rule is located in a batch that only runs once.

For a subquery, the OptimizeSubqueries rule will execute the entire optimizer on the query plan inside subquery. Later on subquery will be turned to joins, and the optimizer rules will be applied to it again.

Unfortunately, the ExtractPythonUDFs rule is not idempotent. When it's applied twice on a query plan inside subquery, it will produce a malformed plan. It extracts Python UDF from Python exec plans.

This PR proposes 2 changes to be double safe:

  1. ExtractPythonUDFs should skip python exec plans, to make the rule idempotent
  2. ExtractPythonUDFs should skip subquery

How was this patch tested?

a new test.

This is a regression introduced by apache#22104 at Spark 2.4.0.

When we have Python UDF in subquery, we will hit an exception
```
Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.AttributeReference cannot be cast to org.apache.spark.sql.catalyst.expressions.PythonUDF
	at scala.collection.immutable.Stream.map(Stream.scala:414)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:98)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:815)
...
```

apache#22104 turned `ExtractPythonUDFs` from a physical rule to optimizer rule. However, there is a difference between a physical rule and optimizer rule. A physical rule always runs once, an optimizer rule may be applied twice on a query tree even the rule is located in a batch that only runs once.

For a subquery, the `OptimizeSubqueries` rule will execute the entire optimizer on the query plan inside subquery. Later on subquery will be turned to joins, and the optimizer rules will be applied to it again.

Unfortunately, the `ExtractPythonUDFs` rule is not idempotent. When it's applied twice on a query plan inside subquery, it will produce a malformed plan. It extracts Python UDF from Python exec plans.

This PR proposes 2 changes to be double safe:
1. `ExtractPythonUDFs` should skip python exec plans, to make the rule idempotent
2. `ExtractPythonUDFs` should skip subquery

a new test.

Closes apache#23248 from cloud-fan/python.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@HyukjinKwon HyukjinKwon force-pushed the backport-SPARK-26293 branch from b1d41b7 to 7a916ac Compare March 19, 2020 15:01
@HyukjinKwon
Copy link
Member Author

cc @cloud-fan and @tgravescs

@cloud-fan
Copy link
Contributor

I do remember I backported it as I manually fixed some conflicts, but ...

Maybe some network problems happened but I didn't notice. Anyway thanks for doing it!

@tgravescs
Copy link
Contributor

thanks @HyukjinKwon looks like clean merge other then test change. LGTM pending jenkins

@SparkQA
Copy link

SparkQA commented Mar 19, 2020

Test build #120060 has finished for PR 27960 at commit 7a916ac.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ArrowEvalPython(
  • case class BatchEvalPython(

@dongjoon-hyun
Copy link
Member

The failure is relevant one, test_udf_in_subquery. Could you take a look, @HyukjinKwon ?

======================================================================
ERROR: test_udf_in_subquery (pyspark.sql.tests.SQLTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/sql/tests.py", line 3581, in test_udf_in_subquery
    with self.tempView("v"):
AttributeError: 'SQLTests' object has no attribute 'tempView'

@HyukjinKwon
Copy link
Member Author

Sure, will do.

@SparkQA
Copy link

SparkQA commented Mar 20, 2020

Test build #120075 has finished for PR 27960 at commit 423644e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

Merged to branch-2.4.

HyukjinKwon added a commit that referenced this pull request Mar 20, 2020
…uery

## What changes were proposed in this pull request?

This PR backports #23248 which seems mistakenly not backported.

This is a regression introduced by #22104 at Spark 2.4.0.

When we have Python UDF in subquery, we will hit an exception
```
Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.AttributeReference cannot be cast to org.apache.spark.sql.catalyst.expressions.PythonUDF
	at scala.collection.immutable.Stream.map(Stream.scala:414)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:98)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:815)
...
```

#22104 turned `ExtractPythonUDFs` from a physical rule to optimizer rule. However, there is a difference between a physical rule and optimizer rule. A physical rule always runs once, an optimizer rule may be applied twice on a query tree even the rule is located in a batch that only runs once.

For a subquery, the `OptimizeSubqueries` rule will execute the entire optimizer on the query plan inside subquery. Later on subquery will be turned to joins, and the optimizer rules will be applied to it again.

Unfortunately, the `ExtractPythonUDFs` rule is not idempotent. When it's applied twice on a query plan inside subquery, it will produce a malformed plan. It extracts Python UDF from Python exec plans.

This PR proposes 2 changes to be double safe:
1. `ExtractPythonUDFs` should skip python exec plans, to make the rule idempotent
2. `ExtractPythonUDFs` should skip subquery

## How was this patch tested?

a new test.

Closes #27960 from HyukjinKwon/backport-SPARK-26293.

Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: HyukjinKwon <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
@dongjoon-hyun
Copy link
Member

Thank you, @HyukjinKwon and @cloud-fan .

@HyukjinKwon HyukjinKwon deleted the backport-SPARK-26293 branch July 27, 2020 07:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants