-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19602][SQL] Support column resolution of fully qualified column name ( 3 part name) #17185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
72830ce
ee3dd8f
13ed442
6a7c08f
d8fca00
065687f
5f7e5d7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -104,12 +104,12 @@ case class UnresolvedAttribute(nameParts: Seq[String]) extends Attribute with Un | |
| override def exprId: ExprId = throw new UnresolvedException(this, "exprId") | ||
| override def dataType: DataType = throw new UnresolvedException(this, "dataType") | ||
| override def nullable: Boolean = throw new UnresolvedException(this, "nullable") | ||
| override def qualifier: Option[String] = throw new UnresolvedException(this, "qualifier") | ||
| override def qualifier: Seq[String] = throw new UnresolvedException(this, "qualifier") | ||
| override lazy val resolved = false | ||
|
|
||
| override def newInstance(): UnresolvedAttribute = this | ||
| override def withNullability(newNullability: Boolean): UnresolvedAttribute = this | ||
| override def withQualifier(newQualifier: Option[String]): UnresolvedAttribute = this | ||
| override def withQualifier(newQualifier: Seq[String]): UnresolvedAttribute = this | ||
| override def withName(newName: String): UnresolvedAttribute = UnresolvedAttribute.quoted(newName) | ||
| override def withMetadata(newMetadata: Metadata): Attribute = this | ||
|
|
||
|
|
@@ -240,7 +240,7 @@ abstract class Star extends LeafExpression with NamedExpression { | |
| override def exprId: ExprId = throw new UnresolvedException(this, "exprId") | ||
| override def dataType: DataType = throw new UnresolvedException(this, "dataType") | ||
| override def nullable: Boolean = throw new UnresolvedException(this, "nullable") | ||
| override def qualifier: Option[String] = throw new UnresolvedException(this, "qualifier") | ||
| override def qualifier: Seq[String] = throw new UnresolvedException(this, "qualifier") | ||
| override def toAttribute: Attribute = throw new UnresolvedException(this, "toAttribute") | ||
| override def newInstance(): NamedExpression = throw new UnresolvedException(this, "newInstance") | ||
| override lazy val resolved = false | ||
|
|
@@ -262,17 +262,46 @@ abstract class Star extends LeafExpression with NamedExpression { | |
| */ | ||
| case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevaluable { | ||
|
|
||
| override def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = { | ||
| /** | ||
| * Returns true if the nameParts match the qualifier of the attribute | ||
| * | ||
| * There are two checks: i) Check if the nameParts match the qualifier fully. | ||
| * E.g. SELECT db.t1.* FROM db1.t1 In this case, the nameParts is Seq("db1", "t1") and | ||
| * qualifier of the attribute is Seq("db1","t1") | ||
| * ii) If (i) is not true, then check if nameParts is only a single element and it | ||
| * matches the table portion of the qualifier | ||
| * | ||
| * E.g. SELECT t1.* FROM db1.t1 In this case nameParts is Seq("t1") and | ||
| * qualifier is Seq("db1","t1") | ||
| * SELECT a.* FROM db1.t1 AS a | ||
| * In this case nameParts is Seq("a") and qualifier for | ||
| * attribute is Seq("a") | ||
| */ | ||
| private def matchedQualifier( | ||
| attribute: Attribute, | ||
|
||
| nameParts: Seq[String], | ||
| resolver: Resolver): Boolean = { | ||
| val qualifierList = attribute.qualifier | ||
|
|
||
| val matched = nameParts.corresponds(qualifierList)(resolver) || { | ||
| // check if it matches the table portion of the qualifier | ||
| if (nameParts.length == 1 && qualifierList.nonEmpty) { | ||
| resolver(nameParts.head, qualifierList.last) | ||
| } else { | ||
| false | ||
| } | ||
| } | ||
| matched | ||
| } | ||
|
|
||
| override def expand( | ||
| input: LogicalPlan, | ||
| resolver: Resolver): Seq[NamedExpression] = { | ||
| // If there is no table specified, use all input attributes. | ||
| if (target.isEmpty) return input.output | ||
|
|
||
| val expandedAttributes = | ||
| if (target.get.size == 1) { | ||
| // If there is a table, pick out attributes that are part of this table. | ||
| input.output.filter(_.qualifier.exists(resolver(_, target.get.head))) | ||
| } else { | ||
| List() | ||
| } | ||
| val expandedAttributes = input.output.filter(matchedQualifier(_, target.get, resolver)) | ||
|
|
||
| if (expandedAttributes.nonEmpty) return expandedAttributes | ||
|
|
||
| // Try to resolve it as a struct expansion. If there is a conflict and both are possible, | ||
|
|
@@ -316,8 +345,8 @@ case class UnresolvedRegex(regexPattern: String, table: Option[String], caseSens | |
| // If there is no table specified, use all input attributes that match expr | ||
| case None => input.output.filter(_.name.matches(pattern)) | ||
| // If there is a table, pick out attributes that are part of this table that match expr | ||
| case Some(t) => input.output.filter(_.qualifier.exists(resolver(_, t))) | ||
| .filter(_.name.matches(pattern)) | ||
| case Some(t) => input.output.filter(a => a.qualifier.nonEmpty && | ||
| resolver(a.qualifier.last, t)).filter(_.name.matches(pattern)) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -345,7 +374,7 @@ case class MultiAlias(child: Expression, names: Seq[String]) | |
|
|
||
| override def nullable: Boolean = throw new UnresolvedException(this, "nullable") | ||
|
|
||
| override def qualifier: Option[String] = throw new UnresolvedException(this, "qualifier") | ||
| override def qualifier: Seq[String] = throw new UnresolvedException(this, "qualifier") | ||
|
|
||
| override def toAttribute: Attribute = throw new UnresolvedException(this, "toAttribute") | ||
|
|
||
|
|
@@ -403,7 +432,7 @@ case class UnresolvedAlias( | |
| extends UnaryExpression with NamedExpression with Unevaluable { | ||
|
|
||
| override def toAttribute: Attribute = throw new UnresolvedException(this, "toAttribute") | ||
| override def qualifier: Option[String] = throw new UnresolvedException(this, "qualifier") | ||
| override def qualifier: Seq[String] = throw new UnresolvedException(this, "qualifier") | ||
| override def exprId: ExprId = throw new UnresolvedException(this, "exprId") | ||
| override def nullable: Boolean = throw new UnresolvedException(this, "nullable") | ||
| override def dataType: DataType = throw new UnresolvedException(this, "dataType") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -71,19 +71,22 @@ trait NamedExpression extends Expression { | |
| * multiple qualifiers, it is possible that there are other possible way to refer to this | ||
| * attribute. | ||
| */ | ||
| def qualifiedName: String = (qualifier.toSeq :+ name).mkString(".") | ||
| def qualifiedName: String = (qualifier :+ name).mkString(".") | ||
|
|
||
| /** | ||
| * Optional qualifier for the expression. | ||
| * Qualifier can also contain the fully qualified information, for e.g, Sequence of string | ||
| * containing the database and the table name | ||
| * | ||
| * For now, since we do not allow using original table name to qualify a column name once the | ||
| * table is aliased, this can only be: | ||
| * | ||
| * 1. Empty Seq: when an attribute doesn't have a qualifier, | ||
| * e.g. top level attributes aliased in the SELECT clause, or column from a LocalRelation. | ||
| * 2. Single element: either the table name or the alias name of the table. | ||
| * 2. Seq with a Single element: either the table name or the alias name of the table. | ||
|
||
| * 3. Seq with 2 elements: database name and table name | ||
| */ | ||
| def qualifier: Option[String] | ||
| def qualifier: Seq[String] | ||
|
|
||
| def toAttribute: Attribute | ||
|
|
||
|
|
@@ -109,7 +112,7 @@ abstract class Attribute extends LeafExpression with NamedExpression with NullIn | |
| override def references: AttributeSet = AttributeSet(this) | ||
|
|
||
| def withNullability(newNullability: Boolean): Attribute | ||
| def withQualifier(newQualifier: Option[String]): Attribute | ||
| def withQualifier(newQualifier: Seq[String]): Attribute | ||
| def withName(newName: String): Attribute | ||
| def withMetadata(newMetadata: Metadata): Attribute | ||
|
|
||
|
|
@@ -130,14 +133,14 @@ abstract class Attribute extends LeafExpression with NamedExpression with NullIn | |
| * @param name The name to be associated with the result of computing [[child]]. | ||
| * @param exprId A globally unique id used to check if an [[AttributeReference]] refers to this | ||
| * alias. Auto-assigned if left blank. | ||
| * @param qualifier An optional string that can be used to referred to this attribute in a fully | ||
| * qualified way. Consider the examples tableName.name, subQueryAlias.name. | ||
| * tableName and subQueryAlias are possible qualifiers. | ||
| * @param qualifier An optional Seq of string that can be used to refer to this attribute in a | ||
| * fully qualified way. Consider the examples tableName.name, subQueryAlias.name. | ||
| * tableName and subQueryAlias are possible qualifiers. | ||
| * @param explicitMetadata Explicit metadata associated with this alias that overwrites child's. | ||
| */ | ||
| case class Alias(child: Expression, name: String)( | ||
| val exprId: ExprId = NamedExpression.newExprId, | ||
| val qualifier: Option[String] = None, | ||
| val qualifier: Seq[String] = Seq.empty, | ||
| val explicitMetadata: Option[Metadata] = None) | ||
| extends UnaryExpression with NamedExpression { | ||
|
|
||
|
|
@@ -201,7 +204,7 @@ case class Alias(child: Expression, name: String)( | |
| } | ||
|
|
||
| override def sql: String = { | ||
| val qualifierPrefix = qualifier.map(_ + ".").getOrElse("") | ||
| val qualifierPrefix = if (qualifier.nonEmpty) qualifier.mkString(".") + "." else "" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This won't work for the case when we have Seq.empty. The suffix "." gets returned even for a empty sequence. Shall we leave the 'if' as is or is there an equivalent preferred style that would work?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah my bad, I thought it would return empty string for empty seq. Let's leave it.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, sounds good. |
||
| s"${child.sql} AS $qualifierPrefix${quoteIdentifier(name)}" | ||
| } | ||
| } | ||
|
|
@@ -225,9 +228,11 @@ case class AttributeReference( | |
| nullable: Boolean = true, | ||
| override val metadata: Metadata = Metadata.empty)( | ||
| val exprId: ExprId = NamedExpression.newExprId, | ||
| val qualifier: Option[String] = None) | ||
| val qualifier: Seq[String] = Seq.empty[String]) | ||
| extends Attribute with Unevaluable { | ||
|
|
||
| // currently can only handle qualifier of length 2 | ||
| require(qualifier.length <= 2) | ||
| /** | ||
| * Returns true iff the expression id is the same for both attributes. | ||
| */ | ||
|
|
@@ -286,7 +291,7 @@ case class AttributeReference( | |
| /** | ||
| * Returns a copy of this [[AttributeReference]] with new qualifier. | ||
| */ | ||
| override def withQualifier(newQualifier: Option[String]): AttributeReference = { | ||
| override def withQualifier(newQualifier: Seq[String]): AttributeReference = { | ||
| if (newQualifier == qualifier) { | ||
| this | ||
| } else { | ||
|
|
@@ -324,7 +329,7 @@ case class AttributeReference( | |
| override def simpleString: String = s"$name#${exprId.id}: ${dataType.simpleString}" | ||
|
|
||
| override def sql: String = { | ||
| val qualifierPrefix = qualifier.map(_ + ".").getOrElse("") | ||
| val qualifierPrefix = if (qualifier.nonEmpty) qualifier.mkString(".") + "." else "" | ||
| s"$qualifierPrefix${quoteIdentifier(name)}" | ||
| } | ||
| } | ||
|
|
@@ -350,12 +355,12 @@ case class PrettyAttribute( | |
| override def withNullability(newNullability: Boolean): Attribute = | ||
| throw new UnsupportedOperationException | ||
| override def newInstance(): Attribute = throw new UnsupportedOperationException | ||
| override def withQualifier(newQualifier: Option[String]): Attribute = | ||
| override def withQualifier(newQualifier: Seq[String]): Attribute = | ||
| throw new UnsupportedOperationException | ||
| override def withName(newName: String): Attribute = throw new UnsupportedOperationException | ||
| override def withMetadata(newMetadata: Metadata): Attribute = | ||
| throw new UnsupportedOperationException | ||
| override def qualifier: Option[String] = throw new UnsupportedOperationException | ||
| override def qualifier: Seq[String] = throw new UnsupportedOperationException | ||
| override def exprId: ExprId = throw new UnsupportedOperationException | ||
| override def nullable: Boolean = true | ||
| } | ||
|
|
@@ -371,7 +376,7 @@ case class OuterReference(e: NamedExpression) | |
| override def prettyName: String = "outer" | ||
|
|
||
| override def name: String = e.name | ||
| override def qualifier: Option[String] = e.qualifier | ||
| override def qualifier: Seq[String] = e.qualifier | ||
| override def exprId: ExprId = e.exprId | ||
| override def toAttribute: Attribute = e.toAttribute | ||
| override def newInstance(): NamedExpression = OuterReference(e.newInstance()) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -152,10 +152,22 @@ package object expressions { | |
| unique(attrs.groupBy(_.name.toLowerCase(Locale.ROOT))) | ||
| } | ||
|
|
||
| /** Map to use for qualified case insensitive attribute lookups. */ | ||
| @transient private val qualified: Map[(String, String), Seq[Attribute]] = { | ||
| val grouped = attrs.filter(_.qualifier.isDefined).groupBy { a => | ||
| (a.qualifier.get.toLowerCase(Locale.ROOT), a.name.toLowerCase(Locale.ROOT)) | ||
| /** Map to use for qualified case insensitive attribute lookups with 2 part key */ | ||
| @transient private lazy val qualified: Map[(String, String), Seq[Attribute]] = { | ||
| // key is 2 part: table/alias and name | ||
| val grouped = attrs.filter(_.qualifier.nonEmpty).groupBy { | ||
| a => (a.qualifier.last.toLowerCase(Locale.ROOT), a.name.toLowerCase(Locale.ROOT)) | ||
| } | ||
| unique(grouped) | ||
| } | ||
|
|
||
| /** Map to use for qualified case insensitive attribute lookups with 3 part key */ | ||
| @transient private val qualified3Part: Map[(String, String, String), Seq[Attribute]] = { | ||
| // key is 3 part: database name, table name and name | ||
| val grouped = attrs.filter(_.qualifier.length == 2).groupBy { a => | ||
| (a.qualifier.head.toLowerCase(Locale.ROOT), | ||
| a.qualifier.last.toLowerCase(Locale.ROOT), | ||
| a.name.toLowerCase(Locale.ROOT)) | ||
| } | ||
| unique(grouped) | ||
| } | ||
|
|
@@ -169,25 +181,48 @@ package object expressions { | |
| }) | ||
| } | ||
|
|
||
| // Find matches for the given name assuming that the 1st part is a qualifier (i.e. table name, | ||
| // alias, or subquery alias) and the 2nd part is the actual name. This returns a tuple of | ||
| // Find matches for the given name assuming that the 1st two parts are qualifier | ||
| // (i.e. database name and table name) and the 3rd part is the actual column name. | ||
| // | ||
| // For example, consider an example where "db1" is the database name, "a" is the table name | ||
| // and "b" is the column name and "c" is the struct field name. | ||
| // If the name parts is db1.a.b.c, then Attribute will match | ||
|
||
| // Attribute(b, qualifier("db1,"a")) and List("c") will be the second element | ||
| var matches: (Seq[Attribute], Seq[String]) = nameParts match { | ||
| case dbPart +: tblPart +: name +: nestedFields => | ||
| val key = (dbPart.toLowerCase(Locale.ROOT), | ||
| tblPart.toLowerCase(Locale.ROOT), name.toLowerCase(Locale.ROOT)) | ||
| val attributes = collectMatches(name, qualified3Part.get(key)).filter { | ||
| a => (resolver(dbPart, a.qualifier.head) && resolver(tblPart, a.qualifier.last)) | ||
| } | ||
| (attributes, nestedFields) | ||
| case all => | ||
| (Seq.empty, Seq.empty) | ||
| } | ||
|
|
||
| // If there are no matches, then find matches for the given name assuming that | ||
| // the 1st part is a qualifier (i.e. table name, alias, or subquery alias) and the | ||
| // 2nd part is the actual name. This returns a tuple of | ||
| // matched attributes and a list of parts that are to be resolved. | ||
| // | ||
| // For example, consider an example where "a" is the table name, "b" is the column name, | ||
| // and "c" is the struct field name, i.e. "a.b.c". In this case, Attribute will be "a.b", | ||
| // and the second element will be List("c"). | ||
| val matches = nameParts match { | ||
| case qualifier +: name +: nestedFields => | ||
| val key = (qualifier.toLowerCase(Locale.ROOT), name.toLowerCase(Locale.ROOT)) | ||
| val attributes = collectMatches(name, qualified.get(key)).filter { a => | ||
| resolver(qualifier, a.qualifier.get) | ||
| } | ||
| (attributes, nestedFields) | ||
| case all => | ||
| (Nil, all) | ||
| if (matches._1.isEmpty) { | ||
| matches = nameParts match { | ||
| case qualifier +: name +: nestedFields => | ||
| val key = (qualifier.toLowerCase(Locale.ROOT), name.toLowerCase(Locale.ROOT)) | ||
| val attributes = collectMatches(name, qualified.get(key)).filter { a => | ||
| resolver(qualifier, a.qualifier.last) | ||
| } | ||
| (attributes, nestedFields) | ||
| case all => | ||
| (Seq.empty[Attribute], Seq.empty[String]) | ||
| } | ||
| } | ||
|
|
||
| // If none of attributes match `table.column` pattern, we try to resolve it as a column. | ||
| // If none of attributes match database.table.column pattern or | ||
| // `table.column` pattern, we try to resolve it as a column. | ||
| val (candidates, nestedFields) = matches match { | ||
| case (Seq(), _) => | ||
| val name = nameParts.head | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about
SELECT db1.t1.* FROM t1while the current database isdb1?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This (SELECT db1.t1.* FROM t1) would resolve correctly.
Also a similar scenario (details in Section 2 in Table A # 5 in the design doc)
select db1.t1.i1 from t1 will resolve correctly when the current database is db1. This is tested here
I will explicitly add a test case for the db1.t1.* from t1 as well.