From 2014ac0dda3f7bec53210985a17d0c88b5338750 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Tue, 11 Jul 2017 18:46:57 +0900 Subject: [PATCH 1/5] Support subquery column aliases in FROM clause --- .../spark/sql/catalyst/parser/SqlBase.g4 | 2 +- .../sql/catalyst/analysis/Analyzer.scala | 24 ++++++++++++-- .../sql/catalyst/optimizer/subquery.scala | 6 ++-- .../sql/catalyst/parser/AstBuilder.scala | 15 +++++++-- .../plans/logical/basicLogicalOperators.scala | 31 +++++++++++++++++-- .../sql/catalyst/analysis/AnalysisSuite.scala | 19 ++++++++++++ .../sql/catalyst/parser/PlanParserSuite.scala | 10 ++++++ .../sql-tests/inputs/table-aliases.sql | 3 ++ .../sql-tests/results/table-aliases.sql.out | 10 +++++- .../sql/hive/HiveMetastoreCatalogSuite.scala | 2 +- .../sql/hive/execution/SQLQuerySuite.scala | 2 +- 11 files changed, 109 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index ef9f88a9026c9..4534b7dcf6399 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -474,7 +474,7 @@ identifierComment relationPrimary : tableIdentifier sample? tableAlias #tableName - | '(' queryNoWith ')' sample? (AS? strictIdentifier)? #aliasedQuery + | '(' queryNoWith ')' sample? tableAlias #aliasedQuery | '(' relation ')' sample? (AS? strictIdentifier)? #aliasedRelation | inlineTable #inlineTableDefault2 | functionTable #tableValuedFunction diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 501e7e3c6961d..d01e996517cea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -625,7 +625,7 @@ class Analyzer( execute(child) } view.copy(child = newChild) - case p @ SubqueryAlias(_, view: View) => + case p @ SubqueryAlias(_, view: View, _) => val newChild = resolveRelation(view) p.copy(child = newChild) case _ => plan @@ -859,6 +859,22 @@ class Analyzer( // rule: ResolveDeserializer. case plan if containsDeserializer(plan.expressions) => plan + case q @ SubqueryAlias(alias, child, Some(columnNames)) if child.resolved && !q.resolved => + // Resolves output attributes if a query has alias names in its subquery: + // e.g., SELECT * FROM (SELECT 1 AS a, 1 AS b) t(col1, col2) + val outputAttrs = child.output + // Checks if the number of the aliases equals to the number of output columns + // in the subquery. + if (columnNames.size != outputAttrs.size) { + q.failAnalysis(s"Number of column aliases does not match number of columns. " + + s"Number of column aliases: ${columnNames.size}; " + + s"number of columns: ${outputAttrs.size}.") + } + val aliases = outputAttrs.zip(columnNames).map { case (attr, ue) => + Alias(attr, ue.name)() + } + q.copy(outputColumnNames = Some(aliases)) + case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString}") q.transformExpressionsUp { @@ -2218,7 +2234,8 @@ class Analyzer( */ object EliminateSubqueryAliases extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { - case SubqueryAlias(_, child) => child + case SubqueryAlias(_, child, Some(columnNames)) => Project(columnNames, child) + case SubqueryAlias(_, child, _) => child } } @@ -2271,6 +2288,9 @@ object CleanupAliases extends Rule[LogicalPlan] { case o: ObjectProducer => o case a: AppendColumns => a + // Also, `SubqueryAlias` should never have extra aliases + case q: SubqueryAlias => q + case other => other transformExpressionsDown { case Alias(child, _) => child diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index 2a3e07aebe709..e0c9cfb3d37da 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -307,7 +307,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { // and Project operators, followed by an optional Filter, followed by an // Aggregate. Traverse the operators recursively. def evalPlan(lp : LogicalPlan) : Map[ExprId, Option[Any]] = lp match { - case SubqueryAlias(_, child) => evalPlan(child) + case SubqueryAlias(_, child, _) => evalPlan(child) case Filter(condition, child) => val bindings = evalPlan(child) if (bindings.isEmpty) bindings @@ -365,7 +365,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { topPart += p bottomPart = child - case s @ SubqueryAlias(_, child) => + case s @ SubqueryAlias(_, child, _) => topPart += s bottomPart = child @@ -436,7 +436,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { topPart.reverse.foreach { case Project(projList, _) => subqueryRoot = Project(projList ++ havingInputs, subqueryRoot) - case s @ SubqueryAlias(alias, _) => + case s @ SubqueryAlias(alias, _, _) => subqueryRoot = SubqueryAlias(alias, subqueryRoot) case op => sys.error(s"Unexpected operator $op in corelated subquery") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 45c1d3d430e0d..2261912e88e01 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -753,17 +753,26 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * hooks. */ override def visitAliasedQuery(ctx: AliasedQueryContext): LogicalPlan = withOrigin(ctx) { - val alias = if (ctx.strictIdentifier == null) { + val alias = if (ctx.tableAlias.strictIdentifier == null) { // For un-aliased subqueries, use a default alias name that is not likely to conflict with // normal subquery names, so that parent operators can only access the columns in subquery by // unqualified names. Users can still use this special qualifier to access columns if they // know it, but that's not recommended. "__auto_generated_subquery_name" } else { - ctx.strictIdentifier.getText + ctx.tableAlias.strictIdentifier.getText } - SubqueryAlias(alias, plan(ctx.queryNoWith).optionalMap(ctx.sample)(withSample)) + val columnNamesOption = if (ctx.tableAlias.identifierList != null) { + Some(visitIdentifierList(ctx.tableAlias.identifierList)).map { names => + names.map(n => UnresolvedAttribute(n :: Nil)) + } + } else { + None + } + + SubqueryAlias(alias, plan(ctx.queryNoWith).optionalMap(ctx.sample)(withSample), + columnNamesOption) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 0bd3166352d35..98d4c85f19125 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -677,19 +677,44 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo } /** - * Aliased subquery. + * Aliased subquery. We could add alias names for output columns in the subquery: + * {{{ + * // Assign alias names for output columns + * SELECT col1, col2 FROM testData AS t(col1, col2); + * }}} * * @param alias the alias name for this subquery. * @param child the logical plan of this subquery. + * @param outputColumnNames the column names for this subquery. */ case class SubqueryAlias( alias: String, - child: LogicalPlan) + child: LogicalPlan, + outputColumnNames: Option[Seq[NamedExpression]] = None) extends UnaryNode { override lazy val canonicalized: LogicalPlan = child.canonicalized - override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias))) + override def validConstraints: Set[Expression] = outputColumnNames.map { exprs => + child.constraints.union(getAliasedConstraints(exprs)) + }.getOrElse { + Set.empty + } + + override def output: Seq[Attribute] = { + val attrs = outputColumnNames.map { exprs => + exprs.map(_.toAttribute) + }.getOrElse { + child.output + } + attrs.map(_.withQualifier(Some(alias))) + } + + override def simpleString: String = statePrefix + outputColumnNames.map { names => + s"SubqueryAlias $alias${Utils.truncatedString(names, "(", ", ", ")")}" + }.getOrElse { + s"SubqueryAlias $alias" + } } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index be26b1b26f175..89434c33b186d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -470,4 +470,23 @@ class AnalysisSuite extends AnalysisTest with ShouldMatchers { Seq("Number of column aliases does not match number of columns. Table name: TaBlE3; " + "number of column aliases: 5; number of columns: 4.")) } + + test("SPARK-20962 Support subquery column aliases in FROM clause") { + def tableColumnsWithAliases(outputNames: Seq[String]): LogicalPlan = { + SubqueryAlias( + "t", + UnresolvedRelation(TableIdentifier("TaBlE3")).select(star()), + Some(outputNames.map(n => UnresolvedAttribute(n :: Nil))) + ).select(star()) + } + assertAnalysisSuccess(tableColumnsWithAliases("col1" :: "col2" :: "col3" :: "col4" :: Nil)) + assertAnalysisError( + tableColumnsWithAliases("col1" :: Nil), + Seq("Number of column aliases does not match number of columns. " + + "Number of column aliases: 1; number of columns: 4.")) + assertAnalysisError( + tableColumnsWithAliases("col1" :: "col2" :: "col3" :: "col4" :: "col5" :: Nil), + Seq("Number of column aliases does not match number of columns. " + + "Number of column aliases: 5; number of columns: 4.")) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 6dad097041a15..f299d6c72ecd7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -495,6 +495,16 @@ class PlanParserSuite extends AnalysisTest { .select(star())) } + test("SPARK-20962 Support subquery column aliases in FROM clause") { + assertEqual( + "SELECT * FROM (SELECT a AS x, b AS y FROM t) t(col1, col2)", + SubqueryAlias( + "t", + UnresolvedRelation(TableIdentifier("t")).select('a.as("x"), 'b.as("y")), + Some(Seq("col1", "col2").map(n => UnresolvedAttribute(n :: Nil))) + ).select(star())) + } + test("inline table") { assertEqual("values 1, 2, 3, 4", UnresolvedInlineTable(Seq("col1"), Seq(1, 2, 3, 4).map(x => Seq(Literal(x))))) diff --git a/sql/core/src/test/resources/sql-tests/inputs/table-aliases.sql b/sql/core/src/test/resources/sql-tests/inputs/table-aliases.sql index c90a9c7f85587..85481cbbf9377 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/table-aliases.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/table-aliases.sql @@ -15,3 +15,6 @@ SELECT * FROM testData AS t(col1); -- Check alias duplication SELECT a AS col1, b AS col2 FROM testData AS t(c, d); + +-- Subquery aliases in FROM clause +SELECT * FROM (SELECT 1 AS a, 1 AS b) t(col1, col2); diff --git a/sql/core/src/test/resources/sql-tests/results/table-aliases.sql.out b/sql/core/src/test/resources/sql-tests/results/table-aliases.sql.out index 7abbcd834a523..4459f3186c77b 100644 --- a/sql/core/src/test/resources/sql-tests/results/table-aliases.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/table-aliases.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 7 +-- Number of queries: 8 -- !query 0 @@ -61,3 +61,11 @@ struct<> -- !query 6 output org.apache.spark.sql.AnalysisException cannot resolve '`a`' given input columns: [t.c, t.d]; line 1 pos 7 + + +-- !query 7 +SELECT * FROM (SELECT 1 AS a, 1 AS b) t(col1, col2) +-- !query 7 schema +struct +-- !query 7 output +1 1 diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 8140f883ee542..51b304f22f9e3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -62,7 +62,7 @@ class HiveMetastoreCatalogSuite extends TestHiveSingleton with SQLTestUtils { spark.sql("create view vw1 as select 1 as id") val plan = spark.sql("select id from vw1").queryExecution.analyzed val aliases = plan.collect { - case x @ SubqueryAlias("vw1", _) => x + case x @ SubqueryAlias("vw1", _, _) => x } assert(aliases.size == 1) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index a949e5e829e14..ae743f04f8823 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -948,7 +948,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { withSQLConf(SQLConf.CONVERT_CTAS.key -> "false") { sql("CREATE TABLE explodeTest (key bigInt)") table("explodeTest").queryExecution.analyzed match { - case SubqueryAlias(_, r: CatalogRelation) => // OK + case SubqueryAlias(_, r: CatalogRelation, _) => // OK case _ => fail("To correctly test the fix of SPARK-5875, explodeTest should be a MetastoreRelation") } From d971d35d810e9284f6c1dee3f02820d10b4b5159 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 24 Jul 2017 11:18:05 +0900 Subject: [PATCH 2/5] Add a DDL syntax example --- .../org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 2261912e88e01..5a5ac36694d7f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -750,7 +750,10 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging /** * Create an alias (SubqueryAlias) for a sub-query. This is practically the same as * visitAliasedRelation and visitNamedExpression, ANTLR4 however requires us to use 3 different - * hooks. + * hooks. We could add alias names for output columns, for example: + * {{{ + * SELECT col1, col2 FROM testData AS t(col1, col2) + * }}} */ override def visitAliasedQuery(ctx: AliasedQueryContext): LogicalPlan = withOrigin(ctx) { val alias = if (ctx.tableAlias.strictIdentifier == null) { From 5b272e427f68be115e5aef1bc595250e6c80d900 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 24 Jul 2017 16:22:45 +0900 Subject: [PATCH 3/5] Add UnresolvedSubqueryColumnAlias --- .../sql/catalyst/analysis/Analyzer.scala | 18 +++++------ .../sql/catalyst/analysis/unresolved.scala | 22 ++++++++++++- .../sql/catalyst/optimizer/subquery.scala | 6 ++-- .../sql/catalyst/parser/AstBuilder.scala | 14 +++------ .../plans/logical/basicLogicalOperators.scala | 31 ++----------------- .../sql/catalyst/analysis/AnalysisSuite.scala | 9 +++--- .../sql/catalyst/parser/PlanParserSuite.scala | 11 ++++--- .../sql/hive/HiveMetastoreCatalogSuite.scala | 2 +- .../sql/hive/execution/SQLQuerySuite.scala | 2 +- 9 files changed, 52 insertions(+), 63 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d01e996517cea..73dd992ead351 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -625,7 +625,7 @@ class Analyzer( execute(child) } view.copy(child = newChild) - case p @ SubqueryAlias(_, view: View, _) => + case p @ SubqueryAlias(_, view: View) => val newChild = resolveRelation(view) p.copy(child = newChild) case _ => plan @@ -859,21 +859,21 @@ class Analyzer( // rule: ResolveDeserializer. case plan if containsDeserializer(plan.expressions) => plan - case q @ SubqueryAlias(alias, child, Some(columnNames)) if child.resolved && !q.resolved => + case u @ UnresolvedSubqueryColumnAlias(columnNames, child) if child.resolved => // Resolves output attributes if a query has alias names in its subquery: // e.g., SELECT * FROM (SELECT 1 AS a, 1 AS b) t(col1, col2) val outputAttrs = child.output // Checks if the number of the aliases equals to the number of output columns // in the subquery. if (columnNames.size != outputAttrs.size) { - q.failAnalysis(s"Number of column aliases does not match number of columns. " + + u.failAnalysis(s"Number of column aliases does not match number of columns. " + s"Number of column aliases: ${columnNames.size}; " + s"number of columns: ${outputAttrs.size}.") } - val aliases = outputAttrs.zip(columnNames).map { case (attr, ue) => - Alias(attr, ue.name)() + val aliases = outputAttrs.zip(columnNames).map { case (attr, aliasName) => + Alias(attr, aliasName)() } - q.copy(outputColumnNames = Some(aliases)) + Project(aliases, child) case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString}") @@ -2234,8 +2234,7 @@ class Analyzer( */ object EliminateSubqueryAliases extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { - case SubqueryAlias(_, child, Some(columnNames)) => Project(columnNames, child) - case SubqueryAlias(_, child, _) => child + case SubqueryAlias(_, child) => child } } @@ -2288,9 +2287,6 @@ object CleanupAliases extends Rule[LogicalPlan] { case o: ObjectProducer => o case a: AppendColumns => a - // Also, `SubqueryAlias` should never have extra aliases - case q: SubqueryAlias => q - case other => other transformExpressionsDown { case Alias(child, _) => child diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index fb322697c7c68..61c3960d21759 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, ExprCode} import org.apache.spark.sql.catalyst.parser.ParserUtils -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode} import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.types.{DataType, Metadata, StructType} @@ -422,6 +422,26 @@ case class UnresolvedAlias( override lazy val resolved = false } +/** + * Aliased column names for subquery. We could add alias names for output columns in the subquery: + * {{{ + * // Assign alias names for output columns + * SELECT col1, col2 FROM testData AS t(col1, col2); + * }}} + * + * @param outputColumnNames the column names for this subquery. + * @param child the logical plan of this subquery. + */ +case class UnresolvedSubqueryColumnAlias( + outputColumnNames: Seq[String], + child: LogicalPlan) + extends UnaryNode { + + override def output: Seq[Attribute] = Nil + + override lazy val resolved = false +} + /** * Holds the deserializer expression and the attributes that are available during the resolution * for it. Deserializer expression is a special kind of expression that is not always resolved by diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index e0c9cfb3d37da..2a3e07aebe709 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -307,7 +307,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { // and Project operators, followed by an optional Filter, followed by an // Aggregate. Traverse the operators recursively. def evalPlan(lp : LogicalPlan) : Map[ExprId, Option[Any]] = lp match { - case SubqueryAlias(_, child, _) => evalPlan(child) + case SubqueryAlias(_, child) => evalPlan(child) case Filter(condition, child) => val bindings = evalPlan(child) if (bindings.isEmpty) bindings @@ -365,7 +365,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { topPart += p bottomPart = child - case s @ SubqueryAlias(_, child, _) => + case s @ SubqueryAlias(_, child) => topPart += s bottomPart = child @@ -436,7 +436,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { topPart.reverse.foreach { case Project(projList, _) => subqueryRoot = Project(projList ++ havingInputs, subqueryRoot) - case s @ SubqueryAlias(alias, _, _) => + case s @ SubqueryAlias(alias, _) => subqueryRoot = SubqueryAlias(alias, subqueryRoot) case op => sys.error(s"Unexpected operator $op in corelated subquery") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 5a5ac36694d7f..e607d27178cc4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -765,17 +765,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } else { ctx.tableAlias.strictIdentifier.getText } - - val columnNamesOption = if (ctx.tableAlias.identifierList != null) { - Some(visitIdentifierList(ctx.tableAlias.identifierList)).map { names => - names.map(n => UnresolvedAttribute(n :: Nil)) - } + val subquery = SubqueryAlias(alias, plan(ctx.queryNoWith).optionalMap(ctx.sample)(withSample)) + if (ctx.tableAlias.identifierList != null) { + val columnNames = visitIdentifierList(ctx.tableAlias.identifierList) + UnresolvedSubqueryColumnAlias(columnNames, subquery) } else { - None + subquery } - - SubqueryAlias(alias, plan(ctx.queryNoWith).optionalMap(ctx.sample)(withSample), - columnNamesOption) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 98d4c85f19125..0bd3166352d35 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -677,44 +677,19 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo } /** - * Aliased subquery. We could add alias names for output columns in the subquery: - * {{{ - * // Assign alias names for output columns - * SELECT col1, col2 FROM testData AS t(col1, col2); - * }}} + * Aliased subquery. * * @param alias the alias name for this subquery. * @param child the logical plan of this subquery. - * @param outputColumnNames the column names for this subquery. */ case class SubqueryAlias( alias: String, - child: LogicalPlan, - outputColumnNames: Option[Seq[NamedExpression]] = None) + child: LogicalPlan) extends UnaryNode { override lazy val canonicalized: LogicalPlan = child.canonicalized - override def validConstraints: Set[Expression] = outputColumnNames.map { exprs => - child.constraints.union(getAliasedConstraints(exprs)) - }.getOrElse { - Set.empty - } - - override def output: Seq[Attribute] = { - val attrs = outputColumnNames.map { exprs => - exprs.map(_.toAttribute) - }.getOrElse { - child.output - } - attrs.map(_.withQualifier(Some(alias))) - } - - override def simpleString: String = statePrefix + outputColumnNames.map { names => - s"SubqueryAlias $alias${Utils.truncatedString(names, "(", ", ", ")")}" - }.getOrElse { - s"SubqueryAlias $alias" - } + override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias))) } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 89434c33b186d..16fc8207a991f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -473,10 +473,11 @@ class AnalysisSuite extends AnalysisTest with ShouldMatchers { test("SPARK-20962 Support subquery column aliases in FROM clause") { def tableColumnsWithAliases(outputNames: Seq[String]): LogicalPlan = { - SubqueryAlias( - "t", - UnresolvedRelation(TableIdentifier("TaBlE3")).select(star()), - Some(outputNames.map(n => UnresolvedAttribute(n :: Nil))) + UnresolvedSubqueryColumnAlias( + outputNames, + SubqueryAlias( + "t", + UnresolvedRelation(TableIdentifier("TaBlE3"))) ).select(star()) } assertAnalysisSuccess(tableColumnsWithAliases("col1" :: "col2" :: "col3" :: "col4" :: Nil)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index f299d6c72ecd7..47b9cc987c97a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.parser import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAttribute, UnresolvedFunction, UnresolvedGenerator, UnresolvedInlineTable, UnresolvedRelation, UnresolvedTableValuedFunction} +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAttribute, UnresolvedFunction, UnresolvedGenerator, UnresolvedInlineTable, UnresolvedRelation, UnresolvedSubqueryColumnAlias, UnresolvedTableValuedFunction} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -498,10 +498,11 @@ class PlanParserSuite extends AnalysisTest { test("SPARK-20962 Support subquery column aliases in FROM clause") { assertEqual( "SELECT * FROM (SELECT a AS x, b AS y FROM t) t(col1, col2)", - SubqueryAlias( - "t", - UnresolvedRelation(TableIdentifier("t")).select('a.as("x"), 'b.as("y")), - Some(Seq("col1", "col2").map(n => UnresolvedAttribute(n :: Nil))) + UnresolvedSubqueryColumnAlias( + Seq("col1", "col2"), + SubqueryAlias( + "t", + UnresolvedRelation(TableIdentifier("t")).select('a.as("x"), 'b.as("y"))) ).select(star())) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 51b304f22f9e3..8140f883ee542 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -62,7 +62,7 @@ class HiveMetastoreCatalogSuite extends TestHiveSingleton with SQLTestUtils { spark.sql("create view vw1 as select 1 as id") val plan = spark.sql("select id from vw1").queryExecution.analyzed val aliases = plan.collect { - case x @ SubqueryAlias("vw1", _, _) => x + case x @ SubqueryAlias("vw1", _) => x } assert(aliases.size == 1) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index ae743f04f8823..a949e5e829e14 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -948,7 +948,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { withSQLConf(SQLConf.CONVERT_CTAS.key -> "false") { sql("CREATE TABLE explodeTest (key bigInt)") table("explodeTest").queryExecution.analyzed match { - case SubqueryAlias(_, r: CatalogRelation, _) => // OK + case SubqueryAlias(_, r: CatalogRelation) => // OK case _ => fail("To correctly test the fix of SPARK-5875, explodeTest should be a MetastoreRelation") } From 23ca897825a51baa1b879c3b7968749199e8724f Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Fri, 28 Jul 2017 08:58:32 +0900 Subject: [PATCH 4/5] Apply xiao's comments --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 ++-- .../apache/spark/sql/catalyst/analysis/unresolved.scala | 7 ++++--- .../org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 4 ++-- .../apache/spark/sql/catalyst/analysis/AnalysisSuite.scala | 2 +- .../apache/spark/sql/catalyst/parser/PlanParserSuite.scala | 4 ++-- 5 files changed, 11 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 73dd992ead351..3a03a8f0f59f8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -859,14 +859,14 @@ class Analyzer( // rule: ResolveDeserializer. case plan if containsDeserializer(plan.expressions) => plan - case u @ UnresolvedSubqueryColumnAlias(columnNames, child) if child.resolved => + case u @ UnresolvedSubqueryColumnAliases(columnNames, child) if child.resolved => // Resolves output attributes if a query has alias names in its subquery: // e.g., SELECT * FROM (SELECT 1 AS a, 1 AS b) t(col1, col2) val outputAttrs = child.output // Checks if the number of the aliases equals to the number of output columns // in the subquery. if (columnNames.size != outputAttrs.size) { - u.failAnalysis(s"Number of column aliases does not match number of columns. " + + u.failAnalysis("Number of column aliases does not match number of columns. " + s"Number of column aliases: ${columnNames.size}; " + s"number of columns: ${outputAttrs.size}.") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 61c3960d21759..b7a704dc8453a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -423,16 +423,17 @@ case class UnresolvedAlias( } /** - * Aliased column names for subquery. We could add alias names for output columns in the subquery: + * Aliased column names resolved by positions for subquery. We could add alias names for output + * columns in the subquery: * {{{ * // Assign alias names for output columns * SELECT col1, col2 FROM testData AS t(col1, col2); * }}} * - * @param outputColumnNames the column names for this subquery. + * @param outputColumnNames the [[LogicalPlan]] on which this subquery column aliases apply. * @param child the logical plan of this subquery. */ -case class UnresolvedSubqueryColumnAlias( +case class UnresolvedSubqueryColumnAliases( outputColumnNames: Seq[String], child: LogicalPlan) extends UnaryNode { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index e607d27178cc4..198f839d2beef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -767,8 +767,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } val subquery = SubqueryAlias(alias, plan(ctx.queryNoWith).optionalMap(ctx.sample)(withSample)) if (ctx.tableAlias.identifierList != null) { - val columnNames = visitIdentifierList(ctx.tableAlias.identifierList) - UnresolvedSubqueryColumnAlias(columnNames, subquery) + val columnAliases = visitIdentifierList(ctx.tableAlias.identifierList) + UnresolvedSubqueryColumnAliases(columnAliases, subquery) } else { subquery } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 16fc8207a991f..9bcf4773fa903 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -473,7 +473,7 @@ class AnalysisSuite extends AnalysisTest with ShouldMatchers { test("SPARK-20962 Support subquery column aliases in FROM clause") { def tableColumnsWithAliases(outputNames: Seq[String]): LogicalPlan = { - UnresolvedSubqueryColumnAlias( + UnresolvedSubqueryColumnAliases( outputNames, SubqueryAlias( "t", diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 47b9cc987c97a..446b9073dff10 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.parser import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAttribute, UnresolvedFunction, UnresolvedGenerator, UnresolvedInlineTable, UnresolvedRelation, UnresolvedSubqueryColumnAlias, UnresolvedTableValuedFunction} +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAttribute, UnresolvedFunction, UnresolvedGenerator, UnresolvedInlineTable, UnresolvedRelation, UnresolvedSubqueryColumnAliases, UnresolvedTableValuedFunction} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -498,7 +498,7 @@ class PlanParserSuite extends AnalysisTest { test("SPARK-20962 Support subquery column aliases in FROM clause") { assertEqual( "SELECT * FROM (SELECT a AS x, b AS y FROM t) t(col1, col2)", - UnresolvedSubqueryColumnAlias( + UnresolvedSubqueryColumnAliases( Seq("col1", "col2"), SubqueryAlias( "t", From 277d6ee40f9bb111799171fae5632d0030503e19 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sat, 29 Jul 2017 10:20:55 +0900 Subject: [PATCH 5/5] Add ResolveSubqueryColumnAliases rule --- .../sql/catalyst/analysis/Analyzer.scala | 41 +++++++++++-------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 3a03a8f0f59f8..7ea7158f29a39 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -141,6 +141,7 @@ class Analyzer( ResolveFunctions :: ResolveAliases :: ResolveSubquery :: + ResolveSubqueryColumnAliases :: ResolveWindowOrder :: ResolveWindowFrame :: ResolveNaturalAndUsingJoin :: @@ -859,22 +860,6 @@ class Analyzer( // rule: ResolveDeserializer. case plan if containsDeserializer(plan.expressions) => plan - case u @ UnresolvedSubqueryColumnAliases(columnNames, child) if child.resolved => - // Resolves output attributes if a query has alias names in its subquery: - // e.g., SELECT * FROM (SELECT 1 AS a, 1 AS b) t(col1, col2) - val outputAttrs = child.output - // Checks if the number of the aliases equals to the number of output columns - // in the subquery. - if (columnNames.size != outputAttrs.size) { - u.failAnalysis("Number of column aliases does not match number of columns. " + - s"Number of column aliases: ${columnNames.size}; " + - s"number of columns: ${outputAttrs.size}.") - } - val aliases = outputAttrs.zip(columnNames).map { case (attr, aliasName) => - Alias(attr, aliasName)() - } - Project(aliases, child) - case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString}") q.transformExpressionsUp { @@ -1339,6 +1324,30 @@ class Analyzer( } } + /** + * Replaces unresolved column aliases for a subquery with projections. + */ + object ResolveSubqueryColumnAliases extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { + case u @ UnresolvedSubqueryColumnAliases(columnNames, child) if child.resolved => + // Resolves output attributes if a query has alias names in its subquery: + // e.g., SELECT * FROM (SELECT 1 AS a, 1 AS b) t(col1, col2) + val outputAttrs = child.output + // Checks if the number of the aliases equals to the number of output columns + // in the subquery. + if (columnNames.size != outputAttrs.size) { + u.failAnalysis("Number of column aliases does not match number of columns. " + + s"Number of column aliases: ${columnNames.size}; " + + s"number of columns: ${outputAttrs.size}.") + } + val aliases = outputAttrs.zip(columnNames).map { case (attr, aliasName) => + Alias(attr, aliasName)() + } + Project(aliases, child) + } + } + /** * Turns projections that contain aggregate expressions into aggregations. */