-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-18863][SQL] Output non-aggregate expressions without GROUP BY in a subquery does not yield an error #16572
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 22 commits
b988651
069ed8f
edca333
64184fd
29f82b0
ac43ab4
631d396
7eb9b2d
1387cf5
3faa2d5
a308634
2f463de
6e2f686
f1524b9
6dfa8e5
e9bdde6
deec874
5c36dce
98cbd60
bcae336
51f7fb9
24397cf
ced19c7
862b2b8
203ad7d
010d27a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -117,66 +117,72 @@ trait CheckAnalysis extends PredicateHelper { | |
| failAnalysis(s"Window specification $s is not valid because $m") | ||
| case None => w | ||
| } | ||
| case s @ ScalarSubquery(query, conditions, _) | ||
|
|
||
| case e @ PredicateSubquery(query, _, _, _) => | ||
| checkAnalysis(query) | ||
| e | ||
|
|
||
| case s @ ScalarSubquery(query, conditions, _) => | ||
| // If no correlation, the output must be exactly one column | ||
| if (conditions.isEmpty && query.output.size != 1) => | ||
| if (conditions.isEmpty && query.output.size != 1) { | ||
| failAnalysis( | ||
| s"Scalar subquery must return only one column, but got ${query.output.size}") | ||
|
|
||
| case s @ ScalarSubquery(query, conditions, _) if conditions.nonEmpty => | ||
|
|
||
| // Collect the columns from the subquery for further checking. | ||
| var subqueryColumns = conditions.flatMap(_.references).filter(query.output.contains) | ||
|
|
||
| def checkAggregate(agg: Aggregate): Unit = { | ||
| // Make sure correlated scalar subqueries contain one row for every outer row by | ||
| // enforcing that they are aggregates which contain exactly one aggregate expressions. | ||
| // The analyzer has already checked that subquery contained only one output column, | ||
| // and added all the grouping expressions to the aggregate. | ||
| val aggregates = agg.expressions.flatMap(_.collect { | ||
| case a: AggregateExpression => a | ||
| }) | ||
| if (aggregates.isEmpty) { | ||
| failAnalysis("The output of a correlated scalar subquery must be aggregated") | ||
| } | ||
|
|
||
| // SPARK-18504/SPARK-18814: Block cases where GROUP BY columns | ||
| // are not part of the correlated columns. | ||
| val groupByCols = AttributeSet(agg.groupingExpressions.flatMap(_.references)) | ||
| val correlatedCols = AttributeSet(subqueryColumns) | ||
| val invalidCols = groupByCols -- correlatedCols | ||
| // GROUP BY columns must be a subset of columns in the predicates | ||
| if (invalidCols.nonEmpty) { | ||
| failAnalysis( | ||
| "A GROUP BY clause in a scalar correlated subquery " + | ||
| "cannot contain non-correlated columns: " + | ||
| invalidCols.mkString(",")) | ||
| } | ||
| } | ||
| else if (conditions.nonEmpty) { | ||
| // Collect the columns from the subquery for further checking. | ||
| var subqueryColumns = conditions.flatMap(_.references).filter(query.output.contains) | ||
|
|
||
| def checkAggregate(agg: Aggregate): Unit = { | ||
| // Make sure correlated scalar subqueries contain one row for every outer row by | ||
| // enforcing that they are aggregates containing exactly one aggregate expression. | ||
| // The analyzer has already checked that subquery contained only one output column, | ||
| // and added all the grouping expressions to the aggregate. | ||
| val aggregates = agg.expressions.flatMap(_.collect { | ||
| case a: AggregateExpression => a | ||
| }) | ||
| if (aggregates.isEmpty) { | ||
| failAnalysis("The output of a correlated scalar subquery must be aggregated") | ||
| } | ||
|
|
||
| // Skip subquery aliases added by the Analyzer and the SQLBuilder. | ||
| // For projects, do the necessary mapping and skip to its child. | ||
| def cleanQuery(p: LogicalPlan): LogicalPlan = p match { | ||
| case s: SubqueryAlias => cleanQuery(s.child) | ||
| case p: Project => | ||
| // SPARK-18814: Map any aliases to their AttributeReference children | ||
| // for the checking in the Aggregate operators below this Project. | ||
| subqueryColumns = subqueryColumns.map { | ||
| xs => p.projectList.collectFirst { | ||
| case e @ Alias(child : AttributeReference, _) if e.exprId == xs.exprId => | ||
| child | ||
| }.getOrElse(xs) | ||
| // SPARK-18504/SPARK-18814: Block cases where GROUP BY columns | ||
| // are not part of the correlated columns. | ||
| val groupByCols = AttributeSet(agg.groupingExpressions.flatMap(_.references)) | ||
| val correlatedCols = AttributeSet(subqueryColumns) | ||
| val invalidCols = groupByCols -- correlatedCols | ||
| // GROUP BY columns must be a subset of columns in the predicates | ||
| if (invalidCols.nonEmpty) { | ||
| failAnalysis( | ||
| "A GROUP BY clause in a scalar correlated subquery " + | ||
| "cannot contain non-correlated columns: " + | ||
| invalidCols.mkString(",")) | ||
| } | ||
| } | ||
|
|
||
| cleanQuery(p.child) | ||
| case child => child | ||
| } | ||
| // Skip subquery aliases added by the Analyzer and the SQLBuilder. | ||
| // For projects, do the necessary mapping and skip to its child. | ||
| def cleanQuery(p: LogicalPlan): LogicalPlan = p match { | ||
| case s: SubqueryAlias => cleanQuery(s.child) | ||
| case p: Project => | ||
| // SPARK-18814: Map any aliases to their AttributeReference children | ||
| // for the checking in the Aggregate operators below this Project. | ||
| subqueryColumns = subqueryColumns.map { | ||
| xs => p.projectList.collectFirst { | ||
| case e @ Alias(child : AttributeReference, _) if e.exprId == xs.exprId => | ||
| child | ||
| }.getOrElse(xs) | ||
| } | ||
|
|
||
| cleanQuery(p.child) | ||
| case child => child | ||
| } | ||
|
|
||
| cleanQuery(query) match { | ||
| case a: Aggregate => checkAggregate(a) | ||
| case Filter(_, a: Aggregate) => checkAggregate(a) | ||
| case fail => failAnalysis(s"Correlated scalar subqueries must be Aggregated: $fail") | ||
| cleanQuery(query) match { | ||
| case a: Aggregate => checkAggregate(a) | ||
| case Filter(_, a: Aggregate) => checkAggregate(a) | ||
| case fail => failAnalysis(s"Correlated scalar subqueries must be Aggregated: $fail") | ||
| } | ||
| } | ||
| checkAnalysis(query) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The best way to view this block of code changes is using a |
||
| s | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| -- The test file contains negative test cases | ||
| -- of invalid queries where error messages are expected. | ||
|
|
||
| create temporary view t1 as select * from values | ||
| (1, 2, 3) | ||
| as t1(t1a, t1b, t1c); | ||
|
|
||
| create temporary view t2 as select * from values | ||
| (1, 0, 1) | ||
| as t2(t2a, t2b, t2c); | ||
|
|
||
| create temporary view t3 as select * from values | ||
| (3, 1, 2) | ||
| as t3(t3a, t3b, t3c); | ||
|
|
||
| -- TC 01.01 | ||
| -- The column t2b in the SELECT of the subquery is invalid | ||
| -- because it is neither an aggregate function nor a GROUP BY column. | ||
| select t1a, t2b | ||
| from t1, t2 | ||
| where t1b = t2c | ||
| and t2b = (select max(avg) | ||
| from (select t2b, avg(t2b) avg | ||
| from t2 | ||
| where t2a = t1.t1b | ||
| ) | ||
| ) | ||
| ; | ||
|
|
||
| -- TC 01.02 | ||
| -- Invalid due to the column t2b not part of the output from table t2. | ||
| select * | ||
| from t1 | ||
| where t1a in (select min(t2a) | ||
| from t2 | ||
| group by t2c | ||
| having t2c in (select max(t3c) | ||
| from t3 | ||
| group by t3b | ||
| having t3b > t2b )) | ||
| ; | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,66 @@ | ||
| -- Automatically generated by SQLQueryTestSuite | ||
| -- Number of queries: 5 | ||
|
|
||
|
|
||
| -- !query 0 | ||
| create temporary view t1 as select * from values | ||
| (1, 2, 3) | ||
| as t1(t1a, t1b, t1c) | ||
| -- !query 0 schema | ||
| struct<> | ||
| -- !query 0 output | ||
|
|
||
|
|
||
|
|
||
| -- !query 1 | ||
| create temporary view t2 as select * from values | ||
| (1, 0, 1) | ||
| as t2(t2a, t2b, t2c) | ||
| -- !query 1 schema | ||
| struct<> | ||
| -- !query 1 output | ||
|
|
||
|
|
||
|
|
||
| -- !query 2 | ||
| create temporary view t3 as select * from values | ||
| (3, 1, 2) | ||
| as t3(t3a, t3b, t3c) | ||
| -- !query 2 schema | ||
| struct<> | ||
| -- !query 2 output | ||
|
|
||
|
|
||
|
|
||
| -- !query 3 | ||
| select t1a, t2b | ||
| from t1, t2 | ||
| where t1b = t2c | ||
| and t2b = (select max(avg) | ||
| from (select t2b, avg(t2b) avg | ||
| from t2 | ||
| where t2a = t1.t1b | ||
| ) | ||
| ) | ||
| -- !query 3 schema | ||
| struct<> | ||
| -- !query 3 output | ||
| org.apache.spark.sql.AnalysisException | ||
| expression 't2.`t2b`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.; | ||
|
|
||
|
|
||
| -- !query 4 | ||
| select * | ||
| from t1 | ||
| where t1a in (select min(t2a) | ||
| from t2 | ||
| group by t2c | ||
| having t2c in (select max(t3c) | ||
| from t3 | ||
| group by t3b | ||
| having t3b > t2b )) | ||
| -- !query 4 schema | ||
| struct<> | ||
| -- !query 4 output | ||
| org.apache.spark.sql.AnalysisException | ||
| resolved attribute(s) t2b### missing from min(t2a)###,t2c### in operator !Filter predicate-subquery### [(t2c### = max(t3c)###) && (t3b### > t2b###)]; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -163,7 +163,12 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { | |
| s"-- Number of queries: ${outputs.size}\n\n\n" + | ||
| outputs.zipWithIndex.map{case (qr, i) => qr.toString(i)}.mkString("\n\n\n") + "\n" | ||
| } | ||
| stringToFile(new File(testCase.resultFile), goldenOutput) | ||
| val resultFile = new File(testCase.resultFile); | ||
| val parent = resultFile.getParentFile(); | ||
| if (!parent.exists()) { | ||
| assert(parent.mkdirs(), "Could not create directory: " + parent) | ||
| } | ||
| stringToFile(resultFile, goldenOutput) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This addition is ported from the code from PR-16467 of SPARK-19017 reviewed by @hvanhovell. This change is required after the introduction of test files in sub-directories by SPARK-18871.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This has been merged. |
||
| } | ||
|
|
||
| // Read back the golden file. | ||
|
|
@@ -223,7 +228,10 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { | |
| } catch { | ||
| case a: AnalysisException if a.plan.nonEmpty => | ||
| // Do not output the logical plan tree which contains expression IDs. | ||
| (StructType(Seq.empty), Seq(a.getClass.getName, a.getSimpleMessage)) | ||
| // Also implement a crude way of masking expression IDs in the error message | ||
| // with a generic pattern "###". | ||
| (StructType(Seq.empty), | ||
| Seq(a.getClass.getName, a.getSimpleMessage.replaceAll("#[0-9]+", "###"))) | ||
|
||
| case NonFatal(e) => | ||
| // If there is an exception, put the exception class followed by the message. | ||
| (StructType(Seq.empty), Seq(e.getClass.getName, e.getMessage)) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be better to add a catch all
SubqueryExpressioncase after the ScalarSubquery case, instead of adding one specifically aimed at a predicate subquery.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.