-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19471][SQL]AggregationIterator does not initialize the generated result projection before using it #18920
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Jenkins, test this please |
|
ok to test |
|
Test build #80582 has finished for PR 18920 at commit
|
|
Jenkins, retest this please. |
|
Test build #80591 has finished for PR 18920 at commit
|
| (SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString), | ||
| (SQLConf.USE_OBJECT_HASH_AGG.key, useObjectHashAgg.toString)) { | ||
| val df = Seq(("1", 1), ("1", 2), ("2", 3), ("2", 4)).toDF("x", "y") | ||
| // HashAggregate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to check/compare the plans to ensure they are HashAggregate, ObjectHashAggregate and SortAggregate.
|
updated |
|
Jenkins, retest this please. |
|
Test build #80599 has finished for PR 18920 at commit
|
| assert(hashAggPlan.find(p => | ||
| p.isInstanceOf[WholeStageCodegenExec] && | ||
| p.asInstanceOf[WholeStageCodegenExec].child | ||
| .isInstanceOf[HashAggregateExec]).isDefined) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert(hashAggPlan.find {
case WholeStageCodegenExec(_: HashAggregateExec) => true
case _ => false
}.isDefined)| Seq( | ||
| monotonically_increasing_id(), spark_partition_id(), | ||
| rand(Random.nextLong()), randn(Random.nextLong()) | ||
| ).foreach(assertNoExceptions(_)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-> ).foreach(assertNoExceptions)
| allImperativeAggregateFunctions(i).eval(currentBuffer)) | ||
| i += 1 | ||
| } | ||
| resultProjection.initialize(partIndex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move it to line 221
| typedImperativeAggregates(i).serializeAggregateBufferInPlace(currentBuffer) | ||
| i += 1 | ||
| } | ||
| resultProjection.initialize(partIndex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move it to line 240
| // Grouping-only: we only output values based on grouping expressions. | ||
| val resultProjection = UnsafeProjection.create(resultExpressions, groupingAttributes) | ||
| (currentGroupingKey: UnsafeRow, currentBuffer: InternalRow) => { | ||
| resultProjection.initialize(partIndex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move it to line 261
| hashAggDF.collect() | ||
|
|
||
| // ObjectHashAggregate and SortAggregate test cases | ||
| val objHashOrSort_AggDF = df.groupBy("x").agg(c, collect_list("y")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
objHashOrSort_AggDF -> objHashAggOrSortAggDf
|
|
||
| // ObjectHashAggregate and SortAggregate test cases | ||
| val objHashOrSort_AggDF = df.groupBy("x").agg(c, collect_list("y")) | ||
| val objHashOrSort_Plan = objHashOrSort_AggDF.queryExecution.executedPlan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
objHashOrSort_Plan -> objHashAggOrSortAggPlan
|
Updated, thanks for reviewing. |
|
Test build #80615 has finished for PR 18920 at commit
|
|
retest please |
|
Test build #80625 has finished for PR 18920 at commit
|
|
LGTM |
|
Thanks! Merged to master. |
| ).foreach(assertValuesDoNotChangeAfterCoalesceOrUnion(_)) | ||
| } | ||
|
|
||
| private def assertNoExceptions(c: Column): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you submit a follow-up PR to move this test case to DataFrameAggregateSuite? Thanks!
|
Sure, I will do it later. |
…ted result projection before using it ## What changes were proposed in this pull request? This is a follow-up PR that moves the test case in PR-18920 (#18920) to DataFrameAggregateSuit. ## How was this patch tested? unit test Author: donnyzone <[email protected]> Closes #18946 from DonnyZone/branch-19471-followingPR.
What changes were proposed in this pull request?
Recently, we have also encountered such NPE issues in our production environment as described in:
https://issues.apache.org/jira/browse/SPARK-19471
This issue can be reproduced by the following examples:
` val df = spark.createDataFrame(Seq(("1", 1), ("1", 2), ("2", 3), ("2", 4))).toDF("x", "y")
//HashAggregate, SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key=false
df.groupBy("x").agg(rand(),sum("y")).show()
//ObjectHashAggregate, SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key=false
df.groupBy("x").agg(rand(),collect_list("y")).show()
//SortAggregate, SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key=false &&SQLConf.USE_OBJECT_HASH_AGG.key=false
df.groupBy("x").agg(rand(),collect_list("y")).show()
This PR is based on PR-16820(#16820) with test cases for all aggregation paths. We want to push it forward.
How was this patch tested?
unit test
verified in production environment