Skip to content

Conversation

@cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

This is a regression introduced by #22104 at Spark 2.4.0.

When we have Python UDF in subquery, we will hit an exception

Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.AttributeReference cannot be cast to org.apache.spark.sql.catalyst.expressions.PythonUDF
	at scala.collection.immutable.Stream.map(Stream.scala:414)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:98)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:815)
...

#22104 turned ExtractPythonUDFs from a physical rule to optimizer rule. However, there is a difference between a physical rule and optimizer rule. A physical rule always runs once, an optimizer rule may be applied twice on a query tree even the rule is located in a batch that only runs once.

For a subquery, the OptimizeSubqueries rule will execute the entire optimizer on the query plan inside subquery. Later on subquery will be turned to joins, and the optimizer rules will be applied to it again.

Unfortunately, the ExtractPythonUDFs rule is not idempotent. When it's applied twice on a query plan inside subquery, it will produce a malformed plan. It extracts Python UDF from Python exec plans.

This PR proposes 2 changes to be double safe:

  1. ExtractPythonUDFs should skip python exec plans, to make the rule idempotent
  2. ExtractPythonUDFs should skip subquery

How was this patch tested?

a new test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add the import here, as a lof of tests use it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yea. It's okay and I think it's good timing to clean up while we are here, and while it's broken down into multiple test files now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a different but related fix, to make the missingAttributes calculated correctly.

@cloud-fan
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented Dec 6, 2018

