Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ private[hive] class HiveClientImpl(
/** Returns the configuration for the current session. */
def conf: HiveConf = state.getConf

private val userName = state.getAuthenticator.getUserName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this userName shared by all SparkSessions?

Copy link
Member Author

@gatorsmile gatorsmile Mar 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the SparkSessions share the same internal Hive SessionState. After we create it, the user name is set, if my understanding is not wrong, no matter which HiveAuthenticationProvider that users choose.

Previously, before merging the PR #17311, do we have the same issue?

cc @vanzin @dongjoon-hyun @yhuai


override def getConf(key: String, defaultValue: String): String = {
conf.get(key, defaultValue)
}
Expand Down Expand Up @@ -413,7 +415,7 @@ private[hive] class HiveClientImpl(
createTime = h.getTTable.getCreateTime.toLong * 1000,
lastAccessTime = h.getLastAccessTime.toLong * 1000,
storage = CatalogStorageFormat(
locationUri = shim.getDataLocation(h).map(CatalogUtils.stringToURI(_)),
locationUri = shim.getDataLocation(h).map(CatalogUtils.stringToURI),
// To avoid ClassNotFound exception, we try our best to not get the format class, but get
// the class name directly. However, for non-native tables, there is no interface to get
// the format class name, so we may still throw ClassNotFound in this case.
Expand Down Expand Up @@ -441,7 +443,7 @@ private[hive] class HiveClientImpl(
}

override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState {
client.createTable(toHiveTable(table, Some(conf)), ignoreIfExists)
client.createTable(toHiveTable(table, Some(userName)), ignoreIfExists)
}

override def dropTable(
Expand All @@ -453,7 +455,7 @@ private[hive] class HiveClientImpl(
}

override def alterTable(tableName: String, table: CatalogTable): Unit = withHiveState {
val hiveTable = toHiveTable(table, Some(conf))
val hiveTable = toHiveTable(table, Some(userName))
// Do not use `table.qualifiedName` here because this may be a rename
val qualifiedTableName = s"${table.database}.$tableName"
shim.alterTable(client, qualifiedTableName, hiveTable)
Expand Down Expand Up @@ -522,7 +524,7 @@ private[hive] class HiveClientImpl(
newSpecs: Seq[TablePartitionSpec]): Unit = withHiveState {
require(specs.size == newSpecs.size, "number of old and new partition specs differ")
val catalogTable = getTable(db, table)
val hiveTable = toHiveTable(catalogTable, Some(conf))
val hiveTable = toHiveTable(catalogTable, Some(userName))
specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
val hivePart = getPartitionOption(catalogTable, oldSpec)
.map { p => toHivePartition(p.copy(spec = newSpec), hiveTable) }
Expand All @@ -535,7 +537,7 @@ private[hive] class HiveClientImpl(
db: String,
table: String,
newParts: Seq[CatalogTablePartition]): Unit = withHiveState {
val hiveTable = toHiveTable(getTable(db, table), Some(conf))
val hiveTable = toHiveTable(getTable(db, table), Some(userName))
shim.alterPartitions(client, table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava)
}

Expand Down Expand Up @@ -563,7 +565,7 @@ private[hive] class HiveClientImpl(
override def getPartitionOption(
table: CatalogTable,
spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState {
val hiveTable = toHiveTable(table, Some(conf))
val hiveTable = toHiveTable(table, Some(userName))
val hivePartition = client.getPartition(hiveTable, spec.asJava, false)
Option(hivePartition).map(fromHivePartition)
}
Expand All @@ -575,7 +577,7 @@ private[hive] class HiveClientImpl(
override def getPartitions(
table: CatalogTable,
spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState {
val hiveTable = toHiveTable(table, Some(conf))
val hiveTable = toHiveTable(table, Some(userName))
val parts = spec match {
case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition)
case Some(s) =>
Expand All @@ -589,7 +591,7 @@ private[hive] class HiveClientImpl(
override def getPartitionsByFilter(
table: CatalogTable,
predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState {
val hiveTable = toHiveTable(table, Some(conf))
val hiveTable = toHiveTable(table, Some(userName))
val parts = shim.getPartitionsByFilter(client, hiveTable, predicates).map(fromHivePartition)
HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
parts
Expand Down Expand Up @@ -817,9 +819,7 @@ private[hive] object HiveClientImpl {
/**
* Converts the native table metadata representation format CatalogTable to Hive's Table.
*/
def toHiveTable(
table: CatalogTable,
conf: Option[HiveConf] = None): HiveTable = {
def toHiveTable(table: CatalogTable, userName: Option[String] = None): HiveTable = {
val hiveTable = new HiveTable(table.database, table.identifier.table)
// For EXTERNAL_TABLE, we also need to set EXTERNAL field in the table properties.
// Otherwise, Hive metastore will change the table to a MANAGED_TABLE.
Expand Down Expand Up @@ -851,10 +851,10 @@ private[hive] object HiveClientImpl {
hiveTable.setFields(schema.asJava)
}
hiveTable.setPartCols(partCols.asJava)
conf.foreach { _ => hiveTable.setOwner(SessionState.get().getAuthenticator().getUserName()) }
userName.foreach(hiveTable.setOwner)
hiveTable.setCreateTime((table.createTime / 1000).toInt)
hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt)
table.storage.locationUri.map(CatalogUtils.URIToString(_)).foreach { loc =>
table.storage.locationUri.map(CatalogUtils.URIToString).foreach { loc =>
hiveTable.getTTable.getSd.setLocation(loc)}
table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass)
table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass)
Expand Down