Skip to content

Commit 9938252

Browse files
committed
Handle case of one distinct grouping with superficially different function children to Spark strategies
1 parent f53136d commit 9938252

File tree

2 files changed

+7
-25
lines changed

2 files changed

+7
-25
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -405,28 +405,7 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
405405
}
406406
Aggregate(groupByAttrs, patchedAggExpressions, firstAggregate)
407407
} else {
408-
// We may have one distinct group only because we grouped using ExpressionSet.
409-
// To prevent SparkStrategies from complaining during sanity check, we need to check whether
410-
// the original list of aggregate expressions had multiple distinct groups and, if so,
411-
// patch that list so we have only one distinct group.
412-
val funcChildren = distinctAggs.flatMap { e =>
413-
e.aggregateFunction.children.filter(!_.foldable)
414-
}
415-
val funcChildrenLookup = funcChildren.map { e =>
416-
(e, funcChildren.find(fc => e.semanticEquals(fc)).getOrElse(e))
417-
}.toMap
418-
419-
if (funcChildrenLookup.keySet.size > funcChildrenLookup.values.toSet.size) {
420-
val patchedAggExpressions = a.aggregateExpressions.map { e =>
421-
e.transformDown {
422-
case e: Expression =>
423-
funcChildrenLookup.getOrElse(e, e)
424-
}.asInstanceOf[NamedExpression]
425-
}
426-
a.copy(aggregateExpressions = patchedAggExpressions)
427-
} else {
428-
a
429-
}
408+
a
430409
}
431410
}
432411

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -527,8 +527,10 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
527527

528528
val (functionsWithDistinct, functionsWithoutDistinct) =
529529
aggregateExpressions.partition(_.isDistinct)
530-
if (functionsWithDistinct.map(
531-
_.aggregateFunction.children.filterNot(_.foldable).toSet).distinct.length > 1) {
530+
val distinctAggChildSets = functionsWithDistinct.map { ae =>
531+
ExpressionSet(ae.aggregateFunction.children.filterNot(_.foldable))
532+
}.distinct
533+
if (distinctAggChildSets.length > 1) {
532534
// This is a sanity check. We should not reach here when we have multiple distinct
533535
// column sets. Our `RewriteDistinctAggregates` should take care this case.
534536
throw new IllegalStateException(
@@ -560,7 +562,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
560562
// [COUNT(DISTINCT bar), COUNT(DISTINCT foo)] is disallowed because those two distinct
561563
// aggregates have different column expressions.
562564
val distinctExpressions =
563-
functionsWithDistinct.head.aggregateFunction.children.filterNot(_.foldable)
565+
functionsWithDistinct.flatMap(
566+
_.aggregateFunction.children.filterNot(_.foldable)).distinct
564567
val normalizedNamedDistinctExpressions = distinctExpressions.map { e =>
565568
// Ideally this should be done in `NormalizeFloatingNumbers`, but we do it here
566569
// because `distinctExpressions` is not extracted during logical phase.

0 commit comments

Comments
 (0)