Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
case e @ Except(left, right, false) if isEligible(left, right) =>
val newCondition = transformCondition(left, skipProject(right))
newCondition.map { c =>
Distinct(Filter(Not(c), left))
// We need to consider as False when the condition is Null, otherwise we do not return
// those rows containing NULL which are instead filtered in the Except right plan
Distinct(Filter(Not(Coalesce(Seq(c, Literal.FalseLiteral))), left))
}.getOrElse {
e
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal, Not}
import org.apache.spark.sql.catalyst.expressions.{Alias, Coalesce, If, Literal, Not}
import org.apache.spark.sql.catalyst.expressions.aggregate.First
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.types.BooleanType

class ReplaceOperatorSuite extends PlanTest {

Expand Down Expand Up @@ -229,4 +230,17 @@ class ReplaceOperatorSuite extends PlanTest {

comparePlans(optimized, query)
}

test("SPARK-26366: ReplaceExceptWithFilter should handle properly NULL") {
val basePlan = LocalRelation(Seq('a.int, 'b.int))
val otherPlan = basePlan.where('a.in(1, 2) || 'b.in())
val except = Except(basePlan, otherPlan, false)
val result = OptimizeIn(Optimize.execute(except.analyze))
val correctAnswer = Aggregate(basePlan.output, basePlan.output,
Filter(!Coalesce(Seq(
'a.in(1, 2) || If('b.isNotNull, Literal.FalseLiteral, Literal(null, BooleanType)),
Literal.FalseLiteral)),
basePlan)).analyze
comparePlans(result, correctAnswer)
}
}
13 changes: 13 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1656,6 +1656,19 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
checkAnswer(df.groupBy(col("a")).agg(first(col("b"))),
Seq(Row("0", BigDecimal.valueOf(0.1111)), Row("1", BigDecimal.valueOf(1.1111))))
}

test("SPARK-26366: return nulls which are not filtered in except") {
val inputDF = sqlContext.createDataFrame(
sparkContext.parallelize(Seq(Row("0", "a"), Row("1", null))),
StructType(Seq(
StructField("a", StringType, nullable = true),
StructField("b", StringType, nullable = true))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seq("0" -> "a", "1" -> null).toDF("a", "b")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with your suggestion, the test passes always, even without the patch, because it adds extra projects for renaming the fields, so I cannot do this...


val exceptDF = inputDF.filter(
col("a").isin(Seq("0"): _*) or col("b").isin())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isin(Seq("0"): _*) =>isin("0")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this bug only reproducible with In?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, also with other comparisons (> for instance...). I am using > now, is that ok?


checkAnswer(inputDF.except(exceptDF), Seq(Row("1", null)))
}
}

case class TestDataUnion(x: Int, y: Int, z: Int)
Expand Down