Skip to content

Commit fc9fafe

Browse files
hvanhovellmarkhamstra
authored andcommitted
[SPARK-16406][SQL] Improve performance of LogicalPlan.resolve
## What changes were proposed in this pull request? `LogicalPlan.resolve(...)` uses linear searches to find an attribute matching a name. This is fine in normal cases, but gets problematic when you try to resolve a large number of columns on a plan with a large number of attributes. This PR adds an indexing structure to `resolve(...)` in order to find potential matches quicker. This PR improves the reference resolution time for the following code by 4x (11.8s -> 2.4s): ``` scala val n = 4000 val values = (1 to n).map(_.toString).mkString(", ") val columns = (1 to n).map("column" + _).mkString(", ") val query = s""" |SELECT $columns |FROM VALUES ($values) T($columns) |WHERE 1=2 AND 1 IN ($columns) |GROUP BY $columns |ORDER BY $columns |""".stripMargin spark.time(sql(query)) ``` ## How was this patch tested? Existing tests. Author: Herman van Hovell <[email protected]> Closes apache#14083 from hvanhovell/SPARK-16406.
1 parent 9d03457 commit fc9fafe

File tree

2 files changed

+93
-101
lines changed

2 files changed

+93
-101
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,12 @@
1717

1818
package org.apache.spark.sql.catalyst
1919

20+
import java.util.Locale
21+
2022
import com.google.common.collect.Maps
2123

24+
import org.apache.spark.sql.AnalysisException
25+
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute}
2226
import org.apache.spark.sql.catalyst.expressions._
2327
import org.apache.spark.sql.types.{StructField, StructType}
2428

@@ -138,6 +142,88 @@ package object expressions {
138142
def indexOf(exprId: ExprId): Int = {
139143
Option(exprIdToOrdinal.get(exprId)).getOrElse(-1)
140144
}
145+
146+
private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, Seq[Attribute]] = {
147+
m.mapValues(_.distinct).map(identity)
148+
}
149+
150+
/** Map to use for direct case insensitive attribute lookups. */
151+
@transient private lazy val direct: Map[String, Seq[Attribute]] = {
152+
unique(attrs.groupBy(_.name.toLowerCase(Locale.ROOT)))
153+
}
154+
155+
/** Map to use for qualified case insensitive attribute lookups. */
156+
@transient private val qualified: Map[(String, String), Seq[Attribute]] = {
157+
val grouped = attrs.filter(_.qualifier.isDefined).groupBy { a =>
158+
(a.qualifier.get.toLowerCase(Locale.ROOT), a.name.toLowerCase(Locale.ROOT))
159+
}
160+
unique(grouped)
161+
}
162+
163+
/** Perform attribute resolution given a name and a resolver. */
164+
def resolve(nameParts: Seq[String], resolver: Resolver): Option[NamedExpression] = {
165+
// Collect matching attributes given a name and a lookup.
166+
def collectMatches(name: String, candidates: Option[Seq[Attribute]]): Seq[Attribute] = {
167+
candidates.toSeq.flatMap(_.collect {
168+
case a if resolver(a.name, name) => a.withName(name)
169+
})
170+
}
171+
172+
// Find matches for the given name assuming that the 1st part is a qualifier (i.e. table name,
173+
// alias, or subquery alias) and the 2nd part is the actual name. This returns a tuple of
174+
// matched attributes and a list of parts that are to be resolved.
175+
//
176+
// For example, consider an example where "a" is the table name, "b" is the column name,
177+
// and "c" is the struct field name, i.e. "a.b.c". In this case, Attribute will be "a.b",
178+
// and the second element will be List("c").
179+
val matches = nameParts match {
180+
case qualifier +: name +: nestedFields =>
181+
val key = (qualifier.toLowerCase(Locale.ROOT), name.toLowerCase(Locale.ROOT))
182+
val attributes = collectMatches(name, qualified.get(key)).filter { a =>
183+
resolver(qualifier, a.qualifier.get)
184+
}
185+
(attributes, nestedFields)
186+
case all =>
187+
(Nil, all)
188+
}
189+
190+
// If none of attributes match `table.column` pattern, we try to resolve it as a column.
191+
val (candidates, nestedFields) = matches match {
192+
case (Seq(), _) =>
193+
val name = nameParts.head
194+
val attributes = collectMatches(name, direct.get(name.toLowerCase(Locale.ROOT)))
195+
(attributes, nameParts.tail)
196+
case _ => matches
197+
}
198+
199+
def name = UnresolvedAttribute(nameParts).name
200+
candidates match {
201+
case Seq(a) if nestedFields.nonEmpty =>
202+
// One match, but we also need to extract the requested nested field.
203+
// The foldLeft adds ExtractValues for every remaining parts of the identifier,
204+
// and aliased it with the last part of the name.
205+
// For example, consider "a.b.c", where "a" is resolved to an existing attribute.
206+
// Then this will add ExtractValue("c", ExtractValue("b", a)), and alias the final
207+
// expression as "c".
208+
val fieldExprs = nestedFields.foldLeft(a: Expression) { (e, name) =>
209+
ExtractValue(e, Literal(name), resolver)
210+
}
211+
Some(Alias(fieldExprs, nestedFields.last)())
212+
213+
case Seq(a) =>
214+
// One match, no nested fields, use it.
215+
Some(a)
216+
217+
case Seq() =>
218+
// No matches.
219+
None
220+
221+
case ambiguousReferences =>
222+
// More than one match.
223+
val referenceNames = ambiguousReferences.map(_.qualifiedName).mkString(", ")
224+
throw new AnalysisException(s"Reference '$name' is ambiguous, could be: $referenceNames.")
225+
}
226+
}
141227
}
142228

