-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-32307][SQL] ScalaUDF's canonicalized expression should exclude inputEncoders #29106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @Ngone51 .
The JIRA description and this PR description is misleading because it's not reproducible in the vanilla Apache Spark. Could you be more precise on the contribution of this PR?
scala> spark.version
res0: String = 3.0.0
scala> spark.udf.register("key", udf((m: Map[String, String]) => m.keys.head.toInt))
res1: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$1954/1561881364@8937f62,IntegerType,List(Some(class[value[0]: map<string,string>])),None,false,true)
scala> Seq(Map("1" -> "one", "2" -> "two")).toDF("a").createOrReplaceTempView("t")
scala> sql("SELECT key(a) AS k FROM t GROUP BY key(a)").collect()
res3: Array[org.apache.spark.sql.Row] = Array([1])|
If this exists on |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @Ngone51 and @cloud-fan . I revised the JIRA and the PR description here.
Merged to master/3.0.
… inputEncoders
### What changes were proposed in this pull request?
Override `canonicalized` to empty the `inputEncoders` for the canonicalized `ScalaUDF`.
### Why are the changes needed?
The following fails on `branch-3.0` currently, not on Apache Spark 3.0.0 release.
```scala
spark.udf.register("key", udf((m: Map[String, String]) => m.keys.head.toInt))
Seq(Map("1" -> "one", "2" -> "two")).toDF("a").createOrReplaceTempView("t")
checkAnswer(sql("SELECT key(a) AS k FROM t GROUP BY key(a)"), Row(1) :: Nil)
[info] org.apache.spark.sql.AnalysisException: expression 't.`a`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
[info] Aggregate [UDF(a#6)], [UDF(a#6) AS k#8]
[info] +- SubqueryAlias t
[info] +- Project [value#3 AS a#6]
[info] +- LocalRelation [value#3]
[info] at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:49)
[info] at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis$(CheckAnalysis.scala:48)
[info] at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:130)
[info] at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkValidAggregateExpression$1(CheckAnalysis.scala:257)
[info] at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$10(CheckAnalysis.scala:259)
[info] at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$10$adapted(CheckAnalysis.scala:259)
[info] at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
[info] at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
[info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
[info] at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkValidAggregateExpression$1(CheckAnalysis.scala:259)
[info] at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$10(CheckAnalysis.scala:259)
[info] at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$10$adapted(CheckAnalysis.scala:259)
[info] at scala.collection.immutable.List.foreach(List.scala:392)
[info] at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkValidAggregateExpression$1(CheckAnalysis.scala:259)
...
```
We use the rule`ResolveEncodersInUDF` to resolve `inputEncoders` and the original`ScalaUDF` instance will be updated to a new `ScalaUDF` instance with the resolved encoders at the end. Note, during encoder resolving, types like `map`, `array` will be resolved to new expression(e.g. `MapObjects`, `CatalystToExternalMap`).
However, `ExpressionEncoder` can't be canonicalized. Thus, the canonicalized `ScalaUDF`s become different even if their original `ScalaUDF`s are the same. Finally, it fails the `checkValidAggregateExpression` when this `ScalaUDF` is used as a group expression.
### Does this PR introduce _any_ user-facing change?
Yes, users will not hit the exception after this fix.
### How was this patch tested?
Added tests.
Closes #29106 from Ngone51/spark-32307.
Authored-by: yi.wu <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit a47b69a)
Signed-off-by: Dongjoon Hyun <[email protected]>
|
BTW, @cloud-fan and @HyukjinKwon . I merged this because |
|
Test build #125846 has finished for PR 29106 at commit
|
|
late LGTM, tahnks, @Ngone51 |
|
thanks all!! |
|
Hi, @Ngone51. Could you make a backporting PR for branch-3.0 again? |
has it been merged to 3.0 or not? |
|
It has been reverted at 4ef535f.
This PR fixes the issue which introduced by SPARK-31826 and So, we may not need to backport it to |
|
OK, then let's not backport. @Ngone51 can you update the PR description to make it clear? |
|
Sure |
|
Thank you for clarification! |
| override lazy val canonicalized: Expression = { | ||
| // SPARK-32307: `ExpressionEncoder` can't be canonicalized, and technically we don't | ||
| // need it to identify a `ScalaUDF`. | ||
| Canonicalize.execute(copy(children = children.map(_.canonicalized), inputEncoders = Nil)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ngone51 shall we do the same for outputEncoder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense. I'll do a follow-up.
…preCanonicalized ### What changes were proposed in this pull request? This PR proposes to set `outputEncoder` to `None` for `ScalaUDF.preCanonicalized`. ### Why are the changes needed? We once did the same thing to `inputEncoders` in #29106 to fix a bug where the canonicalized ScalaUDFs for the same ScalaUDF becomes different after resolving `inputEncoders`. So this PR applies the same fix to `outputEncoder` to avoid hitting the same issue in the future. Note that we don't have the issue caused by `outputEncoder` now since we don't resolve `outputEncoder` yet. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass existing tests. Closes #34937 from Ngone51/SPARK-32307-followup. Authored-by: yi.wu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
Override
canonicalizedto empty theinputEncodersfor the canonicalizedScalaUDF.Why are the changes needed?
The following fails on the Master branch currently.
We use the rule
ResolveEncodersInUDFto resolveinputEncodersand the originalScalaUDFinstance will be updated to a newScalaUDFinstance with the resolved encoders at the end. Note, during encoder resolving, types likemap,arraywill be resolved to new expression(e.g.MapObjects,CatalystToExternalMap).However,
ExpressionEncodercan't be canonicalized. Thus, the canonicalizedScalaUDFs become different even if their originalScalaUDFs are the same. Finally, it fails thecheckValidAggregateExpressionwhen thisScalaUDFis used as a group expression.Does this PR introduce any user-facing change?
Yes, users will not hit the exception after this fix.
How was this patch tested?
Added tests.