Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Row => SparkRow}
import org.apache.spark.sql.execution.command.SetCommand
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, HiveUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.{Utils => SparkUtils}
Expand Down Expand Up @@ -98,7 +98,7 @@ private[hive] class SparkExecuteStatementOperation(
case TimestampType =>
to += from.getAs[Timestamp](ordinal)
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
val hiveString = HiveContext.toHiveString((from.get(ordinal), dataTypes(ordinal)))
val hiveString = HiveUtils.toHiveString((from.get(ordinal), dataTypes(ordinal)))
to += hiveString
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.thrift.transport.TSocket

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.util.ShutdownHookManager

/**
Expand Down Expand Up @@ -82,7 +82,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {

val cliConf = new HiveConf(classOf[SessionState])
// Override the location of the metastore since this is only used for local execution.
HiveContext.newTemporaryConfiguration(useInMemoryDerby = false).foreach {
HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach {
case (key, value) => cliConf.set(key, value)
}
val sessionState = new CliSessionState(cliConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.StatsReportListener
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.{HiveContext, HiveUtils}
import org.apache.spark.util.Utils

/** A singleton object for the master program. The slaves should not access this. */
Expand Down Expand Up @@ -62,7 +62,7 @@ private[hive] object SparkSQLEnv extends Logging {
hiveContext.sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
hiveContext.sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))

hiveContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
hiveContext.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)

if (log.isDebugEnabled) {
hiveContext.sessionState.hiveconf.getAllProperties.asScala.toSeq.sorted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.hive.service.cli.session.SessionManager
import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.hive.service.server.HiveServer2

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.{HiveContext, HiveUtils}
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager

Expand Down Expand Up @@ -76,7 +76,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
} else {
hiveContext.newSession()
}
ctx.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
ctx.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)
sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx
sessionHandle
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.scalatest.BeforeAndAfterAll

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
import org.apache.spark.util.{ThreadUtils, Utils}

Expand Down Expand Up @@ -115,7 +115,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
val resultSet = statement.executeQuery("SET spark.sql.hive.version")
resultSet.next()
assert(resultSet.getString(1) === "spark.sql.hive.version")
assert(resultSet.getString(2) === HiveContext.hiveExecutionVersion)
assert(resultSet.getString(2) === HiveUtils.hiveExecutionVersion)
}
}

Expand Down Expand Up @@ -624,7 +624,7 @@ class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
val resultSet = statement.executeQuery("SET spark.sql.hive.version")
resultSet.next()
assert(resultSet.getString(1) === "spark.sql.hive.version")
assert(resultSet.getString(2) === HiveContext.hiveExecutionVersion)
assert(resultSet.getString(2) === HiveUtils.hiveExecutionVersion)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.{Locale, TimeZone}
import org.scalatest.BeforeAndAfter

import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.internal.SQLConf

Expand Down Expand Up @@ -60,7 +60,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
TestHive.sessionState.functionRegistry.unregisterFunction("hash")
// Ensures that the plans generation use metastore relation and not OrcRelation
// Was done because SqlBuilder does not work with plans having logical relation
TestHive.setConf(HiveContext.CONVERT_METASTORE_ORC, false)
TestHive.setConf(HiveUtils.CONVERT_METASTORE_ORC, false)
RuleExecutor.resetTime()
}

Expand All @@ -71,7 +71,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
Locale.setDefault(originalLocale)
TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize)
TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning)
TestHive.setConf(HiveContext.CONVERT_METASTORE_ORC, originalConvertMetastoreOrc)
TestHive.setConf(HiveUtils.CONVERT_METASTORE_ORC, originalConvertMetastoreOrc)
TestHive.sessionState.functionRegistry.restore()

// For debugging dump some statistics about how much time was spent in various optimizer rules.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ protected[hive] class HiveQueryExecution(ctx: SQLContext, logicalPlan: LogicalPl
// We need the types so we can output struct field names
val types = analyzed.output.map(_.dataType)
// Reformat to match hive tab delimited output.
result.map(_.zip(types).map(HiveContext.toHiveString)).map(_.mkString("\t")).toSeq
result.map(_.zip(types).map(HiveUtils.toHiveString)).map(_.mkString("\t")).toSeq
}

