Skip to content

Commit b85b620

Browse files
fix the regression of temp tabl not found in CTAS
1 parent 7e758d7 commit b85b620

File tree

4 files changed

+44
-20
lines changed

4 files changed

+44
-20
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,22 +94,20 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
9494
* @param schema Schema of the new table, if not specified, will use the schema
9595
* specified in crtTbl
9696
* @param allowExisting if true, ignore AlreadyExistsException
97-
* @param desc CreateTableDesc object which contains the SerDe info. Currently
97+
* @param crtTbl CreateTableDesc object which contains the SerDe info. Currently
9898
* we support most of the features except the bucket.
9999
*/
100100
def createTable(
101101
databaseName: String,
102102
tableName: String,
103103
schema: Seq[Attribute],
104104
allowExisting: Boolean = false,
105-
desc: Option[CreateTableDesc] = None) {
105+
crtTbl: CreateTableDesc = null) {
106106
val hconf = hive.hiveconf
107107

108108
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
109109
val tbl = new Table(dbName, tblName)
110110

111-
val crtTbl: CreateTableDesc = desc.getOrElse(null)
112-
113111
// We should respect the passed in schema, unless it's not set
114112
val hiveSchema: JList[FieldSchema] = if (schema == null || schema.isEmpty) {
115113
crtTbl.getCols
@@ -254,15 +252,37 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
254252
* For example, because of a CREATE TABLE X AS statement.
255253
*/
256254
object CreateTables extends Rule[LogicalPlan] {
255+
import org.apache.hadoop.hive.ql.Context
256+
import org.apache.hadoop.hive.ql.parse.{QB, ASTNode, SemanticAnalyzer}
257+
257258
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
258259
// Wait until children are resolved.
259260
case p: LogicalPlan if !p.childrenResolved => p
260261

261-
case CreateTableAsSelect(db, tableName, child, allowExisting, extra) =>
262+
case p @ CreateTableAsSelect(db, tableName, child, allowExisting, Some(extra: ASTNode)) =>
262263
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
263264
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
264265

265-
CreateTableAsSelect(Some(databaseName), tableName, child, allowExisting, extra)
266+
// Get the CreateTableDesc from Hive SemanticAnalyzer
267+
val desc: Option[CreateTableDesc] = if (tableExists(Some(databaseName), tblName)) {
268+
None
269+
} else {
270+
val sa = new SemanticAnalyzer(hive.hiveconf) {
271+
override def analyzeInternal(ast: ASTNode) {
272+
// A hack to intercept the SemanticAnalyzer.analyzeInternal, ignore the query clause
273+
// for CTAS
274+
val method = classOf[SemanticAnalyzer].getDeclaredMethod(
275+
"analyzeCreateTable", classOf[ASTNode], classOf[QB])
276+
method.setAccessible(true)
277+
method.invoke(this, ast, this.getQB)
278+
}
279+
}
280+
281+
sa.analyze(extra, new Context(hive.hiveconf))
282+
Some(sa.getQB().getTableDesc)
283+
}
284+
285+
CreateTableAsSelect(Some(databaseName), tblName, child, allowExisting, desc)
266286
}
267287
}
268288

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.hive
1919

2020
import org.apache.hadoop.hive.ql.parse.ASTNode
21+
import org.apache.hadoop.hive.ql.plan.CreateTableDesc
2122

2223
import org.apache.spark.annotation.Experimental
2324
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
@@ -181,13 +182,13 @@ private[hive] trait HiveStrategies {
181182
execution.InsertIntoHiveTable(
182183
table, partition, planLater(child), overwrite)(hiveContext) :: Nil
183184
case logical.CreateTableAsSelect(
184-
Some(database), tableName, child, allowExisting, Some(extra: ASTNode)) =>
185-
CreateTableAsSelect(
185+
Some(database), tableName, child, allowExisting, desc: Option[CreateTableDesc]) =>
186+
execution.CreateTableAsSelect(
186187
database,
187188
tableName,
188189
child,
189190
allowExisting,
190-
extra) :: Nil
191+
desc) :: Nil
191192
case _ => Nil
192193
}
193194
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
package org.apache.spark.sql.hive.execution
1919

20-
import org.apache.hadoop.hive.ql.Context
21-
import org.apache.hadoop.hive.ql.parse.{SemanticAnalyzer, ASTNode}
20+
import org.apache.hadoop.hive.ql.plan.CreateTableDesc
21+
2222
import org.apache.spark.annotation.Experimental
2323
import org.apache.spark.rdd.RDD
2424
import org.apache.spark.sql.catalyst.expressions.Row
@@ -35,8 +35,7 @@ import org.apache.spark.sql.hive.MetastoreRelation
3535
* @param query the query whose result will be insert into the new relation
3636
* @param allowExisting allow continue working if it's already exists, otherwise
3737
* raise exception
38-
* @param extra the extra information for this Operator, it should be the
39-
* ASTNode object for extracting the CreateTableDesc.
38+
* @param desc the CreateTableDesc, which may contains serde, storage handler etc.
4039
4140
*/
4241
@Experimental
@@ -45,21 +44,16 @@ case class CreateTableAsSelect(
4544
tableName: String,
4645
query: LogicalPlan,
4746
allowExisting: Boolean,
48-
extra: ASTNode) extends LeafNode with Command {
47+
desc: Option[CreateTableDesc]) extends LeafNode with Command {
4948

5049
def output = Seq.empty
5150

5251
private[this] def sc = sqlContext.asInstanceOf[HiveContext]
5352

5453
// A lazy computing of the metastoreRelation
5554
private[this] lazy val metastoreRelation: MetastoreRelation = {
56-
// Get the CreateTableDesc from Hive SemanticAnalyzer
57-
val sa = new SemanticAnalyzer(sc.hiveconf)
58-
59-
sa.analyze(extra, new Context(sc.hiveconf))
60-
val desc = sa.getQB().getTableDesc
6155
// Create Hive Table
62-
sc.catalog.createTable(database, tableName, query.output, allowExisting, Some(desc))
56+
sc.catalog.createTable(database, tableName, query.output, allowExisting, desc.getOrElse(null))
6357

6458
// Get the Metastore Relation
6559
sc.catalog.lookupRelation(Some(database), tableName, None) match {

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,15 @@ class SQLQuerySuite extends QueryTest {
119119
checkAnswer(
120120
sql("SELECT f1.f2.f3 FROM nested"),
121121
1)
122+
checkAnswer(sql("CREATE TABLE test_ctas_1234 AS SELECT * from nested"),
123+
Seq.empty[Row])
124+
checkAnswer(
125+
sql("SELECT * FROM test_ctas_1234"),
126+
sql("SELECT * FROM nested").collect().toSeq)
127+
128+
intercept[org.apache.hadoop.hive.ql.metadata.InvalidTableException] {
129+
sql("CREATE TABLE test_ctas_1234 AS SELECT * from notexists").collect()
130+
}
122131
}
123132

124133
test("test CTAS") {

0 commit comments

Comments
 (0)