-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-43240][SQL][3.3] Fix the wrong result issue when calling df.describe() method. #40914
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@cloud-fan Please help to review. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, we're taking the head row anyway. Do you have any e2e example that produces a wrong result?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, I think we'd better add a test for it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We encountered this issue when passing gluten ut. Because in Gluten we rewritten ColumnarToRow. which will release the row after the row is used. It seems that this issue is hard to reproduce on apache spark. Because the ColumnarToRow of apache spark is in on heap memory, which is recycled by GC. Do you have any suggestion to reproduce this issue in apache spark? @HyukjinKwon @cloud-fan @zhengruifeng
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only a bug if Spark installs third-party physical operators that release memory eagerly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the doc of QueryExecution.toRDD, I think adding copy is the right thing to do
/**
* Internal version of the RDD. Avoids copies and has no schema.
* Note for callers: Spark may apply various optimization including reusing object: this means
* the row is valid only for the iteration it is retrieved. You should avoid storing row and
* accessing after iteration. (Calling `collect()` is one of known bad usage.)
* If you want to store these rows into collection, please apply some converter or copy row
* which produces new object per iteration.
* Given QueryExecution is not a public class, end users are discouraged to use this: please
* use `Dataset.rdd` instead where conversion will be applied.
*/
lazy val toRdd: RDD[InternalRow] = new SQLExecutionRDD(
executedPlan.execute(), sparkSession.sessionState.conf)
It happens to work here because the result only has one row (it's a global aggregate). I'm fine without testing it as this follows the guidance of QueryExecution.toRdd doc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @cloud-fan for the valuable information. @HyukjinKwon @zhengruifeng can we follow the suggestion from wenchen to merge this PR without testing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon Do you have any further comment? Thanks.
|
Do 3.3/3.4/master have the same issue? |
Spark 3.3 have this issue. And spark 3.4 and main doesn't seem to have this issue. Because the StatFunctions.scala is reimplemented and doesn't call the rdd.collect() method. |
|
Oh, you're fixing branch-3.2. It reached EOL, and there won't be more releases in 3.2.x. |
|
I am fine if we can land this to branch-3.3 alone but would need to fix the JIRA's affected version. |
Sure. I have change the version to branch-3.3. Please help to review again. Thanks. |
|
Thanks for your review. @cloud-fan Can you help to merge? |
|
thanks, merging to 3.3! |
…scribe() method ### What changes were proposed in this pull request? The df.describe() method will cached the RDD. And if the cached RDD is RDD[Unsaferow], which may be released after the row is used, then the result will be wong. Here we need to copy the RDD before caching as the [TakeOrderedAndProjectExec ](https://github.com/apache/spark/blob/d68d46c9e2cec04541e2457f4778117b570d8cdb/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala#L204)operator does. ### Why are the changes needed? bug fix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Closes #40914 from JkSelf/describe. Authored-by: Jia Ke <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
The df.describe() method will cached the RDD. And if the cached RDD is RDD[Unsaferow], which may be released after the row is used, then the result will be wong. Here we need to copy the RDD before caching as the TakeOrderedAndProjectExec operator does.
Why are the changes needed?
bug fix
Does this PR introduce any user-facing change?
no
How was this patch tested?