diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 62ff2db2ecb3c..67467637805f9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -40,6 +40,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf._ @@ -80,7 +81,7 @@ private[spark] object HiveUtils extends Logging { val HIVE_METASTORE_JARS = buildStaticConf("spark.sql.hive.metastore.jars") .doc(s""" | Location of the jars that should be used to instantiate the HiveMetastoreClient. - | This property can be one of three options: " + | This property can be one of four options: " | 1. "builtin" | Use Hive ${builtinHiveVersion}, which is bundled with the Spark assembly when | -Phive is enabled. When this option is chosen, @@ -88,12 +89,32 @@ private[spark] object HiveUtils extends Logging { | ${builtinHiveVersion} or not defined. | 2. "maven" | Use Hive jars of specified version downloaded from Maven repositories. - | 3. A classpath in the standard format for both Hive and Hadoop. + | 3. "path" + | Use Hive jars configured by `spark.sql.hive.metastore.jars.path` + | in comma separated format. Support both local or remote paths. + | 4. A classpath in the standard format for both Hive and Hadoop. """.stripMargin) .version("1.4.0") .stringConf .createWithDefault("builtin") + val HIVE_METASTORE_JARS_PATH = buildStaticConf("spark.sql.hive.metastore.jars.path") + .doc(s"Comma separated URL of Hive jars, support both local and remote paths," + + s"Such as: " + + s" 1. file://path/to/jar/xxx.jar\n" + + s" 2. hdfs://nameservice/path/to/jar/xxx.jar\n" + + s" 3. /path/to/jar/ (path without URI scheme follow conf `fs.defaultFS`'s URI schema)\n" + + s" 4. [http/https/ftp]://path/to/jar/xxx.jar\n" + + s"Notice: `http/https/ftp` doesn't support wildcard, but other URLs support" + + s"nested path wildcard, Such as: " + + s" 1. file://path/to/jar/*, file://path/to/jar/*/*\n" + + s" 2. hdfs://nameservice/path/to/jar/*, hdfs://nameservice/path/to/jar/*/*\n" + + s"When ${HIVE_METASTORE_JARS.key} is set to `path`, we will use Hive jars configured by this") + .version("3.1.0") + .stringConf + .toSequence + .createWithDefault(Nil) + val CONVERT_METASTORE_PARQUET = buildConf("spark.sql.hive.convertMetastoreParquet") .doc("When set to true, the built-in Parquet reader and writer are used to process " + "parquet tables created by using the HiveQL syntax, instead of Hive serde.") @@ -178,6 +199,7 @@ private[spark] object HiveUtils extends Logging { * The location of the jars that should be used to instantiate the HiveMetastoreClient. This * property can be one of three options: * - a classpath in the standard format for both hive and hadoop. + * - path - attempt to discover the jars with paths configured by `HIVE_METASTORE_JARS_PATH`. * - builtin - attempt to discover the jars that were used to load Spark SQL and use those. This * option is only valid when using the execution version of Hive. * - maven - download the correct version of hive on demand from maven. @@ -186,6 +208,13 @@ private[spark] object HiveUtils extends Logging { conf.getConf(HIVE_METASTORE_JARS) } + /** + * Hive jars paths, only work when `HIVE_METASTORE_JARS` is `path`. + */ + private def hiveMetastoreJarsPath(conf: SQLConf): Seq[String] = { + conf.getConf(HIVE_METASTORE_JARS_PATH) + } + /** * A comma separated list of class prefixes that should be loaded using the classloader that * is shared between Spark SQL and a specific version of Hive. An example of classes that should @@ -336,6 +365,20 @@ private[spark] object HiveUtils extends Logging { val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf) val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion) + def addLocalHiveJars(file: File): Seq[URL] = { + if (file.getName == "*") { + val files = file.getParentFile.listFiles() + if (files == null) { + logWarning(s"Hive jar path '${file.getPath}' does not exist.") + Nil + } else { + files.filter(_.getName.toLowerCase(Locale.ROOT).endsWith(".jar")).map(_.toURL).toSeq + } + } else { + file.toURL :: Nil + } + } + val isolatedLoader = if (hiveMetastoreJars == "builtin") { if (builtinHiveVersion != hiveMetastoreVersion) { throw new IllegalArgumentException( @@ -396,24 +439,43 @@ private[spark] object HiveUtils extends Logging { config = configurations, barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) + } else if (hiveMetastoreJars == "path") { + // Convert to files and expand any directories. + val jars = + HiveUtils.hiveMetastoreJarsPath(sqlConf) + .flatMap { + case path if path.contains("\\") && Utils.isWindows => + addLocalHiveJars(new File(path)) + case path => + DataSource.checkAndGlobPathIfNecessary( + pathStrings = Seq(path), + hadoopConf = hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = false, + enableGlobbing = true + ).map(_.toUri.toURL) + } + + logInfo( + s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " + + s"using path: ${jars.mkString(";")}") + new IsolatedClientLoader( + version = metaVersion, + sparkConf = conf, + hadoopConf = hadoopConf, + execJars = jars.toSeq, + config = configurations, + isolationOn = true, + barrierPrefixes = hiveMetastoreBarrierPrefixes, + sharedPrefixes = hiveMetastoreSharedPrefixes) } else { // Convert to files and expand any directories. val jars = hiveMetastoreJars .split(File.pathSeparator) - .flatMap { - case path if new File(path).getName == "*" => - val files = new File(path).getParentFile.listFiles() - if (files == null) { - logWarning(s"Hive jar path '$path' does not exist.") - Nil - } else { - files.filter(_.getName.toLowerCase(Locale.ROOT).endsWith(".jar")).toSeq - } - case path => - new File(path) :: Nil - } - .map(_.toURI.toURL) + .flatMap { path => + addLocalHiveJars(new File(path)) + } logInfo( s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " +