From 1723a1acaa53c0de75b43bad86baffe73fa5336d Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 26 Feb 2016 00:56:29 +0900 Subject: [PATCH 1/7] Avoid illegal NULL propagation --- .../spark/sql/catalyst/analysis/Analyzer.scala | 15 ++++++++++++++- .../org/apache/spark/sql/DataFrameJoinSuite.scala | 11 +++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index b7884f9b60f31..a31b86f6cb4b8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -620,7 +620,7 @@ class Analyzer( case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString}") - q transformExpressionsUp { + val resolvedPlan = q transformExpressionsUp { case u @ UnresolvedAttribute(nameParts) => // Leave unchanged if resolution fails. Hopefully will be resolved next round. val result = @@ -630,6 +630,19 @@ class Analyzer( case UnresolvedExtractValue(child, fieldExpr) if child.resolved => ExtractValue(child, fieldExpr, resolver) } + + resolvedPlan.transform { + case f @ Filter(filterCondition, j @ Join(_, _, _, _)) => + val joinOutput = new ArrayBuffer[(Attribute, Attribute)] + j.output.map { + case a: AttributeReference => joinOutput += ((a, a)) + } + val joinOutputMap = AttributeMap(joinOutput) + val newFilterCond = filterCondition.transform { + case a: AttributeReference => joinOutputMap.get(a).getOrElse(a) + } + Filter(newFilterCond, j) + } } def newAliases(expressions: Seq[NamedExpression]): Seq[NamedExpression] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 031e66b57cbcb..d99921c48d050 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -204,4 +204,15 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { leftJoin2Inner, Row(1, 2, "1", 1, 3, "1") :: Nil) } + + test("filter outer join results using a non-nullable column from a right table") { + val df1 = Seq((0, 0), (1, 0), (2, 0), (3, 0), (4, 0)).toDF("id", "count") + val df2 = Seq(Tuple1(0), Tuple1(1)).toDF("id").groupBy("id").count + checkAnswer( + df1.join(df2, df1("id") === df2("id"), "left_outer").filter(df2("count").isNull), + Row(2, 0, null, null) :: + Row(3, 0, null, null) :: + Row(4, 0, null, null) :: Nil + ) + } } From bc3a3a7d9c0bf5aaaa535ed49d35a71f9a1ecb3d Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 26 Feb 2016 16:48:11 +0900 Subject: [PATCH 2/7] Add comments --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a31b86f6cb4b8..2fa6faed5b977 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -631,6 +631,10 @@ class Analyzer( ExtractValue(child, fieldExpr, resolver) } + // Replaces attribute references in a filter if it has a join as a child and it references + // some columns on the base relations of the join. This is because outer joins change + // nullability on columns and this could cause wrong NULL propagation in Optimizer. + // See SPARK-13484 for the concrete query of this case. resolvedPlan.transform { case f @ Filter(filterCondition, j @ Join(_, _, _, _)) => val joinOutput = new ArrayBuffer[(Attribute, Attribute)] From b56da9f41ed3c62ec17bd79df6bb27728936026e Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Sun, 28 Feb 2016 00:16:54 +0900 Subject: [PATCH 3/7] Add a new rule to solve illegal references --- .../sql/catalyst/analysis/Analyzer.scala | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2fa6faed5b977..13f75d5f461ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -106,6 +106,8 @@ class Analyzer( TimeWindowing :: HiveTypeCoercion.typeCoercionRules ++ extendedResolutionRules : _*), + Batch("Solve", Once, + SolveIllegalReferences), Batch("Nondeterministic", Once, PullOutNondeterministic), Batch("UDF", Once, @@ -1459,6 +1461,30 @@ class Analyzer( } } + /** + * Replaces attribute references in a filter if it has a join as a child and it references some + * columns on the base relations of the join. This is because outer joins change nullability on + * columns and this could cause wrong NULL propagation in Optimizer. + * See SPARK-13484 for the concrete query of this case. + */ + object SolveIllegalReferences extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case q: LogicalPlan => + q.transform { + case f @ Filter(filterCondition, j @ Join(_, _, _, _)) => + val joinOutput = new ArrayBuffer[(Attribute, Attribute)] + j.output.map { + case a: AttributeReference => joinOutput += ((a, a)) + } + val joinOutputMap = AttributeMap(joinOutput) + val newFilterCond = filterCondition.transform { + case a: AttributeReference => joinOutputMap.get(a).getOrElse(a) + } + Filter(newFilterCond, j) + } + } + } + /** * Extracts [[WindowExpression]]s from the projectList of a [[Project]] operator and * aggregateExpressions of an [[Aggregate]] operator and creates individual [[Window]] From 274c542be7d0019afe705d517d09cf3fe666389f Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Wed, 2 Mar 2016 15:19:59 +0900 Subject: [PATCH 4/7] Use foreach not map --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 13f75d5f461ae..446d6df6c1455 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1473,7 +1473,7 @@ class Analyzer( q.transform { case f @ Filter(filterCondition, j @ Join(_, _, _, _)) => val joinOutput = new ArrayBuffer[(Attribute, Attribute)] - j.output.map { + j.output.foreach { case a: AttributeReference => joinOutput += ((a, a)) } val joinOutputMap = AttributeMap(joinOutput) From 4a7121e92ecf6f8688125be572aa3d859a97deb4 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 15 Apr 2016 11:17:01 +0900 Subject: [PATCH 5/7] Solve illegal references in Projects --- .../sql/catalyst/analysis/Analyzer.scala | 35 ++++++++++--------- .../sql/catalyst/planning/patterns.scala | 17 +++++++++ 2 files changed, 35 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 446d6df6c1455..b94ff4c6e42e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.encoders.OuterScopes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification -import org.apache.spark.sql.catalyst.planning.IntegerIndex +import org.apache.spark.sql.catalyst.planning.{ExtractJoinOutputAttributes, IntegerIndex} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _} import org.apache.spark.sql.catalyst.rules._ @@ -1462,25 +1462,27 @@ class Analyzer( } /** - * Replaces attribute references in a filter if it has a join as a child and it references some - * columns on the base relations of the join. This is because outer joins change nullability on - * columns and this could cause wrong NULL propagation in Optimizer. - * See SPARK-13484 for the concrete query of this case. + * Corrects attribute references in an expression tree of some operators (e.g., filters and + * projects) if these operators have a join as a child and the references point to columns on the + * input relation of the join. This is because some joins change the nullability of input columns + * and this could cause illegal optimization (e.g., NULL propagation) and wrong answers. + * See SPARK-13484 and SPARK-13801 for the concrete queries of this case. */ object SolveIllegalReferences extends Rule[LogicalPlan] { + + private def replaceReferences(e: Expression, attrMap: AttributeMap[Attribute]) = e.transform { + case a: AttributeReference => attrMap.get(a).getOrElse(a) + } + def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case q: LogicalPlan => + case q: LogicalPlan => q.transform { - case f @ Filter(filterCondition, j @ Join(_, _, _, _)) => - val joinOutput = new ArrayBuffer[(Attribute, Attribute)] - j.output.foreach { - case a: AttributeReference => joinOutput += ((a, a)) - } - val joinOutputMap = AttributeMap(joinOutput) - val newFilterCond = filterCondition.transform { - case a: AttributeReference => joinOutputMap.get(a).getOrElse(a) - } - Filter(newFilterCond, j) + case f @ Filter(filterCondition, ExtractJoinOutputAttributes(join, joinOutputMap)) => + f.copy(condition = replaceReferences(filterCondition, joinOutputMap)) + case p @ Project(projectList, ExtractJoinOutputAttributes(join, joinOutputMap)) => + p.copy(projectList = projectList.map { e => + replaceReferences(e, joinOutputMap).asInstanceOf[NamedExpression] + }) } } } @@ -2165,4 +2167,3 @@ object TimeWindowing extends Rule[LogicalPlan] { } } } - diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 00656191354f2..d617c41408b88 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -181,6 +181,23 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper { } } +/** + * An extractor for join output attributes directly under a given operator. + */ +object ExtractJoinOutputAttributes { + + def unapply(plan: LogicalPlan): Option[(Join, AttributeMap[Attribute])] = { + plan.collectFirst { + case j: Join => j + }.map { join => + val joinOutput = new mutable.ArrayBuffer[(Attribute, Attribute)] + join.output.foreach { + case a: AttributeReference => joinOutput += ((a, a)) + } + (join, AttributeMap(joinOutput)) + } + } +} /** * A pattern that collects all adjacent unions and returns their children as a Seq. From a4903b646a63f9e9f7ce054138f88029018404e2 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 15 Apr 2016 13:28:38 +0900 Subject: [PATCH 6/7] Add tests in DataFrameJoinSuite --- .../sql/catalyst/analysis/Analyzer.scala | 19 +------------------ .../apache/spark/sql/DataFrameJoinSuite.scala | 12 +++++++++++- 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index b94ff4c6e42e6..347bd9619cf98 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -622,7 +622,7 @@ class Analyzer( case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString}") - val resolvedPlan = q transformExpressionsUp { + q transformExpressionsUp { case u @ UnresolvedAttribute(nameParts) => // Leave unchanged if resolution fails. Hopefully will be resolved next round. val result = @@ -632,23 +632,6 @@ class Analyzer( case UnresolvedExtractValue(child, fieldExpr) if child.resolved => ExtractValue(child, fieldExpr, resolver) } - - // Replaces attribute references in a filter if it has a join as a child and it references - // some columns on the base relations of the join. This is because outer joins change - // nullability on columns and this could cause wrong NULL propagation in Optimizer. - // See SPARK-13484 for the concrete query of this case. - resolvedPlan.transform { - case f @ Filter(filterCondition, j @ Join(_, _, _, _)) => - val joinOutput = new ArrayBuffer[(Attribute, Attribute)] - j.output.map { - case a: AttributeReference => joinOutput += ((a, a)) - } - val joinOutputMap = AttributeMap(joinOutput) - val newFilterCond = filterCondition.transform { - case a: AttributeReference => joinOutputMap.get(a).getOrElse(a) - } - Filter(newFilterCond, j) - } } def newAliases(expressions: Seq[NamedExpression]): Seq[NamedExpression] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index d99921c48d050..4342c039aefc8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -205,7 +205,8 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { Row(1, 2, "1", 1, 3, "1") :: Nil) } - test("filter outer join results using a non-nullable column from a right table") { + test("process outer join results using the non-nullable columns in the join input") { + // Filter data using a non-nullable column from a right table val df1 = Seq((0, 0), (1, 0), (2, 0), (3, 0), (4, 0)).toDF("id", "count") val df2 = Seq(Tuple1(0), Tuple1(1)).toDF("id").groupBy("id").count checkAnswer( @@ -214,5 +215,14 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { Row(3, 0, null, null) :: Row(4, 0, null, null) :: Nil ) + + // Coalesce data using non-nullable columns in input tables + val df3 = Seq((1, 1)).toDF("a", "b") + val df4 = Seq((2, 2)).toDF("a", "b") + checkAnswer( + df3.join(df4, df3("a") === df4("a"), "outer") + .select(coalesce(df3("a"), df3("b")), coalesce(df4("a"), df4("b"))), + Row(1, null) :: Row(null, 2) :: Nil + ) } } From bd13652961de5dd7ae0968b3b8c60bb0a191b87f Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 15 Apr 2016 17:44:34 +0900 Subject: [PATCH 7/7] Fix test codes in ResolveNaturalJoinSuite --- .../spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala index 1423a8705af27..748579df41580 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala @@ -100,7 +100,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest { val naturalPlan = r3.join(r4, NaturalJoin(FullOuter), None) val usingPlan = r3.join(r4, UsingJoin(FullOuter, Seq(UnresolvedAttribute("b"))), None) val expected = r3.join(r4, FullOuter, Some(EqualTo(bNotNull, bNotNull))).select( - Alias(Coalesce(Seq(bNotNull, bNotNull)), "b")(), a, c) + Alias(Coalesce(Seq(b, b)), "b")(), a, c) checkAnalysis(naturalPlan, expected) checkAnalysis(usingPlan, expected) }