Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -1391,11 +1391,39 @@ class Analyzer(
notMatchedActions = newNotMatchedActions)
}

// When filter condition is havingConditions, columns haven't been handled by
// TypeCoercion, this make a situation that cond isn't resolved because of aggregate
// functions's checkInputDataType method. Then it can't be handled by
// ResolveAggregateFunctions, finally cause column resolve error.
// For this situation, we don't resolve cond's reference here
case f @ Filter(cond, agg @ Aggregate(_, _, _)) if containsAggregate(cond) => f

case q: LogicalPlan =>
logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}")
q.mapExpressions(resolveExpressionTopDown(_, q))
}

def containsAggregate(e: Expression): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we reuse ResolveAggregateFunctions.containsAggregate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we reuse ResolveAggregateFunctions.containsAggregate?

Since here function is still UnresolvedFunction, we can't just reuse this.

e.find {
// In current loop, functions maybe unresolved,
// we should judge if it is aggregate function now
case func: UnresolvedFunction =>
try {
v1SessionCatalog.lookupFunction(func.name, func.arguments)
.isInstanceOf[AggregateFunction]
} catch {
// When UnresolvedFunction is a UDF function, we can't lookup function since
// it's arguments is unresolved. If throw exception when lookup functions,
// let's assume that we don't deal with this situation right now,
// after next loop, this function's arguments will be resolved
// then we can judge if this function is aggregate function next time.
case _: Exception => true
}
case _ =>
false
}.isDefined || e.find(_.isInstanceOf[AggregateExpression]).isDefined
}

def resolveAssignments(
assignments: Seq[Assignment],
mergeInto: MergeIntoTable,
Expand Down Expand Up @@ -1679,7 +1707,13 @@ class Analyzer(
Project(child.output, newSort)
}

case f @ Filter(cond, child) if (!f.resolved || f.missingInput.nonEmpty) && child.resolved =>
// When filter condition is havingConditions, columns haven't been handled by
// TypeCoercion, this make a situation that cond isn't resolved because of aggregate
// functions's checkInputDataType method. Then it can't be handled by
// ResolveAggregateFunctions, finally cause column resolve error.
// For this situation, we don't resolve cond's reference here
case f @ Filter(cond, child) if (!f.resolved || f.missingInput.nonEmpty) && child.resolved
&& (!child.isInstanceOf[Aggregate] || !ResolveReferences.containsAggregate(cond)) =>
val (newCond, newChild) = resolveExprsAndAddMissingAttrs(Seq(cond), child)
if (child.output == newChild.output) {
f.copy(condition = newCond.head)
Expand Down
40 changes: 40 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3494,6 +3494,46 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
Seq(Row(Map[Int, Int]()), Row(Map(1 -> 2))))
}

test("SPARK-31334: Don't ResolveReference/ResolveMissingReference when " +
"Filter condition with aggregate expression") {
Seq(
(1, 3),
(2, 3),
(3, 6),
(4, 7),
(5, 9),
(6, 9)
).toDF("a", "b").createOrReplaceTempView("testData1")

checkAnswer(sql(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this test fail before your patch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this test fail before your patch?

No, it's won't failed, here is for contrast.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this test is not qualified to reproduce the bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this test is not qualified to reproduce the bug?

The first SQL is used for comparison, and the second can reproduce bugs.
If don't need, we can just delete first one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's delete

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's delete

Done

"""
| SELECT b, sum(a) as a
| FROM testData1
| GROUP BY b
| HAVING sum(a) > 3
""".stripMargin),
Row(7, 4) :: Row(9, 11) :: Nil)

Seq(
("1", 3),
("2", 3),
("3", 6),
("4", 7),
("5", 9),
("6", 9)
).toDF("a", "b").createOrReplaceTempView("testData2")

checkAnswer(sql(
"""
| SELECT b, sum(a) as a
| FROM testData2
| GROUP BY b
| HAVING sum(a) > 3
""".stripMargin),
Row(7, 4.0) :: Row(9, 11.0) :: Nil)
}


test("SPARK-31242: clone SparkSession should respect sessionInitWithConfigDefaults") {
// Note, only the conf explicitly set in SparkConf(e.g. in SharedSparkSessionBase) would cause
// problem before the fix.
Expand Down