From f7720200933d32eba8fc89ceb9ca7ba00b514d84 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Tue, 11 Jun 2019 17:00:43 +0200 Subject: [PATCH 1/5] [SPARK-28002][SQL] Support WITH clause column aliases --- .../spark/sql/catalyst/parser/SqlBase.g4 | 2 +- .../sql/catalyst/parser/AstBuilder.scala | 11 ++++++-- .../org/apache/spark/sql/SQLQuerySuite.scala | 28 +++++++++++++++++++ 3 files changed, 37 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index d58fb7cb20bf..ea06bc0f074b 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -319,7 +319,7 @@ ctes ; namedQuery - : name=identifier AS? '(' query ')' + : name=identifier (columnAliases=identifierList)? AS? '(' query ')' ; tableProvider diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 0048088f3ede..7378d10cc957 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -184,9 +184,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * * This is only used for Common Table Expressions. */ - override def visitNamedQuery(ctx: NamedQueryContext): SubqueryAlias = withOrigin(ctx) { - SubqueryAlias(ctx.name.getText, plan(ctx.query)) - } + override def visitNamedQuery(ctx: NamedQueryContext): SubqueryAlias = + withOrigin(ctx) { + val subQuery: LogicalPlan = plan(ctx.query).optionalMap(ctx.columnAliases)( + (columnAliases, plan) => + UnresolvedSubqueryColumnAliases(visitIdentifierList(columnAliases), plan) + ) + SubqueryAlias(ctx.name.getText, subQuery) + } /** * Create a logical plan which allows for multiple inserts using one 'from' statement. These diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 5d77309c2ed4..7107ace32adf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -545,6 +545,34 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } + test("SPARK-28002: CTE with column alias") { + checkAnswer( + sql(""" + |WITH t(x) AS (SELECT 1) + |SELECT * FROM t WHERE x = 1 + """.stripMargin), + Row(1) :: Nil + ) + } + + test("SPARK-28002: CTE with non-existing column alias") { + intercept[AnalysisException] { + sql(""" + |WITH t(x) AS (SELECT 1) + |SELECT * FROM t WHERE y = 1 + """.stripMargin) + } + } + + test("SPARK-28002: CTE with non-matching column alias") { + intercept[AnalysisException] { + sql(""" + |WITH t(x, y) AS (SELECT 1) + |SELECT * FROM t WHERE x = 1 + """.stripMargin) + } + } + test("date row") { checkAnswer(sql( """select cast("2015-01-28" as date) from testData limit 1"""), From f80011198337e90cad82c72460f1fd4e0bfa5b1d Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Wed, 12 Jun 2019 08:53:05 +0200 Subject: [PATCH 2/5] move analysis error tests to AnalysisSuite --- .../sql/catalyst/analysis/AnalysisSuite.scala | 13 +++++++++++++ .../org/apache/spark/sql/SQLQuerySuite.scala | 18 ------------------ 2 files changed, 13 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 80387336e164..b3f1cc31bb16 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan import org.apache.spark.sql.catalyst.plans.{Cross, Inner} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, @@ -633,4 +634,16 @@ class AnalysisSuite extends AnalysisTest with Matchers { val res = ViewAnalyzer.execute(view) comparePlans(res, expected) } + + test("SPARK-28002: CTE with non-existing column alias") { + assertAnalysisError(parsePlan("WITH t(x) AS (SELECT 1) SELECT * FROM t WHERE y = 1"), + Seq("cannot resolve '`y`' given input columns: [x]")) + } + + test("SPARK-28002: CTE with non-matching column alias") { + assertAnalysisError(parsePlan("WITH t(x, y) AS (SELECT 1) SELECT * FROM t WHERE x = 1"), + Seq("Number of column aliases does not match number of columns. Number of column aliases: " + + "2; number of columns: 1.")) + } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 7107ace32adf..b44be7701ab5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -555,24 +555,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { ) } - test("SPARK-28002: CTE with non-existing column alias") { - intercept[AnalysisException] { - sql(""" - |WITH t(x) AS (SELECT 1) - |SELECT * FROM t WHERE y = 1 - """.stripMargin) - } - } - - test("SPARK-28002: CTE with non-matching column alias") { - intercept[AnalysisException] { - sql(""" - |WITH t(x, y) AS (SELECT 1) - |SELECT * FROM t WHERE x = 1 - """.stripMargin) - } - } - test("date row") { checkAnswer(sql( """select cast("2015-01-28" as date) from testData limit 1"""), From 6ae91c5b9a0f99dad6eb6164c921751147139fd1 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Wed, 12 Jun 2019 21:23:43 +0200 Subject: [PATCH 3/5] fix review findings --- .../sql/catalyst/parser/AstBuilder.scala | 15 +++++------ .../sql/catalyst/analysis/AnalysisSuite.scala | 1 - .../sql/catalyst/parser/PlanParserSuite.scala | 27 ++++++++++++++++--- .../test/resources/sql-tests/inputs/cte.sql | 4 +++ .../resources/sql-tests/results/cte.sql.out | 19 +++++++++---- .../org/apache/spark/sql/SQLQuerySuite.scala | 10 ------- 6 files changed, 48 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 7378d10cc957..59d0aa49262d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -184,14 +184,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * * This is only used for Common Table Expressions. */ - override def visitNamedQuery(ctx: NamedQueryContext): SubqueryAlias = - withOrigin(ctx) { - val subQuery: LogicalPlan = plan(ctx.query).optionalMap(ctx.columnAliases)( - (columnAliases, plan) => - UnresolvedSubqueryColumnAliases(visitIdentifierList(columnAliases), plan) - ) - SubqueryAlias(ctx.name.getText, subQuery) - } + override def visitNamedQuery(ctx: NamedQueryContext): SubqueryAlias = withOrigin(ctx) { + val subQuery: LogicalPlan = plan(ctx.query).optionalMap(ctx.columnAliases)( + (columnAliases, plan) => + UnresolvedSubqueryColumnAliases(visitIdentifierList(columnAliases), plan) + ) + SubqueryAlias(ctx.name.getText, subQuery) + } /** * Create a logical plan which allows for multiple inserts using one 'from' statement. These diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index b3f1cc31bb16..ac2557064ad5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -645,5 +645,4 @@ class AnalysisSuite extends AnalysisTest with Matchers { Seq("Number of column aliases does not match number of columns. Number of column aliases: " + "2; number of columns: 1.")) } - } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index fba2a28c3fc3..c86b4bf95279 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -81,18 +81,31 @@ class PlanParserSuite extends AnalysisTest { assertEqual("select * from a intersect all select * from b", a.intersect(b, isAll = true)) } + private def cte(plan: LogicalPlan, namedPlans: (String, (LogicalPlan, Seq[String]))*): With = { + val ctes = namedPlans.map { + case (name, (cte, columnAliases)) => + val subquery = if (columnAliases.isEmpty) { + cte + } else { + UnresolvedSubqueryColumnAliases(columnAliases, cte) + } + name -> SubqueryAlias(name, subquery) + } + With(plan, ctes) + } + test("common table expressions") { assertEqual( "with cte1 as (select * from a) select * from cte1", - cte(table("cte1").select(star()), "cte1" -> table("a").select(star()))) + cte(table("cte1").select(star()), "cte1" -> ((table("a").select(star()), Seq.empty)))) assertEqual( "with cte1 (select 1) select * from cte1", - cte(table("cte1").select(star()), "cte1" -> OneRowRelation().select(1))) + cte(table("cte1").select(star()), "cte1" -> ((OneRowRelation().select(1), Seq.empty)))) assertEqual( "with cte1 (select 1), cte2 as (select * from cte1) select * from cte2", cte(table("cte2").select(star()), - "cte1" -> OneRowRelation().select(1), - "cte2" -> table("cte1").select(star()))) + "cte1" -> ((OneRowRelation().select(1), Seq.empty)), + "cte2" -> ((table("cte1").select(star()), Seq.empty)))) intercept( "with cte1 (select 1), cte1 as (select 1 from cte1) select * from cte1", "Found duplicate keys 'cte1'") @@ -818,4 +831,10 @@ class PlanParserSuite extends AnalysisTest { "SELECT /*+ BROADCAST(tab) */ * FROM testcat.db.tab", table("testcat", "db", "tab").select(star()).hint("BROADCAST", $"tab")) } + + test("SPARK-28002: CTE with column alias") { + assertEqual( + "WITH t(x) AS (SELECT c FROM a) SELECT * FROM t", + cte(table("t").select(star()), "t" -> ((table("a").select('c), Seq("x"))))) + } } diff --git a/sql/core/src/test/resources/sql-tests/inputs/cte.sql b/sql/core/src/test/resources/sql-tests/inputs/cte.sql index d34d89f23575..434f5d321878 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/cte.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/cte.sql @@ -24,6 +24,10 @@ SELECT t1.id AS c1, FROM CTE1 t1 CROSS JOIN CTE1 t2; +-- SPARK-28002: CTE with column alias +WITH t(x) AS (SELECT 1) +SELECT * FROM t WHERE x = 1; + -- Clean up DROP VIEW IF EXISTS t; DROP VIEW IF EXISTS t2; diff --git a/sql/core/src/test/resources/sql-tests/results/cte.sql.out b/sql/core/src/test/resources/sql-tests/results/cte.sql.out index a446c2cd183d..f8ccecbc46f4 100644 --- a/sql/core/src/test/resources/sql-tests/results/cte.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/cte.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 9 +-- Number of queries: 10 -- !query 0 @@ -89,16 +89,25 @@ struct -- !query 7 -DROP VIEW IF EXISTS t +WITH t(x) AS (SELECT 1) +SELECT * FROM t WHERE x = 1 -- !query 7 schema -struct<> +struct -- !query 7 output - +1 -- !query 8 -DROP VIEW IF EXISTS t2 +DROP VIEW IF EXISTS t -- !query 8 schema struct<> -- !query 8 output + + +-- !query 9 +DROP VIEW IF EXISTS t2 +-- !query 9 schema +struct<> +-- !query 9 output + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index b44be7701ab5..5d77309c2ed4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -545,16 +545,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } - test("SPARK-28002: CTE with column alias") { - checkAnswer( - sql(""" - |WITH t(x) AS (SELECT 1) - |SELECT * FROM t WHERE x = 1 - """.stripMargin), - Row(1) :: Nil - ) - } - test("date row") { checkAnswer(sql( """select cast("2015-01-28" as date) from testData limit 1"""), From 510eee10c9c8b5938cec8bbc867c576ff0080103 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 13 Jun 2019 08:16:43 +0200 Subject: [PATCH 4/5] fix comments --- .../apache/spark/sql/catalyst/analysis/AnalysisSuite.scala | 4 ++-- .../apache/spark/sql/catalyst/parser/PlanParserSuite.scala | 2 +- sql/core/src/test/resources/sql-tests/inputs/cte.sql | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index ac2557064ad5..b69f25b8a5d1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -635,12 +635,12 @@ class AnalysisSuite extends AnalysisTest with Matchers { comparePlans(res, expected) } - test("SPARK-28002: CTE with non-existing column alias") { + test("CTE with non-existing column alias") { assertAnalysisError(parsePlan("WITH t(x) AS (SELECT 1) SELECT * FROM t WHERE y = 1"), Seq("cannot resolve '`y`' given input columns: [x]")) } - test("SPARK-28002: CTE with non-matching column alias") { + test("CTE with non-matching column alias") { assertAnalysisError(parsePlan("WITH t(x, y) AS (SELECT 1) SELECT * FROM t WHERE x = 1"), Seq("Number of column aliases does not match number of columns. Number of column aliases: " + "2; number of columns: 1.")) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index c86b4bf95279..d7d79e44c64d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -832,7 +832,7 @@ class PlanParserSuite extends AnalysisTest { table("testcat", "db", "tab").select(star()).hint("BROADCAST", $"tab")) } - test("SPARK-28002: CTE with column alias") { + test("CTE with column alias") { assertEqual( "WITH t(x) AS (SELECT c FROM a) SELECT * FROM t", cte(table("t").select(star()), "t" -> ((table("a").select('c), Seq("x"))))) diff --git a/sql/core/src/test/resources/sql-tests/inputs/cte.sql b/sql/core/src/test/resources/sql-tests/inputs/cte.sql index 434f5d321878..822c5c4660e3 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/cte.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/cte.sql @@ -24,7 +24,7 @@ SELECT t1.id AS c1, FROM CTE1 t1 CROSS JOIN CTE1 t2; --- SPARK-28002: CTE with column alias +-- CTE with column alias WITH t(x) AS (SELECT 1) SELECT * FROM t WHERE x = 1; From 0a00a036148e898bd88bf3dd6e7b7ca0c67fa270 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 13 Jun 2019 08:29:01 +0200 Subject: [PATCH 5/5] fix compile error --- .../sql/catalyst/parser/PlanParserSuite.scala | 27 +++++++------------ 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index d7d79e44c64d..1d63f1b6ca83 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -42,10 +42,15 @@ class PlanParserSuite extends AnalysisTest { private def intercept(sqlCommand: String, messages: String*): Unit = interceptParseException(parsePlan)(sqlCommand, messages: _*) - private def cte(plan: LogicalPlan, namedPlans: (String, LogicalPlan)*): With = { + private def cte(plan: LogicalPlan, namedPlans: (String, (LogicalPlan, Seq[String]))*): With = { val ctes = namedPlans.map { - case (name, cte) => - name -> SubqueryAlias(name, cte) + case (name, (cte, columnAliases)) => + val subquery = if (columnAliases.isEmpty) { + cte + } else { + UnresolvedSubqueryColumnAliases(columnAliases, cte) + } + name -> SubqueryAlias(name, subquery) } With(plan, ctes) } @@ -81,19 +86,6 @@ class PlanParserSuite extends AnalysisTest { assertEqual("select * from a intersect all select * from b", a.intersect(b, isAll = true)) } - private def cte(plan: LogicalPlan, namedPlans: (String, (LogicalPlan, Seq[String]))*): With = { - val ctes = namedPlans.map { - case (name, (cte, columnAliases)) => - val subquery = if (columnAliases.isEmpty) { - cte - } else { - UnresolvedSubqueryColumnAliases(columnAliases, cte) - } - name -> SubqueryAlias(name, subquery) - } - With(plan, ctes) - } - test("common table expressions") { assertEqual( "with cte1 as (select * from a) select * from cte1", @@ -825,7 +817,8 @@ class PlanParserSuite extends AnalysisTest { |WITH cte1 AS (SELECT * FROM testcat.db.tab) |SELECT * FROM cte1 """.stripMargin, - cte(table("cte1").select(star()), "cte1" -> table("testcat", "db", "tab").select(star()))) + cte(table("cte1").select(star()), + "cte1" -> ((table("testcat", "db", "tab").select(star()), Seq.empty)))) assertEqual( "SELECT /*+ BROADCAST(tab) */ * FROM testcat.db.tab",