Skip to content

Commit b4c8cdb

Browse files
committed
Add AttributeResolver
1 parent 30b182b commit b4c8cdb

File tree

1 file changed

+64
-72
lines changed
  • sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical

1 file changed

+64
-72
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 64 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,10 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
137137
}
138138
}
139139

140+
private[this] lazy val childAttributeResolver = new AttributeResolver(children.flatMap(_.output))
141+
142+
private[this] lazy val outputAttributeResolver = new AttributeResolver(output)
143+
140144
/**
141145
* Optionally resolves the given strings to a [[NamedExpression]] using the input from all child
142146
* nodes of this LogicalPlan. The attribute is expressed as
@@ -145,7 +149,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
145149
def resolveChildren(
146150
nameParts: Seq[String],
147151
resolver: Resolver): Option[NamedExpression] =
148-
resolve(nameParts, children.flatMap(_.output), resolver)
152+
childAttributeResolver.resolve(nameParts, resolver)
149153

150154
/**
151155
* Optionally resolves the given strings to a [[NamedExpression]] based on the output of this
@@ -155,7 +159,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
155159
def resolve(
156160
nameParts: Seq[String],
157161
resolver: Resolver): Option[NamedExpression] =
158-
resolve(nameParts, output, resolver)
162+
outputAttributeResolver.resolve(nameParts, resolver)
159163

160164
/**
161165
* Given an attribute name, split it to name parts by dot, but
@@ -165,104 +169,92 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
165169
def resolveQuoted(
166170
name: String,
167171
resolver: Resolver): Option[NamedExpression] = {
168-
resolve(UnresolvedAttribute.parseAttributeName(name), output, resolver)
172+
outputAttributeResolver.resolve(UnresolvedAttribute.parseAttributeName(name), resolver)
169173
}
174+
}
170175

171-
/**
172-
* Resolve the given `name` string against the given attribute, returning either 0 or 1 match.
173-
*
174-
* This assumes `name` has multiple parts, where the 1st part is a qualifier
175-
* (i.e. table name, alias, or subquery alias).
176-
* See the comment above `candidates` variable in resolve() for semantics the returned data.
177-
*/
178-
private def resolveAsTableColumn(
179-
nameParts: Seq[String],
180-
resolver: Resolver,
181-
attribute: Attribute): Option[(Attribute, List[String])] = {
182-
assert(nameParts.length > 1)
183-
if (attribute.qualifier.exists(resolver(_, nameParts.head))) {
184-
// At least one qualifier matches. See if remaining parts match.
185-
val remainingParts = nameParts.tail
186-
resolveAsColumn(remainingParts, resolver, attribute)
187-
} else {
188-
None
189-
}
176+
/**
177+
* Helper class for (LogicalPlan) attribute resolution. This class indexes attributes by their
178+
* case-in-sensitive name, and checks potential candidates using the given Resolver. Both qualified
179+
* and direct resolution are supported.
180+
*/
181+
private[catalyst] class AttributeResolver(attributes: Seq[Attribute]) extends Logging {
182+
private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, Seq[Attribute]] = {
183+
m.mapValues(_.distinct).map(identity)
190184
}
191185

192-
/**
193-
* Resolve the given `name` string against the given attribute, returning either 0 or 1 match.
194-
*
195-
* Different from resolveAsTableColumn, this assumes `name` does NOT start with a qualifier.
196-
* See the comment above `candidates` variable in resolve() for semantics the returned data.
197-
*/
198-
private def resolveAsColumn(
199-
nameParts: Seq[String],
200-
resolver: Resolver,
201-
attribute: Attribute): Option[(Attribute, List[String])] = {
202-
if (!attribute.isGenerated && resolver(attribute.name, nameParts.head)) {
203-
Option((attribute.withName(nameParts.head), nameParts.tail.toList))
204-
} else {
205-
None
186+
/** Map to use for direct case insensitive attribute lookups. */
187+
private val direct: Map[String, Seq[Attribute]] = {
188+
unique(attributes.groupBy(_.name.toLowerCase))
189+
}
190+
191+
/** Map to use for qualified case insensitive attribute lookups. */
192+
private val qualified: Map[(String, String), Seq[Attribute]] = {
193+
val grouped = attributes.filter(_.qualifier.isDefined).groupBy { a =>
194+
(a.qualifier.get.toLowerCase, a.name.toLowerCase)
206195
}
196+
unique(grouped)
207197
}
208198

209-
/** Performs attribute resolution given a name and a sequence of possible attributes. */
210-
protected def resolve(
211-
nameParts: Seq[String],
212-
input: Seq[Attribute],
213-
resolver: Resolver): Option[NamedExpression] = {
199+
/** Perform attribute resolution given a name and a resolver. */
200+
def resolve(nameParts: Seq[String], resolver: Resolver): Option[NamedExpression] = {
201+
// Check if the attribute is a match for the given name.
202+
def isMatch(name: String, a: Attribute): Boolean = !a.isGenerated && resolver(a.name, name)
214203

215-
// A sequence of possible candidate matches.
216-
// Each candidate is a tuple. The first element is a resolved attribute, followed by a list
217-
// of parts that are to be resolved.
204+
// Find matches for the given name assuming that the 1st part is a qualifier (i.e. table name,
205+
// alias, or subquery alias) and the 2nd part is the actual name. This returns a tuple of
206+
// matched attributes and a list of parts that are to be resolved.
207+
//
218208
// For example, consider an example where "a" is the table name, "b" is the column name,
219209
// and "c" is the struct field name, i.e. "a.b.c". In this case, Attribute will be "a.b",
220210
// and the second element will be List("c").
221-
var candidates: Seq[(Attribute, List[String])] = {
222-
// If the name has 2 or more parts, try to resolve it as `table.column` first.
223-
if (nameParts.length > 1) {
224-
input.flatMap { option =>
225-
resolveAsTableColumn(nameParts, resolver, option)
226-
}
227-
} else {
228-
Seq.empty
229-
}
211+
val matches = nameParts match {
212+
case qualifier :: name :: nestedFields =>
213+
val key = (qualifier.toLowerCase, name.toLowerCase)
214+
val attributes = qualified.get(key).toSeq.flatMap(_.filter { a =>
215+
resolver(qualifier, a.qualifier.get) && isMatch(name, a)
216+
})
217+
(attributes, nestedFields)
218+
case all =>
219+
(Nil, all)
230220
}
231221

232222
// If none of attributes match `table.column` pattern, we try to resolve it as a column.
233-
if (candidates.isEmpty) {
234-
candidates = input.flatMap { candidate =>
235-
resolveAsColumn(nameParts, resolver, candidate)
236-
}
223+
val (candidates, nestedFields) = matches match {
224+
case (Nil, _) =>
225+
val name = nameParts.head
226+
val attributes = direct.get(name.toLowerCase).toSeq.flatMap(_.filter(isMatch(name, _)))
227+
(attributes, nameParts.tail)
228+
case _ => matches
237229
}
238230

239231
def name = UnresolvedAttribute(nameParts).name
240-
241-
candidates.distinct match {
242-
// One match, no nested fields, use it.
243-
case Seq((a, Nil)) => Some(a)
244-
245-
// One match, but we also need to extract the requested nested field.
246-
case Seq((a, nestedFields)) =>
232+
candidates match {
233+
case Seq(a) if nestedFields.nonEmpty =>
234+
// One match, but we also need to extract the requested nested field.
247235
// The foldLeft adds ExtractValues for every remaining parts of the identifier,
248236
// and aliased it with the last part of the name.
249237
// For example, consider "a.b.c", where "a" is resolved to an existing attribute.
250238
// Then this will add ExtractValue("c", ExtractValue("b", a)), and alias the final
251239
// expression as "c".
252-
val fieldExprs = nestedFields.foldLeft(a: Expression)((expr, fieldName) =>
253-
ExtractValue(expr, Literal(fieldName), resolver))
240+
val fieldExprs = nestedFields.foldLeft(a: Expression) { (e, name) =>
241+
ExtractValue(e, Literal(name), resolver)
242+
}
254243
Some(Alias(fieldExprs, nestedFields.last)())
255244

256-
// No matches.
245+
case Seq(a) =>
246+
// One match, no nested fields, use it.
247+
Some(a)
248+
257249
case Seq() =>
258-
logTrace(s"Could not find $name in ${input.mkString(", ")}")
250+
// No matches.
251+
logTrace(s"Could not find $name in ${attributes.mkString(", ")}")
259252
None
260253

261-
// More than one match.
262254
case ambiguousReferences =>
263-
val referenceNames = ambiguousReferences.map(_._1).mkString(", ")
264-
throw new AnalysisException(
265-
s"Reference '$name' is ambiguous, could be: $referenceNames.")
255+
// More than one match.
256+
val referenceNames = ambiguousReferences.mkString(", ")
257+
throw new AnalysisException(s"Reference '$name' is ambiguous, could be: $referenceNames.")
266258
}
267259
}
268260
}

0 commit comments

Comments
 (0)