diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 78808ff21394c..bef9f4b46c628 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -608,12 +608,20 @@ case class FileSourceScanExec( new FileScanRDD(fsRelation.sparkSession, readFile, partitions) } + // Filters unused DynamicPruningExpression expressions - one which has been replaced + // with DynamicPruningExpression(Literal.TrueLiteral) during Physical Planning + private def filterUnusedDynamicPruningExpressions( + predicates: Seq[Expression]): Seq[Expression] = { + predicates.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral)) + } + override def doCanonicalize(): FileSourceScanExec = { FileSourceScanExec( relation, output.map(QueryPlan.normalizeExpressions(_, output)), requiredSchema, - QueryPlan.normalizePredicates(partitionFilters, output), + QueryPlan.normalizePredicates( + filterUnusedDynamicPruningExpressions(partitionFilters), output), optionalBucketSet, optionalNumCoalescedBuckets, QueryPlan.normalizePredicates(dataFilters, output), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index 0b754e9e3ec0b..8edfb91d15fff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -1240,6 +1240,30 @@ abstract class DynamicPartitionPruningSuiteBase } } + test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + + "canonicalization and exchange reuse") { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val df = sql( + """ WITH view1 as ( + | SELECT f.store_id FROM fact_stats f WHERE f.units_sold = 70 + | ) + | + | SELECT * FROM view1 v1 join view1 v2 WHERE v1.store_id = v2.store_id + """.stripMargin) + + checkPartitionPruningPredicate(df, false, false) + val reuseExchangeNodes = df.queryExecution.executedPlan.collect { + case se: ReusedExchangeExec => se + } + assert(reuseExchangeNodes.size == 1, "Expected plan to contain 1 ReusedExchangeExec " + + s"nodes. Found ${reuseExchangeNodes.size}") + + checkAnswer(df, Row(15, 15) :: Nil) + } + } + } + test("Plan broadcast pruning only when the broadcast can be reused") { Given("dynamic pruning filter on the build side") withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {