Skip to content

Commit 5ad644a

Browse files
wangyumgatorsmile
authored andcommitted
[SPARK-25368][SQL] Incorrect predicate pushdown returns wrong result
How to reproduce: ```scala val df1 = spark.createDataFrame(Seq( (1, 1) )).toDF("a", "b").withColumn("c", lit(null).cast("int")) val df2 = df1.union(df1).withColumn("d", spark_partition_id).filter($"c".isNotNull) df2.show +---+---+----+---+ | a| b| c| d| +---+---+----+---+ | 1| 1|null| 0| | 1| 1|null| 1| +---+---+----+---+ ``` `filter($"c".isNotNull)` was transformed to `(null <=> c#10)` before #19201, but it is transformed to `(c#10 = null)` since #20155. This pr revert it to `(null <=> c#10)` to fix this issue. unit tests Closes #22368 from wangyum/SPARK-25368. Authored-by: Yuming Wang <[email protected]> Signed-off-by: gatorsmile <[email protected]> (cherry picked from commit 77c9964) Signed-off-by: gatorsmile <[email protected]>
1 parent 5b8b6b4 commit 5ad644a

File tree

3 files changed

+20
-2
lines changed

3 files changed

+20
-2
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ abstract class UnaryNode extends LogicalPlan {
248248
var allConstraints = child.constraints.asInstanceOf[Set[Expression]]
249249
projectList.foreach {
250250
case a @ Alias(l: Literal, _) =>
251-
allConstraints += EqualTo(a.toAttribute, l)
251+
allConstraints += EqualNullSafe(a.toAttribute, l)
252252
case a @ Alias(e, _) =>
253253
// For every alias in `projectList`, replace the reference in constraints by its attribute.
254254
allConstraints ++= allConstraints.map(_ transform {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
182182

183183
test("constraints should be inferred from aliased literals") {
184184
val originalLeft = testRelation.subquery('left).as("left")
185-
val optimizedLeft = testRelation.subquery('left).where(IsNotNull('a) && 'a === 2).as("left")
185+
val optimizedLeft = testRelation.subquery('left).where(IsNotNull('a) && 'a <=> 2).as("left")
186186

187187
val right = Project(Seq(Literal(2).as("two")), testRelation.subquery('right)).as("right")
188188
val condition = Some("left.a".attr === "right.two".attr)

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2306,4 +2306,22 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
23062306
val df2 = spark.range(3).selectExpr("id")
23072307
assert(df1.join(df2, Seq("id"), "left_outer").where(df2("id").isNull).collect().length == 1)
23082308
}
2309+
2310+
test("SPARK-25368 Incorrect predicate pushdown returns wrong result") {
2311+
def check(newCol: Column, filter: Column, result: Seq[Row]): Unit = {
2312+
val df1 = spark.createDataFrame(Seq(
2313+
(1, 1)
2314+
)).toDF("a", "b").withColumn("c", newCol)
2315+
2316+
val df2 = df1.union(df1).withColumn("d", spark_partition_id).filter(filter)
2317+
checkAnswer(df2, result)
2318+
}
2319+
2320+
check(lit(null).cast("int"), $"c".isNull, Seq(Row(1, 1, null, 0), Row(1, 1, null, 1)))
2321+
check(lit(null).cast("int"), $"c".isNotNull, Seq())
2322+
check(lit(2).cast("int"), $"c".isNull, Seq())
2323+
check(lit(2).cast("int"), $"c".isNotNull, Seq(Row(1, 1, 2, 0), Row(1, 1, 2, 1)))
2324+
check(lit(2).cast("int"), $"c" === 2, Seq(Row(1, 1, 2, 0), Row(1, 1, 2, 1)))
2325+
check(lit(2).cast("int"), $"c" =!= 2, Seq())
2326+
}
23092327
}

0 commit comments

Comments
 (0)