From 2af656a6b8ddae00555b04ecdbc7873adc6fc0b6 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 16 Nov 2018 13:27:35 +0100 Subject: [PATCH 01/10] [SPARK-26078][SQL] Dedup self-join attributes on subqueries --- .../sql/catalyst/optimizer/subquery.scala | 36 ++++++++++++++----- .../org/apache/spark/sql/SubquerySuite.scala | 31 ++++++++++++++++ 2 files changed, 59 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index e9b7a8b76e68..c11470a22908 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -54,10 +54,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { val aliasMap = AttributeMap(duplicates.map { dup => dup -> Alias(dup, dup.toString)() }.toSeq) - val aliasedExpressions = right.output.map { ref => - aliasMap.getOrElse(ref, ref) - } - val newRight = Project(aliasedExpressions, right) + val newRight = rewriteDedupPlan(right, aliasMap) val newJoinCond = joinCond.map { condExpr => condExpr transform { case a: Attribute => aliasMap.getOrElse(a, a).toAttribute @@ -70,6 +67,27 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { case _ => joinPlan } + private def rewriteDedupPlan(plan: LogicalPlan, rewrites: AttributeMap[Alias]): LogicalPlan = { + val aliasedExpressions = plan.output.map { ref => + rewrites.getOrElse(ref, ref) + } + Project(aliasedExpressions, plan) + } + + private def dedupSubqueryOnSelfJoin(values: Seq[Expression], sub: LogicalPlan): LogicalPlan = { + val leftRefs = AttributeSet.fromAttributeSets(values.map(_.references)) + val rightRefs = AttributeSet(sub.output) + val duplicates = leftRefs.intersect(rightRefs) + if (duplicates.isEmpty) { + sub + } else { + val aliasMap = AttributeMap(duplicates.map { dup => + dup -> Alias(dup, dup.toString)() + }.toSeq) + rewriteDedupPlan(sub, aliasMap) + } + } + def apply(plan: LogicalPlan): LogicalPlan = plan transform { case Filter(condition, child) => val (withSubquery, withoutSubquery) = @@ -92,10 +110,11 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { // Deduplicate conflicting attributes if any. dedupJoin(Join(outerPlan, sub, LeftAnti, joinCond)) case (p, InSubquery(values, ListQuery(sub, conditions, _, _))) => - val inConditions = values.zip(sub.output).map(EqualTo.tupled) + val newSub = dedupSubqueryOnSelfJoin(values, sub) + val inConditions = values.zip(newSub.output).map(EqualTo.tupled) val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions ++ conditions, p) // Deduplicate conflicting attributes if any. - dedupJoin(Join(outerPlan, sub, LeftSemi, joinCond)) + dedupJoin(Join(outerPlan, newSub, LeftSemi, joinCond)) case (p, Not(InSubquery(values, ListQuery(sub, conditions, _, _)))) => // This is a NULL-aware (left) anti join (NAAJ) e.g. col NOT IN expr // Construct the condition. A NULL in one of the conditions is regarded as a positive @@ -103,7 +122,8 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { // Note that will almost certainly be planned as a Broadcast Nested Loop join. // Use EXISTS if performance matters to you. - val inConditions = values.zip(sub.output).map(EqualTo.tupled) + val newSub = dedupSubqueryOnSelfJoin(values, sub) + val inConditions = values.zip(newSub.output).map(EqualTo.tupled) val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions, p) // Expand the NOT IN expression with the NULL-aware semantic // to its full form. That is from: @@ -119,7 +139,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { // (A.A1 = B.B1 OR ISNULL(A.A1 = B.B1)) AND (B.B2 = A.A2) AND B.B3 > 1 val finalJoinCond = (nullAwareJoinConds ++ conditions).reduceLeft(And) // Deduplicate conflicting attributes if any. - dedupJoin(Join(outerPlan, sub, LeftAnti, Option(finalJoinCond))) + dedupJoin(Join(outerPlan, newSub, LeftAnti, Option(finalJoinCond))) case (p, predicate) => val (newCond, inputPlan) = rewriteExistentialExpr(Seq(predicate), p) Project(p.output, Filter(newCond.get, inputPlan)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 5088821ad736..7cbebfb252ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Sort} +import org.apache.spark.sql.types._ import org.apache.spark.sql.test.SharedSQLContext class SubquerySuite extends QueryTest with SharedSQLContext { @@ -1280,4 +1281,34 @@ class SubquerySuite extends QueryTest with SharedSQLContext { assert(subqueries.length == 1) } } + + test("SPARK-26078: deduplicate fake self joins for IN subqueries") { + withTempView("a", "b") { + val a = spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row("a", 2), Row("b", 1))), + StructType(Seq(StructField("id", StringType), StructField("num", IntegerType)))) + val b = spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row("a", 2), Row("b", 1))), + StructType(Seq(StructField("id", StringType), StructField("num", IntegerType)))) + a.createOrReplaceTempView("a") + b.createOrReplaceTempView("b") + + val df1 = spark.sql( + """ + |SELECT id,num,source FROM ( + | SELECT id, num, 'a' as source FROM a + | UNION ALL + | SELECT id, num, 'b' as source FROM b + |) AS c WHERE c.id IN (SELECT id FROM b WHERE num = 2) + """.stripMargin) + checkAnswer(df1, Seq(Row("a", 2, "a"), Row("a", 2, "b"))) + val df2 = spark.sql( + """ + |SELECT id,num,source FROM ( + | SELECT id, num, 'a' as source FROM a + | UNION ALL + | SELECT id, num, 'b' as source FROM b + |) AS c WHERE c.id NOT IN (SELECT id FROM b WHERE num = 2) + """.stripMargin) + checkAnswer(df2, Seq(Row("b", 1, "a"), Row("b", 1, "b"))) + } + } } From a71b1c6abd566e52063b3fb0343db5178ac67c8f Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 16 Nov 2018 13:44:51 +0100 Subject: [PATCH 02/10] fix scalastyle --- .../src/test/scala/org/apache/spark/sql/SubquerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 7cbebfb252ee..5a525a6d4ab2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -21,8 +21,8 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Sort} -import org.apache.spark.sql.types._ import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types._ class SubquerySuite extends QueryTest with SharedSQLContext { import testImplicits._ From 86106fadcaed6c1a4768138b3d72e8c892b7cd7f Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Sat, 17 Nov 2018 15:36:45 +0100 Subject: [PATCH 03/10] address comments --- .../spark/sql/catalyst/optimizer/subquery.scala | 3 +-- .../scala/org/apache/spark/sql/SubquerySuite.scala | 14 ++++++++------ 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index c11470a22908..ea658ad461f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -76,8 +76,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { private def dedupSubqueryOnSelfJoin(values: Seq[Expression], sub: LogicalPlan): LogicalPlan = { val leftRefs = AttributeSet.fromAttributeSets(values.map(_.references)) - val rightRefs = AttributeSet(sub.output) - val duplicates = leftRefs.intersect(rightRefs) + val duplicates = leftRefs.intersect(sub.outputSet) if (duplicates.isEmpty) { sub } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 5a525a6d4ab2..007a063d071c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -1284,12 +1284,14 @@ class SubquerySuite extends QueryTest with SharedSQLContext { test("SPARK-26078: deduplicate fake self joins for IN subqueries") { withTempView("a", "b") { - val a = spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row("a", 2), Row("b", 1))), - StructType(Seq(StructField("id", StringType), StructField("num", IntegerType)))) - val b = spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row("a", 2), Row("b", 1))), - StructType(Seq(StructField("id", StringType), StructField("num", IntegerType)))) - a.createOrReplaceTempView("a") - b.createOrReplaceTempView("b") + def genTestViewWithName(name: String): Unit = { + val df = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row("a", 2), Row("b", 1))), + StructType(Seq(StructField("id", StringType), StructField("num", IntegerType)))) + df.createOrReplaceTempView(name) + } + genTestViewWithName("a") + genTestViewWithName("b") val df1 = spark.sql( """ From 3d010fd911a9b07d1a9d13c79d52be186d023556 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 19 Nov 2018 10:33:47 +0100 Subject: [PATCH 04/10] dedup also rewriteExistentialExpr --- .../apache/spark/sql/catalyst/optimizer/subquery.scala | 5 +++-- .../scala/org/apache/spark/sql/SubquerySuite.scala | 10 ++++++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index ea658ad461f7..bc7c658a72f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -165,10 +165,11 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { exists case InSubquery(values, ListQuery(sub, conditions, _, _)) => val exists = AttributeReference("exists", BooleanType, nullable = false)() - val inConditions = values.zip(sub.output).map(EqualTo.tupled) + val newSub = dedupSubqueryOnSelfJoin(values, sub) + val inConditions = values.zip(newSub.output).map(EqualTo.tupled) val newConditions = (inConditions ++ conditions).reduceLeftOption(And) // Deduplicate conflicting attributes if any. - newPlan = dedupJoin(Join(newPlan, sub, ExistenceJoin(exists), newConditions)) + newPlan = dedupJoin(Join(newPlan, newSub, ExistenceJoin(exists), newConditions)) exists } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 007a063d071c..393c279311ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -1311,6 +1311,16 @@ class SubquerySuite extends QueryTest with SharedSQLContext { |) AS c WHERE c.id NOT IN (SELECT id FROM b WHERE num = 2) """.stripMargin) checkAnswer(df2, Seq(Row("b", 1, "a"), Row("b", 1, "b"))) + val df3 = spark.sql( + """ + |SELECT id,num,source FROM ( + | SELECT id, num, 'a' as source FROM a + | UNION ALL + | SELECT id, num, 'b' as source FROM b + |) AS c WHERE c.id IN (SELECT id FROM b WHERE num = 2) OR + |c.id IN (SELECT id FROM b WHERE num = 3) + """.stripMargin) + checkAnswer(df3, Seq(Row("a", 2, "a"), Row("a", 2, "b"))) } } } From 65fca4f8b5ae347fc3f1baf3069b92aaab1b00db Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 21 Nov 2018 10:09:37 +0100 Subject: [PATCH 05/10] add comment --- .../org/apache/spark/sql/catalyst/optimizer/subquery.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index bc7c658a72f7..11fd5e40b246 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -75,6 +75,11 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { } private def dedupSubqueryOnSelfJoin(values: Seq[Expression], sub: LogicalPlan): LogicalPlan = { + // SPARK-26078: it may happen that the subquery has conflicting attributes with the outer + // values. In this case, the resulting join would contain trivially true conditions (eg. + // id#3 = id#3) which cannot be de-duplicated after. In this method, if there are conflicting + // attributes in the join condition, the subquery's conflicting attributes are changed using + // a projection which aliases them and resolves the problem. val leftRefs = AttributeSet.fromAttributeSets(values.map(_.references)) val duplicates = leftRefs.intersect(sub.outputSet) if (duplicates.isEmpty) { From 1beb40ce0aa21f20d46ce34ce227b0e444ace80b Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Tue, 11 Dec 2018 11:15:47 +0100 Subject: [PATCH 06/10] address comments --- .../spark/sql/catalyst/optimizer/subquery.scala | 13 +++++++------ .../scala/org/apache/spark/sql/SubquerySuite.scala | 10 ++-------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index 11fd5e40b246..1eeed95b3b56 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -114,11 +114,11 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { // Deduplicate conflicting attributes if any. dedupJoin(Join(outerPlan, sub, LeftAnti, joinCond)) case (p, InSubquery(values, ListQuery(sub, conditions, _, _))) => + // Deduplicate conflicting attributes if any. val newSub = dedupSubqueryOnSelfJoin(values, sub) val inConditions = values.zip(newSub.output).map(EqualTo.tupled) val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions ++ conditions, p) - // Deduplicate conflicting attributes if any. - dedupJoin(Join(outerPlan, newSub, LeftSemi, joinCond)) + Join(outerPlan, newSub, LeftSemi, joinCond) case (p, Not(InSubquery(values, ListQuery(sub, conditions, _, _)))) => // This is a NULL-aware (left) anti join (NAAJ) e.g. col NOT IN expr // Construct the condition. A NULL in one of the conditions is regarded as a positive @@ -126,6 +126,8 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { // Note that will almost certainly be planned as a Broadcast Nested Loop join. // Use EXISTS if performance matters to you. + + // Deduplicate conflicting attributes if any. val newSub = dedupSubqueryOnSelfJoin(values, sub) val inConditions = values.zip(newSub.output).map(EqualTo.tupled) val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions, p) @@ -142,8 +144,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { // will have the final conditions in the LEFT ANTI as // (A.A1 = B.B1 OR ISNULL(A.A1 = B.B1)) AND (B.B2 = A.A2) AND B.B3 > 1 val finalJoinCond = (nullAwareJoinConds ++ conditions).reduceLeft(And) - // Deduplicate conflicting attributes if any. - dedupJoin(Join(outerPlan, newSub, LeftAnti, Option(finalJoinCond))) + Join(outerPlan, newSub, LeftAnti, Option(finalJoinCond)) case (p, predicate) => val (newCond, inputPlan) = rewriteExistentialExpr(Seq(predicate), p) Project(p.output, Filter(newCond.get, inputPlan)) @@ -170,11 +171,11 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { exists case InSubquery(values, ListQuery(sub, conditions, _, _)) => val exists = AttributeReference("exists", BooleanType, nullable = false)() + // Deduplicate conflicting attributes if any. val newSub = dedupSubqueryOnSelfJoin(values, sub) val inConditions = values.zip(newSub.output).map(EqualTo.tupled) val newConditions = (inConditions ++ conditions).reduceLeftOption(And) - // Deduplicate conflicting attributes if any. - newPlan = dedupJoin(Join(newPlan, newSub, ExistenceJoin(exists), newConditions)) + newPlan = Join(newPlan, newSub, ExistenceJoin(exists), newConditions) exists } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 393c279311ef..c95c52f1d3a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -1284,14 +1284,8 @@ class SubquerySuite extends QueryTest with SharedSQLContext { test("SPARK-26078: deduplicate fake self joins for IN subqueries") { withTempView("a", "b") { - def genTestViewWithName(name: String): Unit = { - val df = spark.createDataFrame( - spark.sparkContext.parallelize(Seq(Row("a", 2), Row("b", 1))), - StructType(Seq(StructField("id", StringType), StructField("num", IntegerType)))) - df.createOrReplaceTempView(name) - } - genTestViewWithName("a") - genTestViewWithName("b") + Seq("a" -> 2, "b" -> 1).toDF("id", "num").createTempView("a") + Seq("a" -> 2, "b" -> 1).toDF("id", "num").createTempView("b") val df1 = spark.sql( """ From 0312558ce201a9866c5e79df5dfd50b21d2dc6c5 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 12 Dec 2018 10:32:07 +0100 Subject: [PATCH 07/10] fix failures --- .../spark/sql/catalyst/optimizer/subquery.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index 1eeed95b3b56..cbb7d2b77ccf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -74,13 +74,16 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { Project(aliasedExpressions, plan) } - private def dedupSubqueryOnSelfJoin(values: Seq[Expression], sub: LogicalPlan): LogicalPlan = { + private def dedupSubqueryOnSelfJoin( + values: Seq[Expression], + outerPlan: LogicalPlan, + sub: LogicalPlan): LogicalPlan = { // SPARK-26078: it may happen that the subquery has conflicting attributes with the outer // values. In this case, the resulting join would contain trivially true conditions (eg. // id#3 = id#3) which cannot be de-duplicated after. In this method, if there are conflicting // attributes in the join condition, the subquery's conflicting attributes are changed using // a projection which aliases them and resolves the problem. - val leftRefs = AttributeSet.fromAttributeSets(values.map(_.references)) + val leftRefs = AttributeSet.fromAttributeSets(values.map(_.references)) ++ outerPlan.outputSet val duplicates = leftRefs.intersect(sub.outputSet) if (duplicates.isEmpty) { sub @@ -115,7 +118,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { dedupJoin(Join(outerPlan, sub, LeftAnti, joinCond)) case (p, InSubquery(values, ListQuery(sub, conditions, _, _))) => // Deduplicate conflicting attributes if any. - val newSub = dedupSubqueryOnSelfJoin(values, sub) + val newSub = dedupSubqueryOnSelfJoin(values, p, sub) val inConditions = values.zip(newSub.output).map(EqualTo.tupled) val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions ++ conditions, p) Join(outerPlan, newSub, LeftSemi, joinCond) @@ -128,7 +131,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { // Use EXISTS if performance matters to you. // Deduplicate conflicting attributes if any. - val newSub = dedupSubqueryOnSelfJoin(values, sub) + val newSub = dedupSubqueryOnSelfJoin(values, p, sub) val inConditions = values.zip(newSub.output).map(EqualTo.tupled) val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions, p) // Expand the NOT IN expression with the NULL-aware semantic @@ -172,7 +175,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { case InSubquery(values, ListQuery(sub, conditions, _, _)) => val exists = AttributeReference("exists", BooleanType, nullable = false)() // Deduplicate conflicting attributes if any. - val newSub = dedupSubqueryOnSelfJoin(values, sub) + val newSub = dedupSubqueryOnSelfJoin(values, newPlan, sub) val inConditions = values.zip(newSub.output).map(EqualTo.tupled) val newConditions = (inConditions ++ conditions).reduceLeftOption(And) newPlan = Join(newPlan, newSub, ExistenceJoin(exists), newConditions) From 652858213e941f865b61c3d0c542d0db66ba4a57 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 13 Dec 2018 10:48:13 +0100 Subject: [PATCH 08/10] address comment --- .../sql/catalyst/optimizer/subquery.scala | 81 ++++++++----------- 1 file changed, 33 insertions(+), 48 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index cbb7d2b77ccf..0b97e629f7ad 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -43,55 +43,43 @@ import org.apache.spark.sql.types._ * condition. */ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { - private def dedupJoin(joinPlan: LogicalPlan): LogicalPlan = joinPlan match { - // SPARK-21835: It is possibly that the two sides of the join have conflicting attributes, - // the produced join then becomes unresolved and break structural integrity. We should - // de-duplicate conflicting attributes. We don't use transformation here because we only - // care about the most top join converted from correlated predicate subquery. - case j @ Join(left, right, joinType @ (LeftSemi | LeftAnti | ExistenceJoin(_)), joinCond) => - val duplicates = right.outputSet.intersect(left.outputSet) - if (duplicates.nonEmpty) { - val aliasMap = AttributeMap(duplicates.map { dup => - dup -> Alias(dup, dup.toString)() - }.toSeq) - val newRight = rewriteDedupPlan(right, aliasMap) - val newJoinCond = joinCond.map { condExpr => - condExpr transform { - case a: Attribute => aliasMap.getOrElse(a, a).toAttribute - } - } - Join(left, newRight, joinType, newJoinCond) - } else { - j - } - case _ => joinPlan - } - private def rewriteDedupPlan(plan: LogicalPlan, rewrites: AttributeMap[Alias]): LogicalPlan = { - val aliasedExpressions = plan.output.map { ref => - rewrites.getOrElse(ref, ref) - } - Project(aliasedExpressions, plan) + private def buildJoin( + outerPlan: LogicalPlan, + subplan: LogicalPlan, + joinType: JoinType, + condition: Option[Expression]): Join = { + // Deduplicate conflicting attributes if any. + val dedupSubplan = dedupSubqueryOnSelfJoin(outerPlan, subplan) + Join(outerPlan, dedupSubplan, joinType, condition) } private def dedupSubqueryOnSelfJoin( - values: Seq[Expression], outerPlan: LogicalPlan, - sub: LogicalPlan): LogicalPlan = { - // SPARK-26078: it may happen that the subquery has conflicting attributes with the outer + subplan: LogicalPlan, + valuesOpt: Option[Seq[Expression]] = None): LogicalPlan = { + // SPARK-21835: It is possibly that the two sides of the join have conflicting attributes, + // the produced join then becomes unresolved and break structural integrity. We should + // de-duplicate conflicting attributes. + // SPARK-26078: it may also happen that the subquery has conflicting attributes with the outer // values. In this case, the resulting join would contain trivially true conditions (eg. // id#3 = id#3) which cannot be de-duplicated after. In this method, if there are conflicting // attributes in the join condition, the subquery's conflicting attributes are changed using // a projection which aliases them and resolves the problem. - val leftRefs = AttributeSet.fromAttributeSets(values.map(_.references)) ++ outerPlan.outputSet - val duplicates = leftRefs.intersect(sub.outputSet) - if (duplicates.isEmpty) { - sub - } else { - val aliasMap = AttributeMap(duplicates.map { dup => + val outerReferences = valuesOpt.map(values => + AttributeSet.fromAttributeSets(values.map(_.references))).getOrElse(AttributeSet.empty) + val outerRefs = outerPlan.outputSet ++ outerReferences + val duplicates = outerRefs.intersect(subplan.outputSet) + if (duplicates.nonEmpty) { + val rewrites = AttributeMap(duplicates.map { dup => dup -> Alias(dup, dup.toString)() }.toSeq) - rewriteDedupPlan(sub, aliasMap) + val aliasedExpressions = subplan.output.map { ref => + rewrites.getOrElse(ref, ref) + } + Project(aliasedExpressions, subplan) + } else { + subplan } } @@ -110,15 +98,13 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { withSubquery.foldLeft(newFilter) { case (p, Exists(sub, conditions, _)) => val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p) - // Deduplicate conflicting attributes if any. - dedupJoin(Join(outerPlan, sub, LeftSemi, joinCond)) + buildJoin(outerPlan, sub, LeftSemi, joinCond) case (p, Not(Exists(sub, conditions, _))) => val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p) - // Deduplicate conflicting attributes if any. - dedupJoin(Join(outerPlan, sub, LeftAnti, joinCond)) + buildJoin(outerPlan, sub, LeftAnti, joinCond) case (p, InSubquery(values, ListQuery(sub, conditions, _, _))) => // Deduplicate conflicting attributes if any. - val newSub = dedupSubqueryOnSelfJoin(values, p, sub) + val newSub = dedupSubqueryOnSelfJoin(p, sub, Some(values)) val inConditions = values.zip(newSub.output).map(EqualTo.tupled) val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions ++ conditions, p) Join(outerPlan, newSub, LeftSemi, joinCond) @@ -131,7 +117,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { // Use EXISTS if performance matters to you. // Deduplicate conflicting attributes if any. - val newSub = dedupSubqueryOnSelfJoin(values, p, sub) + val newSub = dedupSubqueryOnSelfJoin(p, sub, Some(values)) val inConditions = values.zip(newSub.output).map(EqualTo.tupled) val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions, p) // Expand the NOT IN expression with the NULL-aware semantic @@ -168,14 +154,13 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { e transformUp { case Exists(sub, conditions, _) => val exists = AttributeReference("exists", BooleanType, nullable = false)() - // Deduplicate conflicting attributes if any. - newPlan = dedupJoin( - Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And))) + newPlan = + buildJoin(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And)) exists case InSubquery(values, ListQuery(sub, conditions, _, _)) => val exists = AttributeReference("exists", BooleanType, nullable = false)() // Deduplicate conflicting attributes if any. - val newSub = dedupSubqueryOnSelfJoin(values, newPlan, sub) + val newSub = dedupSubqueryOnSelfJoin(newPlan, sub, Some(values)) val inConditions = values.zip(newSub.output).map(EqualTo.tupled) val newConditions = (inConditions ++ conditions).reduceLeftOption(And) newPlan = Join(newPlan, newSub, ExistenceJoin(exists), newConditions) From 6172f52b55de78c3b34aa12966e7fc3726b5be07 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 13 Dec 2018 21:16:16 +0100 Subject: [PATCH 09/10] address comment --- .../spark/sql/catalyst/optimizer/subquery.scala | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index 0b97e629f7ad..a7bfe389888b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer import scala.collection.mutable.ArrayBuffer -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.SubExprUtils._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -50,14 +50,15 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { joinType: JoinType, condition: Option[Expression]): Join = { // Deduplicate conflicting attributes if any. - val dedupSubplan = dedupSubqueryOnSelfJoin(outerPlan, subplan) + val dedupSubplan = dedupSubqueryOnSelfJoin(outerPlan, subplan, None, condition) Join(outerPlan, dedupSubplan, joinType, condition) } private def dedupSubqueryOnSelfJoin( outerPlan: LogicalPlan, subplan: LogicalPlan, - valuesOpt: Option[Seq[Expression]] = None): LogicalPlan = { + valuesOpt: Option[Seq[Expression]], + condition: Option[Expression] = None): LogicalPlan = { // SPARK-21835: It is possibly that the two sides of the join have conflicting attributes, // the produced join then becomes unresolved and break structural integrity. We should // de-duplicate conflicting attributes. @@ -71,6 +72,12 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { val outerRefs = outerPlan.outputSet ++ outerReferences val duplicates = outerRefs.intersect(subplan.outputSet) if (duplicates.nonEmpty) { + condition.foreach { + case e if e.references.intersect(duplicates).nonEmpty => + throw new AnalysisException(s"Found conflicting attributes ${duplicates.mkString(",")} " + + s"in nodes:\n $outerPlan\n $subplan") + case _ => + } val rewrites = AttributeMap(duplicates.map { dup => dup -> Alias(dup, dup.toString)() }.toSeq) From ec710d718d821d5c610f64f79331a0c4e14f0bfd Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 13 Dec 2018 21:19:28 +0100 Subject: [PATCH 10/10] address comment --- .../spark/sql/catalyst/optimizer/subquery.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index a7bfe389888b..34840c6c977a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -72,11 +72,13 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { val outerRefs = outerPlan.outputSet ++ outerReferences val duplicates = outerRefs.intersect(subplan.outputSet) if (duplicates.nonEmpty) { - condition.foreach { - case e if e.references.intersect(duplicates).nonEmpty => - throw new AnalysisException(s"Found conflicting attributes ${duplicates.mkString(",")} " + - s"in nodes:\n $outerPlan\n $subplan") - case _ => + condition.foreach { e => + val conflictingAttrs = e.references.intersect(duplicates) + if (conflictingAttrs.nonEmpty) { + throw new AnalysisException("Found conflicting attributes " + + s"${conflictingAttrs.mkString(",")} in the condition joining outer plan:\n " + + s"$outerPlan\nand subplan:\n $subplan") + } } val rewrites = AttributeMap(duplicates.map { dup => dup -> Alias(dup, dup.toString)()