-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-34581][SQL] Don't optimize out grouping expressions from aggregate expressions without aggregate function #31913
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
ae1186f
5ab9f75
2293fd4
3de19ca
04e61c5
6e05f14
09f1a85
f46b89d
56589a3
ea95bff
7ea2306
468534f
977c0bf
c2ba804
0622444
343f35e
2e79eb9
cff9b9a
78296a8
72c173b
34f0439
fb3a19d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -66,6 +66,34 @@ object RewriteNonCorrelatedExists extends Rule[LogicalPlan] { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Wrap complex grouping expression in aggregate expressions without aggregate function into | ||
|
||
| * `GroupingExpression` nodes so as to avoid further optimizations between the expression and its | ||
| * parent. | ||
| * | ||
| * This is required as further optimizations could change the grouping expression and so make the | ||
| * aggregate expression invalid. | ||
|
||
| */ | ||
| object WrapGroupingExpressions extends Rule[LogicalPlan] { | ||
| override def apply(plan: LogicalPlan): LogicalPlan = { | ||
| plan transform { | ||
| case a: Aggregate => | ||
| val complexGroupingExpressions = | ||
| ExpressionSet(a.groupingExpressions.filter(_.children.nonEmpty)) | ||
|
|
||
| def wrapGroupingExpression(e: Expression): Expression = e match { | ||
| case _: GroupingExpression => e | ||
| case _: AggregateExpression => e | ||
| case _ if complexGroupingExpressions.contains(e) => GroupingExpression(e) | ||
| case _ => e.mapChildren(wrapGroupingExpression) | ||
| } | ||
|
|
||
| a.copy(aggregateExpressions = | ||
| a.aggregateExpressions.map(wrapGroupingExpression(_).asInstanceOf[NamedExpression])) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Computes the current date and time to make sure we return the same result in a single query. | ||
| */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4140,6 +4140,20 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark | |
| } | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-34581: Don't optimize out grouping expressions from aggregate expressions") { | ||
| withTempView("t") { | ||
| Seq[Integer](null, 1, 2, 3, null).toDF("id").createOrReplaceTempView("t") | ||
|
|
||
| val df = spark.sql( | ||
|
||
| """ | ||
| |SELECT not(t.id IS NULL), count(*) AS c | ||
| |FROM t | ||
| |GROUP BY t.id IS NULL | ||
| |""".stripMargin) | ||
| checkAnswer(df, Row(true, 3) :: Row(false, 2) :: Nil) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| case class Foo(bar: Option[String]) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: How about
GroupingExpression->ReferencedByGroupingExpr?