Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.util.SQLKeywordUtils
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.hive.security.HiveDelegationTokenProvider
import org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.closeHiveSessionStateIfStarted
Expand Down Expand Up @@ -94,9 +93,8 @@ private[hive] object SparkSQLCLIDriver extends Logging {

val sparkConf = new SparkConf(loadDefaults = true)
val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
val extraConfigs = HiveUtils.formatTimeVarsForHiveClient(hadoopConf)

val cliConf = HiveClientImpl.newHiveConf(sparkConf, hadoopConf, extraConfigs)
val cliConf = HiveClientImpl.newHiveConf(sparkConf, hadoopConf)

val sessionState = new CliSessionState(cliConf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.apache.spark.{ErrorMessageFormat, SparkConf, SparkContext, SparkFunSu
import org.apache.spark.ProcessTestUtils.ProcessOutputCapturer
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.HiveUtils._
import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.hive.test.HiveTestJars
Expand Down Expand Up @@ -651,8 +650,7 @@ class CliSuite extends SparkFunSuite {
val sparkContext = new SparkContext(sparkConf)
SparkSQLEnv.sparkContext = sparkContext
val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
val extraConfigs = HiveUtils.formatTimeVarsForHiveClient(hadoopConf)
val cliConf = HiveClientImpl.newHiveConf(sparkConf, hadoopConf, extraConfigs)
val cliConf = HiveClientImpl.newHiveConf(sparkConf, hadoopConf)
val sessionState = new CliSessionState(cliConf)
SessionState.setCurrentSessionState(sessionState)
val cli = new SparkSQLCLIDriver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.hive
import java.io.File
import java.net.URL
import java.util.Locale
import java.util.concurrent.TimeUnit

import scala.collection.mutable.HashMap
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -259,60 +258,6 @@ private[spark] object HiveUtils extends Logging {
conf.getConf(HIVE_METASTORE_BARRIER_PREFIXES).filterNot(_ == "")
}

/**
* Change time configurations needed to create a [[HiveClient]] into unified [[Long]] format.
*/
private[hive] def formatTimeVarsForHiveClient(hadoopConf: Configuration): Map[String, String] = {
// Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch
// of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.). This breaks backwards-
// compatibility when users are trying to connecting to a Hive metastore of lower version,
// because these options are expected to be integral values in lower versions of Hive.
//
// Here we enumerate all time `ConfVar`s and convert their values to numeric strings according
// to their output time units.
Seq(
ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY -> TimeUnit.SECONDS,
ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT -> TimeUnit.SECONDS,
ConfVars.METASTORE_CLIENT_SOCKET_LIFETIME -> TimeUnit.SECONDS,
ConfVars.HMSHANDLERINTERVAL -> TimeUnit.MILLISECONDS,
ConfVars.METASTORE_EVENT_DB_LISTENER_TTL -> TimeUnit.SECONDS,
ConfVars.METASTORE_EVENT_CLEAN_FREQ -> TimeUnit.SECONDS,
ConfVars.METASTORE_EVENT_EXPIRY_DURATION -> TimeUnit.SECONDS,
ConfVars.METASTORE_AGGREGATE_STATS_CACHE_TTL -> TimeUnit.SECONDS,
ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_WRITER_WAIT -> TimeUnit.MILLISECONDS,
ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_READER_WAIT -> TimeUnit.MILLISECONDS,
ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT -> TimeUnit.SECONDS,
ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL -> TimeUnit.MILLISECONDS,
ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES -> TimeUnit.SECONDS,
ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS,
ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME -> TimeUnit.MILLISECONDS,
ConfVars.HIVE_TXN_TIMEOUT -> TimeUnit.SECONDS,
ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT -> TimeUnit.SECONDS,
ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL -> TimeUnit.SECONDS,
ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL -> TimeUnit.MILLISECONDS,
ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME -> TimeUnit.MILLISECONDS,
ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS,
ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_MAX_AGE -> TimeUnit.SECONDS,
ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH -> TimeUnit.MILLISECONDS,
ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT -> TimeUnit.SECONDS,
ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS,
ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT -> TimeUnit.SECONDS,
ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME -> TimeUnit.SECONDS,
ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT -> TimeUnit.MILLISECONDS,
ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL -> TimeUnit.MILLISECONDS,
ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS,
ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT -> TimeUnit.MILLISECONDS,
ConfVars.SERVER_READ_SOCKET_TIMEOUT -> TimeUnit.SECONDS,
ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL -> TimeUnit.MILLISECONDS,
ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT -> TimeUnit.SECONDS,
ConfVars.SPARK_JOB_MONITOR_TIMEOUT -> TimeUnit.SECONDS,
ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT -> TimeUnit.MILLISECONDS,
ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT -> TimeUnit.MILLISECONDS
).map { case (confVar, unit) =>
confVar.varname -> HiveConf.getTimeVar(hadoopConf, confVar, unit).toString
}.toMap
}

/**
* Check current Thread's SessionState type
* @return true when SessionState.get returns an instance of CliSessionState,
Expand Down Expand Up @@ -359,17 +304,10 @@ private[spark] object HiveUtils extends Logging {
* The version of the Hive client that is used here must match the metastore that is configured
* in the hive-site.xml file.
*/
protected[hive] def newClientForMetadata(
conf: SparkConf,
hadoopConf: Configuration): HiveClient = {
val configurations = formatTimeVarsForHiveClient(hadoopConf)
newClientForMetadata(conf, hadoopConf, configurations)
}

protected[hive] def newClientForMetadata(
conf: SparkConf,
hadoopConf: Configuration,
configurations: Map[String, String]): HiveClient = {
configurations: Map[String, String] = Map.empty): HiveClient = {
val sqlConf = new SQLConf
sqlConf.setConf(SQLContext.getSQLProperties(conf))
val hiveMetastoreVersion = HiveUtils.hiveMetastoreVersion(sqlConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1285,7 +1285,7 @@ private[hive] object HiveClientImpl extends Logging {
def newHiveConf(
sparkConf: SparkConf,
hadoopConf: JIterable[JMap.Entry[String, String]],
extraConfig: Map[String, String],
extraConfig: Map[String, String] = Map.empty,
classLoader: Option[ClassLoader] = None): HiveConf = {
val hiveConf = new HiveConf(classOf[SessionState])
// HiveConf is a Hadoop Configuration, which has a field of classLoader and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.hive
import java.io.File
import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf.ConfVars

import org.apache.spark.{SparkConf, TestUtils}
Expand All @@ -33,12 +32,6 @@ import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader}

class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {

private def testFormatTimeVarsForHiveClient(key: String, value: String, expected: Long): Unit = {
val conf = new Configuration
conf.set(key, value)
assert(HiveUtils.formatTimeVarsForHiveClient(conf)(key) === expected.toString)
}

test("newTemporaryConfiguration overwrites listener configurations") {
Seq(true, false).foreach { useInMemoryDerby =>
val conf = HiveUtils.newTemporaryConfiguration(useInMemoryDerby)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.scalactic.source.Position
import org.scalatest.Tag

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.hive.HiveUtils

private[client] abstract class HiveVersionSuite(version: String) extends SparkFunSuite {
override protected val enableAutoThreadAudit = false
Expand All @@ -43,10 +42,7 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
hadoopConf.set("hive.in.test", "true")
hadoopConf.set("hive.query.reexecution.enabled", "false")
}
HiveClientBuilder.buildClient(
version,
hadoopConf,
HiveUtils.formatTimeVarsForHiveClient(hadoopConf))
HiveClientBuilder.buildClient(version, hadoopConf)
}

override def suiteName: String = s"${super.suiteName}($version)"
Expand Down