Skip to content

Commit c98725a

Browse files
Yang Liuwangyum
authored andcommitted
[SPARK-32268][SQL][FOLLOWUP] Add ColumnPruning in injectBloomFilter
### What changes were proposed in this pull request? Add `ColumnPruning` in `InjectRuntimeFilter.injectBloomFilter` to optimize the BoomFilter creation query. ### Why are the changes needed? It seems BloomFilter subqueries injected by `InjectRuntimeFilter` will read as many columns as filterCreationSidePlan. This does not match "Only scan the required columns" as the design said. We can check this by a simple case in `InjectRuntimeFilterSuite`: ```scala withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true", SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") { val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 where bf2.a2 = 62" sql(query).explain() } ``` The reason is subqueries have not been optimized by `ColumnPruning`, and this pr will fix it. ### Does this PR introduce _any_ user-facing change? No, not released ### How was this patch tested? Improve the test by adding `columnPruningTakesEffect` to check the optimizedPlan of bloom filter join. Closes #36047 from Flyangz/SPARK-32268-FOllOWUP. Authored-by: Yang Liu <[email protected]> Signed-off-by: Yuming Wang <[email protected]>
1 parent 41a8249 commit c98725a

File tree

2 files changed

+17
-1
lines changed

2 files changed

+17
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
8585
}
8686
val aggExp = AggregateExpression(bloomFilterAgg, Complete, isDistinct = false, None)
8787
val alias = Alias(aggExp, "bloomFilter")()
88-
val aggregate = ConstantFolding(Aggregate(Nil, Seq(alias), filterCreationSidePlan))
88+
val aggregate =
89+
ConstantFolding(ColumnPruning(Aggregate(Nil, Seq(alias), filterCreationSidePlan)))
8990
val bloomFilterSubquery = ScalarSubquery(aggregate, Nil)
9091
val filter = BloomFilterMightContain(bloomFilterSubquery,
9192
new XxHash64(Seq(filterApplicationSideExp)))

sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
255255
planEnabled = sql(query).queryExecution.optimizedPlan
256256
checkAnswer(sql(query), expectedAnswer)
257257
if (shouldReplace) {
258+
assert(!columnPruningTakesEffect(planEnabled))
258259
assert(getNumBloomFilters(planEnabled) > getNumBloomFilters(planDisabled))
259260
} else {
260261
assert(getNumBloomFilters(planEnabled) == getNumBloomFilters(planDisabled))
@@ -288,6 +289,20 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
288289
numMightContains
289290
}
290291

292+
def columnPruningTakesEffect(plan: LogicalPlan): Boolean = {
293+
def takesEffect(plan: LogicalPlan): Boolean = {
294+
val result = org.apache.spark.sql.catalyst.optimizer.ColumnPruning.apply(plan)
295+
!result.fastEquals(plan)
296+
}
297+
298+
plan.collectFirst {
299+
case Filter(condition, _) if condition.collectFirst {
300+
case subquery: org.apache.spark.sql.catalyst.expressions.ScalarSubquery
301+
if takesEffect(subquery.plan) => true
302+
}.nonEmpty => true
303+
}.nonEmpty
304+
}
305+
291306
def assertRewroteSemiJoin(query: String): Unit = {
292307
checkWithAndWithoutFeatureEnabled(query, testSemiJoin = true, shouldReplace = true)
293308
}

0 commit comments

Comments
 (0)