-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-32777][SQL] Aggregation support aggregate function with multiple foldable expressions. #29626
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32777][SQL] Aggregation support aggregate function with multiple foldable expressions. #29626
Conversation
|
Test build #128196 has finished for PR 29626 at commit
|
|
retest this please |
|
cc @cloud-fan and @linhongliu-db |
|
|
||
|
|
||
| -- !query | ||
| SELECT COUNT(DISTINCT id), COUNT(DISTINCT 2,3) FILTER (WHERE dept_id = 30) FROM emp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we test WHERE dept_id > 0? to make sure that we get correct result even if there are more than one inputs after filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
|
Test build #128198 has finished for PR 29626 at commit
|
|
Test build #128197 has finished for PR 29626 at commit
|
|
Test build #128201 has finished for PR 29626 at commit
|
|
Test build #128208 has finished for PR 29626 at commit
|
|
Test build #128219 has finished for PR 29626 at commit
|
|
retest this please |
|
cc @cloud-fan |
|
Test build #128285 has finished for PR 29626 at commit
|
|
Test build #128294 has finished for PR 29626 at commit
|
| } | ||
| } | ||
| val distinctAggGroupLookup = distinctAggGroupMap.toMap | ||
| val distinctAggGroups = distinctAggGroupMap.groupBy(_._2).map{ kv => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: mapValues
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
| val naf = patchAggregateFunctionChildren(af) { x => | ||
| val condition = if (e.filter.isDefined) { | ||
| e.filter.map(distinctAggFilterAttrLookup.get(_)).get | ||
| val condition = e.filter.map(distinctAggFilterAttrLookup.get(_)).getOrElse(None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is .getOrElse(None) needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
Test build #128341 has finished for PR 29626 at commit
|
|
Test build #128345 has finished for PR 29626 at commit
|
| val condition = if (e.filter.isDefined) { | ||
| e.filter.map(distinctAggFilterAttrLookup.get(_)).get | ||
| val condition = e.filter.map(distinctAggFilterAttrLookup.get(_)).flatten | ||
| if (distinctAggGroupLookup(e).contains(x)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we can simplify the logic a bit. The goal is to only do the replacement for the first child if all the children are foldable. How about
val af = e.aggregateFunction
val condition = e.filter.map(distinctAggFilterAttrLookup.get(_)).flatten
val naf = if (af.children.forall(_.foldable)) {
val firstChild = evalWithinGroup(id, af.children.head, condition)
af.withNewChildren(firstChild +: af.children.drop(1)).asInstanceOf[AggregateFunction]
} else {
patchAggregateFunctionChildren(af) { x =>
distinctAggChildAttrLookup.get(x).map(evalWithinGroup(id, _, condition))
}
}
Then we don't need to change a lot of code to create distinctAggGroupLookup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
|
Test build #128429 has finished for PR 29626 at commit
|
|
retest this please |
|
Test build #128438 has finished for PR 29626 at commit
|
|
retest this please |
| None | ||
| val condition = e.filter.map(distinctAggFilterAttrLookup.get(_)).flatten | ||
| val naf = if (af.children.forall(_.foldable)) { | ||
| val firstChild = evalWithinGroup(id, af.children.head, condition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add some comments to explain why we are doing it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
|
Test build #128479 has finished for PR 29626 at commit
|
|
retest this please |
|
Test build #128478 has finished for PR 29626 at commit
|
|
Test build #128489 has finished for PR 29626 at commit
|
|
retest this please |
|
Test build #128498 has finished for PR 29626 at commit
|
|
github action passed, merging to master, thanks! |
|
@cloud-fan Thanks |
What changes were proposed in this pull request?
Spark SQL exists a bug show below:
The first query is correct, but the second query is not.
The root reason is the second query rewrited by
RewriteDistinctAggregateswho expand the output but lost the 2.Why are the changes needed?
Fix a bug.
SELECT COUNT(DISTINCT 2), COUNT(DISTINCT 3, 2)should return1, 1Does this PR introduce any user-facing change?
Yes
How was this patch tested?
New UT