Skip to content

Commit 3652997

Browse files
committed
[SPARK-4943][SQL] Allow table name having dot to support db/catalog ...
1 parent d345ebe commit 3652997

File tree

17 files changed

+137
-116
lines changed

17 files changed

+137
-116
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,10 +178,23 @@ class SqlParser extends AbstractSparkSQLParser {
178178
joinedRelation | relationFactor
179179

180180
protected lazy val relationFactor: Parser[LogicalPlan] =
181-
( ident ~ (opt(AS) ~> opt(ident)) ^^ {
182-
case tableName ~ alias => UnresolvedRelation(None, tableName, alias)
181+
(
182+
ident ~ ("." ~> ident) ~ ("." ~> ident) ~ ("." ~> ident) ~ (opt(AS) ~> opt(ident)) ^^ {
183+
case reserveName1 ~ reserveName2 ~ dbName ~ tableName ~ alias =>
184+
UnresolvedRelation(IndexedSeq(tableName, dbName, reserveName2, reserveName1), alias)
183185
}
184-
| ("(" ~> start <~ ")") ~ (AS.? ~> ident) ^^ { case s ~ a => Subquery(a, s) }
186+
| ident ~ ("." ~> ident) ~ ("." ~> ident) ~ (opt(AS) ~> opt(ident)) ^^ {
187+
case reserveName1 ~ dbName ~ tableName ~ alias =>
188+
UnresolvedRelation(IndexedSeq(tableName, dbName, reserveName1), alias)
189+
}
190+
| ident ~ ("." ~> ident) ~ (opt(AS) ~> opt(ident)) ^^ {
191+
case dbName ~ tableName ~ alias =>
192+
UnresolvedRelation(IndexedSeq(tableName, dbName), alias)
193+
}
194+
| ident ~ (opt(AS) ~> opt(ident)) ^^ {
195+
case tableName ~ alias => UnresolvedRelation(IndexedSeq(tableName), alias)
196+
}
197+
| ("(" ~> start <~ ")") ~ (AS.? ~> ident) ^^ { case s ~ a => Subquery(a, s) }
185198
)
186199

187200
protected lazy val joinedRelation: Parser[LogicalPlan] =

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,11 @@ class Analyzer(catalog: Catalog,
228228
*/
229229
object ResolveRelations extends Rule[LogicalPlan] {
230230
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
231-
case i @ InsertIntoTable(UnresolvedRelation(databaseName, name, alias), _, _, _) =>
231+
case i @ InsertIntoTable(UnresolvedRelation(tableIdentifier, alias), _, _, _) =>
232232
i.copy(
233-
table = EliminateAnalysisOperators(catalog.lookupRelation(databaseName, name, alias)))
234-
case UnresolvedRelation(databaseName, name, alias) =>
235-
catalog.lookupRelation(databaseName, name, alias)
233+
table = EliminateAnalysisOperators(catalog.lookupRelation(tableIdentifier, alias)))
234+
case UnresolvedRelation(tableIdentifier, alias) =>
235+
catalog.lookupRelation(tableIdentifier, alias)
236236
}
237237
}
238238

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala

Lines changed: 43 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -28,77 +28,63 @@ trait Catalog {
2828

2929
def caseSensitive: Boolean
3030

31-
def tableExists(db: Option[String], tableName: String): Boolean
31+
def tableExists(tableIdentifier: Seq[String]): Boolean
3232

3333
def lookupRelation(
34-
databaseName: Option[String],
35-
tableName: String,
36-
alias: Option[String] = None): LogicalPlan
34+
tableIdentifier: Seq[String],
35+
alias: Option[String] = None): LogicalPlan
3736

38-
def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit
37+
def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit
3938

40-
def unregisterTable(databaseName: Option[String], tableName: String): Unit
39+
def unregisterTable(tableIdentifier: Seq[String]): Unit
4140

4241
def unregisterAllTables(): Unit
4342

44-
protected def processDatabaseAndTableName(
45-
databaseName: Option[String],
46-
tableName: String): (Option[String], String) = {
43+
protected def processTableIdentifier(tableIdentifier: Seq[String]):
44+
Seq[String] = {
4745
if (!caseSensitive) {
48-
(databaseName.map(_.toLowerCase), tableName.toLowerCase)
46+
tableIdentifier.map(_.toLowerCase)
4947
} else {
50-
(databaseName, tableName)
48+
tableIdentifier
5149
}
5250
}
5351

54-
protected def processDatabaseAndTableName(
55-
databaseName: String,
56-
tableName: String): (String, String) = {
57-
if (!caseSensitive) {
58-
(databaseName.toLowerCase, tableName.toLowerCase)
59-
} else {
60-
(databaseName, tableName)
61-
}
62-
}
6352
}
6453

6554
class SimpleCatalog(val caseSensitive: Boolean) extends Catalog {
6655
val tables = new mutable.HashMap[String, LogicalPlan]()
6756

6857
override def registerTable(
69-
databaseName: Option[String],
70-
tableName: String,
58+
tableIdentifier: Seq[String],
7159
plan: LogicalPlan): Unit = {
72-
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
73-
tables += ((tblName, plan))
60+
val tableIdent = processTableIdentifier(tableIdentifier)
61+
tables += ((tableIdent.mkString("."), plan))
7462
}
7563

76-
override def unregisterTable(
77-
databaseName: Option[String],
78-
tableName: String) = {
79-
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
80-
tables -= tblName
64+
override def unregisterTable(tableIdentifier: Seq[String]) = {
65+
val tableIdent = processTableIdentifier(tableIdentifier)
66+
tables -= tableIdent.mkString(".")
8167
}
8268

8369
override def unregisterAllTables() = {
8470
tables.clear()
8571
}
8672

87-
override def tableExists(db: Option[String], tableName: String): Boolean = {
88-
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
89-
tables.get(tblName) match {
73+
override def tableExists(tableIdentifier: Seq[String]): Boolean = {
74+
val tableIdent = processTableIdentifier(tableIdentifier)
75+
tables.get(tableIdent.mkString(".")) match {
9076
case Some(_) => true
9177
case None => false
9278
}
9379
}
9480

9581
override def lookupRelation(
96-
databaseName: Option[String],
97-
tableName: String,
82+
tableIdentifier: Seq[String],
9883
alias: Option[String] = None): LogicalPlan = {
99-
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
100-
val table = tables.getOrElse(tblName, sys.error(s"Table Not Found: $tableName"))
101-
val tableWithQualifiers = Subquery(tblName, table)
84+
val tableIdent = processTableIdentifier(tableIdentifier)
85+
val tableFullName = tableIdent.mkString(".")
86+
val table = tables.getOrElse(tableFullName, sys.error(s"Table Not Found: $tableFullName"))
87+
val tableWithQualifiers = Subquery(tableIdent.head, table)
10288

10389
// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
10490
// properly qualified with this alias.
@@ -115,43 +101,41 @@ class SimpleCatalog(val caseSensitive: Boolean) extends Catalog {
115101
trait OverrideCatalog extends Catalog {
116102

117103
// TODO: This doesn't work when the database changes...
118-
val overrides = new mutable.HashMap[(Option[String],String), LogicalPlan]()
104+
val overrides = new mutable.HashMap[String, LogicalPlan]()
119105

120-
abstract override def tableExists(db: Option[String], tableName: String): Boolean = {
121-
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
122-
overrides.get((dbName, tblName)) match {
106+
abstract override def tableExists(tableIdentifier: Seq[String]): Boolean = {
107+
val tableIdent = processTableIdentifier(tableIdentifier).mkString(".")
108+
overrides.get(tableIdent) match {
123109
case Some(_) => true
124-
case None => super.tableExists(db, tableName)
110+
case None => super.tableExists(tableIdentifier)
125111
}
126112
}
127113

128114
abstract override def lookupRelation(
129-
databaseName: Option[String],
130-
tableName: String,
115+
tableIdentifier: Seq[String],
131116
alias: Option[String] = None): LogicalPlan = {
132-
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
133-
val overriddenTable = overrides.get((dbName, tblName))
134-
val tableWithQualifers = overriddenTable.map(r => Subquery(tblName, r))
117+
val tableIdent = processTableIdentifier(tableIdentifier)
118+
val overriddenTable = overrides.get(tableIdent.mkString("."))
119+
val tableWithQualifers = overriddenTable.map(r => Subquery(tableIdent.head, r))
135120

136121
// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
137122
// properly qualified with this alias.
138123
val withAlias =
139124
tableWithQualifers.map(r => alias.map(a => Subquery(a, r)).getOrElse(r))
140125

141-
withAlias.getOrElse(super.lookupRelation(dbName, tblName, alias))
126+
withAlias.getOrElse(super.lookupRelation(tableIdentifier, alias))
142127
}
143128

144129
override def registerTable(
145-
databaseName: Option[String],
146-
tableName: String,
130+
tableIdentifier: Seq[String],
147131
plan: LogicalPlan): Unit = {
148-
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
149-
overrides.put((dbName, tblName), plan)
132+
val tableIdent = processTableIdentifier(tableIdentifier).mkString(".")
133+
overrides.put(tableIdent, plan)
150134
}
151135

152-
override def unregisterTable(databaseName: Option[String], tableName: String): Unit = {
153-
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
154-
overrides.remove((dbName, tblName))
136+
override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
137+
val tableIdent = processTableIdentifier(tableIdentifier).mkString(".")
138+
overrides.remove(tableIdent)
155139
}
156140

157141
override def unregisterAllTables(): Unit = {
@@ -167,22 +151,21 @@ object EmptyCatalog extends Catalog {
167151

168152
val caseSensitive: Boolean = true
169153

170-
def tableExists(db: Option[String], tableName: String): Boolean = {
154+
def tableExists(tableIdentifier: Seq[String]): Boolean = {
171155
throw new UnsupportedOperationException
172156
}
173157

174158
def lookupRelation(
175-
databaseName: Option[String],
176-
tableName: String,
159+
tableIdentifier: Seq[String],
177160
alias: Option[String] = None) = {
178161
throw new UnsupportedOperationException
179162
}
180163

181-
def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit = {
164+
def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = {
182165
throw new UnsupportedOperationException
183166
}
184167

185-
def unregisterTable(databaseName: Option[String], tableName: String): Unit = {
168+
def unregisterTable(tableIdentifier: Seq[String]): Unit = {
186169
throw new UnsupportedOperationException
187170
}
188171

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
3434
* Holds the name of a relation that has yet to be looked up in a [[Catalog]].
3535
*/
3636
case class UnresolvedRelation(
37-
databaseName: Option[String],
38-
tableName: String,
37+
tableIdentifier: Seq[String],
3938
alias: Option[String] = None) extends LeafNode {
4039
override def output = Nil
4140
override lazy val resolved = false

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ package object dsl {
290290

291291
def insertInto(tableName: String, overwrite: Boolean = false) =
292292
InsertIntoTable(
293-
analysis.UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite)
293+
analysis.UnresolvedRelation(IndexedSeq(tableName)), Map.empty, logicalPlan, overwrite)
294294

295295
def analyze = analysis.SimpleAnalyzer(logicalPlan)
296296
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
4444
AttributeReference("e", ShortType)())
4545

4646
before {
47-
caseSensitiveCatalog.registerTable(None, "TaBlE", testRelation)
48-
caseInsensitiveCatalog.registerTable(None, "TaBlE", testRelation)
47+
caseSensitiveCatalog.registerTable(IndexedSeq("TaBlE"), testRelation)
48+
caseInsensitiveCatalog.registerTable(IndexedSeq("TaBlE"), testRelation)
4949
}
5050

5151
test("union project *") {
@@ -64,45 +64,45 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
6464
assert(
6565
caseSensitiveAnalyze(
6666
Project(Seq(UnresolvedAttribute("TbL.a")),
67-
UnresolvedRelation(None, "TaBlE", Some("TbL")))) ===
67+
UnresolvedRelation(IndexedSeq("TaBlE"), Some("TbL")))) ===
6868
Project(testRelation.output, testRelation))
6969

7070
val e = intercept[TreeNodeException[_]] {
7171
caseSensitiveAnalyze(
7272
Project(Seq(UnresolvedAttribute("tBl.a")),
73-
UnresolvedRelation(None, "TaBlE", Some("TbL"))))
73+
UnresolvedRelation(IndexedSeq("TaBlE"), Some("TbL"))))
7474
}
7575
assert(e.getMessage().toLowerCase.contains("unresolved"))
7676

7777
assert(
7878
caseInsensitiveAnalyze(
7979
Project(Seq(UnresolvedAttribute("TbL.a")),
80-
UnresolvedRelation(None, "TaBlE", Some("TbL")))) ===
80+
UnresolvedRelation(IndexedSeq("TaBlE"), Some("TbL")))) ===
8181
Project(testRelation.output, testRelation))
8282

8383
assert(
8484
caseInsensitiveAnalyze(
8585
Project(Seq(UnresolvedAttribute("tBl.a")),
86-
UnresolvedRelation(None, "TaBlE", Some("TbL")))) ===
86+
UnresolvedRelation(IndexedSeq("TaBlE"), Some("TbL")))) ===
8787
Project(testRelation.output, testRelation))
8888
}
8989

9090
test("resolve relations") {
9191
val e = intercept[RuntimeException] {
92-
caseSensitiveAnalyze(UnresolvedRelation(None, "tAbLe", None))
92+
caseSensitiveAnalyze(UnresolvedRelation(IndexedSeq("tAbLe"), None))
9393
}
9494
assert(e.getMessage == "Table Not Found: tAbLe")
9595

9696
assert(
97-
caseSensitiveAnalyze(UnresolvedRelation(None, "TaBlE", None)) ===
97+
caseSensitiveAnalyze(UnresolvedRelation(IndexedSeq("TaBlE"), None)) ===
9898
testRelation)
9999

100100
assert(
101-
caseInsensitiveAnalyze(UnresolvedRelation(None, "tAbLe", None)) ===
101+
caseInsensitiveAnalyze(UnresolvedRelation(IndexedSeq("tAbLe"), None)) ===
102102
testRelation)
103103

104104
assert(
105-
caseInsensitiveAnalyze(UnresolvedRelation(None, "TaBlE", None)) ===
105+
caseInsensitiveAnalyze(UnresolvedRelation(IndexedSeq("TaBlE"), None)) ===
106106
testRelation)
107107
}
108108

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class DecimalPrecisionSuite extends FunSuite with BeforeAndAfter {
4141
val f: Expression = UnresolvedAttribute("f")
4242

4343
before {
44-
catalog.registerTable(None, "table", relation)
44+
catalog.registerTable(IndexedSeq("table"), relation)
4545
}
4646

4747
private def checkType(expression: Expression, expectedType: DataType): Unit = {

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
276276
* @group userf
277277
*/
278278
def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = {
279-
catalog.registerTable(None, tableName, rdd.queryExecution.logical)
279+
catalog.registerTable(IndexedSeq(tableName), rdd.queryExecution.logical)
280280
}
281281

282282
/**
@@ -289,7 +289,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
289289
*/
290290
def dropTempTable(tableName: String): Unit = {
291291
tryUncacheQuery(table(tableName))
292-
catalog.unregisterTable(None, tableName)
292+
catalog.unregisterTable(IndexedSeq(tableName))
293293
}
294294

295295
/**
@@ -308,7 +308,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
308308

309309
/** Returns the specified table as a SchemaRDD */
310310
def table(tableName: String): SchemaRDD =
311-
new SchemaRDD(this, catalog.lookupRelation(None, tableName))
311+
new SchemaRDD(this, catalog.lookupRelation(IndexedSeq(tableName)))
312312

313313
/**
314314
* :: DeveloperApi ::

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ private[sql] trait SchemaRDDLike {
9797
*/
9898
@Experimental
9999
def insertInto(tableName: String, overwrite: Boolean): Unit =
100-
sqlContext.executePlan(
101-
InsertIntoTable(UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite)).toRdd
100+
sqlContext.executePlan(InsertIntoTable(UnresolvedRelation(IndexedSeq(tableName)),
101+
Map.empty, logicalPlan, overwrite)).toRdd
102102

103103
/**
104104
* :: Experimental ::

sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,8 +302,8 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
302302
upperCaseData.where('N <= 4).registerTempTable("left")
303303
upperCaseData.where('N >= 3).registerTempTable("right")
304304

305-
val left = UnresolvedRelation(None, "left", None)
306-
val right = UnresolvedRelation(None, "right", None)
305+
val left = UnresolvedRelation(IndexedSeq("left"), None)
306+
val right = UnresolvedRelation(IndexedSeq("right"), None)
307307

308308
checkAnswer(
309309
left.join(right, FullOuter, Some("left.N".attr === "right.N".attr)),

0 commit comments

Comments
 (0)