Skip to content
Closed
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 77 additions & 15 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf._
Expand Down Expand Up @@ -80,20 +81,40 @@ private[spark] object HiveUtils extends Logging {
val HIVE_METASTORE_JARS = buildStaticConf("spark.sql.hive.metastore.jars")
.doc(s"""
| Location of the jars that should be used to instantiate the HiveMetastoreClient.
| This property can be one of three options: "
| This property can be one of four options: "
| 1. "builtin"
| Use Hive ${builtinHiveVersion}, which is bundled with the Spark assembly when
| <code>-Phive</code> is enabled. When this option is chosen,
| <code>spark.sql.hive.metastore.version</code> must be either
| <code>${builtinHiveVersion}</code> or not defined.
| 2. "maven"
| Use Hive jars of specified version downloaded from Maven repositories.
| 3. A classpath in the standard format for both Hive and Hadoop.
| 3. "path"
| Use Hive jars configured by `spark.sql.hive.metastore.jars.path`
| in comma separated format. Support both local or remote paths.
| 4. A classpath in the standard format for both Hive and Hadoop.
""".stripMargin)
.version("1.4.0")
.stringConf
.createWithDefault("builtin")

val HIVE_METASTORE_JARS_PATH = buildStaticConf("spark.sql.hive.metastore.jars.path")
.doc(s"Comma separated URL of Hive jars, support both local and remote paths," +
s"Such as: " +
s" 1. file://path/to/jar/xxx.jar\n" +
s" 2. hdfs://nameservice/path/to/jar/xxx.jar\n" +
s" 3. /path/to/jar/ (path without URI scheme follow conf `fs.defaultFS`'s URI schema)\n" +
s" 4. [http/https/ftp]://path/to/jar/xxx.jar\n" +
s"Notice: `http/https/ftp` doesn't support wildcard, but other URLs support" +
s"nested path wildcard, Such as: " +
s" 1. file://path/to/jar/*, file://path/to/jar/*/*\n" +
s" 2. hdfs://nameservice/path/to/jar/*, hdfs://nameservice/path/to/jar/*/*\n" +
s"When ${HIVE_METASTORE_JARS.key} is set to `path`, we will use Hive jars configured by this")
.version("3.1.0")
.stringConf
.toSequence
.createWithDefault(Nil)

val CONVERT_METASTORE_PARQUET = buildConf("spark.sql.hive.convertMetastoreParquet")
.doc("When set to true, the built-in Parquet reader and writer are used to process " +
"parquet tables created by using the HiveQL syntax, instead of Hive serde.")
Expand Down Expand Up @@ -178,6 +199,7 @@ private[spark] object HiveUtils extends Logging {
* The location of the jars that should be used to instantiate the HiveMetastoreClient. This
* property can be one of three options:
* - a classpath in the standard format for both hive and hadoop.
* - path - attempt to discover the jars with paths configured by `HIVE_METASTORE_JARS_PATH`.
* - builtin - attempt to discover the jars that were used to load Spark SQL and use those. This
* option is only valid when using the execution version of Hive.
* - maven - download the correct version of hive on demand from maven.
Expand All @@ -186,6 +208,13 @@ private[spark] object HiveUtils extends Logging {
conf.getConf(HIVE_METASTORE_JARS)
}

/**
* Hive jars paths, only work when `HIVE_METASTORE_JARS` is `path`.
*/
private def hiveMetastoreJarsPath(conf: SQLConf): Seq[String] = {
conf.getConf(HIVE_METASTORE_JARS_PATH)
}

/**
* A comma separated list of class prefixes that should be loaded using the classloader that
* is shared between Spark SQL and a specific version of Hive. An example of classes that should
Expand Down Expand Up @@ -336,6 +365,20 @@ private[spark] object HiveUtils extends Logging {
val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf)
val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)

def addLocalHiveJars(file: File): Seq[URL] = {
if (file.getName == "*") {
val files = file.getParentFile.listFiles()
if (files == null) {
logWarning(s"Hive jar path '${file.getPath}' does not exist.")
Nil
} else {
files.filter(_.getName.toLowerCase(Locale.ROOT).endsWith(".jar")).map(_.toURL).toSeq
}
} else {
file.toURL :: Nil
}
}

val isolatedLoader = if (hiveMetastoreJars == "builtin") {
if (builtinHiveVersion != hiveMetastoreVersion) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -396,24 +439,43 @@ private[spark] object HiveUtils extends Logging {
config = configurations,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
} else if (hiveMetastoreJars == "path") {
// Convert to files and expand any directories.
val jars =
HiveUtils.hiveMetastoreJarsPath(sqlConf)
.flatMap {
case path if path.contains("\\") && Utils.isWindows =>
addLocalHiveJars(new File(path))
case path =>
DataSource.checkAndGlobPathIfNecessary(
pathStrings = Seq(path),
hadoopConf = hadoopConf,
checkEmptyGlobPath = true,
checkFilesExist = false,
enableGlobbing = true
).map(_.toUri.toURL)
}

logInfo(
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " +
s"using path: ${jars.mkString(";")}")
new IsolatedClientLoader(
version = metaVersion,
sparkConf = conf,
hadoopConf = hadoopConf,
execJars = jars.toSeq,
config = configurations,
isolationOn = true,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
} else {
// Convert to files and expand any directories.
val jars =
hiveMetastoreJars
.split(File.pathSeparator)
.flatMap {
case path if new File(path).getName == "*" =>
val files = new File(path).getParentFile.listFiles()
if (files == null) {
logWarning(s"Hive jar path '$path' does not exist.")
Nil
} else {
files.filter(_.getName.toLowerCase(Locale.ROOT).endsWith(".jar")).toSeq
}
case path =>
new File(path) :: Nil
}
.map(_.toURI.toURL)
.flatMap { path =>
addLocalHiveJars(new File(path))
}

logInfo(
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " +
Expand Down