Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -608,12 +608,20 @@ case class FileSourceScanExec(
new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
}

// Filters unused DynamicPruningExpression expressions - one which has been replaced
// with DynamicPruningExpression(Literal.TrueLiteral) during Physical Planning
private def filterUnusedDynamicPruningExpressions(
predicates: Seq[Expression]): Seq[Expression] = {
predicates.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral))
}

override def doCanonicalize(): FileSourceScanExec = {
FileSourceScanExec(
relation,
output.map(QueryPlan.normalizeExpressions(_, output)),
requiredSchema,
QueryPlan.normalizePredicates(partitionFilters, output),
QueryPlan.normalizePredicates(
filterUnusedDynamicPruningExpressions(partitionFilters), output),
optionalBucketSet,
optionalNumCoalescedBuckets,
QueryPlan.normalizePredicates(dataFilters, output),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,30 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
"canonicalization and exchange reuse") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(
""" WITH view1 as (
| SELECT f.store_id FROM fact_stats f WHERE f.units_sold = 70
| )
|
| SELECT * FROM view1 v1 join view1 v2 WHERE v1.store_id = v2.store_id
""".stripMargin)

checkPartitionPruningPredicate(df, false, false)
val reuseExchangeNodes = df.queryExecution.executedPlan.collect {
case se: ReusedExchangeExec => se
}
assert(reuseExchangeNodes.size == 1, "Expected plan to contain 1 ReusedExchangeExec " +
s"nodes. Found ${reuseExchangeNodes.size}")

checkAnswer(df, Row(15, 15) :: Nil)
}
}
}

test("Plan broadcast pruning only when the broadcast can be reused") {
Given("dynamic pruning filter on the build side")
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
Expand Down