143229
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 7 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ abstract class LogicalPlan
8686
}
8787
}
8888

89+
private[this] lazy val childAttributes = AttributeSeq(children.flatMap(_.output))
90+
91+
private[this] lazy val outputAttributes = AttributeSeq(output)
92+
8993
/**
9094
* Optionally resolves the given strings to a [[NamedExpression]] using the input from all child
9195
* nodes of this LogicalPlan. The attribute is expressed as
@@ -94,7 +98,7 @@ abstract class LogicalPlan
9498
def resolveChildren(
9599
nameParts: Seq[String],
96100
resolver: Resolver): Option[NamedExpression] =
97-
resolve(nameParts, children.flatMap(_.output), resolver)
101+
childAttributes.resolve(nameParts, resolver)
98102

99103
/**
100104
* Optionally resolves the given strings to a [[NamedExpression]] based on the output of this
@@ -104,7 +108,7 @@ abstract class LogicalPlan
104108
def resolve(
105109
nameParts: Seq[String],
106110
resolver: Resolver): Option[NamedExpression] =
107-
resolve(nameParts, output, resolver)
111+
outputAttributes.resolve(nameParts, resolver)
108112

109113
/**
110114
* Given an attribute name, split it to name parts by dot, but
@@ -114,105 +118,7 @@ abstract class LogicalPlan
114118
def resolveQuoted(
115119
name: String,
116120
resolver: Resolver): Option[NamedExpression] = {
117-
resolve(UnresolvedAttribute.parseAttributeName(name), output, resolver)
118-
}
119-
120-
/**
121-
* Resolve the given `name` string against the given attribute, returning either 0 or 1 match.
122-
*
123-
* This assumes `name` has multiple parts, where the 1st part is a qualifier
124-
* (i.e. table name, alias, or subquery alias).
125-
* See the comment above `candidates` variable in resolve() for semantics the returned data.
126-
*/
127-
private def resolveAsTableColumn(
128-
nameParts: Seq[String],
129-
resolver: Resolver,
130-
attribute: Attribute): Option[(Attribute, List[String])] = {
131-
assert(nameParts.length > 1)
132-
if (attribute.qualifier.exists(resolver(_, nameParts.head))) {
133-
// At least one qualifier matches. See if remaining parts match.
134-
val remainingParts = nameParts.tail
135-
resolveAsColumn(remainingParts, resolver, attribute)
136-
} else {
137-
None
138-
}
139-
}
140-
141-
/**
142-
* Resolve the given `name` string against the given attribute, returning either 0 or 1 match.
143-
*
144-
* Different from resolveAsTableColumn, this assumes `name` does NOT start with a qualifier.
145-
* See the comment above `candidates` variable in resolve() for semantics the returned data.
146-
*/
147-
private def resolveAsColumn(
148-
nameParts: Seq[String],
149-
resolver: Resolver,
150-
attribute: Attribute): Option[(Attribute, List[String])] = {
151-
if (resolver(attribute.name, nameParts.head)) {
152-
Option((attribute.withName(nameParts.head), nameParts.tail.toList))
153-
} else {
154-
None
155-
}
156-
}
157-
158-
/** Performs attribute resolution given a name and a sequence of possible attributes. */
159-
protected def resolve(
160-
nameParts: Seq[String],
161-
input: Seq[Attribute],
162-
resolver: Resolver): Option[NamedExpression] = {
163-
164-
// A sequence of possible candidate matches.
165-
// Each candidate is a tuple. The first element is a resolved attribute, followed by a list
166-
// of parts that are to be resolved.
167-
// For example, consider an example where "a" is the table name, "b" is the column name,
168-
// and "c" is the struct field name, i.e. "a.b.c". In this case, Attribute will be "a.b",
169-
// and the second element will be List("c").
170-
var candidates: Seq[(Attribute, List[String])] = {
171-
// If the name has 2 or more parts, try to resolve it as `table.column` first.
172-
if (nameParts.length > 1) {
173-
input.flatMap { option =>
174-
resolveAsTableColumn(nameParts, resolver, option)
175-
}
176-
} else {
177-
Seq.empty
178-
}
179-
}
180-
181-
// If none of attributes match `table.column` pattern, we try to resolve it as a column.
182-
if (candidates.isEmpty) {
183-
candidates = input.flatMap { candidate =>
184-
resolveAsColumn(nameParts, resolver, candidate)
185-
}
186-
}
187-
188-
def name = UnresolvedAttribute(nameParts).name
189-
190-
candidates.distinct match {
191-
// One match, no nested fields, use it.
192-
case Seq((a, Nil)) => Some(a)
193-
194-
// One match, but we also need to extract the requested nested field.
195-
case Seq((a, nestedFields)) =>
196-
// The foldLeft adds ExtractValues for every remaining parts of the identifier,
197-
// and aliased it with the last part of the name.
198-
// For example, consider "a.b.c", where "a" is resolved to an existing attribute.
199-
// Then this will add ExtractValue("c", ExtractValue("b", a)), and alias the final
200-
// expression as "c".
201-
val fieldExprs = nestedFields.foldLeft(a: Expression)((expr, fieldName) =>
202-
ExtractValue(expr, Literal(fieldName), resolver))
203-
Some(Alias(fieldExprs, nestedFields.last)())
204-
205-
// No matches.
206-
case Seq() =>
207-
logTrace(s"Could not find $name in ${input.mkString(", ")}")
208-
None
209-
210-
// More than one match.
211-
case ambiguousReferences =>
212-
val referenceNames = ambiguousReferences.map(_._1.qualifiedName).mkString(", ")
213-
throw new AnalysisException(
214-
s"Reference '$name' is ambiguous, could be: $referenceNames.")
215-
}
121+
outputAttributes.resolve(UnresolvedAttribute.parseAttributeName(name), resolver)
216122
}
217123

218124
/**

0 commit comments

Comments
 (0)