Test build #99765 has finished for PR 23248 at commit 9477fb0.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ArrowEvalPython(
  • case class BatchEvalPython(

@HyukjinKwon
Copy link
Member

Thanks, @cloud-fan. I will take a look within tomorrow - don't block by me.

@SparkQA
Copy link

SparkQA commented Dec 6, 2018

Test build #99767 has finished for PR 23248 at commit d28089f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ArrowEvalPython(
  • case class BatchEvalPython(

@cloud-fan
Copy link
Contributor Author

retest this please

def apply(plan: LogicalPlan): LogicalPlan = plan match {
// SPARK-26293: A subquery will be rewritten into join later, and will go through this rule
// eventually. Here we skip subquery, as Python UDF only needs to be extracted once.
case _: Subquery => plan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I found it a bit confusing when two seeming unrelated things are put together (Subquery and ExtractPythonUDFs).

I wonder if it's sufficient to make ExtractPythonUDFs idempotent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it's a bit confusing, but that's how Subquery is designed to work. See how RemoveRedundantAliases catches Subquery.

It's sufficient to make ExtractPythonUDFs idempotent, skip Subquery is just for double safe, and may have a little bit perf improvement, since this rule will be run less.

In general, I think we should skip Subquery here. This is why we create Subquery: we expect rules that don't want to be executed on subquery to skip it. I'll check more rules and see if they need to skip Subquery later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. If it's common to skip Subquery in other rules, I guess it's ok to put it in here as well. But it would definitely be helpful to establish some kind of guidance, maybe sth like "All optimizer rule should skip Subquery because OptimizeSubqueries will execute them anyway"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you have a point here. If subquery will be converted to join, why do we need to optimize subquery ahead?

Anyway, that's something we need to discuss later. cc @dilipbiswal for the subquery question.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it is totally ok to skip Subquery for all optimizer rules.

For ExtractPythonUDFs I think it is ok because ExtractPythonUDFs is performed after the rules in RewriteSubquery. So we can skip ExtractPythonUDFs here and extract Python UDF after the subqueries are rewritten into join.

But for the rules which perform before RewriteSubquery, if we skip it on Subquery, we have no chance to do the rules after the subqueries are rewritten into join.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, we want to ensure this rule is running once and only once. In the future, if we have another rule/function that calls Optimizer.this.execute(plan), this rule needs to be fixed again... We have a very strong hidden assumption in the implementation. This looks risky in the long term.

The current fix is fine for backporting to 2.4.

@SparkQA
Copy link

SparkQA commented Dec 6, 2018

Test build #99773 has finished for PR 23248 at commit d28089f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ArrowEvalPython(
  • case class BatchEvalPython(

@gatorsmile
Copy link
Member

LGTM to the surgical fix for backporting.

We need to fix this rule with the other rules for avoiding making such a strong and hidden assumption.

@cloud-fan
Copy link
Contributor Author

If it's fine for 2.4, I think it's also fine for master as a temporary fix? We can create another ticket to clean up the subquery optimization hack. IIUC #23211 may help with it.

@AdolphKK
Copy link

looks good for me, +1 👍

@asfgit asfgit closed this in 7d5f6e8 Dec 11, 2018
@cloud-fan
Copy link
Contributor Author

thanks, merging to master/2.4!

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

late LGTM as well

@HyukjinKwon
Copy link
Member

BTW, @cloud-fan, I think it's going to be a considerable conflict against branch-2.4 ... If the conflict is considerable, might better to open a PR.

@cloud-fan
Copy link
Contributor Author

@HyukjinKwon the conflict is only the test. I just moved the test (without those cleanups) to the giant tests.py in 2.4.

@HyukjinKwon
Copy link
Member

Ah, sounds good!

jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

This is a regression introduced by apache#22104 at Spark 2.4.0.

When we have Python UDF in subquery, we will hit an exception
```
Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.AttributeReference cannot be cast to org.apache.spark.sql.catalyst.expressions.PythonUDF
	at scala.collection.immutable.Stream.map(Stream.scala:414)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:98)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:815)
...
```

apache#22104 turned `ExtractPythonUDFs` from a physical rule to optimizer rule. However, there is a difference between a physical rule and optimizer rule. A physical rule always runs once, an optimizer rule may be applied twice on a query tree even the rule is located in a batch that only runs once.

For a subquery, the `OptimizeSubqueries` rule will execute the entire optimizer on the query plan inside subquery. Later on subquery will be turned to joins, and the optimizer rules will be applied to it again.

Unfortunately, the `ExtractPythonUDFs` rule is not idempotent. When it's applied twice on a query plan inside subquery, it will produce a malformed plan. It extracts Python UDF from Python exec plans.

This PR proposes 2 changes to be double safe:
1. `ExtractPythonUDFs` should skip python exec plans, to make the rule idempotent
2. `ExtractPythonUDFs` should skip subquery

## How was this patch tested?

a new test.

Closes apache#23248 from cloud-fan/python.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@tgravescs
Copy link
Contributor

@cloud-fan @HyukjinKwon did this go into Spark 2.4? I'm seeing this error in 2.4.5. Jira claims it went into 2.4.1 but I don't see a commit for it?

@HyukjinKwon
Copy link
Member

Indeed seems not ported back. Let me open a PR to backport.

HyukjinKwon pushed a commit to HyukjinKwon/spark that referenced this pull request Mar 19, 2020
This is a regression introduced by apache#22104 at Spark 2.4.0.

When we have Python UDF in subquery, we will hit an exception
```
Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.AttributeReference cannot be cast to org.apache.spark.sql.catalyst.expressions.PythonUDF
	at scala.collection.immutable.Stream.map(Stream.scala:414)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:98)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:815)
...
```

apache#22104 turned `ExtractPythonUDFs` from a physical rule to optimizer rule. However, there is a difference between a physical rule and optimizer rule. A physical rule always runs once, an optimizer rule may be applied twice on a query tree even the rule is located in a batch that only runs once.

For a subquery, the `OptimizeSubqueries` rule will execute the entire optimizer on the query plan inside subquery. Later on subquery will be turned to joins, and the optimizer rules will be applied to it again.

Unfortunately, the `ExtractPythonUDFs` rule is not idempotent. When it's applied twice on a query plan inside subquery, it will produce a malformed plan. It extracts Python UDF from Python exec plans.

This PR proposes 2 changes to be double safe:
1. `ExtractPythonUDFs` should skip python exec plans, to make the rule idempotent
2. `ExtractPythonUDFs` should skip subquery

a new test.

Closes apache#23248 from cloud-fan/python.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@HyukjinKwon
Copy link
Member

Here #27960

HyukjinKwon added a commit that referenced this pull request Mar 20, 2020
…uery

## What changes were proposed in this pull request?

This PR backports #23248 which seems mistakenly not backported.

This is a regression introduced by #22104 at Spark 2.4.0.

When we have Python UDF in subquery, we will hit an exception
```
Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.AttributeReference cannot be cast to org.apache.spark.sql.catalyst.expressions.PythonUDF
	at scala.collection.immutable.Stream.map(Stream.scala:414)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:98)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:815)
...
```

#22104 turned `ExtractPythonUDFs` from a physical rule to optimizer rule. However, there is a difference between a physical rule and optimizer rule. A physical rule always runs once, an optimizer rule may be applied twice on a query tree even the rule is located in a batch that only runs once.

For a subquery, the `OptimizeSubqueries` rule will execute the entire optimizer on the query plan inside subquery. Later on subquery will be turned to joins, and the optimizer rules will be applied to it again.

Unfortunately, the `ExtractPythonUDFs` rule is not idempotent. When it's applied twice on a query plan inside subquery, it will produce a malformed plan. It extracts Python UDF from Python exec plans.

This PR proposes 2 changes to be double safe:
1. `ExtractPythonUDFs` should skip python exec plans, to make the rule idempotent
2. `ExtractPythonUDFs` should skip subquery

## How was this patch tested?

a new test.

Closes #27960 from HyukjinKwon/backport-SPARK-26293.

Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: HyukjinKwon <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants