diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 3fd5039a4f11..56cc2a274bb7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -799,6 +799,7 @@ class Analyzer( def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp { case u: UnresolvedRelation => lookupV2Relation(u.multipartIdentifier) + .map(SubqueryAlias(u.multipartIdentifier, _)) .getOrElse(u) case u @ UnresolvedTable(NonSessionCatalogAndIdentifier(catalog, ident)) => @@ -923,7 +924,9 @@ class Analyzer( case v1Table: V1Table => v1SessionCatalog.getRelation(v1Table.v1Table) case table => - DataSourceV2Relation.create(table, Some(catalog), Some(ident)) + SubqueryAlias( + identifier, + DataSourceV2Relation.create(table, Some(catalog), Some(ident))) } val key = catalog.name +: ident.namespace :+ ident.name Option(AnalysisContext.get.relationCache.getOrElseUpdate(key, loaded.orNull)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 3362353e2662..02e90f8458c3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -236,8 +236,6 @@ case class AttributeReference( val qualifier: Seq[String] = Seq.empty[String]) extends Attribute with Unevaluable { - // currently can only handle qualifier of length 2 - require(qualifier.length <= 2) /** * Returns true iff the expression id is the same for both attributes. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala index 7164b6b82adb..9f42e643e4cb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala @@ -23,7 +23,6 @@ import com.google.common.collect.Maps import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute} -import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.{StructField, StructType} /** @@ -153,13 +152,19 @@ package object expressions { unique(grouped) } - /** Perform attribute resolution given a name and a resolver. */ - def resolve(nameParts: Seq[String], resolver: Resolver): Option[NamedExpression] = { + /** Returns true if all qualifiers in `attrs` have 2 or less parts. */ + @transient private val hasTwoOrLessQualifierParts: Boolean = + attrs.forall(_.qualifier.length <= 2) + + /** Match attributes for the case where all qualifiers in `attrs` have 2 or less parts. */ + private def matchWithTwoOrLessQualifierParts( + nameParts: Seq[String], + resolver: Resolver): (Seq[Attribute], Seq[String]) = { // Collect matching attributes given a name and a lookup. def collectMatches(name: String, candidates: Option[Seq[Attribute]]): Seq[Attribute] = { - candidates.toSeq.flatMap(_.collect { + candidates.getOrElse(Nil).collect { case a if resolver(a.name, name) => a.withName(name) - }) + } } // Find matches for the given name assuming that the 1st two parts are qualifier @@ -204,13 +209,79 @@ package object expressions { // If none of attributes match database.table.column pattern or // `table.column` pattern, we try to resolve it as a column. - val (candidates, nestedFields) = matches match { + matches match { case (Seq(), _) => val name = nameParts.head val attributes = collectMatches(name, direct.get(name.toLowerCase(Locale.ROOT))) (attributes, nameParts.tail) case _ => matches } + } + + /** + * Match attributes for the case where at least one qualifier in `attrs` has more than 2 parts. + */ + private def matchWithThreeOrMoreQualifierParts( + nameParts: Seq[String], + resolver: Resolver): (Seq[Attribute], Seq[String]) = { + // Returns true if the `short` qualifier is a subset of the last elements of + // `long` qualifier. For example, Seq("a", "b") is a subset of Seq("a", "a", "b"), + // but not a subset of Seq("a", "b", "b"). + def matchQualifier(short: Seq[String], long: Seq[String]): Boolean = { + (long.length >= short.length) && + long.takeRight(short.length) + .zip(short) + .forall(x => resolver(x._1, x._2)) + } + + // Collect attributes that match the given name and qualifier. + // A match occurs if + // 1) the given name matches the attribute's name according to the resolver. + // 2) the given qualifier is a subset of the attribute's qualifier. + def collectMatches( + name: String, + qualifier: Seq[String], + candidates: Option[Seq[Attribute]]): Seq[Attribute] = { + candidates.getOrElse(Nil).collect { + case a if resolver(name, a.name) && matchQualifier(qualifier, a.qualifier) => + a.withName(name) + } + } + + // Iterate each string in `nameParts` in a reverse order and try to match the attributes + // considering the current string as the attribute name. For example, if `nameParts` is + // Seq("a", "b", "c"), the match will be performed in the following order: + // 1) name = "c", qualifier = Seq("a", "b") + // 2) name = "b", qualifier = Seq("a") + // 3) name = "a", qualifier = Seq() + // Note that the match is performed in the reverse order in order to match the longest + // qualifier as possible. If a match is found, the remaining portion of `nameParts` + // is also returned as nested fields. + var candidates: Seq[Attribute] = Nil + var nestedFields: Seq[String] = Nil + var i = nameParts.length - 1 + while (i >= 0 && candidates.isEmpty) { + val name = nameParts(i) + candidates = collectMatches( + name, + nameParts.take(i), + direct.get(name.toLowerCase(Locale.ROOT))) + if (candidates.nonEmpty) { + nestedFields = nameParts.takeRight(nameParts.length - i - 1) + } + i -= 1 + } + + (candidates, nestedFields) + } + + /** Perform attribute resolution given a name and a resolver. */ + def resolve(nameParts: Seq[String], resolver: Resolver): Option[NamedExpression] = { + val (candidates, nestedFields) = if (hasTwoOrLessQualifierParts) { + matchWithTwoOrLessQualifierParts(nameParts, resolver) + } else { + matchWithThreeOrMoreQualifierParts(nameParts, resolver) + } def name = UnresolvedAttribute(nameParts).name candidates match { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala index deceec73dda3..c574a20da0b5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala @@ -49,19 +49,21 @@ sealed trait IdentifierWithDatabase { /** * Encapsulates an identifier that is either a alias name or an identifier that has table - * name and optionally a database name. + * name and a qualifier. * The SubqueryAlias node keeps track of the qualifier using the information in this structure - * @param identifier - Is an alias name or a table name - * @param database - Is a database name and is optional + * @param name - Is an alias name or a table name + * @param qualifier - Is a qualifier */ -case class AliasIdentifier(identifier: String, database: Option[String]) - extends IdentifierWithDatabase { +case class AliasIdentifier(name: String, qualifier: Seq[String]) { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + def this(identifier: String) = this(identifier, Seq()) - def this(identifier: String) = this(identifier, None) + override def toString: String = (qualifier :+ name).quoted } object AliasIdentifier { - def apply(identifier: String): AliasIdentifier = new AliasIdentifier(identifier) + def apply(name: String): AliasIdentifier = new AliasIdentifier(name) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 40db8b6f49dc..54e5ff7aeb75 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString +import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.types._ import org.apache.spark.util.random.RandomSampler @@ -849,18 +850,18 @@ case class Tail(limitExpr: Expression, child: LogicalPlan) extends OrderPreservi /** * Aliased subquery. * - * @param name the alias identifier for this subquery. + * @param identifier the alias identifier for this subquery. * @param child the logical plan of this subquery. */ case class SubqueryAlias( - name: AliasIdentifier, + identifier: AliasIdentifier, child: LogicalPlan) extends OrderPreservingUnaryNode { - def alias: String = name.identifier + def alias: String = identifier.name override def output: Seq[Attribute] = { - val qualifierList = name.database.map(Seq(_, alias)).getOrElse(Seq(alias)) + val qualifierList = identifier.qualifier :+ alias child.output.map(_.withQualifier(qualifierList)) } override def doCanonicalize(): LogicalPlan = child.canonicalized @@ -877,7 +878,13 @@ object SubqueryAlias { identifier: String, database: String, child: LogicalPlan): SubqueryAlias = { - SubqueryAlias(AliasIdentifier(identifier, Some(database)), child) + SubqueryAlias(AliasIdentifier(identifier, Seq(database)), child) + } + + def apply( + multipartIdentifier: Seq[String], + child: LogicalPlan): SubqueryAlias = { + SubqueryAlias(AliasIdentifier(multipartIdentifier.last, multipartIdentifier.init), child) } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index ba1eeb38e247..56a198763b4e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -27,7 +27,7 @@ import org.json4s.JsonAST._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ -import org.apache.spark.sql.catalyst.IdentifierWithDatabase +import org.apache.spark.sql.catalyst.{AliasIdentifier, IdentifierWithDatabase} import org.apache.spark.sql.catalyst.ScalaReflection._ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, FunctionResource} import org.apache.spark.sql.catalyst.errors._ @@ -780,6 +780,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { case exprId: ExprId => true case field: StructField => true case id: IdentifierWithDatabase => true + case alias: AliasIdentifier => true case join: JoinType => true case spec: BucketSpec => true case catalog: CatalogTable => true diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeResolutionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeResolutionSuite.scala new file mode 100644 index 000000000000..813a68f68451 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeResolutionSuite.scala @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.{caseInsensitiveResolution, caseSensitiveResolution} +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} + +class AttributeResolutionSuite extends SparkFunSuite { + val resolver = caseInsensitiveResolution + + test("basic attribute resolution with namespaces") { + val attrs = Seq( + AttributeReference("a", IntegerType)(qualifier = Seq("ns1", "ns2", "t1")), + AttributeReference("a", IntegerType)(qualifier = Seq("ns1", "ns2", "ns3", "t2"))) + + // Try to match attribute reference with name "a" with qualifier "ns1.ns2.t1". + Seq(Seq("t1", "a"), Seq("ns2", "t1", "a"), Seq("ns1", "ns2", "t1", "a")).foreach { nameParts => + attrs.resolve(nameParts, resolver) match { + case Some(attr) => assert(attr.semanticEquals(attrs(0))) + case _ => fail() + } + } + + // Non-matching cases + Seq(Seq("ns1", "ns2", "t1"), Seq("ns2", "a")).foreach { nameParts => + assert(attrs.resolve(nameParts, resolver).isEmpty) + } + } + + test("attribute resolution where table and attribute names are the same") { + val attrs = Seq(AttributeReference("t", IntegerType)(qualifier = Seq("ns1", "ns2", "t"))) + // Matching cases + Seq( + Seq("t"), Seq("t", "t"), Seq("ns2", "t", "t"), Seq("ns1", "ns2", "t", "t") + ).foreach { nameParts => + attrs.resolve(nameParts, resolver) match { + case Some(attr) => assert(attr.semanticEquals(attrs(0))) + case _ => fail() + } + } + + // Non-matching case + assert(attrs.resolve(Seq("ns1", "ns2", "t"), resolver).isEmpty) + } + + test("attribute resolution ambiguity at the attribute name level") { + val attrs = Seq( + AttributeReference("a", IntegerType)(qualifier = Seq("ns1", "t1")), + AttributeReference("a", IntegerType)(qualifier = Seq("ns1", "ns2", "t2"))) + + val ex = intercept[AnalysisException] { + attrs.resolve(Seq("a"), resolver) + } + assert(ex.getMessage.contains( + "Reference 'a' is ambiguous, could be: ns1.t1.a, ns1.ns2.t2.a.")) + } + + test("attribute resolution ambiguity at the qualifier level") { + val attrs = Seq( + AttributeReference("a", IntegerType)(qualifier = Seq("ns1", "t")), + AttributeReference("a", IntegerType)(qualifier = Seq("ns2", "ns1", "t"))) + + val ex = intercept[AnalysisException] { + attrs.resolve(Seq("ns1", "t", "a"), resolver) + } + assert(ex.getMessage.contains( + "Reference 'ns1.t.a' is ambiguous, could be: ns1.t.a, ns2.ns1.t.a.")) + } + + test("attribute resolution with nested fields") { + val attrType = StructType(Seq(StructField("aa", IntegerType), StructField("bb", IntegerType))) + val attrs = Seq(AttributeReference("a", attrType)(qualifier = Seq("ns1", "t"))) + + val resolved = attrs.resolve(Seq("ns1", "t", "a", "aa"), resolver) + resolved match { + case Some(Alias(_, name)) => assert(name == "aa") + case _ => fail() + } + + val ex = intercept[AnalysisException] { + attrs.resolve(Seq("ns1", "t", "a", "cc"), resolver) + } + assert(ex.getMessage.contains("No such struct field cc in aa, bb")) + } + + test("attribute resolution with case insensitive resolver") { + val attrs = Seq(AttributeReference("a", IntegerType)(qualifier = Seq("ns1", "t"))) + attrs.resolve(Seq("Ns1", "T", "A"), caseInsensitiveResolution) match { + case Some(attr) => assert(attr.semanticEquals(attrs(0)) && attr.name == "A") + case _ => fail() + } + } + + test("attribute resolution with case sensitive resolver") { + val attrs = Seq(AttributeReference("a", IntegerType)(qualifier = Seq("ns1", "t"))) + assert(attrs.resolve(Seq("Ns1", "T", "A"), caseSensitiveResolution).isEmpty) + assert(attrs.resolve(Seq("ns1", "t", "A"), caseSensitiveResolution).isEmpty) + attrs.resolve(Seq("ns1", "t", "a"), caseSensitiveResolution) match { + case Some(attr) => assert(attr.semanticEquals(attrs(0))) + case _ => fail() + } + } + + test("attribute resolution should try to match the longest qualifier") { + // We have two attributes: + // 1) "a.b" where "a" is the name and "b" is the nested field. + // 2) "a.b.a" where "b" is the name, left-side "a" is the qualifier and the right-side "a" + // is the nested field. + // When "a.b" is resolved, "b" is tried first as the name, so it is resolved to #2 attribute. + val a1Type = StructType(Seq(StructField("b", IntegerType))) + val a2Type = StructType(Seq(StructField("a", IntegerType))) + val attrs = Seq( + AttributeReference("a", a1Type)(), + AttributeReference("b", a2Type)(qualifier = Seq("a"))) + attrs.resolve(Seq("a", "b"), resolver) match { + case Some(attr) => assert(attr.semanticEquals(attrs(1))) + case _ => fail() + } + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index 0e094bc06b05..e72b2e9b1b21 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -433,10 +433,11 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper { // Converts AliasIdentifier to JSON assertJSON( - AliasIdentifier("alias"), + AliasIdentifier("alias", Seq("ns1", "ns2")), JObject( "product-class" -> JString(classOf[AliasIdentifier].getName), - "identifier" -> "alias")) + "name" -> "alias", + "qualifier" -> "[ns1, ns2]")) // Converts SubqueryAlias to JSON assertJSON( @@ -445,8 +446,9 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper { JObject( "class" -> classOf[SubqueryAlias].getName, "num-children" -> 1, - "name" -> JObject("product-class" -> JString(classOf[AliasIdentifier].getName), - "identifier" -> "t1"), + "identifier" -> JObject("product-class" -> JString(classOf[AliasIdentifier].getName), + "name" -> "t1", + "qualifier" -> JArray(Nil)), "child" -> 0), JObject( "class" -> classOf[JsonTestTreeNode].getName, diff --git a/sql/core/src/test/resources/sql-tests/results/group-by-filter.sql.out b/sql/core/src/test/resources/sql-tests/results/group-by-filter.sql.out index a032678e90fe..a4c7c2cf90cd 100644 --- a/sql/core/src/test/resources/sql-tests/results/group-by-filter.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/group-by-filter.sql.out @@ -369,13 +369,13 @@ org.apache.spark.sql.AnalysisException IN/EXISTS predicate sub-queries can only be used in Filter/Join and a few commands: Aggregate [dept_id#x], [dept_id#x, avg(salary#x) AS avg(salary)#x, avg(salary#x) FILTER (WHERE exists#x [dept_id#x]) AS avg(salary) FILTER (WHERE exists(dept_id))#x] : +- Project [state#x] : +- Filter (dept_id#x = outer(dept_id#x)) -: +- SubqueryAlias `dept` +: +- SubqueryAlias dept : +- Project [dept_id#x, dept_name#x, state#x] -: +- SubqueryAlias `DEPT` +: +- SubqueryAlias DEPT : +- LocalRelation [dept_id#x, dept_name#x, state#x] -+- SubqueryAlias `emp` ++- SubqueryAlias emp +- Project [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x] - +- SubqueryAlias `EMP` + +- SubqueryAlias EMP +- LocalRelation [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x] ; @@ -395,13 +395,13 @@ org.apache.spark.sql.AnalysisException IN/EXISTS predicate sub-queries can only be used in Filter/Join and a few commands: Aggregate [dept_id#x], [dept_id#x, sum(salary#x) AS sum(salary)#x, sum(salary#x) FILTER (WHERE NOT exists#x [dept_id#x]) AS sum(salary) FILTER (WHERE (NOT exists(dept_id)))#x] : +- Project [state#x] : +- Filter (dept_id#x = outer(dept_id#x)) -: +- SubqueryAlias `dept` +: +- SubqueryAlias dept : +- Project [dept_id#x, dept_name#x, state#x] -: +- SubqueryAlias `DEPT` +: +- SubqueryAlias DEPT : +- LocalRelation [dept_id#x, dept_name#x, state#x] -+- SubqueryAlias `emp` ++- SubqueryAlias emp +- Project [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x] - +- SubqueryAlias `EMP` + +- SubqueryAlias EMP +- LocalRelation [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x] ; @@ -420,13 +420,13 @@ org.apache.spark.sql.AnalysisException IN/EXISTS predicate sub-queries can only be used in Filter/Join and a few commands: Aggregate [dept_id#x], [dept_id#x, avg(salary#x) AS avg(salary)#x, avg(salary#x) FILTER (WHERE dept_id#x IN (list#x [])) AS avg(salary) FILTER (WHERE (dept_id IN (listquery())))#x] : +- Distinct : +- Project [dept_id#x] -: +- SubqueryAlias `dept` +: +- SubqueryAlias dept : +- Project [dept_id#x, dept_name#x, state#x] -: +- SubqueryAlias `DEPT` +: +- SubqueryAlias DEPT : +- LocalRelation [dept_id#x, dept_name#x, state#x] -+- SubqueryAlias `emp` ++- SubqueryAlias emp +- Project [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x] - +- SubqueryAlias `EMP` + +- SubqueryAlias EMP +- LocalRelation [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x] ; @@ -445,13 +445,13 @@ org.apache.spark.sql.AnalysisException IN/EXISTS predicate sub-queries can only be used in Filter/Join and a few commands: Aggregate [dept_id#x], [dept_id#x, sum(salary#x) AS sum(salary)#x, sum(salary#x) FILTER (WHERE NOT dept_id#x IN (list#x [])) AS sum(salary) FILTER (WHERE (NOT (dept_id IN (listquery()))))#x] : +- Distinct : +- Project [dept_id#x] -: +- SubqueryAlias `dept` +: +- SubqueryAlias dept : +- Project [dept_id#x, dept_name#x, state#x] -: +- SubqueryAlias `DEPT` +: +- SubqueryAlias DEPT : +- LocalRelation [dept_id#x, dept_name#x, state#x] -+- SubqueryAlias `emp` ++- SubqueryAlias emp +- Project [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x] - +- SubqueryAlias `EMP` + +- SubqueryAlias EMP +- LocalRelation [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x] ; diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out index 1599634ff9ef..ec7ecf28754e 100644 --- a/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out @@ -110,8 +110,8 @@ struct<> org.apache.spark.sql.AnalysisException Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses: Aggregate [min(outer(t2a#x)) AS min(outer())#x] -+- SubqueryAlias `t3` ++- SubqueryAlias t3 +- Project [t3a#x, t3b#x, t3c#x] - +- SubqueryAlias `t3` + +- SubqueryAlias t3 +- LocalRelation [t3a#x, t3b#x, t3c#x] ; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 2c8349a0e6a7..eabcb81c5064 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -679,6 +679,44 @@ class DataSourceV2SQLSuite } } + test("qualified column names for v2 tables") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id bigint, point struct) USING foo") + sql(s"INSERT INTO $t VALUES (1, (10, 20))") + + checkAnswer( + sql(s"SELECT testcat.ns1.ns2.tbl.id, testcat.ns1.ns2.tbl.point.x FROM $t"), + Row(1, 10)) + checkAnswer(sql(s"SELECT ns1.ns2.tbl.id, ns1.ns2.tbl.point.x FROM $t"), Row(1, 10)) + checkAnswer(sql(s"SELECT ns2.tbl.id, ns2.tbl.point.x FROM $t"), Row(1, 10)) + checkAnswer(sql(s"SELECT tbl.id, tbl.point.x FROM $t"), Row(1, 10)) + + val ex = intercept[AnalysisException] { + sql(s"SELECT ns1.ns2.ns3.tbl.id from $t") + } + assert(ex.getMessage.contains("cannot resolve '`ns1.ns2.ns3.tbl.id`")) + } + } + + test("qualified column names for v1 tables") { + // unset this config to use the default v2 session catalog. + spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) + + withTable("t") { + sql("CREATE TABLE t USING json AS SELECT 1 AS i") + checkAnswer(sql("select default.t.i from spark_catalog.t"), Row(1)) + checkAnswer(sql("select t.i from spark_catalog.default.t"), Row(1)) + checkAnswer(sql("select default.t.i from spark_catalog.default.t"), Row(1)) + + // catalog name cannot be used for v1 tables. + val ex = intercept[AnalysisException] { + sql(s"select spark_catalog.default.t.i from spark_catalog.default.t") + } + assert(ex.getMessage.contains("cannot resolve '`spark_catalog.default.t.i`")) + } + } + test("InsertInto: append - across catalog") { val t1 = "testcat.ns1.ns2.tbl" val t2 = "testcat2.db.tbl" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala index c93d27f02c68..ad3d79760adf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala @@ -81,7 +81,7 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark { val queryRelations = scala.collection.mutable.HashSet[String]() spark.sql(queryString).queryExecution.analyzed.foreach { case SubqueryAlias(alias, _: LogicalRelation) => - queryRelations.add(alias.identifier) + queryRelations.add(alias.name) case LogicalRelation(_, _, Some(catalogTable), _) => queryRelations.add(catalogTable.identifier.table) case HiveTableRelation(tableMeta, _, _, _, _) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index c0c3cd70fcc9..88f30353cce9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -885,33 +885,34 @@ class PlanResolutionSuite extends AnalysisTest { val parsed4 = parseAndResolve(sql4) parsed1 match { - case DeleteFromTable(_: DataSourceV2Relation, None) => - case _ => fail("Expect DeleteFromTable, bug got:\n" + parsed1.treeString) + case DeleteFromTable(AsDataSourceV2Relation(_), None) => + case _ => fail("Expect DeleteFromTable, but got:\n" + parsed1.treeString) } parsed2 match { case DeleteFromTable( - _: DataSourceV2Relation, + AsDataSourceV2Relation(_), Some(EqualTo(name: UnresolvedAttribute, StringLiteral("Robert")))) => assert(name.name == "name") - case _ => fail("Expect DeleteFromTable, bug got:\n" + parsed2.treeString) + case _ => fail("Expect DeleteFromTable, but got:\n" + parsed2.treeString) } parsed3 match { case DeleteFromTable( - SubqueryAlias(AliasIdentifier("t", None), _: DataSourceV2Relation), + SubqueryAlias(AliasIdentifier("t", Seq()), AsDataSourceV2Relation(_)), Some(EqualTo(name: UnresolvedAttribute, StringLiteral("Robert")))) => assert(name.name == "t.name") - case _ => fail("Expect DeleteFromTable, bug got:\n" + parsed3.treeString) + case _ => fail("Expect DeleteFromTable, but got:\n" + parsed3.treeString) } parsed4 match { - case DeleteFromTable(SubqueryAlias(AliasIdentifier("t", None), _: DataSourceV2Relation), + case DeleteFromTable( + SubqueryAlias(AliasIdentifier("t", Seq()), AsDataSourceV2Relation(_)), Some(InSubquery(values, query))) => assert(values.size == 1 && values.head.isInstanceOf[UnresolvedAttribute]) assert(values.head.asInstanceOf[UnresolvedAttribute].name == "t.name") query match { - case ListQuery(Project(projects, SubqueryAlias(AliasIdentifier("s", None), + case ListQuery(Project(projects, SubqueryAlias(AliasIdentifier("s", Seq()), UnresolvedSubqueryColumnAliases(outputColumnNames, Project(_, _: OneRowRelation)))), _, _, _) => assert(projects.size == 1 && projects.head.name == "s.name") @@ -944,7 +945,7 @@ class PlanResolutionSuite extends AnalysisTest { parsed1 match { case UpdateTable( - _: DataSourceV2Relation, + AsDataSourceV2Relation(_), Seq(Assignment(name: UnresolvedAttribute, StringLiteral("Robert")), Assignment(age: UnresolvedAttribute, IntegerLiteral(32))), None) => @@ -956,7 +957,7 @@ class PlanResolutionSuite extends AnalysisTest { parsed2 match { case UpdateTable( - SubqueryAlias(AliasIdentifier("t", None), _: DataSourceV2Relation), + SubqueryAlias(AliasIdentifier("t", Seq()), AsDataSourceV2Relation(_)), Seq(Assignment(name: UnresolvedAttribute, StringLiteral("Robert")), Assignment(age: UnresolvedAttribute, IntegerLiteral(32))), None) => @@ -968,7 +969,7 @@ class PlanResolutionSuite extends AnalysisTest { parsed3 match { case UpdateTable( - SubqueryAlias(AliasIdentifier("t", None), _: DataSourceV2Relation), + SubqueryAlias(AliasIdentifier("t", Seq()), AsDataSourceV2Relation(_)), Seq(Assignment(name: UnresolvedAttribute, StringLiteral("Robert")), Assignment(age: UnresolvedAttribute, IntegerLiteral(32))), Some(EqualTo(p: UnresolvedAttribute, IntegerLiteral(1)))) => @@ -980,14 +981,14 @@ class PlanResolutionSuite extends AnalysisTest { } parsed4 match { - case UpdateTable(SubqueryAlias(AliasIdentifier("t", None), _: DataSourceV2Relation), + case UpdateTable(SubqueryAlias(AliasIdentifier("t", Seq()), AsDataSourceV2Relation(_)), Seq(Assignment(key: UnresolvedAttribute, IntegerLiteral(32))), Some(InSubquery(values, query))) => assert(key.name == "t.age") assert(values.size == 1 && values.head.isInstanceOf[UnresolvedAttribute]) assert(values.head.asInstanceOf[UnresolvedAttribute].name == "t.name") query match { - case ListQuery(Project(projects, SubqueryAlias(AliasIdentifier("s", None), + case ListQuery(Project(projects, SubqueryAlias(AliasIdentifier("s", Seq()), UnresolvedSubqueryColumnAliases(outputColumnNames, Project(_, _: OneRowRelation)))), _, _, _) => assert(projects.size == 1 && projects.head.name == "s.name") @@ -1129,7 +1130,7 @@ class PlanResolutionSuite extends AnalysisTest { case AlterTable(_, _, r: DataSourceV2Relation, _) => assert(r.catalog.exists(_ == catlogIdent)) assert(r.identifier.exists(_.name() == tableIdent)) - case Project(_, r: DataSourceV2Relation) => + case Project(_, AsDataSourceV2Relation(r)) => assert(r.catalog.exists(_ == catlogIdent)) assert(r.identifier.exists(_.name() == tableIdent)) case InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _) => @@ -1206,8 +1207,8 @@ class PlanResolutionSuite extends AnalysisTest { """.stripMargin parseAndResolve(sql1) match { case MergeIntoTable( - SubqueryAlias(AliasIdentifier("target", None), target: DataSourceV2Relation), - SubqueryAlias(AliasIdentifier("source", None), source: DataSourceV2Relation), + SubqueryAlias(AliasIdentifier("target", Seq()), AsDataSourceV2Relation(target)), + SubqueryAlias(AliasIdentifier("source", Seq()), AsDataSourceV2Relation(source)), mergeCondition, Seq(DeleteAction(Some(EqualTo(dl: AttributeReference, StringLiteral("delete")))), UpdateAction(Some(EqualTo(ul: AttributeReference, StringLiteral("update"))), @@ -1232,8 +1233,8 @@ class PlanResolutionSuite extends AnalysisTest { """.stripMargin parseAndResolve(sql2) match { case MergeIntoTable( - SubqueryAlias(AliasIdentifier("target", None), target: DataSourceV2Relation), - SubqueryAlias(AliasIdentifier("source", None), source: DataSourceV2Relation), + SubqueryAlias(AliasIdentifier("target", Seq()), AsDataSourceV2Relation(target)), + SubqueryAlias(AliasIdentifier("source", Seq()), AsDataSourceV2Relation(source)), mergeCondition, Seq(DeleteAction(Some(EqualTo(dl: AttributeReference, StringLiteral("delete")))), UpdateAction(Some(EqualTo(ul: AttributeReference, @@ -1258,8 +1259,8 @@ class PlanResolutionSuite extends AnalysisTest { """.stripMargin parseAndResolve(sql3) match { case MergeIntoTable( - SubqueryAlias(AliasIdentifier("target", None), target: DataSourceV2Relation), - SubqueryAlias(AliasIdentifier("source", None), source: DataSourceV2Relation), + SubqueryAlias(AliasIdentifier("target", Seq()), AsDataSourceV2Relation(target)), + SubqueryAlias(AliasIdentifier("source", Seq()), AsDataSourceV2Relation(source)), mergeCondition, Seq(DeleteAction(None), UpdateAction(None, updateAssigns)), Seq(InsertAction(None, insertAssigns))) => @@ -1282,8 +1283,8 @@ class PlanResolutionSuite extends AnalysisTest { """.stripMargin parseAndResolve(sql4) match { case MergeIntoTable( - SubqueryAlias(AliasIdentifier("target", None), target: DataSourceV2Relation), - SubqueryAlias(AliasIdentifier("source", None), source: Project), + SubqueryAlias(AliasIdentifier("target", Seq()), AsDataSourceV2Relation(target)), + SubqueryAlias(AliasIdentifier("source", Seq()), source: Project), mergeCondition, Seq(DeleteAction(Some(EqualTo(dl: AttributeReference, StringLiteral("delete")))), UpdateAction(Some(EqualTo(ul: AttributeReference, StringLiteral("update"))), @@ -1311,8 +1312,8 @@ class PlanResolutionSuite extends AnalysisTest { """.stripMargin parseAndResolve(sql5) match { case MergeIntoTable( - SubqueryAlias(AliasIdentifier("target", None), target: DataSourceV2Relation), - SubqueryAlias(AliasIdentifier("source", None), source: Project), + SubqueryAlias(AliasIdentifier("target", Seq()), AsDataSourceV2Relation(target)), + SubqueryAlias(AliasIdentifier("source", Seq()), source: Project), mergeCondition, Seq(DeleteAction(Some(EqualTo(dl: AttributeReference, StringLiteral("delete")))), UpdateAction(Some(EqualTo(ul: AttributeReference, StringLiteral("update"))), @@ -1346,8 +1347,8 @@ class PlanResolutionSuite extends AnalysisTest { parseAndResolve(sql1) match { case MergeIntoTable( - target: DataSourceV2Relation, - source: DataSourceV2Relation, + AsDataSourceV2Relation(target), + AsDataSourceV2Relation(source), _, Seq(DeleteAction(None), UpdateAction(None, updateAssigns)), Seq(InsertAction( @@ -1453,8 +1454,8 @@ class PlanResolutionSuite extends AnalysisTest { parseAndResolve(sql) match { case MergeIntoTable( - SubqueryAlias(AliasIdentifier("target", None), _: DataSourceV2Relation), - SubqueryAlias(AliasIdentifier("source", None), _: DataSourceV2Relation), + SubqueryAlias(AliasIdentifier("target", Seq()), AsDataSourceV2Relation(_)), + SubqueryAlias(AliasIdentifier("source", Seq()), AsDataSourceV2Relation(_)), EqualTo(l: UnresolvedAttribute, r: UnresolvedAttribute), Seq( DeleteAction(Some(EqualTo(dl: UnresolvedAttribute, StringLiteral("delete")))), @@ -1481,3 +1482,11 @@ class PlanResolutionSuite extends AnalysisTest { } // TODO: add tests for more commands. } + +object AsDataSourceV2Relation { + def unapply(plan: LogicalPlan): Option[DataSourceV2Relation] = plan match { + case SubqueryAlias(_, r: DataSourceV2Relation) => Some(r) + case _ => None + } +} + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 20bafd832d0d..b8ef44b096ee 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -62,7 +62,7 @@ class HiveMetastoreCatalogSuite extends TestHiveSingleton with SQLTestUtils { spark.sql("create view vw1 as select 1 as id") val plan = spark.sql("select id from vw1").queryExecution.analyzed val aliases = plan.collect { - case x @ SubqueryAlias(AliasIdentifier("vw1", Some("default")), _) => x + case x @ SubqueryAlias(AliasIdentifier("vw1", Seq("default")), _) => x } assert(aliases.size == 1) }