Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.log4j.{Level, Logger}
import org.apache.thrift.transport.TSocket

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.hive.HiveUtils
Expand Down Expand Up @@ -81,11 +83,17 @@ private[hive] object SparkSQLCLIDriver extends Logging {
System.exit(1)
}

val sparkConf = new SparkConf(loadDefaults = true)
val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
val extraConfigs = HiveUtils.formatTimeVarsForHiveClient(hadoopConf)

val cliConf = new HiveConf(classOf[SessionState])
// Override the location of the metastore since this is only used for local execution.
HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach {
case (key, value) => cliConf.set(key, value)
(hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue)
++ sparkConf.getAll.toMap ++ extraConfigs).foreach {
case (k, v) =>
cliConf.set(k, v)
}

val sessionState = new CliSessionState(cliConf)

sessionState.in = System.in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ private[spark] object HiveUtils extends Logging {
}

/**
* Configurations needed to create a [[HiveClient]].
* Change time configurations needed to create a [[HiveClient]] into unified [[Long]] format.
*/
private[hive] def hiveClientConfigurations(hadoopConf: Configuration): Map[String, String] = {
private[hive] def formatTimeVarsForHiveClient(hadoopConf: Configuration): Map[String, String] = {
// Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch
// of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.). This breaks backwards-
// compatibility when users are trying to connecting to a Hive metastore of lower version,
Expand Down Expand Up @@ -280,7 +280,7 @@ private[spark] object HiveUtils extends Logging {
protected[hive] def newClientForMetadata(
conf: SparkConf,
hadoopConf: Configuration): HiveClient = {
val configurations = hiveClientConfigurations(hadoopConf)
val configurations = formatTimeVarsForHiveClient(hadoopConf)
newClientForMetadata(conf, hadoopConf, configurations)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Order}
import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor}
Expand Down Expand Up @@ -132,14 +133,24 @@ private[hive] class HiveClientImpl(
// in hive jars, which will turn off isolation, if SessionSate.detachSession is
// called to remove the current state after that, hive client created later will initialize
// its own state by newState()
Option(SessionState.get).getOrElse(newState())
val ret = SessionState.get
if (ret != null) {
// hive.metastore.warehouse.dir is determined in SharedState after the CliSessionState
// instance constructed, we need to follow that change here.
Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname)).foreach { dir =>
ret.getConf.setVar(ConfVars.METASTOREWAREHOUSE, dir)
}
ret
} else {
newState()
}
}
}

// Log the default warehouse location.
logInfo(
s"Warehouse location for Hive client " +
s"(version ${version.fullVersion}) is ${conf.get("hive.metastore.warehouse.dir")}")
s"(version ${version.fullVersion}) is ${conf.getVar(ConfVars.METASTOREWAREHOUSE)}")

private def newState(): SessionState = {
val hiveConf = new HiveConf(classOf[SessionState])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
hadoopConf.set("hive.metastore.schema.verification", "false")
}
HiveClientBuilder
.buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf))
.buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf))
}

override def suiteName: String = s"${super.suiteName}($version)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")
}
client = buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf))
client = buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf))
if (versionSpark != null) versionSpark.reset()
versionSpark = TestHiveVersion(client)
assert(versionSpark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
Expand Down