override def simpleString: String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
* SerDe.
*/
def convertMetastoreParquet: Boolean = {
conf.getConf(HiveContext.CONVERT_METASTORE_PARQUET)
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET)
}

/**
Expand All @@ -218,7 +218,7 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
* This configuration is only effective when "spark.sql.hive.convertMetastoreParquet" is true.
*/
def convertMetastoreParquetWithSchemaMerging: Boolean = {
conf.getConf(HiveContext.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING)
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING)
}

/**
Expand All @@ -227,7 +227,7 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
* SerDe.
*/
def convertMetastoreOrc: Boolean = {
conf.getConf(HiveContext.CONVERT_METASTORE_ORC)
conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
}

/**
Expand All @@ -243,14 +243,14 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
* and no SerDe is specified (no ROW FORMAT SERDE clause).
*/
def convertCTAS: Boolean = {
conf.getConf(HiveContext.CONVERT_CTAS)
conf.getConf(HiveUtils.CONVERT_CTAS)
}

/**
* When true, Hive Thrift server will execute SQL queries asynchronously using a thread pool."
*/
def hiveThriftServerAsync: Boolean = {
conf.getConf(HiveContext.HIVE_THRIFT_SERVER_ASYNC)
conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
}

def hiveThriftServerSingleSession: Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ private[hive] class HiveSharedState(override val sparkContext: SparkContext)
* A Hive client used for execution.
*/
val executionHive: HiveClientImpl = {
HiveContext.newClientForExecution(sparkContext.conf, sparkContext.hadoopConfiguration)
HiveUtils.newClientForExecution(sparkContext.conf, sparkContext.hadoopConfiguration)
}

