-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-32308][SQL] Move by-name resolution logic of unionByName from API code to analysis phase #29107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #125845 has finished for PR 29107 at commit
|
|
Thank you, @viirya . |
|
Test build #125853 has finished for PR 29107 at commit
|
| case Union(children, byName, allowMissingCol) | ||
| if byName => | ||
| val union = children.reduceLeft { (left: LogicalPlan, right: LogicalPlan) => | ||
| unionTwoSides(left, right, allowMissingCol) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a question; the process of unionTwoSides is a type-coercion one? Seems like column resolution rather than type coercion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems not like typical type coercion case, but put here based on #28996 (comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm I see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If looks not proper after rethinking, we can also move to other rule or create another rule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea maybe a new analyzer rule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, moved.
| case e if !e.childrenResolved => e | ||
|
|
||
| case Union(children, byName, allowMissingCol) | ||
| if byName => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: case Union(children, byName, allowMissingCol) if byName =>?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
|
||
| case Union(_, byName, allowMissingCol) if byName || allowMissingCol => | ||
| failAnalysis("Union should not be with true `byName` or " + | ||
| "`allowMissingCol` flags after analysis phase.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a question; users can see this error message? That's the case of an analyzer bug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usually not. This mainly prevents we accidentally create a Union with byName or allowMissingCol after ResolveUnion rule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And, yes, prevent a unexpected bug during analysis.
|
Test build #126077 has finished for PR 29107 at commit
|
|
retest this please |
|
Test build #126093 has finished for PR 29107 at commit
|
|
retest this please |
|
Test build #126112 has finished for PR 29107 at commit
|
|
I just linked the JIRA to SPARK-29358 so it's easier to track the lineage. Is there already another follow up ticket to make this work for nested structs? |
Thanks. I don't see an existing follow up ticket for nested structs. |
|
Awesome, I just created https://issues.apache.org/jira/browse/SPARK-32376. We've implemented a utility to do this before; I'll sync with my team and see if it's easy to port the code over and open a PR for that unless you're already planning on tackling it. Even if so, we're happy to share our test cases. |
|
Thanks @mukulmurthy. I've not worked on it yet. Feel free to open a PR if you are ready to port the code. |
| * run before `TypeCoercion`, because `TypeCoercion` should be run on correctly resolved | ||
| * column by name. | ||
| */ | ||
| object ResolveUnion extends Rule[LogicalPlan] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we put it in a new file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure.
|
|
||
| /** | ||
| * Resolves different children of Union to a common set of columns. Note that this must be | ||
| * run before `TypeCoercion`, because `TypeCoercion` should be run on correctly resolved |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fragile to rely on rule order. How about we skip the type coercion rule if the union is by name and the name match is not done yet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
| } else { | ||
| left | ||
| } | ||
| Union(leftChild, rightChild) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we avoid creating intermediate unions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, it was the behavior before and seems hard to get rid of it.
| s.children.forall(_.output.length == s.children.head.output.length) && !s.resolved => | ||
| val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(s.children) | ||
| s.makeCopy(Array(newChildren)) | ||
| Union(newChildren, s.byName, s.allowMissingCol) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s.copy(children = newChildren)
| case LocalLimit(exp, Union(children)) => | ||
| LocalLimit(exp, Union(children.map(maybePushLocalLimit(exp, _)))) | ||
| case LocalLimit(exp, Union(children, byName, allowMissingCol)) => | ||
| LocalLimit(exp, Union(children.map(maybePushLocalLimit(exp, _)), byName, allowMissingCol)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use copy would be better, as it preserves the tree node tag
| Project(projectList.map(pushToRight(_, rewrites)), child) | ||
| } | ||
| Union(newFirstChild +: newOtherChildren) | ||
| Union(newFirstChild +: newOtherChildren, byName, allowMissingCol) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| } | ||
| } | ||
| Union(flattened.toSeq) | ||
| Union(flattened, topByName, topAllowMissingCol) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto: use copy
| val expected2 = Union(table1 :: projected2 :: Nil) | ||
| comparePlans(analyzed2, expected2) | ||
|
|
||
| // By name + Allow missing column |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by position?
| // By name + Allow missing column | ||
| val union3 = Union(union1 :: union2 :: Nil) | ||
| val analyzed3 = analyzer.execute(union3) | ||
| val expected3 = Union(expected1 :: expected2 :: Nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this test prove anything? union1 and union2 have exactly the same schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, let me change it.
| sparkSession.sessionState.conf.caseSensitiveAnalysis) | ||
| SchemaUtils.checkColumnNameDuplication( | ||
| rightOutputAttrs.map(_.name), | ||
| "in the right attributes", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we move the check to the analyzer rule as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. moved.
maropu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except for the minor comments.
| Alias(Literal(null, lattr.dataType), lattr.name)() | ||
| } else { | ||
| throw new AnalysisException( | ||
| s"""Cannot resolve column name "${lattr.name}" among """ + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about making it consistent (using ` for wrapping a column name)?
throw new AnalysisException(
s"Cannot resolve column name `${lattr.name}` among " +
s"(${rightOutputAttrs.map(_.name).mkString(", ")})")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
| case e if !e.childrenResolved => e | ||
|
|
||
| case Union(children, byName, allowMissingCol) if byName => | ||
| val union = children.reduceLeft { (left: LogicalPlan, right: LogicalPlan) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: { (left: LogicalPlan, right: LogicalPlan) => -> { (left, right) =>
| case s: Union if s.childrenResolved && | ||
| s.children.forall(_.output.length == s.children.head.output.length) && !s.resolved => | ||
| s.children.forall(_.output.length == s.children.head.output.length) && !s.resolved && | ||
| !s.byName => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Since s.byName is constant, how about evaluating it first?
case s @ Union(_, false, _) if s.childrenResolved && !s.byName &&
s.children.forall(_.output.length == s.children.head.output.length) && !s.resolved =>
|
Test build #126277 has finished for PR 29107 at commit
|
|
Test build #126294 has finished for PR 29107 at commit
|
|
retest this please |
| case e @ Except(left, right, _) if !e.duplicateResolved => | ||
| e.copy(right = dedupRight(left, right)) | ||
| case u @ Union(children) if !u.duplicateResolved => | ||
| case u @ Union(children, _, _) if !u.duplicateResolved => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should do this only when the by-name resolution is done?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicateResolved checks attribute sets from children that look at exprId actually. By-name resolution is only for attribute name. If you think it is safer, I can add it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's safer to add it, to explicitly define the rule order (by-name resolution should happen before this rule)
...catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
Outdated
Show resolved
Hide resolved
| case Tail(limitExpr, _) => checkLimitLikeClause("tail", limitExpr) | ||
|
|
||
| case Union(_, byName, allowMissingCol) if byName || allowMissingCol => | ||
| failAnalysis("Union should not be with true `byName` or " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be safe, shall we also override Union.resolved to include this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add !(byName || allowMissingCol) to the condition at Union.resolved now. Then do we still need to check them at CheckAnalysis like above? Or just follow other nodes with special check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is not something users can hit(only bug can trigger this), maybe we can just remove this check here.
| case logical.GlobalLimit(IntegerLiteral(limit), child) => | ||
| execution.GlobalLimitExec(limit, planLater(child)) :: Nil | ||
| case logical.Union(unionChildren) => | ||
| case logical.Union(unionChildren, _, _) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this pattern appears many times (Union(children, _, _)), and actually we can rewrite to
case union: logical.Union =>
execution.UnionExec(union.children.map(planLater)) :: Nil
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh ok.
| df1.unionByName(df2) | ||
| }.getMessage | ||
| assert(errMsg.contains("""Cannot resolve column name "b" among (a, c, d)""")) | ||
| assert(errMsg.contains("""Cannot resolve column name `b` among (a, c, d)""")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we keep the previous error message? In general we should only use backticks when necessary(has special chars)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. let me revert to previous one.
|
Test build #126311 has finished for PR 29107 at commit
|
|
Test build #126348 has finished for PR 29107 at commit
|
|
Test build #126355 has finished for PR 29107 at commit
|
| e.copy(right = dedupRight(left, right)) | ||
| case u @ Union(children) if !u.duplicateResolved => | ||
| // Only after we finish by-name resolution for Union | ||
| case u: logical.Union if !u.duplicateResolved && !u.byName => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: logical.Union -> Union.
!u.duplicateResolved && !u.byName -> !u.byName && !u.duplicateResolved
| case (l, r) => l.dataType.sameType(r.dataType) | ||
| }) | ||
| children.length > 1 && childrenResolved && allChildrenCompatible | ||
| children.length > 1 && childrenResolved && allChildrenCompatible && !(byName || allowMissingCol) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: !(byName || allowMissingCol) should be checked before childrenResolved, as it's cheaper.
cloud-fan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except a few minor comments
|
Test build #126388 has finished for PR 29107 at commit
|
|
retest this please |
|
Test build #126417 has finished for PR 29107 at commit
|
|
thanks, merging to master! |
|
Thanks for review. |
What changes were proposed in this pull request?
Currently the by-name resolution logic of
unionByNameis put in API code. This patch moves the logic to analysis phase.See #28996 (comment).
Why are the changes needed?
Logically we should do resolution in analysis phase. This refactoring cleans up API method and makes consistent resolution.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit tests.