Skip to content

Commit e0c116c

Browse files
committed
Compute aggregate expression during planning instead of lazily on workers.
1 parent 41db44c commit e0c116c

File tree

1 file changed

+5
-10
lines changed

1 file changed

+5
-10
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,7 @@ case class Aggregate(
7777
resultAttribute: AttributeReference)
7878

7979
/** A list of aggregates that need to be computed for each group. */
80-
@transient
81-
private[this] lazy val computedAggregates = aggregateExpressions.flatMap { agg =>
80+
private[this] val computedAggregates = aggregateExpressions.flatMap { agg =>
8281
agg.collect {
8382
case a: AggregateExpression =>
8483
ComputedAggregate(
@@ -89,8 +88,7 @@ case class Aggregate(
8988
}.toArray
9089

9190
/** The schema of the result of all aggregate evaluations */
92-
@transient
93-
private[this] lazy val computedSchema = computedAggregates.map(_.resultAttribute)
91+
private[this] val computedSchema = computedAggregates.map(_.resultAttribute)
9492

9593
/** Creates a new aggregate buffer for a group. */
9694
private[this] def newAggregateBuffer(): Array[AggregateFunction] = {
@@ -104,8 +102,7 @@ case class Aggregate(
104102
}
105103

106104
/** Named attributes used to substitute grouping attributes into the final result. */
107-
@transient
108-
private[this] lazy val namedGroups = groupingExpressions.map {
105+
private[this] val namedGroups = groupingExpressions.map {
109106
case ne: NamedExpression => ne -> ne.toAttribute
110107
case e => e -> Alias(e, s"groupingExpr:$e")().toAttribute
111108
}
@@ -114,16 +111,14 @@ case class Aggregate(
114111
* A map of substitutions that are used to insert the aggregate expressions and grouping
115112
* expression into the final result expression.
116113
*/
117-
@transient
118-
private[this] lazy val resultMap =
114+
private[this] val resultMap =
119115
(computedAggregates.map { agg => agg.unbound -> agg.resultAttribute } ++ namedGroups).toMap
120116

121117
/**
122118
* Substituted version of aggregateExpressions expressions which are used to compute final
123119
* output rows given a group and the result of all aggregate computations.
124120
*/
125-
@transient
126-
private[this] lazy val resultExpressions = aggregateExpressions.map { agg =>
121+
private[this] val resultExpressions = aggregateExpressions.map { agg =>
127122
agg.transform {
128123
case e: Expression if resultMap.contains(e) => resultMap(e)
129124
}

0 commit comments

Comments
 (0)