/**
* A Hive client used to interact with the metastore.
*/
// This needs to be a lazy val at here because TestHiveSharedState is overriding it.
lazy val metadataHive: HiveClient = {
HiveContext.newClientForMetadata(sparkContext.conf, sparkContext.hadoopConfiguration)
HiveUtils.newClientForMetadata(sparkContext.conf, sparkContext.hadoopConfiguration)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class HiveContext private[hive](
self =>

def this(sc: SparkContext) = {
this(new SparkSession(HiveContext.withHiveExternalCatalog(sc)), true)
this(new SparkSession(HiveUtils.withHiveExternalCatalog(sc)), true)
}

def this(sc: JavaSparkContext) = this(sc.sc)
Expand All @@ -84,7 +84,7 @@ class HiveContext private[hive](
}


private[hive] object HiveContext extends Logging {
private[spark] object HiveUtils extends Logging {

def withHiveExternalCatalog(sc: SparkContext): SparkContext = {
sc.conf.set(CATALOG_IMPLEMENTATION.key, "hive")
Expand Down Expand Up @@ -315,10 +315,10 @@ private[hive] object HiveContext extends Logging {
configurations: Map[String, String]): HiveClient = {
val sqlConf = new SQLConf
sqlConf.setConf(SQLContext.getSQLProperties(conf))
val hiveMetastoreVersion = HiveContext.hiveMetastoreVersion(sqlConf)
val hiveMetastoreJars = HiveContext.hiveMetastoreJars(sqlConf)
val hiveMetastoreSharedPrefixes = HiveContext.hiveMetastoreSharedPrefixes(sqlConf)
val hiveMetastoreBarrierPrefixes = HiveContext.hiveMetastoreBarrierPrefixes(sqlConf)
val hiveMetastoreVersion = HiveUtils.hiveMetastoreVersion(sqlConf)
val hiveMetastoreJars = HiveUtils.hiveMetastoreJars(sqlConf)
val hiveMetastoreSharedPrefixes = HiveUtils.hiveMetastoreSharedPrefixes(sqlConf)
val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf)
val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)

val defaultWarehouseLocation = hiveConf.get("hive.metastore.warehouse.dir")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.metadata.{HiveUtils, Partition => HivePartition,
Table => HiveTable}
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters,
Expand Down Expand Up @@ -300,7 +299,8 @@ private[hive] object HiveTableUtil {
def configureJobPropertiesForStorageHandler(
tableDesc: TableDesc, jobConf: JobConf, input: Boolean) {
val property = tableDesc.getProperties.getProperty(META_TABLE_STORAGE)
val storageHandler = HiveUtils.getStorageHandler(jobConf, property)
val storageHandler =
org.apache.hadoop.hive.ql.metadata.HiveUtils.getStorageHandler(jobConf, property)
if (storageHandler != null) {
val jobProperties = new util.LinkedHashMap[String, String]
if (input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.util.{MutableURLClassLoader, Utils}

/** Factory for `IsolatedClientLoader` with specific versions of hive. */
Expand Down Expand Up @@ -263,7 +263,7 @@ private[hive] class IsolatedClientLoader(
throw new ClassNotFoundException(
s"$cnf when creating Hive client using classpath: ${execJars.mkString(", ")}\n" +
"Please make sure that jars for your version of hive and hadoop are included in the " +
s"paths passed to ${HiveContext.HIVE_METASTORE_JARS}.", e)
s"paths passed to ${HiveUtils.HIVE_METASTORE_JARS}.", e)
} else {
throw e
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class TestHiveContext(@transient val sparkSession: TestHiveSparkSession, isRootC
extends HiveContext(sparkSession, isRootContext) {

def this(sc: SparkContext) {
this(new TestHiveSparkSession(HiveContext.withHiveExternalCatalog(sc)), true)
this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc)), true)
}

override def newSession(): TestHiveContext = {
Expand Down Expand Up @@ -118,7 +118,7 @@ private[hive] class TestHiveSparkSession(
sc,
Utils.createTempDir(namePrefix = "warehouse"),
TestHiveContext.makeScratchDir(),
HiveContext.newTemporaryConfiguration(useInMemoryDerby = false),
HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false),
None)
}

Expand Down Expand Up @@ -577,7 +577,7 @@ private[hive] object TestHiveContext {
scratchDirPath: File,
metastoreTemporaryConf: Map[String, String]): HiveClient = {
val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf])
HiveContext.newClientForMetadata(
HiveUtils.newClientForMetadata(
conf,
hiveConf,
hadoopConf,
Expand All @@ -592,7 +592,7 @@ private[hive] object TestHiveContext {
warehousePath: File,
scratchDirPath: File,
metastoreTemporaryConf: Map[String, String]): Map[String, String] = {
HiveContext.hiveClientConfigurations(hiveconf) ++ metastoreTemporaryConf ++ Map(
HiveUtils.hiveClientConfigurations(hiveconf) ++ metastoreTemporaryConf ++ Map(
ConfVars.METASTOREWAREHOUSE.varname -> warehousePath.toURI.toString,
ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true",
ConfVars.SCRATCHDIR.varname -> scratchDirPath.toURI.toString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class HiveExternalCatalogSuite extends CatalogTestCases {
private val client: HiveClient = {
// We create a metastore at a temp location to avoid any potential
// conflict of having multiple connections to a single derby instance.
HiveContext.newClientForExecution(new SparkConf, new Configuration)
HiveUtils.newClientForExecution(new SparkConf, new Configuration)
}

protected override val utils: CatalogTestUtils = new CatalogTestUtils {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}

test("scan a parquet table created through a CTAS statement") {
withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "true") {
withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true") {
withTempTable("jt") {
(1 to 10).map(i => i -> s"str$i").toDF("a", "b").registerTempTable("jt")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
val rows = row :: Row(Seq.fill(row.length)(null): _*) :: Nil

// Don't convert Hive metastore Parquet tables to let Hive write those Parquet files.
withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") {
withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false") {
withTempTable("data") {
val fields = hiveTypes.zipWithIndex.map { case (typ, index) => s" col_$index $typ" }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.tags.ExtendedHiveTest
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -62,7 +62,7 @@ class VersionsSuite extends SparkFunSuite with Logging {

test("success sanity check") {
val badClient = IsolatedClientLoader.forVersion(
hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
hiveMetastoreVersion = HiveUtils.hiveExecutionVersion,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
hadoopConf = new Configuration(),
Expand All @@ -76,7 +76,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
val hadoopConf = new Configuration();
hadoopConf.set("test", "success")
val client = IsolatedClientLoader.forVersion(
hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
hiveMetastoreVersion = HiveUtils.hiveExecutionVersion,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
hadoopConf = hadoopConf,
Expand Down
Loading