Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit
private def supportPartialAggPushDown(agg: Aggregation): Boolean = {
// We don't know the agg buffer of `GeneralAggregateFunc`, so can't do partial agg push down.
// If `Sum`, `Count`, `Avg` with distinct, can't do partial agg push down.
agg.aggregateExpressions().exists {
agg.aggregateExpressions().isEmpty || agg.aggregateExpressions().exists {
case sum: Sum => !sum.isDistinct
case count: Count => !count.isDistinct
case avg: Avg => !avg.isDistinct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,57 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkAnswer(df, Seq(Row(5)))
}

test("scan with aggregate push-down: GROUP BY without aggregate functions") {
val df = sql("select name FROM h2.test.employee GROUP BY name")
checkAggregateRemoved(df)
checkPushedInfo(df,
"PushedAggregates: [], PushedFilters: [], PushedGroupByExpressions: [NAME],")
checkAnswer(df, Seq(Row("alex"), Row("amy"), Row("cathy"), Row("david"), Row("jen")))

val df2 = spark.read
.option("partitionColumn", "dept")
.option("lowerBound", "0")
.option("upperBound", "2")
.option("numPartitions", "2")
.table("h2.test.employee")
.groupBy($"name")
.agg(Map.empty[String, String])
checkAggregateRemoved(df2, false)
checkPushedInfo(df2,
"PushedAggregates: [], PushedFilters: [], PushedGroupByExpressions: [NAME],")
checkAnswer(df2, Seq(Row("alex"), Row("amy"), Row("cathy"), Row("david"), Row("jen")))

val df3 = sql("SELECT CASE WHEN SALARY > 8000 AND SALARY < 10000 THEN SALARY ELSE 0 END as" +
" key FROM h2.test.employee GROUP BY key")
checkAggregateRemoved(df3)
checkPushedInfo(df3,
"""
|PushedAggregates: [],
|PushedFilters: [],
|PushedGroupByExpressions:
|[CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END],
|""".stripMargin.replaceAll("\n", " "))
checkAnswer(df3, Seq(Row(0), Row(9000)))

val df4 = spark.read
.option("partitionColumn", "dept")
.option("lowerBound", "0")
.option("upperBound", "2")
.option("numPartitions", "2")
.table("h2.test.employee")
.groupBy(when(($"SALARY" > 8000).and($"SALARY" < 10000), $"SALARY").otherwise(0).as("key"))
.agg(Map.empty[String, String])
checkAggregateRemoved(df4, false)
checkPushedInfo(df4,
"""
|PushedAggregates: [],
|PushedFilters: [],
|PushedGroupByExpressions:
|[CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END],
|""".stripMargin.replaceAll("\n", " "))
checkAnswer(df4, Seq(Row(0), Row(9000)))
}

test("scan with aggregate push-down: COUNT(col)") {
val df = sql("select COUNT(DEPT) FROM h2.test.employee")
checkAggregateRemoved(df)
Expand Down