Skip to content

Commit 0d49fd6

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into dag-viz-sql
2 parents ffd237a + f496bf3 commit 0d49fd6

File tree

2 files changed

+61
-39
lines changed

2 files changed

+61
-39
lines changed

repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1028,7 +1028,7 @@ class SparkILoop(
10281028
logInfo("Created sql context (with Hive support)..")
10291029
}
10301030
catch {
1031-
case cnf: java.lang.ClassNotFoundException =>
1031+
case _: java.lang.ClassNotFoundException | _: java.lang.NoClassDefFoundError =>
10321032
sqlContext = new SQLContext(sparkContext)
10331033
logInfo("Created sql context..")
10341034
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 60 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ class Analyzer(
5555
val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Nil
5656

5757
lazy val batches: Seq[Batch] = Seq(
58+
Batch("Substitution", fixedPoint,
59+
CTESubstitution ::
60+
WindowsSubstitution ::
61+
Nil : _*),
5862
Batch("Resolution", fixedPoint,
5963
ResolveRelations ::
6064
ResolveReferences ::
@@ -71,6 +75,55 @@ class Analyzer(
7175
extendedResolutionRules : _*)
7276
)
7377

78+
/**
79+
* Substitute child plan with cte definitions
80+
*/
81+
object CTESubstitution extends Rule[LogicalPlan] {
82+
// TODO allow subquery to define CTE
83+
def apply(plan: LogicalPlan): LogicalPlan = plan match {
84+
case With(child, relations) => substituteCTE(child, relations)
85+
case other => other
86+
}
87+
88+
def substituteCTE(plan: LogicalPlan, cteRelations: Map[String, LogicalPlan]): LogicalPlan = {
89+
plan transform {
90+
// In hive, if there is same table name in database and CTE definition,
91+
// hive will use the table in database, not the CTE one.
92+
// Taking into account the reasonableness and the implementation complexity,
93+
// here use the CTE definition first, check table name only and ignore database name
94+
// see https://github.com/apache/spark/pull/4929#discussion_r27186638 for more info
95+
case u : UnresolvedRelation =>
96+
val substituted = cteRelations.get(u.tableIdentifier.last).map { relation =>
97+
val withAlias = u.alias.map(Subquery(_, relation))
98+
withAlias.getOrElse(relation)
99+
}
100+
substituted.getOrElse(u)
101+
}
102+
}
103+
}
104+
105+
/**
106+
* Substitute child plan with WindowSpecDefinitions.
107+
*/
108+
object WindowsSubstitution extends Rule[LogicalPlan] {
109+
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
110+
// Lookup WindowSpecDefinitions. This rule works with unresolved children.
111+
case WithWindowDefinition(windowDefinitions, child) =>
112+
child.transform {
113+
case plan => plan.transformExpressions {
114+
case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) =>
115+
val errorMessage =
116+
s"Window specification $windowName is not defined in the WINDOW clause."
117+
val windowSpecDefinition =
118+
windowDefinitions
119+
.get(windowName)
120+
.getOrElse(failAnalysis(errorMessage))
121+
WindowExpression(c, windowSpecDefinition)
122+
}
123+
}
124+
}
125+
}
126+
74127
/**
75128
* Removes no-op Alias expressions from the plan.
76129
*/
@@ -172,36 +225,20 @@ class Analyzer(
172225
* Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
173226
*/
174227
object ResolveRelations extends Rule[LogicalPlan] {
175-
def getTable(u: UnresolvedRelation, cteRelations: Map[String, LogicalPlan]): LogicalPlan = {
228+
def getTable(u: UnresolvedRelation): LogicalPlan = {
176229
try {
177-
// In hive, if there is same table name in database and CTE definition,
178-
// hive will use the table in database, not the CTE one.
179-
// Taking into account the reasonableness and the implementation complexity,
180-
// here use the CTE definition first, check table name only and ignore database name
181-
cteRelations.get(u.tableIdentifier.last)
182-
.map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation))
183-
.getOrElse(catalog.lookupRelation(u.tableIdentifier, u.alias))
230+
catalog.lookupRelation(u.tableIdentifier, u.alias)
184231
} catch {
185232
case _: NoSuchTableException =>
186233
u.failAnalysis(s"no such table ${u.tableName}")
187234
}
188235
}
189236

190-
def apply(plan: LogicalPlan): LogicalPlan = {
191-
val (realPlan, cteRelations) = plan match {
192-
// TODO allow subquery to define CTE
193-
// Add cte table to a temp relation map,drop `with` plan and keep its child
194-
case With(child, relations) => (child, relations)
195-
case other => (other, Map.empty[String, LogicalPlan])
196-
}
197-
198-
realPlan transform {
199-
case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
200-
i.copy(
201-
table = EliminateSubQueries(getTable(u, cteRelations)))
202-
case u: UnresolvedRelation =>
203-
getTable(u, cteRelations)
204-
}
237+
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
238+
case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
239+
i.copy(table = EliminateSubQueries(getTable(u)))
240+
case u: UnresolvedRelation =>
241+
getTable(u)
205242
}
206243
}
207244

@@ -664,21 +701,6 @@ class Analyzer(
664701
// We have to use transformDown at here to make sure the rule of
665702
// "Aggregate with Having clause" will be triggered.
666703
def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
667-
// Lookup WindowSpecDefinitions. This rule works with unresolved children.
668-
case WithWindowDefinition(windowDefinitions, child) =>
669-
child.transform {
670-
case plan => plan.transformExpressions {
671-
case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) =>
672-
val errorMessage =
673-
s"Window specification $windowName is not defined in the WINDOW clause."
674-
val windowSpecDefinition =
675-
windowDefinitions
676-
.get(windowName)
677-
.getOrElse(failAnalysis(errorMessage))
678-
WindowExpression(c, windowSpecDefinition)
679-
}
680-
}
681-
682704
// Aggregate with Having clause. This rule works with an unresolved Aggregate because
683705
// a resolved Aggregate will not have Window Functions.
684706
case f @ Filter(condition, a @ Aggregate(groupingExprs, aggregateExprs, child))

0 commit comments

Comments
 (0)