From b98865127a39bde885f9b1680cfe608629d59d51 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Fri, 29 Jul 2016 17:43:56 -0400 Subject: [PATCH 01/17] [SPARK-16804][SQL] Correlated subqueries containing LIMIT return incorrect results ## What changes were proposed in this pull request? This patch fixes the incorrect results in the rule ResolveSubquery in Catalyst's Analysis phase. ## How was this patch tested? ./dev/run-tests a new unit test on the problematic pattern. --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 ++++++++++ .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 8 ++++++++ 2 files changed, 18 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2efa997ff22d..c3ee6517875c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1021,6 +1021,16 @@ class Analyzer( case e: Expand => failOnOuterReferenceInSubTree(e, "an EXPAND") e + case l @ LocalLimit(_, child) => + failOnOuterReferenceInSubTree(l, "LIMIT") + l + // Since LIMIT is represented as GlobalLimit(, (LocalLimit (, child)) + // and we are walking bottom up, we will fail on LocalLimit before + // reaching GlobalLimit. + // The code below is just a safety net. + case g @ GlobalLimit(_, child) => + failOnOuterReferenceInSubTree(g, "LIMIT") + g case p => failOnOuterReference(p) p diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index ff112c51697a..b78a988eddbb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -533,5 +533,13 @@ class AnalysisErrorSuite extends AnalysisTest { Exists(Union(LocalRelation(b), Filter(EqualTo(OuterReference(a), c), LocalRelation(c)))), LocalRelation(a)) assertAnalysisError(plan3, "Accessing outer query column is not allowed in" :: Nil) + + val plan4 = Filter( + Exists( + Limit(1, + Filter(EqualTo(OuterReference(a), b), LocalRelation(b))) + ), + LocalRelation(a)) + assertAnalysisError(plan4, "Accessing outer query column is not allowed in LIMIT" :: Nil) } } From 069ed8f8e5f14dca7a15701945d42fc27fe82f3c Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Fri, 29 Jul 2016 17:50:02 -0400 Subject: [PATCH 02/17] [SPARK-16804][SQL] Correlated subqueries containing LIMIT return incorrect results ## What changes were proposed in this pull request? This patch fixes the incorrect results in the rule ResolveSubquery in Catalyst's Analysis phase. ## How was this patch tested? ./dev/run-tests a new unit test on the problematic pattern. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index c3ee6517875c..357c763f5946 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1022,14 +1022,14 @@ class Analyzer( failOnOuterReferenceInSubTree(e, "an EXPAND") e case l @ LocalLimit(_, child) => - failOnOuterReferenceInSubTree(l, "LIMIT") + failOnOuterReferenceInSubTree(l, "a LIMIT") l // Since LIMIT is represented as GlobalLimit(, (LocalLimit (, child)) // and we are walking bottom up, we will fail on LocalLimit before // reaching GlobalLimit. // The code below is just a safety net. case g @ GlobalLimit(_, child) => - failOnOuterReferenceInSubTree(g, "LIMIT") + failOnOuterReferenceInSubTree(g, "a LIMIT") g case p => failOnOuterReference(p) From edca333c081e6d4e53a91b496fba4a3ef4ee89ac Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Fri, 29 Jul 2016 20:28:15 -0400 Subject: [PATCH 03/17] New positive test cases --- .../org/apache/spark/sql/SubquerySuite.scala | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index afed342ff8e2..52387b4b72a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -571,4 +571,33 @@ class SubquerySuite extends QueryTest with SharedSQLContext { Row(1.0, false) :: Row(1.0, false) :: Row(2.0, true) :: Row(2.0, true) :: Row(3.0, false) :: Row(5.0, true) :: Row(null, false) :: Row(null, true) :: Nil) } + + test("SPARK-16804: Correlated subqueries containing LIMIT - 1") { + withTempView("onerow") { + Seq(1).toDF("c1").createOrReplaceTempView("onerow") + + checkAnswer( + sql( + """ + | select c1 from onerow t1 + | where exists (select 1 from onerow t2 where t1.c1=t2.c1) + | and exists (select 1 from onerow LIMIT 1)""".stripMargin), + Row(1) :: Nil) + } + } + + test("SPARK-16804: Correlated subqueries containing LIMIT - 2") { + withTempView("onerow") { + Seq(1).toDF("c1").createOrReplaceTempView("onerow") + + checkAnswer( + sql( + """ + | select c1 from onerow t1 + | where exists (select 1 + | from (select 1 from onerow t2 LIMIT 1) + | where t1.c1=t2.c1)""".stripMargin), + Row(1) :: Nil) + } + } } From 64184fdb77c1a305bb2932e82582da28bb4c0e53 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Mon, 1 Aug 2016 09:20:09 -0400 Subject: [PATCH 04/17] Fix unit test case failure --- .../apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index b78a988eddbb..c08de826bd94 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -540,6 +540,6 @@ class AnalysisErrorSuite extends AnalysisTest { Filter(EqualTo(OuterReference(a), b), LocalRelation(b))) ), LocalRelation(a)) - assertAnalysisError(plan4, "Accessing outer query column is not allowed in LIMIT" :: Nil) + assertAnalysisError(plan4, "Accessing outer query column is not allowed in a LIMIT" :: Nil) } } From 29f82b05c9e40e7934397257c674b260a8e8a996 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Fri, 5 Aug 2016 13:42:01 -0400 Subject: [PATCH 05/17] blocking TABLESAMPLE --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 7 +++++-- .../spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 8 ++++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 357c763f5946..9d99c4173d4a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1021,16 +1021,19 @@ class Analyzer( case e: Expand => failOnOuterReferenceInSubTree(e, "an EXPAND") e - case l @ LocalLimit(_, child) => + case l @ LocalLimit(_, _) => failOnOuterReferenceInSubTree(l, "a LIMIT") l // Since LIMIT is represented as GlobalLimit(, (LocalLimit (, child)) // and we are walking bottom up, we will fail on LocalLimit before // reaching GlobalLimit. // The code below is just a safety net. - case g @ GlobalLimit(_, child) => + case g @ GlobalLimit(_, _) => failOnOuterReferenceInSubTree(g, "a LIMIT") g + case s @ Sample(_, _, _, _, _) => + failOnOuterReferenceInSubTree(s, "a TABLESAMPLE") + s case p => failOnOuterReference(p) p diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index c08de826bd94..0b7d681be511 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -541,5 +541,13 @@ class AnalysisErrorSuite extends AnalysisTest { ), LocalRelation(a)) assertAnalysisError(plan4, "Accessing outer query column is not allowed in a LIMIT" :: Nil) + + val plan5 = Filter( + Exists( + Sample(0.0, 0.5, false, 1L, + Filter(EqualTo(OuterReference(a), b), LocalRelation(b)))().select('b) + ), + LocalRelation(a)) + assertAnalysisError(plan5, "Accessing outer query column is not allowed in a TABLESAMPLE" :: Nil) } } From ac43ab47907a1ccd6d22f920415fbb4de93d4720 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Fri, 5 Aug 2016 17:10:19 -0400 Subject: [PATCH 06/17] Fixing code styling --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 9d99c4173d4a..29ede7048a2d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1021,17 +1021,17 @@ class Analyzer( case e: Expand => failOnOuterReferenceInSubTree(e, "an EXPAND") e - case l @ LocalLimit(_, _) => + case l : LocalLimit => failOnOuterReferenceInSubTree(l, "a LIMIT") l // Since LIMIT is represented as GlobalLimit(, (LocalLimit (, child)) // and we are walking bottom up, we will fail on LocalLimit before // reaching GlobalLimit. // The code below is just a safety net. - case g @ GlobalLimit(_, _) => + case g : GlobalLimit => failOnOuterReferenceInSubTree(g, "a LIMIT") g - case s @ Sample(_, _, _, _, _) => + case s : Sample => failOnOuterReferenceInSubTree(s, "a TABLESAMPLE") s case p => From 631d396031e8bf627eb1f4872a4d3a17c144536c Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Sun, 7 Aug 2016 14:39:44 -0400 Subject: [PATCH 07/17] Correcting Scala test style --- .../spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 0b7d681be511..8935d979414a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -548,6 +548,7 @@ class AnalysisErrorSuite extends AnalysisTest { Filter(EqualTo(OuterReference(a), b), LocalRelation(b)))().select('b) ), LocalRelation(a)) - assertAnalysisError(plan5, "Accessing outer query column is not allowed in a TABLESAMPLE" :: Nil) + assertAnalysisError(plan5, + "Accessing outer query column is not allowed in a TABLESAMPLE" :: Nil) } } From 7eb9b2dbba3633a1958e38e0019e3ce816300514 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Sun, 7 Aug 2016 22:31:09 -0400 Subject: [PATCH 08/17] One (last) attempt to correct the Scala style tests --- .../apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 8935d979414a..6438065fb292 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -548,7 +548,7 @@ class AnalysisErrorSuite extends AnalysisTest { Filter(EqualTo(OuterReference(a), b), LocalRelation(b)))().select('b) ), LocalRelation(a)) - assertAnalysisError(plan5, + assertAnalysisError(plan5, "Accessing outer query column is not allowed in a TABLESAMPLE" :: Nil) } } From 2f463de8d4bf566e5fd59f39ddef6ceba5cfc894 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Sun, 1 Jan 2017 11:18:54 -0500 Subject: [PATCH 09/17] first fix (incomplete) --- .../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index aa77a6efef34..357e2aea8f4e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -202,6 +202,11 @@ trait CheckAnalysis extends PredicateHelper { case e if PredicateSubquery.hasNullAwarePredicateWithinNot(e) => failAnalysis(s"Null-aware predicate sub-queries cannot be used in nested" + s" conditions: $e") + // @nsyca + // Incomplete fix. This is to address a subset of problem + // specific to "= " + case e @ EqualTo(_, x: ScalarSubquery) => + checkAnalysis(x.plan) case e => } From 6e2f686f8e516e63235e1e6ccb13bdf8a9e6e314 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Tue, 3 Jan 2017 15:06:12 -0500 Subject: [PATCH 10/17] first attempt --- .../sql/catalyst/analysis/CheckAnalysis.scala | 113 +++++++++--------- 1 file changed, 57 insertions(+), 56 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 357e2aea8f4e..9f5828a2c95c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -117,66 +117,72 @@ trait CheckAnalysis extends PredicateHelper { failAnalysis(s"Window specification $s is not valid because $m") case None => w } - case s @ ScalarSubquery(query, conditions, _) + // @nsyca + case e @ PredicateSubquery(query, _, _, _) => + checkAnalysis(query) + e + + case s @ ScalarSubquery(query, conditions, _) => // If no correlation, the output must be exactly one column - if (conditions.isEmpty && query.output.size != 1) => + if (conditions.isEmpty && query.output.size != 1) { failAnalysis( s"Scalar subquery must return only one column, but got ${query.output.size}") - - case s @ ScalarSubquery(query, conditions, _) if conditions.nonEmpty => - - // Collect the columns from the subquery for further checking. - var subqueryColumns = conditions.flatMap(_.references).filter(query.output.contains) - - def checkAggregate(agg: Aggregate): Unit = { - // Make sure correlated scalar subqueries contain one row for every outer row by - // enforcing that they are aggregates which contain exactly one aggregate expressions. - // The analyzer has already checked that subquery contained only one output column, - // and added all the grouping expressions to the aggregate. - val aggregates = agg.expressions.flatMap(_.collect { - case a: AggregateExpression => a - }) - if (aggregates.isEmpty) { - failAnalysis("The output of a correlated scalar subquery must be aggregated") - } - - // SPARK-18504/SPARK-18814: Block cases where GROUP BY columns - // are not part of the correlated columns. - val groupByCols = AttributeSet(agg.groupingExpressions.flatMap(_.references)) - val correlatedCols = AttributeSet(subqueryColumns) - val invalidCols = groupByCols -- correlatedCols - // GROUP BY columns must be a subset of columns in the predicates - if (invalidCols.nonEmpty) { - failAnalysis( - "A GROUP BY clause in a scalar correlated subquery " + - "cannot contain non-correlated columns: " + - invalidCols.mkString(",")) - } } + else if (conditions.nonEmpty) { + // Collect the columns from the subquery for further checking. + var subqueryColumns = conditions.flatMap(_.references).filter(query.output.contains) + + def checkAggregate(agg: Aggregate): Unit = { + // Make sure correlated scalar subqueries contain one row for every outer row by + // enforcing that they are aggregates containing exactly one aggregate expression. + // The analyzer has already checked that subquery contained only one output column, + // and added all the grouping expressions to the aggregate. + val aggregates = agg.expressions.flatMap(_.collect { + case a: AggregateExpression => a + }) + if (aggregates.isEmpty) { + failAnalysis("The output of a correlated scalar subquery must be aggregated") + } - // Skip subquery aliases added by the Analyzer and the SQLBuilder. - // For projects, do the necessary mapping and skip to its child. - def cleanQuery(p: LogicalPlan): LogicalPlan = p match { - case s: SubqueryAlias => cleanQuery(s.child) - case p: Project => - // SPARK-18814: Map any aliases to their AttributeReference children - // for the checking in the Aggregate operators below this Project. - subqueryColumns = subqueryColumns.map { - xs => p.projectList.collectFirst { - case e @ Alias(child : AttributeReference, _) if e.exprId == xs.exprId => - child - }.getOrElse(xs) + // SPARK-18504/SPARK-18814: Block cases where GROUP BY columns + // are not part of the correlated columns. + val groupByCols = AttributeSet(agg.groupingExpressions.flatMap(_.references)) + val correlatedCols = AttributeSet(subqueryColumns) + val invalidCols = groupByCols -- correlatedCols + // GROUP BY columns must be a subset of columns in the predicates + if (invalidCols.nonEmpty) { + failAnalysis( + "A GROUP BY clause in a scalar correlated subquery " + + "cannot contain non-correlated columns: " + + invalidCols.mkString(",")) } + } - cleanQuery(p.child) - case child => child - } + // Skip subquery aliases added by the Analyzer and the SQLBuilder. + // For projects, do the necessary mapping and skip to its child. + def cleanQuery(p: LogicalPlan): LogicalPlan = p match { + case s: SubqueryAlias => cleanQuery(s.child) + case p: Project => + // SPARK-18814: Map any aliases to their AttributeReference children + // for the checking in the Aggregate operators below this Project. + subqueryColumns = subqueryColumns.map { + xs => p.projectList.collectFirst { + case e @ Alias(child : AttributeReference, _) if e.exprId == xs.exprId => + child + }.getOrElse(xs) + } - cleanQuery(query) match { - case a: Aggregate => checkAggregate(a) - case Filter(_, a: Aggregate) => checkAggregate(a) - case fail => failAnalysis(s"Correlated scalar subqueries must be Aggregated: $fail") + cleanQuery(p.child) + case child => child + } + + cleanQuery(query) match { + case a: Aggregate => checkAggregate(a) + case Filter(_, a: Aggregate) => checkAggregate(a) + case fail => failAnalysis(s"Correlated scalar subqueries must be Aggregated: $fail") + } } + checkAnalysis(query) s } @@ -202,11 +208,6 @@ trait CheckAnalysis extends PredicateHelper { case e if PredicateSubquery.hasNullAwarePredicateWithinNot(e) => failAnalysis(s"Null-aware predicate sub-queries cannot be used in nested" + s" conditions: $e") - // @nsyca - // Incomplete fix. This is to address a subset of problem - // specific to "= " - case e @ EqualTo(_, x: ScalarSubquery) => - checkAnalysis(x.plan) case e => } From e9bdde6e1268170ef89c2a3f402dbd766b5cad00 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Thu, 5 Jan 2017 10:31:08 -0500 Subject: [PATCH 11/17] New test cases --- .../negative-cases/invalid-correlation.sql | 58 +++++++++++++ .../invalid-correlation.sql.out | 85 +++++++++++++++++++ 2 files changed, 143 insertions(+) create mode 100644 sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql create mode 100644 sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql new file mode 100644 index 000000000000..7cd8acf93b15 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql @@ -0,0 +1,58 @@ +-- The test file contains negative test cases +-- of invalid queries where error messages are expected. + +create temporary view t1 as select * from values + (1, 2, 3) +as t1(t1a, t1b, t1c); + +create temporary view t2 as select * from values + (1, 0, 1) +as t2(t2a, t2b, t2c); + +create temporary view t3 as select * from values + (3, 1, 2) +as t3(t3a, t3b, t3c); + +-- TC 01.01 +-- The column t2b in the SELECT of the subquery is invalid +-- because it is neither an aggregate function nor a GROUP BY column. +select t1a, t2b +from t1, t2 +where t1b = t2c +and t2b = (select max(avg) + from (select t2b, avg(t2b) avg + from t2 + where t2a = t1.t1b + ) + ) +; + +-- TC 01.02 +-- Invalid due to the column t2b not part of the output from table t2. +select * +from t1 +where t1a in (select min(t2a) + from t2 + group by t2c + having t2c in (select max(t3c) + from t3 + group by t3b + having t3b > t2b )) +; + +-- TC 01.03 +-- The column t2c in the predicate t2c > 8 must be mapped to the t2 in its subquery scope. +-- But t2c is not part of the output of the subquery hence this is an invalid query. +select * +from (select * + from t2 + where t2a in (select t1a + from t1 + where t1b = t2b)) t2 +where t2a in (select t2a + from t2 + where t2a = t2a + and t2c > 1 + group by t2a + having t2c > 8) +; diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out new file mode 100644 index 000000000000..06ffc4223460 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out @@ -0,0 +1,85 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 6 + + +-- !query 0 +create temporary view t1 as select * from values + (1, 2, 3) +as t1(t1a, t1b, t1c) +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +create temporary view t2 as select * from values + (1, 0, 1) +as t2(t2a, t2b, t2c) +-- !query 1 schema +struct<> +-- !query 1 output + + + +-- !query 2 +create temporary view t3 as select * from values + (3, 1, 2) +as t3(t3a, t3b, t3c) +-- !query 2 schema +struct<> +-- !query 2 output + + + +-- !query 3 +select t1a, t2b +from t1, t2 +where t1b = t2c +and t2b = (select max(avg) + from (select t2b, avg(t2b) avg + from t2 + where t2a = t1.t1b + ) + ) +-- !query 3 schema +struct<> +-- !query 3 output +org.apache.spark.sql.AnalysisException +expression 't2.`t2b`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.; + + +-- !query 4 +select * +from t1 +where t1a in (select min(t2a) + from t2 + group by t2c + having t2c in (select max(t3c) + from t3 + group by t3b + having t3b > t2b )) +-- !query 4 schema +struct<> +-- !query 4 output +org.apache.spark.sql.AnalysisException +resolved attribute(s) t2b#4863 missing from min(t2a)#4879,t2c#4864 in operator !Filter predicate-subquery#4876 [(t2c#4864 = max(t3c)#4882) && (t3b#4867 > t2b#4863)]; + + +-- !query 5 +select * +from (select * + from t2 + where t2a in (select t1a + from t1 + where t1b = t2b)) t2 +where t2a in (select t2a + from t2 + where t2a = t2a + and t2c > 1 + group by t2a + having t2c > 8) +-- !query 5 schema +struct +-- !query 5 output + From deec874947a7028aa4a7bef0a1b5898609a6d79c Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Thu, 5 Jan 2017 11:04:36 -0500 Subject: [PATCH 12/17] Masking exprIDs --- .../negative-cases/invalid-correlation.sql | 16 ------------- .../invalid-correlation.sql.out | 23 ++----------------- .../apache/spark/sql/SQLQueryTestSuite.scala | 9 +++++--- 3 files changed, 8 insertions(+), 40 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql index 7cd8acf93b15..cf93c5a83597 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql @@ -40,19 +40,3 @@ where t1a in (select min(t2a) having t3b > t2b )) ; --- TC 01.03 --- The column t2c in the predicate t2c > 8 must be mapped to the t2 in its subquery scope. --- But t2c is not part of the output of the subquery hence this is an invalid query. -select * -from (select * - from t2 - where t2a in (select t1a - from t1 - where t1b = t2b)) t2 -where t2a in (select t2a - from t2 - where t2a = t2a - and t2c > 1 - group by t2a - having t2c > 8) -; diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out index 06ffc4223460..2148ef182340 100644 --- a/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 6 +-- Number of queries: 5 -- !query 0 @@ -63,23 +63,4 @@ where t1a in (select min(t2a) struct<> -- !query 4 output org.apache.spark.sql.AnalysisException -resolved attribute(s) t2b#4863 missing from min(t2a)#4879,t2c#4864 in operator !Filter predicate-subquery#4876 [(t2c#4864 = max(t3c)#4882) && (t3b#4867 > t2b#4863)]; - - --- !query 5 -select * -from (select * - from t2 - where t2a in (select t1a - from t1 - where t1b = t2b)) t2 -where t2a in (select t2a - from t2 - where t2a = t2a - and t2c > 1 - group by t2a - having t2c > 8) --- !query 5 schema -struct --- !query 5 output - +resolved attribute(s) t2b### missing from min(t2a)###,t2c### in operator !Filter predicate-subquery### [(t2c### = max(t3c)###) && (t3b### > t2b###)]; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 1a4049fb339c..43a040505bbf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -47,9 +47,9 @@ import org.apache.spark.sql.types.StructType * * To re-generate golden files, run: * {{{ - * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/test-only *SQLQueryTestSuite" - * }}} * + * }}} + *SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/test-only *SQLQueryTestSuite" * The format for input files is simple: * 1. A list of SQL queries separated by semicolon. * 2. Lines starting with -- are treated as comments and ignored. @@ -223,7 +223,10 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { } catch { case a: AnalysisException if a.plan.nonEmpty => // Do not output the logical plan tree which contains expression IDs. - (StructType(Seq.empty), Seq(a.getClass.getName, a.getSimpleMessage)) + // Also implement a crude way of masking expression IDs in the error message + // with a generic pattern "###". + (StructType(Seq.empty), + Seq(a.getClass.getName, a.getSimpleMessage.replaceAll("#[0-9]+", "###"))) case NonFatal(e) => // If there is an exception, put the exception class followed by the message. (StructType(Seq.empty), Seq(e.getClass.getName, e.getMessage)) From bcae3363db60cdf93d1bb9b741f96ec0e088cf0b Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Thu, 12 Jan 2017 20:12:55 -0500 Subject: [PATCH 13/17] reverse back accidental change --- .../test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 43a040505bbf..54f04ef4aff5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -47,9 +47,9 @@ import org.apache.spark.sql.types.StructType * * To re-generate golden files, run: * {{{ - * + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/test-only *SQLQueryTestSuite" * }}} - *SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/test-only *SQLQueryTestSuite" + * * The format for input files is simple: * 1. A list of SQL queries separated by semicolon. * 2. Lines starting with -- are treated as comments and ignored. From 51f7fb92e47e92208f4e7b2d3cd6d9745177509e Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Thu, 12 Jan 2017 20:16:05 -0500 Subject: [PATCH 14/17] port from SPARK-19017 --- .../scala/org/apache/spark/sql/SQLQueryTestSuite.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 54f04ef4aff5..051047bf5081 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -163,7 +163,12 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { s"-- Number of queries: ${outputs.size}\n\n\n" + outputs.zipWithIndex.map{case (qr, i) => qr.toString(i)}.mkString("\n\n\n") + "\n" } - stringToFile(new File(testCase.resultFile), goldenOutput) + val resultFile = new File(testCase.resultFile); + val parent = resultFile.getParentFile(); + if (!parent.exists()) { + assert(parent.mkdirs(), "Could not create directory: " + parent) + } + stringToFile(resultFile, goldenOutput) } // Read back the golden file. From 24397cf6c8728b4dfff22da14dc909dbb3b0a4e5 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Thu, 12 Jan 2017 21:19:11 -0500 Subject: [PATCH 15/17] remove unrelated comment --- .../org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 9f5828a2c95c..538b2280cb44 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -117,7 +117,7 @@ trait CheckAnalysis extends PredicateHelper { failAnalysis(s"Window specification $s is not valid because $m") case None => w } - // @nsyca + case e @ PredicateSubquery(query, _, _, _) => checkAnalysis(query) e From ced19c72a19db059e6c76955f84f435d8051fe97 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Tue, 24 Jan 2017 20:38:36 -0500 Subject: [PATCH 16/17] address comment #1 --- .../spark/sql/catalyst/analysis/CheckAnalysis.scala | 9 +++++---- .../subquery/negative-cases/invalid-correlation.sql.out | 2 +- .../scala/org/apache/spark/sql/SQLQueryTestSuite.scala | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 538b2280cb44..610a33d9c55b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -118,10 +118,6 @@ trait CheckAnalysis extends PredicateHelper { case None => w } - case e @ PredicateSubquery(query, _, _, _) => - checkAnalysis(query) - e - case s @ ScalarSubquery(query, conditions, _) => // If no correlation, the output must be exactly one column if (conditions.isEmpty && query.output.size != 1) { @@ -184,6 +180,11 @@ trait CheckAnalysis extends PredicateHelper { } checkAnalysis(query) s + + case s: SubqueryExpression => + checkAnalysis(s.plan) + s + } operator match { diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out index 2148ef182340..50ae01e181bc 100644 --- a/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out @@ -63,4 +63,4 @@ where t1a in (select min(t2a) struct<> -- !query 4 output org.apache.spark.sql.AnalysisException -resolved attribute(s) t2b### missing from min(t2a)###,t2c### in operator !Filter predicate-subquery### [(t2c### = max(t3c)###) && (t3b### > t2b###)]; +resolved attribute(s) t2b#x missing from min(t2a)#x,t2c#x in operator !Filter predicate-subquery#x [(t2c#x = max(t3c)#x) && (t3b#x > t2b#x)]; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 051047bf5081..91aecca537fb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -231,7 +231,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { // Also implement a crude way of masking expression IDs in the error message // with a generic pattern "###". (StructType(Seq.empty), - Seq(a.getClass.getName, a.getSimpleMessage.replaceAll("#[0-9]+", "###"))) + Seq(a.getClass.getName, a.getSimpleMessage.replaceAll("#\\d+", "#x"))) case NonFatal(e) => // If there is an exception, put the exception class followed by the message. (StructType(Seq.empty), Seq(e.getClass.getName, e.getMessage)) From 010d27a79be684668012fd796d21a085308dd828 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Tue, 24 Jan 2017 20:44:47 -0500 Subject: [PATCH 17/17] remove blank line --- .../org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index d77c9fbe72f3..f13a1f6d5d2f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -184,7 +184,6 @@ trait CheckAnalysis extends PredicateHelper { case s: SubqueryExpression => checkAnalysis(s.plan) s - } operator match {