diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 3c35ba9b6004..bbda9eb76b10 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -137,11 +137,22 @@ object ScanOperation extends OperationHelper { val alwaysInline = SQLConf.get.getConf(SQLConf.COLLAPSE_PROJECT_ALWAYS_INLINE) val (fields, filters, child, _) = collectProjectsAndFilters(plan, alwaysInline) // `collectProjectsAndFilters` transforms the plan bottom-up, so the bottom-most filter are - // placed at the beginning of `filters` list. According to the SQL semantic, we can only - // push down the bottom deterministic filters. - val filtersCanPushDown = filters.takeWhile(_.deterministic).flatMap(splitConjunctivePredicates) - val filtersStayUp = filters.dropWhile(_.deterministic) - Some((fields.getOrElse(child.output), filtersStayUp, filtersCanPushDown, child)) + // placed at the beginning of `filters` list. According to the SQL semantic, we cannot merge + // Filters if one or more of them are nondeterministic. This means we can only push down the + // bottom-most Filter, or more following deterministic Filters if the bottom-most Filter is + // also deterministic. + if (filters.isEmpty) { + Some((fields.getOrElse(child.output), Nil, Nil, child)) + } else if (filters.head.deterministic) { + val filtersCanPushDown = filters.takeWhile(_.deterministic) + .flatMap(splitConjunctivePredicates) + val filtersStayUp = filters.dropWhile(_.deterministic) + Some((fields.getOrElse(child.output), filtersStayUp, filtersCanPushDown, child)) + } else { + val filtersCanPushDown = splitConjunctivePredicates(filters.head) + val filtersStayUp = filters.drop(1) + Some((fields.getOrElse(child.output), filtersStayUp, filtersCanPushDown, child)) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 98cb54ccbbc3..cf5f8d990f79 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -30,10 +30,10 @@ import org.apache.hadoop.fs.{LocalFileSystem, Path} import org.apache.spark.SparkException import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT} -import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterThan, Literal} import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt} import org.apache.spark.sql.catalyst.plans.logical.Filter -import org.apache.spark.sql.execution.SimpleMode +import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} @@ -1074,6 +1074,24 @@ class FileBasedDataSourceSuite extends QueryTest checkAnswer(df, Row("v1", "v2")) } } + + test("SPARK-41017: filter pushdown with nondeterministic predicates") { + withTempPath { path => + val pathStr = path.getCanonicalPath + spark.range(10).write.parquet(pathStr) + Seq("parquet", "").foreach { useV1SourceList => + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1SourceList) { + val scan = spark.read.parquet(pathStr) + val df = scan.where(rand() > 0.5 && $"id" > 5) + val filters = df.queryExecution.executedPlan.collect { + case f: FileSourceScanLike => f.dataFilters + case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters + }.flatten + assert(filters.contains(GreaterThan(scan.logicalPlan.output.head, Literal(5L)))) + } + } + } + } } object TestingUDT {