Skip to content

Commit a2b9137

Browse files
committed
[SPARK-5952][SQL] Lock when using hive metastore client
Author: Michael Armbrust <[email protected]> Closes #4746 from marmbrus/hiveLock and squashes the following commits: 8b871cf [Michael Armbrust] [SPARK-5952][SQL] Lock when using hive metastore client
1 parent c5ba975 commit a2b9137

File tree

1 file changed

+12
-6
lines changed

1 file changed

+12
-6
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
5252
/** Connection to hive metastore. Usages should lock on `this`. */
5353
protected[hive] val client = Hive.get(hive.hiveconf)
5454

55+
/** Usages should lock on `this`. */
5556
protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf)
5657

5758
// TODO: Use this everywhere instead of tuples or databaseName, tableName,.
@@ -65,7 +66,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
6566
val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
6667
override def load(in: QualifiedTableName): LogicalPlan = {
6768
logDebug(s"Creating new cached data source for $in")
68-
val table = client.getTable(in.database, in.name)
69+
val table = synchronized {
70+
client.getTable(in.database, in.name)
71+
}
6972
val schemaString = table.getProperty("spark.sql.sources.schema")
7073
val userSpecifiedSchema =
7174
if (schemaString == null) {
@@ -134,15 +137,18 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
134137
}
135138
}
136139

137-
def hiveDefaultTableFilePath(tableName: String): String = {
140+
def hiveDefaultTableFilePath(tableName: String): String = synchronized {
138141
val currentDatabase = client.getDatabase(hive.sessionState.getCurrentDatabase)
142+
139143
hiveWarehouse.getTablePath(currentDatabase, tableName).toString
140144
}
141145

142-
def tableExists(tableIdentifier: Seq[String]): Boolean = {
146+
def tableExists(tableIdentifier: Seq[String]): Boolean = synchronized {
143147
val tableIdent = processTableIdentifier(tableIdentifier)
144-
val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
145-
hive.sessionState.getCurrentDatabase)
148+
val databaseName =
149+
tableIdent
150+
.lift(tableIdent.size - 2)
151+
.getOrElse(hive.sessionState.getCurrentDatabase)
146152
val tblName = tableIdent.last
147153
client.getTable(databaseName, tblName, false) != null
148154
}
@@ -219,7 +225,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
219225
}
220226
}
221227

222-
override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
228+
override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = synchronized {
223229
val dbName = if (!caseSensitive) {
224230
if (databaseName.isDefined) Some(databaseName.get.toLowerCase) else None
225231
} else {

0 commit comments

Comments
 (0)