-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24500][SQL] Make sure streams are materialized during Tree transforms. #21539
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #91703 has finished for PR 21539 at commit
|
|
retest this please |
|
👍 |
|
Test build #91708 has finished for PR 21539 at commit
|
| if (children.nonEmpty) { | ||
| var changed = false | ||
| def mapChild(child: Any): Any = child match { | ||
| case arg: TreeNode[_] if containsChild(arg) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we reuse these code in L326?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, so was about to do that but then I noticed that they handle different cases, mapChild handles TreeNode and (TreeNode, TreeNode), whereas L326-L349 handles TreeNode, Option[TreeNode] and Map[_, TreeNode]. I am not sure if combining them is useful, and if it is then I'd rather do it in a different PR.
| val df = Union(Stream( | ||
| Range(1, 1, 1, 1), | ||
| Range(1, 2, 1, 1))) | ||
| df.queryExecution.executedPlan.execute() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it throw exception before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it would throw an UnsupportedOperationException before.
|
nice fix! |
|
retest this please |
|
Test build #91735 has finished for PR 21539 at commit
|
|
Interesting and great catch! |
|
thanks, merging to master! |
What changes were proposed in this pull request?
If you construct catalyst trees using
scala.collection.immutable.Streamyou can run into situations where valid transformations do not seem to have any effect. There are two causes for this behavior:Streamis evaluated lazily. Note that default implementation will generally only evaluate a function for the first element (this makes testing a bit tricky).TreeNodeandQueryPlanuse side effects to detect if a tree has changed. Mapping over a stream is lazy and does not need to trigger this side effect. If this happens the node will invalidly assume that it did not change and return itself instead if the newly created node (this is for GC reasons).This PR fixes this issue by forcing materialization on streams in
TreeNodeandQueryPlan.How was this patch tested?
Unit tests were added to
TreeNodeSuiteandLogicalPlanSuite. An integration test was added to thePlannerSuite