-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21966][SQL]ResolveMissingReference rule should not ignore Union #19178
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Can one of the admins verify this patch? |
|
Could you trigger this test cuz I think this needs be fixed? @gatorsmile @HyukjinKwon |
| case d: Distinct => | ||
| throw new AnalysisException(s"Can't add $missingAttrs to $d") | ||
| case u: Union => | ||
| u.withNewChildren(u.children.map(addMissingAttr(_, missingAttrs))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the only issue in Union and I think binary operators have the same issue, e.g.,
scala> df3.join(df4).filter("grouping_id()=0").show()
org.apache.spark.sql.AnalysisException: cannot resolve '`spark_grouping_id`' given input columns: [a, sum(b), a, sum(b)];;
'Filter ('spark_grouping_id = 0)
+- Join Inner
:- Aggregate [a#27, spark_grouping_id#25], [a#27, sum(cast(b#6 as bigint)) AS sum(b)#24L]
: +- Expand [List(a#5, b#6, a#26, 0), List(a#5, b#6, null, 1)], [a#5, b#6, a#27, spark_grouping_id#25]
: +- Project [a#5, b#6, a#5 AS a#26]
: +- Project [_1#0 AS a#5, _2#1 AS b#6]
: +- LocalRelation [_1#0, _2#1]
+- Aggregate [a#38, spark_grouping_id#36], [a#38, sum(cast(b#16 as bigint)) AS sum(b)#35L]
+- Expand [List(a#15, b#16, a#37, 0), List(a#15, b#16, null, 1)], [a#15, b#16, a#38, spark_grouping_id#36]
+- Project [a#15, b#16, a#15 AS a#37]
+- Project [_1#10 AS a#15, _2#11 AS b#16]
+- LocalRelation [_1#10, _2#11]
So, we need more general solution for this case, I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree with you. Current implementation only checks UnaryNode.
It is necessary to take all node types into consideration.
Thanks for suggestion, I will work on a general solution.
gatorsmile
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rule ResolveMissingReference was majorly added for sort, since the traditional RDBMS systems do it.
When we did this rule, we do not plan to support binary nodes. This rule could be very complex for a complete support. We still prefer users to adding the missing references in the query instead of adding them by Spark SQL.
|
Aha, you mean this is an expected design and we won't change this logic in |
|
Yes. We do not plan to introduce extra complexity for improving this rule. |
|
yea, thanks! |
|
Thanks, I will make a try in our private repository as there are several such cases and the users want to migrate in a seamless way. But I found it is really complicated for a general support. |
|
yea, also could you close the jira ticket as |
What changes were proposed in this pull request?
https://issues.apache.org/jira/browse/SPARK-21966
The problem can be reproduced by following example.
val df1 = spark.createDataFrame(Seq((1, 1), (2, 1), (2, 2))).toDF("a", "b") val df2 = spark.createDataFrame(Seq((1, 1), (1, 2), (2, 3))).toDF("a", "b") val df3 = df1.cube("a").sum("b") val df4 = df2.cube("a").sum("b") val df5 = df3.union(df4).filter("grouping_id()=0").show()The
org.apache.spark.sql.AnalysisException: cannot resolve 'spark_grouping_id' given input columnsis thrown as the ResolveMissingReference rule ignore the Union operator. This PR fix the issue.
How was this patch tested?
unit tests