Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
/** Connection to hive metastore. Usages should lock on `this`. */
protected[hive] val client = Hive.get(hive.hiveconf)

/** Usages should lock on `this`. */
protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf)

// TODO: Use this everywhere instead of tuples or databaseName, tableName,.
Expand All @@ -65,7 +66,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
override def load(in: QualifiedTableName): LogicalPlan = {
logDebug(s"Creating new cached data source for $in")
val table = client.getTable(in.database, in.name)
val table = synchronized {
client.getTable(in.database, in.name)
}
val schemaString = table.getProperty("spark.sql.sources.schema")
val userSpecifiedSchema =
if (schemaString == null) {
Expand Down Expand Up @@ -134,15 +137,18 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
}

def hiveDefaultTableFilePath(tableName: String): String = {
def hiveDefaultTableFilePath(tableName: String): String = synchronized {
val currentDatabase = client.getDatabase(hive.sessionState.getCurrentDatabase)

hiveWarehouse.getTablePath(currentDatabase, tableName).toString
}

def tableExists(tableIdentifier: Seq[String]): Boolean = {
def tableExists(tableIdentifier: Seq[String]): Boolean = synchronized {
val tableIdent = processTableIdentifier(tableIdentifier)
val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
hive.sessionState.getCurrentDatabase)
val databaseName =
tableIdent
.lift(tableIdent.size - 2)
.getOrElse(hive.sessionState.getCurrentDatabase)
val tblName = tableIdent.last
client.getTable(databaseName, tblName, false) != null
}
Expand Down Expand Up @@ -219,7 +225,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
}

override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = synchronized {
val dbName = if (!caseSensitive) {
if (databaseName.isDefined) Some(databaseName.get.toLowerCase) else None
} else {
Expand Down