diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 69bdd988c243..421b843d28e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1391,11 +1391,39 @@ class Analyzer( notMatchedActions = newNotMatchedActions) } + // When filter condition is havingConditions, columns haven't been handled by + // TypeCoercion, this make a situation that cond isn't resolved because of aggregate + // functions's checkInputDataType method. Then it can't be handled by + // ResolveAggregateFunctions, finally cause column resolve error. + // For this situation, we don't resolve cond's reference here + case f @ Filter(cond, agg @ Aggregate(_, _, _)) if containsAggregate(cond) => f + case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}") q.mapExpressions(resolveExpressionTopDown(_, q)) } + def containsAggregate(e: Expression): Boolean = { + e.find { + // In current loop, functions maybe unresolved, + // we should judge if it is aggregate function now + case func: UnresolvedFunction => + try { + v1SessionCatalog.lookupFunction(func.name, func.arguments) + .isInstanceOf[AggregateFunction] + } catch { + // When UnresolvedFunction is a UDF function, we can't lookup function since + // it's arguments is unresolved. If throw exception when lookup functions, + // let's assume that we don't deal with this situation right now, + // after next loop, this function's arguments will be resolved + // then we can judge if this function is aggregate function next time. + case _: Exception => true + } + case _ => + false + }.isDefined || e.find(_.isInstanceOf[AggregateExpression]).isDefined + } + def resolveAssignments( assignments: Seq[Assignment], mergeInto: MergeIntoTable, @@ -1679,7 +1707,13 @@ class Analyzer( Project(child.output, newSort) } - case f @ Filter(cond, child) if (!f.resolved || f.missingInput.nonEmpty) && child.resolved => + // When filter condition is havingConditions, columns haven't been handled by + // TypeCoercion, this make a situation that cond isn't resolved because of aggregate + // functions's checkInputDataType method. Then it can't be handled by + // ResolveAggregateFunctions, finally cause column resolve error. + // For this situation, we don't resolve cond's reference here + case f @ Filter(cond, child) if (!f.resolved || f.missingInput.nonEmpty) && child.resolved + && (!child.isInstanceOf[Aggregate] || !ResolveReferences.containsAggregate(cond)) => val (newCond, newChild) = resolveExprsAndAddMissingAttrs(Seq(cond), child) if (child.output == newChild.output) { f.copy(condition = newCond.head) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 17602e14f04f..5fe4bd42fab4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -3494,6 +3494,28 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark Seq(Row(Map[Int, Int]()), Row(Map(1 -> 2)))) } + test("SPARK-31334: Don't ResolveReference/ResolveMissingReference when " + + "Filter condition with aggregate expression") { + Seq( + ("1", 3), + ("2", 3), + ("3", 6), + ("4", 7), + ("5", 9), + ("6", 9) + ).toDF("a", "b").createOrReplaceTempView("testData2") + + checkAnswer(sql( + """ + | SELECT b, sum(a) as a + | FROM testData2 + | GROUP BY b + | HAVING sum(a) > 3 + """.stripMargin), + Row(7, 4.0) :: Row(9, 11.0) :: Nil) + } + + test("SPARK-31242: clone SparkSession should respect sessionInitWithConfigDefaults") { // Note, only the conf explicitly set in SparkConf(e.g. in SharedSparkSessionBase) would cause // problem before the fix.