Skip to content

Commit 08efa3c

Browse files
committed
not push down null
1 parent 770a4fd commit 08efa3c

File tree

2 files changed

+40
-28
lines changed

2 files changed

+40
-28
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -748,7 +748,13 @@ class ParquetFilters(
748748
makeEq.lift(fieldType).map(_(fieldNames, v))
749749
}.reduceLeftOption(FilterApi.or)
750750
} else if (canPartialPushDownConjuncts) {
751-
makeInPredicate.lift(fieldType).map(_(fieldNames, values))
751+
if (values.contains(null)) {
752+
Seq(makeEq.lift(fieldType).map(_(fieldNames, null)),
753+
makeInPredicate.lift(fieldType).map(_(fieldNames, values.filter(_ != null)))
754+
).flatten.reduceLeftOption(FilterApi.or)
755+
} else {
756+
makeInPredicate.lift(fieldType).map(_(fieldNames, values))
757+
}
752758
} else {
753759
None
754760
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2015,33 +2015,39 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
20152015
test("SPARK-38825: in and notIn filters") {
20162016
import testImplicits._
20172017
withTempPath { file =>
2018-
Seq(1, 2, 0, -1, 99, Integer.MAX_VALUE, 1000, 3, 7, Integer.MIN_VALUE, 2)
2019-
.toDF("id").coalesce(1).write.mode("overwrite")
2020-
.parquet(file.getCanonicalPath)
2021-
var df = spark.read.parquet(file.getCanonicalPath)
2022-
var in = df.filter(col("id").isin(100, 3, 11, 12, 13, Integer.MAX_VALUE, Integer.MIN_VALUE))
2023-
var notIn =
2024-
df.filter(!col("id").isin(100, 3, 11, 12, 13, Integer.MAX_VALUE, Integer.MIN_VALUE))
2025-
checkAnswer(in, Seq(Row(3), Row(-2147483648), Row(2147483647)))
2026-
checkAnswer(notIn, Seq(Row(1), Row(2), Row(0), Row(-1), Row(99), Row(1000), Row(7), Row(2)))
2027-
2028-
Seq("mary", "martin", "lucy", "alex", null, "mary", "dan").toDF("name").coalesce(1)
2029-
.write.mode("overwrite").parquet(file.getCanonicalPath)
2030-
df = spark.read.parquet(file.getCanonicalPath)
2031-
in = df.filter(col("name").isin("mary", "victor", "leo", "alex"))
2032-
notIn = df.filter(!col("name").isin("mary", "victor", "leo", "alex"))
2033-
checkAnswer(in, Seq(Row("mary"), Row("alex"), Row("mary")))
2034-
checkAnswer(notIn, Seq(Row("martin"), Row("lucy"), Row("dan")))
2035-
2036-
in = df.filter(col("name").isin("mary", "victor", "leo", "alex", null))
2037-
notIn = df.filter(!col("name").isin("mary", "victor", "leo", "alex", null))
2038-
checkAnswer(in, Seq(Row("mary"), Row("alex"), Row("mary")))
2039-
checkAnswer(notIn, Seq())
2040-
2041-
in = df.filter(col("name").isin(null))
2042-
notIn = df.filter(!col("name").isin(null))
2043-
checkAnswer(in, Seq())
2044-
checkAnswer(notIn, Seq())
2018+
Seq(3, 20).foreach { threshold =>
2019+
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD.key -> s"$threshold") {
2020+
Seq(1, 2, 0, -1, 99, Integer.MAX_VALUE, 1000, 3, 7, Integer.MIN_VALUE, 2)
2021+
.toDF("id").coalesce(1).write.mode("overwrite")
2022+
.parquet(file.getCanonicalPath)
2023+
var df = spark.read.parquet(file.getCanonicalPath)
2024+
var in = df.filter(col("id")
2025+
.isin(100, 3, 11, 12, 13, Integer.MAX_VALUE, Integer.MIN_VALUE))
2026+
var notIn = df.filter(!col("id")
2027+
.isin(100, 3, 11, 12, 13, Integer.MAX_VALUE, Integer.MIN_VALUE))
2028+
checkAnswer(in, Seq(Row(3), Row(-2147483648), Row(2147483647)))
2029+
checkAnswer(notIn,
2030+
Seq(Row(1), Row(2), Row(0), Row(-1), Row(99), Row(1000), Row(7), Row(2)))
2031+
2032+
Seq("mary", "martin", "lucy", "alex", null, "mary", "dan").toDF("name").coalesce(1)
2033+
.write.mode("overwrite").parquet(file.getCanonicalPath)
2034+
df = spark.read.parquet(file.getCanonicalPath)
2035+
in = df.filter(col("name").isin("mary", "victor", "leo", "alex"))
2036+
notIn = df.filter(!col("name").isin("mary", "victor", "leo", "alex"))
2037+
checkAnswer(in, Seq(Row("mary"), Row("alex"), Row("mary")))
2038+
checkAnswer(notIn, Seq(Row("martin"), Row("lucy"), Row("dan")))
2039+
2040+
in = df.filter(col("name").isin("mary", "victor", "leo", "alex", null))
2041+
notIn = df.filter(!col("name").isin("mary", "victor", "leo", "alex", null))
2042+
checkAnswer(in, Seq(Row("mary"), Row("alex"), Row("mary")))
2043+
checkAnswer(notIn, Seq())
2044+
2045+
in = df.filter(col("name").isin(null))
2046+
notIn = df.filter(!col("name").isin(null))
2047+
checkAnswer(in, Seq())
2048+
checkAnswer(notIn, Seq())
2049+
}
2050+
}
20452051
}
20462052
}
20472053
}

0 commit comments

Comments
 (0)