[SPARK-41049][SQL] Revisit stateful expression handling#39248
[SPARK-41049][SQL] Revisit stateful expression handling#39248cloud-fan wants to merge 2 commits intoapache:masterfrom
Conversation
0838a76 to
de9c17b
Compare
There was a problem hiding this comment.
This is a pre-existing bug. The final expressions we use is exprs, not expressions
There was a problem hiding this comment.
The old usage of .transform here contained a subtle bug related to how fastEquals works.
Let's say that we have a tree which looks like this:
Outer(Middle(Stateful()))
where Outer and Middle are non-Stateful expressions.
When the .transform is applied to Stateful() and .freshCopy() is called, the returned value will be == to the original Stateful expression but will have a different object identity (because it's a fresh object). Internally, .transform will use fastEquals to check whether the transformation modified the node. Stateful overrides fastEquals so that it only considers object identity, so the transform will return the freshCopy() result.
At the next level up, Middle will check whether any of its children have been changed in the recursive bottom-up transformation (see childrenFastEquals() in withNewChildren(), which is called from mapChildren()). It will detect that its children have changed, so the transform will return a new Middle node.
Finally, at the top level, Outer will perform the same check to see if any of its children have changed. This time, however, it will be calling Middle.fastEquals instead of Stateful.fastEquals. Middle's fastEquals method is the regular implementation which also considers object equality. Both the original and new Middle nodes will be ==, so fastEquals will be true and Outer will conclude that its children have not been changed by the transformation and the original Outer will be returned (losing the copy of the stateful expression).
In other words, the old .transform and copying logic here was incorrect if the Stateful expression was nested more than a single level deep.
In this PR I chose to fix this by adding a freshCopyIfContainsStatefulExpression() method to Expression which implements a custom tree traversal considers only object identity when determining whether the transform has changed a node or a node's children.
There was a problem hiding this comment.
Hmm, will sub-expr elimination extract common stateful expressions as common expr and break the rule (not reusing)?
There was a problem hiding this comment.
2 stateful but deterministic expressions always produce the same result given the same input sequence. So it's OK to apply sub-expr elimination.
There was a problem hiding this comment.
This should be called after prepareExpressions is finished, right?
There was a problem hiding this comment.
or more specifically, the implementation should initialize the final expression that it actually uses.
viirya
left a comment
There was a problem hiding this comment.
The proposed logic looks good to me. There are some test failures that looks related.
There was a problem hiding this comment.
The keys in tags still refer to original tree node. Is it okay?
There was a problem hiding this comment.
How is this possible? keys are basically strings, right?
There was a problem hiding this comment.
Note: the code here basically follows TreeNode.withNewChildren
There was a problem hiding this comment.
Yea, you're correct. It is just a string for node name.
|
thanks for the review, merging to master! |
…he base class ### What changes were proposed in this pull request? This is a followup of #39248 , to add one more code cleanup. The expression initialization code is duplicated 6 times and we should put it in the base class. ### Why are the changes needed? code cleanup ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes #39364 from cloud-fan/expr. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Liang-Chi Hsieh <[email protected]>
What changes were proposed in this pull request?
Spark has a
Statefultrait for stateful expressions. The basic idea is to have fresh copies of stateful expressions before evaluating them. This is to avoid issues caused by the flexible DataFrame APIs:However, the handling of stateful expression has several problems. This PR fixes all of them:
CodegenFallback, then the expression tree will be evaluated using the interpreted mode, even with the codegen code path.InterpretedSafeProjectionnever implemented initialize() for initializing Nondeterministic expressions.ConvertToLocalRelationcalled aInterpretedMutableProjectionconstructor which did not implement the existing Stateful-copying logic. I fixed this by moving that logic out of a factory method and into class's main constructor, guaranteeing that it will always run.ScalaUDF. I removed theStatefultrait and added adef stateful: Booleanfunction inExpression.Why are the changes needed?
Fix stateful expression handling
Does this PR introduce any user-facing change?
Yes, now we never share states for stateful expressions, which may produce wrong result.
How was this patch tested?
new tests