From 00bbe9d3a52ff0bac210de1144ba2121c5a52232 Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Mon, 1 Jun 2020 22:39:53 +0900 Subject: [PATCH 1/2] Check the ambiguous self-join only if there's join in the plan --- .../execution/analysis/DetectAmbiguousSelfJoin.scala | 3 +++ .../apache/spark/sql/DataFrameSelfJoinSuite.scala | 12 ++++++++++++ 2 files changed, 15 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala index 614d6c2846bfa..b041ad1dc0578 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala @@ -137,6 +137,9 @@ class DetectAmbiguousSelfJoin(conf: SQLConf) extends Rule[LogicalPlan] { } condition.toSeq.flatMap(getAmbiguousAttrs) + case _ if plan.find(_.isInstanceOf[Join]).isEmpty => + Nil // If there's no join, there's no self-join. + case _ => ambiguousColRefs.toSeq.map { ref => colRefAttrs.find(attr => toColumnReference(attr) == ref).get } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala index 250ec7dc0ba5a..fb58c9851224b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql +import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.{count, sum} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -202,4 +203,15 @@ class DataFrameSelfJoinSuite extends QueryTest with SharedSparkSession { assertAmbiguousSelfJoin(df1.join(df4).join(df2).select(df2("id"))) } } + + test("SPARK-28344: don't fail as ambiguous self join when there is no join") { + withSQLConf( + SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true") { + val df = Seq(1, 1, 2, 2).toDF("a") + val w = Window.partitionBy(df("a")) + checkAnswer( + df.select(df("a").alias("x"), sum(df("a")).over(w)), + Seq((1, 2), (1, 2), (2, 4), (2, 4)).map(Row.fromTuple)) + } + } } From 21e4241a95b7bd7aa01c0257accbf2a017e1afa3 Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Tue, 2 Jun 2020 00:25:49 +0900 Subject: [PATCH 2/2] Address comments --- .../sql/execution/analysis/DetectAmbiguousSelfJoin.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala index b041ad1dc0578..136f7c47f5341 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala @@ -76,6 +76,8 @@ class DetectAmbiguousSelfJoin(conf: SQLConf) extends Rule[LogicalPlan] { // We always remove the special metadata from `AttributeReference` at the end of this rule, so // Dataset column reference only exists in the root node via Dataset transformations like // `Dataset#select`. + if (plan.find(_.isInstanceOf[Join]).isEmpty) return stripColumnReferenceMetadataInPlan(plan) + val colRefAttrs = plan.expressions.flatMap(_.collect { case a: AttributeReference if isColumnReference(a) => a }) @@ -137,9 +139,6 @@ class DetectAmbiguousSelfJoin(conf: SQLConf) extends Rule[LogicalPlan] { } condition.toSeq.flatMap(getAmbiguousAttrs) - case _ if plan.find(_.isInstanceOf[Join]).isEmpty => - Nil // If there's no join, there's no self-join. - case _ => ambiguousColRefs.toSeq.map { ref => colRefAttrs.find(attr => toColumnReference(attr) == ref).get } @@ -156,6 +155,10 @@ class DetectAmbiguousSelfJoin(conf: SQLConf) extends Rule[LogicalPlan] { } } + stripColumnReferenceMetadataInPlan(plan) + } + + private def stripColumnReferenceMetadataInPlan(plan: LogicalPlan): LogicalPlan = { plan.transformExpressions { case a: AttributeReference if isColumnReference(a) => // Remove the special metadata from this `AttributeReference`, as the detection is done.