Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,6 @@ case class RowDataSourceScanExec(

def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]")

val (aggString, groupByString) = if (pushedDownOperators.aggregation.nonEmpty) {
(seqToString(pushedDownOperators.aggregation.get.aggregateExpressions),
seqToString(pushedDownOperators.aggregation.get.groupByColumns))
} else {
("[]", "[]")
}

val markedFilters = if (filters.nonEmpty) {
for (filter <- filters) yield {
if (handledFilters.contains(filter)) s"*$filter" else s"$filter"
Expand All @@ -151,9 +144,10 @@ case class RowDataSourceScanExec(

Map(
"ReadSchema" -> requiredSchema.catalogString,
"PushedFilters" -> seqToString(markedFilters.toSeq),
"PushedAggregates" -> aggString,
"PushedGroupby" -> groupByString) ++
"PushedFilters" -> seqToString(markedFilters.toSeq)) ++
pushedDownOperators.aggregation.fold(Map[String, String]()) { v =>
Map("PushedAggregates" -> seqToString(v.aggregateExpressions),
"PushedGroupByColumns" -> seqToString(v.groupByColumns))} ++
pushedDownOperators.limit.map(value => "PushedLimit" -> s"LIMIT $value") ++
pushedDownOperators.sample.map(v => "PushedSample" ->
s"SAMPLE (${(v.upperBound - v.lowerBound) * 100}) ${v.withReplacement} SEED(${v.seed})"
Expand Down
32 changes: 16 additions & 16 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val expected_plan_fragment =
"PushedAggregates: [MAX(SALARY), MIN(BONUS)], " +
"PushedFilters: [IsNotNull(DEPT), GreaterThan(DEPT,0)], " +
"PushedGroupby: [DEPT]"
"PushedGroupByColumns: [DEPT]"
checkKeywordsExistsInExplain(df, expected_plan_fragment)
}
checkAnswer(df, Seq(Row(10000, 1000), Row(12000, 1200), Row(12000, 1200)))
Expand All @@ -345,7 +345,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val expected_plan_fragment =
"PushedAggregates: [MAX(ID), MIN(ID)], " +
"PushedFilters: [IsNotNull(ID), GreaterThan(ID,0)], " +
"PushedGroupby: []"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we print PushedAggregates and PushedGroupby together in DataSourceScanExec.scala? Why remove PushedGroupby only here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query is select MAX(ID), MIN(ID) FROM h2.test.people where id > 0. The aggregates MAX(ID) and MIN(ID) are pushed down. There is no group by, so pushed group by is [].

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I understand the PushedGroupby is empty here. I don't know why we need to remove it here. We can still test it to be empty, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I see what you mean now.

"PushedGroupByColumns: []"
checkKeywordsExistsInExplain(df, expected_plan_fragment)
}
checkAnswer(df, Seq(Row(2, 1)))
Expand Down Expand Up @@ -424,7 +424,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val expected_plan_fragment =
"PushedAggregates: [SUM(SALARY)], " +
"PushedFilters: [], " +
"PushedGroupby: [DEPT]"
"PushedGroupByColumns: [DEPT]"
checkKeywordsExistsInExplain(df, expected_plan_fragment)
}
checkAnswer(df, Seq(Row(19000), Row(22000), Row(12000)))
Expand All @@ -437,7 +437,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val expected_plan_fragment =
"PushedAggregates: [SUM(DISTINCT SALARY)], " +
"PushedFilters: [], " +
"PushedGroupby: [DEPT]"
"PushedGroupByColumns: [DEPT]"
checkKeywordsExistsInExplain(df, expected_plan_fragment)
}
checkAnswer(df, Seq(Row(19000), Row(22000), Row(12000)))
Expand All @@ -455,7 +455,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val expected_plan_fragment =
"PushedAggregates: [MAX(SALARY), MIN(BONUS)], " +
"PushedFilters: [IsNotNull(DEPT), GreaterThan(DEPT,0)], " +
"PushedGroupby: [DEPT, NAME]"
"PushedGroupByColumns: [DEPT, NAME]"
checkKeywordsExistsInExplain(df, expected_plan_fragment)
}
checkAnswer(df, Seq(Row(9000, 1200), Row(12000, 1200), Row(10000, 1300),
Expand All @@ -474,7 +474,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val expected_plan_fragment =
"PushedAggregates: [MAX(SALARY), MIN(BONUS)], " +
"PushedFilters: [IsNotNull(DEPT), GreaterThan(DEPT,0)], " +
"PushedGroupby: [DEPT]"
"PushedGroupByColumns: [DEPT]"
checkKeywordsExistsInExplain(df, expected_plan_fragment)
}
checkAnswer(df, Seq(Row(12000, 1200), Row(12000, 1200)))
Expand All @@ -489,7 +489,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val expected_plan_fragment =
"PushedAggregates: [MIN(SALARY)], " +
"PushedFilters: [], " +
"PushedGroupby: [DEPT]"
"PushedGroupByColumns: [DEPT]"
checkKeywordsExistsInExplain(df, expected_plan_fragment)
}
checkAnswer(df, Seq(Row(1, 9000), Row(2, 10000), Row(6, 12000)))
Expand All @@ -512,7 +512,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val expected_plan_fragment =
"PushedAggregates: [SUM(SALARY)], " +
"PushedFilters: [IsNotNull(DEPT), GreaterThan(DEPT,0)], " +
"PushedGroupby: [DEPT]"
"PushedGroupByColumns: [DEPT]"
checkKeywordsExistsInExplain(query, expected_plan_fragment)
}
checkAnswer(query, Seq(Row(6, 12000), Row(1, 19000), Row(2, 22000)))
Expand All @@ -536,10 +536,10 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val df1 = sql("select * from h2.test.employee").toDF(cols: _*)
val df2 = df1.groupBy().sum("c")
df2.queryExecution.optimizedPlan.collect {
case _: DataSourceV2ScanRelation =>
val expected_plan_fragment =
"PushedAggregates: []" // aggregate over alias not push down
checkKeywordsExistsInExplain(df2, expected_plan_fragment)
case relation: DataSourceV2ScanRelation => relation.scan match {
case v1: V1ScanWrapper =>
assert(v1.pushedDownOperators.aggregation.isEmpty)
}
}
checkAnswer(df2, Seq(Row(53000.00)))
}
Expand All @@ -554,10 +554,10 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
.filter(name($"shortName"))
.agg(sum($"SALARY").as("sum_salary"))
query.queryExecution.optimizedPlan.collect {
case _: DataSourceV2ScanRelation =>
val expected_plan_fragment =
"PushedAggregates: []"
checkKeywordsExistsInExplain(query, expected_plan_fragment)
case relation: DataSourceV2ScanRelation => relation.scan match {
case v1: V1ScanWrapper =>
assert(v1.pushedDownOperators.aggregation.isEmpty)
}
}
checkAnswer(query, Seq(Row(29000.0)))
}
Expand Down