From 834c2a77a9a36e39f67eafd07def9b7e9f47639e Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sun, 27 Sep 2020 12:04:52 +0800 Subject: [PATCH 01/24] SPARK-32852 spark.sql.hive.metastore.jars support HDFS location --- .../org/apache/spark/sql/hive/HiveUtils.scala | 117 +++++++++++++++--- .../hive/client/HadoopVersionInfoSuite.scala | 5 +- 2 files changed, 104 insertions(+), 18 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 62ff2db2ecb3c..1aea2148db35f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -25,16 +25,18 @@ import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import scala.language.implicitConversions +import scala.util.control.NonFatal import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.util.VersionInfo import org.apache.hive.common.util.HiveVersionInfo -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFiles} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql._ @@ -88,12 +90,20 @@ private[spark] object HiveUtils extends Logging { | ${builtinHiveVersion} or not defined. | 2. "maven" | Use Hive jars of specified version downloaded from Maven repositories. - | 3. A classpath in the standard format for both Hive and Hadoop. + | 3. "path" + | A classpath configured by `spark.sql.hive.metastore.jars.path` in the standard format + | for both Hive and Hadoop. """.stripMargin) .version("1.4.0") .stringConf .createWithDefault("builtin") + val HIVE_METASTORE_JARS_PATH = buildStaticConf("spark.sql.hive.metastore.jars.path") + .doc(s"When ${HIVE_METASTORE_JARS} is set as `path`, use Hive jars configured by this") + .stringConf + .toSequence + .createWithDefault(Nil) + val CONVERT_METASTORE_PARQUET = buildConf("spark.sql.hive.convertMetastoreParquet") .doc("When set to true, the built-in Parquet reader and writer are used to process " + "parquet tables created by using the HiveQL syntax, instead of Hive serde.") @@ -396,23 +406,95 @@ private[spark] object HiveUtils extends Logging { config = configurations, barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) - } else { - // Convert to files and expand any directories. - val jars = - hiveMetastoreJars - .split(File.pathSeparator) - .flatMap { - case path if new File(path).getName == "*" => - val files = new File(path).getParentFile.listFiles() - if (files == null) { - logWarning(s"Hive jar path '$path' does not exist.") + } else if (hiveMetastoreJars == "path") { + + val hiveMetastoreJarsPath: Seq[String] = conf.get(HiveUtils.HIVE_METASTORE_JARS_PATH) + + def addLocalHiveJars(file: File): Seq[File] = { + if (file.getName == "*") { + val files = file.getParentFile.listFiles() + if (files == null) { + logWarning(s"Hive jar path '${file.getPath}' does not exist.") + Nil + } else { + files.filter(_.getName.toLowerCase(Locale.ROOT).endsWith(".jar")).toSeq + } + } else { + file :: Nil + } + } + + def checkRemoteHiveJars(path: String): Seq[File] = { + try { + val hadoopPath = new Path(path) + val fs = hadoopPath.getFileSystem(hadoopConf) + if (hadoopPath.getName == "*") { + val parent = hadoopPath.getParent + if (!fs.exists(parent)) { + logWarning(s"Hive Jar ${path} does not exist.") + Nil + } else if (!fs.getFileStatus(parent).isDirectory) { + logWarning(s"Hive Jar ${parent} is not a directory.") + Nil + } else { + fs.listStatus(parent).map(file => + Utils.fetchFile(file.getPath.toUri.toString, + new File(SparkFiles.getRootDirectory()), conf, + SparkEnv.get.securityManager, hadoopConf, + System.currentTimeMillis(), useCache = false) + ) + } + } else { + if (!fs.exists(hadoopPath)) { + logWarning(s"Hive Jar ${path} does not exist.") + Nil + } else if (fs.getFileStatus(hadoopPath).isDirectory) { + logWarning(s"Hive Jar ${path} not allow directory without `*`") Nil } else { - files.filter(_.getName.toLowerCase(Locale.ROOT).endsWith(".jar")).toSeq + // Since tar/tar.gz file we can't know it's final path yet, not support it + Utils.fetchFile(hadoopPath.toUri.toString, + new File(SparkFiles.getRootDirectory()), conf, + SparkEnv.get.securityManager, hadoopConf, + System.currentTimeMillis(), useCache = false) :: Nil } - case path => - new File(path) :: Nil + } + } catch { + case NonFatal(e) => + logError(s"Failed to find $path to Hive Jars", e) + Nil } + } + + // Convert to files and expand any directories. + val jars = + hiveMetastoreJarsPath + .flatMap { + case path if path.contains("\\") => + addLocalHiveJars(new File(path)) + case path => + val uri = new Path(path).toUri + uri.getScheme match { + case null | "file" => + addLocalHiveJars(new File(uri.getPath)) + case "local" => + new File("file:" + uri.getPath) :: Nil + case "http" | "https" | "ftp" => + try { + // validate and fetch URI file + Utils.fetchFile(uri.toURL.toString, + new File(SparkFiles.getRootDirectory()), conf, + SparkEnv.get.securityManager, hadoopConf, + System.currentTimeMillis(), useCache = false) :: Nil + } catch { + case _: Throwable => + logWarning(s"Hive Jars URI (${uri.toString}) is not a valid URL.") + Nil + } + case _ => + checkRemoteHiveJars(path) + } + } .map(_.toURI.toURL) logInfo( @@ -427,6 +509,9 @@ private[spark] object HiveUtils extends Logging { isolationOn = true, barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) + } else { + throw new IllegalArgumentException(s"Please set ${HIVE_METASTORE_JARS.key} correctlly using" + + s" ${Seq("buildin", "maven", "path").mkString(", ")}.") } isolatedLoader.createClient() } @@ -468,7 +553,7 @@ private[spark] object HiveUtils extends Logging { // Because execution Hive should always connects to an embedded derby metastore. // We have to remove the value of hive.metastore.uris. So, the execution Hive client connects // to the actual embedded derby metastore instead of the remote metastore. - // You can search HiveConf.ConfVars.METASTOREURIS in the code of HiveConf (in Hive's repo). + // You can seLogFactoryarch HiveConf.ConfVars.METASTOREURIS in the code of HiveConf (in Hive's repo). // Then, you will find that the local metastore mode is only set to true when // hive.metastore.uris is not set. propMap.put(ConfVars.METASTOREURIS.varname, "") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HadoopVersionInfoSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HadoopVersionInfoSuite.scala index 65492abf38cc0..63a5284c6c16d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HadoopVersionInfoSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HadoopVersionInfoSuite.scala @@ -58,9 +58,10 @@ class HadoopVersionInfoSuite extends SparkFunSuite { val sparkConf = new SparkConf() sparkConf.set(HiveUtils.HIVE_METASTORE_VERSION, "2.0") + sparkConf.set(HiveUtils.HIVE_METASTORE_JARS, "path") sparkConf.set( - HiveUtils.HIVE_METASTORE_JARS, - jars.map(_.getCanonicalPath).mkString(File.pathSeparator)) + HiveUtils.HIVE_METASTORE_JARS_PATH.key, + jars.map(_.getCanonicalPath).mkString(",")) HiveClientBuilder.buildConf(Map.empty).foreach { case (k, v) => hadoopConf.set(k, v) } From bf3c4b62f63f2f24945df38e487455004bd2d892 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sun, 27 Sep 2020 12:36:50 +0800 Subject: [PATCH 02/24] Update HiveUtils.scala --- .../src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 1aea2148db35f..7e216b2c3cc2b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -553,7 +553,7 @@ private[spark] object HiveUtils extends Logging { // Because execution Hive should always connects to an embedded derby metastore. // We have to remove the value of hive.metastore.uris. So, the execution Hive client connects // to the actual embedded derby metastore instead of the remote metastore. - // You can seLogFactoryarch HiveConf.ConfVars.METASTOREURIS in the code of HiveConf (in Hive's repo). + // You can search HiveConf.ConfVars.METASTOREURIS in the code of HiveConf (in Hive's repo). // Then, you will find that the local metastore mode is only set to true when // hive.metastore.uris is not set. propMap.put(ConfVars.METASTOREURIS.varname, "") From 2cfd3e06037d73583b8828764b3ae72bbf12906b Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 29 Sep 2020 16:03:16 +0800 Subject: [PATCH 03/24] update --- .../org/apache/spark/sql/hive/HiveUtils.scala | 51 +++++-------------- .../hive/client/HadoopVersionInfoSuite.scala | 5 +- 2 files changed, 16 insertions(+), 40 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 7e216b2c3cc2b..bc56fd0e63b15 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -90,20 +90,12 @@ private[spark] object HiveUtils extends Logging { | ${builtinHiveVersion} or not defined. | 2. "maven" | Use Hive jars of specified version downloaded from Maven repositories. - | 3. "path" - | A classpath configured by `spark.sql.hive.metastore.jars.path` in the standard format - | for both Hive and Hadoop. + | 3. A classpath in the standard format for both Hive and Hadoop. """.stripMargin) .version("1.4.0") .stringConf .createWithDefault("builtin") - val HIVE_METASTORE_JARS_PATH = buildStaticConf("spark.sql.hive.metastore.jars.path") - .doc(s"When ${HIVE_METASTORE_JARS} is set as `path`, use Hive jars configured by this") - .stringConf - .toSequence - .createWithDefault(Nil) - val CONVERT_METASTORE_PARQUET = buildConf("spark.sql.hive.convertMetastoreParquet") .doc("When set to true, the built-in Parquet reader and writer are used to process " + "parquet tables created by using the HiveQL syntax, instead of Hive serde.") @@ -406,25 +398,23 @@ private[spark] object HiveUtils extends Logging { config = configurations, barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) - } else if (hiveMetastoreJars == "path") { - - val hiveMetastoreJarsPath: Seq[String] = conf.get(HiveUtils.HIVE_METASTORE_JARS_PATH) + } else { - def addLocalHiveJars(file: File): Seq[File] = { + def addLocalHiveJars(file: File): Seq[URL] = { if (file.getName == "*") { val files = file.getParentFile.listFiles() if (files == null) { logWarning(s"Hive jar path '${file.getPath}' does not exist.") Nil } else { - files.filter(_.getName.toLowerCase(Locale.ROOT).endsWith(".jar")).toSeq + files.filter(_.getName.toLowerCase(Locale.ROOT).endsWith(".jar")).map(_.toURL).toSeq } } else { - file :: Nil + file.toURL :: Nil } } - def checkRemoteHiveJars(path: String): Seq[File] = { + def checkRemoteHiveJars(path: String): Seq[URL] = { try { val hadoopPath = new Path(path) val fs = hadoopPath.getFileSystem(hadoopConf) @@ -437,12 +427,7 @@ private[spark] object HiveUtils extends Logging { logWarning(s"Hive Jar ${parent} is not a directory.") Nil } else { - fs.listStatus(parent).map(file => - Utils.fetchFile(file.getPath.toUri.toString, - new File(SparkFiles.getRootDirectory()), conf, - SparkEnv.get.securityManager, hadoopConf, - System.currentTimeMillis(), useCache = false) - ) + fs.listStatus(parent).map(_.getPath.toUri.toURL) } } else { if (!fs.exists(hadoopPath)) { @@ -453,10 +438,7 @@ private[spark] object HiveUtils extends Logging { Nil } else { // Since tar/tar.gz file we can't know it's final path yet, not support it - Utils.fetchFile(hadoopPath.toUri.toString, - new File(SparkFiles.getRootDirectory()), conf, - SparkEnv.get.securityManager, hadoopConf, - System.currentTimeMillis(), useCache = false) :: Nil + hadoopPath.toUri.toURL :: Nil } } } catch { @@ -468,7 +450,8 @@ private[spark] object HiveUtils extends Logging { // Convert to files and expand any directories. val jars = - hiveMetastoreJarsPath + hiveMetastoreJars + .split(";") .flatMap { case path if path.contains("\\") => addLocalHiveJars(new File(path)) @@ -478,14 +461,11 @@ private[spark] object HiveUtils extends Logging { case null | "file" => addLocalHiveJars(new File(uri.getPath)) case "local" => - new File("file:" + uri.getPath) :: Nil + new File("file:" + uri.getPath).toURL :: Nil case "http" | "https" | "ftp" => try { // validate and fetch URI file - Utils.fetchFile(uri.toURL.toString, - new File(SparkFiles.getRootDirectory()), conf, - SparkEnv.get.securityManager, hadoopConf, - System.currentTimeMillis(), useCache = false) :: Nil + uri.toURL :: Nil } catch { case _: Throwable => logWarning(s"Hive Jars URI (${uri.toString}) is not a valid URL.") @@ -495,11 +475,10 @@ private[spark] object HiveUtils extends Logging { checkRemoteHiveJars(path) } } - .map(_.toURI.toURL) logInfo( s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " + - s"using ${jars.mkString(":")}") + s"using ${jars.mkString(";")}") new IsolatedClientLoader( version = metaVersion, sparkConf = conf, @@ -509,10 +488,8 @@ private[spark] object HiveUtils extends Logging { isolationOn = true, barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) - } else { - throw new IllegalArgumentException(s"Please set ${HIVE_METASTORE_JARS.key} correctlly using" + - s" ${Seq("buildin", "maven", "path").mkString(", ")}.") } + isolatedLoader.createClient() } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HadoopVersionInfoSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HadoopVersionInfoSuite.scala index 63a5284c6c16d..d0c68fde2f1f2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HadoopVersionInfoSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HadoopVersionInfoSuite.scala @@ -58,10 +58,9 @@ class HadoopVersionInfoSuite extends SparkFunSuite { val sparkConf = new SparkConf() sparkConf.set(HiveUtils.HIVE_METASTORE_VERSION, "2.0") - sparkConf.set(HiveUtils.HIVE_METASTORE_JARS, "path") sparkConf.set( - HiveUtils.HIVE_METASTORE_JARS_PATH.key, - jars.map(_.getCanonicalPath).mkString(",")) + HiveUtils.HIVE_METASTORE_JARS.key, + jars.map(_.getCanonicalPath).mkString(File.pathSeparator)) HiveClientBuilder.buildConf(Map.empty).foreach { case (k, v) => hadoopConf.set(k, v) } From 1063a080a4a7e96071841467b112ffa7e9f0365d Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 29 Sep 2020 16:04:52 +0800 Subject: [PATCH 04/24] . --- .../src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 3 +-- .../apache/spark/sql/hive/client/HadoopVersionInfoSuite.scala | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index bc56fd0e63b15..f1afcffcae475 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -36,7 +36,7 @@ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.util.VersionInfo import org.apache.hive.common.util.HiveVersionInfo -import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFiles} +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql._ @@ -489,7 +489,6 @@ private[spark] object HiveUtils extends Logging { barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) } - isolatedLoader.createClient() } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HadoopVersionInfoSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HadoopVersionInfoSuite.scala index d0c68fde2f1f2..65492abf38cc0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HadoopVersionInfoSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HadoopVersionInfoSuite.scala @@ -59,7 +59,7 @@ class HadoopVersionInfoSuite extends SparkFunSuite { val sparkConf = new SparkConf() sparkConf.set(HiveUtils.HIVE_METASTORE_VERSION, "2.0") sparkConf.set( - HiveUtils.HIVE_METASTORE_JARS.key, + HiveUtils.HIVE_METASTORE_JARS, jars.map(_.getCanonicalPath).mkString(File.pathSeparator)) HiveClientBuilder.buildConf(Map.empty).foreach { case (k, v) => hadoopConf.set(k, v) From 6214ebf03855b76d84990df3f965941287b1d3d3 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 29 Sep 2020 17:41:53 +0800 Subject: [PATCH 05/24] Update HadoopVersionInfoSuite.scala --- .../apache/spark/sql/hive/client/HadoopVersionInfoSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HadoopVersionInfoSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HadoopVersionInfoSuite.scala index 65492abf38cc0..3e02a5389fbd3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HadoopVersionInfoSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HadoopVersionInfoSuite.scala @@ -60,7 +60,7 @@ class HadoopVersionInfoSuite extends SparkFunSuite { sparkConf.set(HiveUtils.HIVE_METASTORE_VERSION, "2.0") sparkConf.set( HiveUtils.HIVE_METASTORE_JARS, - jars.map(_.getCanonicalPath).mkString(File.pathSeparator)) + jars.map(_.getCanonicalPath).mkString(";")) HiveClientBuilder.buildConf(Map.empty).foreach { case (k, v) => hadoopConf.set(k, v) } From e82ed521373ed68b57da88c3f9f061c401e43190 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 29 Sep 2020 20:15:32 +0800 Subject: [PATCH 06/24] follow comment --- .../main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index f1afcffcae475..be49ed4f80dc9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -90,7 +90,8 @@ private[spark] object HiveUtils extends Logging { | ${builtinHiveVersion} or not defined. | 2. "maven" | Use Hive jars of specified version downloaded from Maven repositories. - | 3. A classpath in the standard format for both Hive and Hadoop. + | 3. A classpath in the standard format for both Hive and Hadoop, we should always + | be fully qualified URL to indicate other file systems. """.stripMargin) .version("1.4.0") .stringConf @@ -453,15 +454,13 @@ private[spark] object HiveUtils extends Logging { hiveMetastoreJars .split(";") .flatMap { - case path if path.contains("\\") => + case path if Utils.isWindows => addLocalHiveJars(new File(path)) case path => val uri = new Path(path).toUri uri.getScheme match { - case null | "file" => + case "file" => addLocalHiveJars(new File(uri.getPath)) - case "local" => - new File("file:" + uri.getPath).toURL :: Nil case "http" | "https" | "ftp" => try { // validate and fetch URI file From f2869e9824d035a1279142294fca7cdded5179cf Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 29 Sep 2020 20:17:37 +0800 Subject: [PATCH 07/24] Update HiveUtils.scala --- .../src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index be49ed4f80dc9..76c2c10a45ad5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -459,7 +459,7 @@ private[spark] object HiveUtils extends Logging { case path => val uri = new Path(path).toUri uri.getScheme match { - case "file" => + case "file" | "local" => addLocalHiveJars(new File(uri.getPath)) case "http" | "https" | "ftp" => try { From c200d9b8503360cd0a268d1cb67e8c471df9705e Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 7 Oct 2020 19:12:48 +0800 Subject: [PATCH 08/24] Update HiveUtils.scala --- .../org/apache/spark/sql/hive/HiveUtils.scala | 46 +++++++++++++++++-- 1 file changed, 43 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 76c2c10a45ad5..908434cd8d15a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -90,13 +90,22 @@ private[spark] object HiveUtils extends Logging { | ${builtinHiveVersion} or not defined. | 2. "maven" | Use Hive jars of specified version downloaded from Maven repositories. - | 3. A classpath in the standard format for both Hive and Hadoop, we should always + | 3. "path" + | A classpath configured by `spark.sql.hive.metastore.jars.path` in the standard format + | for both Hive and Hadoop. + | 4. A classpath in the standard format for both Hive and Hadoop, we should always | be fully qualified URL to indicate other file systems. """.stripMargin) .version("1.4.0") .stringConf .createWithDefault("builtin") + val HIVE_METASTORE_JARS_PATH = buildStaticConf("spark.sql.hive.metastore.jars.path") + .doc(s"When ${HIVE_METASTORE_JARS} is set as `path`, use Hive jars configured by this") + .stringConf + .toSequence + .createWithDefault(Nil) + val CONVERT_METASTORE_PARQUET = buildConf("spark.sql.hive.convertMetastoreParquet") .doc("When set to true, the built-in Parquet reader and writer are used to process " + "parquet tables created by using the HiveQL syntax, instead of Hive serde.") @@ -399,7 +408,7 @@ private[spark] object HiveUtils extends Logging { config = configurations, barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) - } else { + } else if (hiveMetastoreJars =="path") { def addLocalHiveJars(file: File): Seq[URL] = { if (file.getName == "*") { @@ -477,7 +486,38 @@ private[spark] object HiveUtils extends Logging { logInfo( s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " + - s"using ${jars.mkString(";")}") + s"using path: ${jars.mkString(";")}") + new IsolatedClientLoader( + version = metaVersion, + sparkConf = conf, + hadoopConf = hadoopConf, + execJars = jars.toSeq, + config = configurations, + isolationOn = true, + barrierPrefixes = hiveMetastoreBarrierPrefixes, + sharedPrefixes = hiveMetastoreSharedPrefixes) + } else { + // Convert to files and expand any directories. + val jars = + hiveMetastoreJars + .split(File.pathSeparator) + .flatMap { + case path if new File(path).getName == "*" => + val files = new File(path).getParentFile.listFiles() + if (files == null) { + logWarning(s"Hive jar path '$path' does not exist.") + Nil + } else { + files.filter(_.getName.toLowerCase(Locale.ROOT).endsWith(".jar")).toSeq + } + case path => + new File(path) :: Nil + } + .map(_.toURI.toURL) + + logInfo( + s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " + + s"using ${jars.mkString(":")}") new IsolatedClientLoader( version = metaVersion, sparkConf = conf, From e3395f5e29ec0291b350dfa451479307fdc92c2c Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 7 Oct 2020 19:23:23 +0800 Subject: [PATCH 09/24] Update HadoopVersionInfoSuite.scala --- .../apache/spark/sql/hive/client/HadoopVersionInfoSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HadoopVersionInfoSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HadoopVersionInfoSuite.scala index 3e02a5389fbd3..65492abf38cc0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HadoopVersionInfoSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HadoopVersionInfoSuite.scala @@ -60,7 +60,7 @@ class HadoopVersionInfoSuite extends SparkFunSuite { sparkConf.set(HiveUtils.HIVE_METASTORE_VERSION, "2.0") sparkConf.set( HiveUtils.HIVE_METASTORE_JARS, - jars.map(_.getCanonicalPath).mkString(";")) + jars.map(_.getCanonicalPath).mkString(File.pathSeparator)) HiveClientBuilder.buildConf(Map.empty).foreach { case (k, v) => hadoopConf.set(k, v) } From 02542975d8fc6786b223bd403bc179180e5134e7 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 7 Oct 2020 19:32:49 +0800 Subject: [PATCH 10/24] Update HiveUtils.scala --- .../org/apache/spark/sql/hive/HiveUtils.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 908434cd8d15a..ab3b5bcccdff2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -502,17 +502,17 @@ private[spark] object HiveUtils extends Logging { hiveMetastoreJars .split(File.pathSeparator) .flatMap { - case path if new File(path).getName == "*" => - val files = new File(path).getParentFile.listFiles() - if (files == null) { - logWarning(s"Hive jar path '$path' does not exist.") - Nil - } else { - files.filter(_.getName.toLowerCase(Locale.ROOT).endsWith(".jar")).toSeq - } - case path => - new File(path) :: Nil - } + case path if new File(path).getName == "*" => + val files = new File(path).getParentFile.listFiles() + if (files == null) { + logWarning(s"Hive jar path '$path' does not exist.") + Nil + } else { + files.filter(_.getName.toLowerCase(Locale.ROOT).endsWith(".jar")).toSeq + } + case path => + new File(path) :: Nil + } .map(_.toURI.toURL) logInfo( From c6475ee20e85f981322e75a410cbf803a8c6d5c6 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 7 Oct 2020 19:43:32 +0800 Subject: [PATCH 11/24] Update HiveUtils.scala --- .../main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index ab3b5bcccdff2..25f946d351e77 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -91,8 +91,8 @@ private[spark] object HiveUtils extends Logging { | 2. "maven" | Use Hive jars of specified version downloaded from Maven repositories. | 3. "path" - | A classpath configured by `spark.sql.hive.metastore.jars.path` in the standard format - | for both Hive and Hadoop. + | Use Hive jars configured by `spark.sql.hive.metastore.jars.path` in comma separated format + | support both local or remote paths. | 4. A classpath in the standard format for both Hive and Hadoop, we should always | be fully qualified URL to indicate other file systems. """.stripMargin) @@ -101,7 +101,8 @@ private[spark] object HiveUtils extends Logging { .createWithDefault("builtin") val HIVE_METASTORE_JARS_PATH = buildStaticConf("spark.sql.hive.metastore.jars.path") - .doc(s"When ${HIVE_METASTORE_JARS} is set as `path`, use Hive jars configured by this") + .doc(s"Comma separated path of Hive jars, both support local and remote paths." + + s"When ${HIVE_METASTORE_JARS} is set as `path`, we will use Hive jars configured by this") .stringConf .toSequence .createWithDefault(Nil) From e50050bb77d1a4374ec06c31530e62de755551fe Mon Sep 17 00:00:00 2001 From: angerszhu Date: Thu, 8 Oct 2020 00:02:36 +0800 Subject: [PATCH 12/24] Update HiveUtils.scala --- .../scala/org/apache/spark/sql/hive/HiveUtils.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 25f946d351e77..b36e422c9b8d4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -191,6 +191,7 @@ private[spark] object HiveUtils extends Logging { * The location of the jars that should be used to instantiate the HiveMetastoreClient. This * property can be one of three options: * - a classpath in the standard format for both hive and hadoop. + * - path - attempt to discover the jars with paths configured bt `HIVE_METASTORE_JARS_PATH` * - builtin - attempt to discover the jars that were used to load Spark SQL and use those. This * option is only valid when using the execution version of Hive. * - maven - download the correct version of hive on demand from maven. @@ -199,6 +200,13 @@ private[spark] object HiveUtils extends Logging { conf.getConf(HIVE_METASTORE_JARS) } + /** + * Hive jars paths, only work when `HIVE_METASTORE_JARS` is `path`. + */ + private def hiveMetastoreJarsPath(conf: SQLConf): Seq[String] = { + conf.getConf(HIVE_METASTORE_JARS_PATH) + } + /** * A comma separated list of class prefixes that should be loaded using the classloader that * is shared between Spark SQL and a specific version of Hive. An example of classes that should @@ -461,8 +469,7 @@ private[spark] object HiveUtils extends Logging { // Convert to files and expand any directories. val jars = - hiveMetastoreJars - .split(";") + HiveUtils.hiveMetastoreJarsPath(sqlConf) .flatMap { case path if Utils.isWindows => addLocalHiveJars(new File(path)) From 927840422002fb76352fcb6beff906a7381b1abb Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 9 Oct 2020 10:28:41 +0800 Subject: [PATCH 13/24] Update HiveUtils.scala --- .../src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index b36e422c9b8d4..581b2287ecee1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -417,7 +417,7 @@ private[spark] object HiveUtils extends Logging { config = configurations, barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) - } else if (hiveMetastoreJars =="path") { + } else if (hiveMetastoreJars == "path") { def addLocalHiveJars(file: File): Seq[URL] = { if (file.getName == "*") { From 9ee1a861e650f716870befcefd9820dc8425b9ae Mon Sep 17 00:00:00 2001 From: angerszhu Date: Mon, 12 Oct 2020 12:10:37 +0800 Subject: [PATCH 14/24] Update HiveUtils.scala --- .../main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 581b2287ecee1..991394c9bfd4e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -92,16 +92,16 @@ private[spark] object HiveUtils extends Logging { | Use Hive jars of specified version downloaded from Maven repositories. | 3. "path" | Use Hive jars configured by `spark.sql.hive.metastore.jars.path` in comma separated format - | support both local or remote paths. - | 4. A classpath in the standard format for both Hive and Hadoop, we should always - | be fully qualified URL to indicate other file systems. + | support both local or remote paths, it should always be fully qualified URL to indicate + | other file systems. + | 4. A classpath in the standard format for both Hive and Hadoop. """.stripMargin) .version("1.4.0") .stringConf .createWithDefault("builtin") val HIVE_METASTORE_JARS_PATH = buildStaticConf("spark.sql.hive.metastore.jars.path") - .doc(s"Comma separated path of Hive jars, both support local and remote paths." + + .doc(s"Comma separated path of Hive jars, support both local and remote paths." + s"When ${HIVE_METASTORE_JARS} is set as `path`, we will use Hive jars configured by this") .stringConf .toSequence From 4016327a018cde039433d0b74b4dab2daacad45e Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 13 Oct 2020 11:22:17 +0800 Subject: [PATCH 15/24] Update HiveUtils.scala --- .../main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 991394c9bfd4e..aea2667f35f2a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -101,8 +101,10 @@ private[spark] object HiveUtils extends Logging { .createWithDefault("builtin") val HIVE_METASTORE_JARS_PATH = buildStaticConf("spark.sql.hive.metastore.jars.path") - .doc(s"Comma separated path of Hive jars, support both local and remote paths." + - s"When ${HIVE_METASTORE_JARS} is set as `path`, we will use Hive jars configured by this") + .doc(s"Comma separated path of Hive jars, support both local and remote paths," + + s"we support path wildcards such as `hdfs://path/to/jars/*`, but not support" + + s"nested path wildcards such as `hdfs://path/to/jars/*/*`. When ${HIVE_METASTORE_JARS}" + + s"is set to `path`, we will use Hive jars configured by this") .stringConf .toSequence .createWithDefault(Nil) From cf0f846022572986e388ab2fc3f1d8405a9eebb0 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 14 Oct 2020 22:14:48 +0800 Subject: [PATCH 16/24] Update HiveUtils.scala --- .../org/apache/spark/sql/hive/HiveUtils.scala | 114 ++++++------------ 1 file changed, 38 insertions(+), 76 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index aea2667f35f2a..0452f4f81571e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -42,6 +42,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf._ @@ -82,7 +83,7 @@ private[spark] object HiveUtils extends Logging { val HIVE_METASTORE_JARS = buildStaticConf("spark.sql.hive.metastore.jars") .doc(s""" | Location of the jars that should be used to instantiate the HiveMetastoreClient. - | This property can be one of three options: " + | This property can be one of four options: " | 1. "builtin" | Use Hive ${builtinHiveVersion}, which is bundled with the Spark assembly when | -Phive is enabled. When this option is chosen, @@ -102,9 +103,16 @@ private[spark] object HiveUtils extends Logging { val HIVE_METASTORE_JARS_PATH = buildStaticConf("spark.sql.hive.metastore.jars.path") .doc(s"Comma separated path of Hive jars, support both local and remote paths," + - s"we support path wildcards such as `hdfs://path/to/jars/*`, but not support" + - s"nested path wildcards such as `hdfs://path/to/jars/*/*`. When ${HIVE_METASTORE_JARS}" + - s"is set to `path`, we will use Hive jars configured by this") + s"Such as:" + + s" 1. /path/to/jar/xxx.jar" + + s" 2. file:///path/to/jar/xxx.jar" + + s" 3. local:///path/to/jar/xxx.jar" + + s" 4. hdfs://path/to/jar/xxx.jar" + + s" 5. [http/https/ftp]://path/to/jar/xxx.jar" + + s"For local path and URI path, we only support path wildcard for the last layer," + + s"but for remote HDFS path, we support nested path wildcard." + + s"When ${HIVE_METASTORE_JARS.key} is set to `path`, we will use Hive jars configured by this") + .version("3.1.0") .stringConf .toSequence .createWithDefault(Nil) @@ -359,6 +367,20 @@ private[spark] object HiveUtils extends Logging { val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf) val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion) + def addLocalHiveJars(file: File): Seq[URL] = { + if (file.getName == "*") { + val files = file.getParentFile.listFiles() + if (files == null) { + logWarning(s"Hive jar path '${file.getPath}' does not exist.") + Nil + } else { + files.filter(_.getName.toLowerCase(Locale.ROOT).endsWith(".jar")).map(_.toURL).toSeq + } + } else { + file.toURL :: Nil + } + } + val isolatedLoader = if (hiveMetastoreJars == "builtin") { if (builtinHiveVersion != hiveMetastoreVersion) { throw new IllegalArgumentException( @@ -420,55 +442,6 @@ private[spark] object HiveUtils extends Logging { barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) } else if (hiveMetastoreJars == "path") { - - def addLocalHiveJars(file: File): Seq[URL] = { - if (file.getName == "*") { - val files = file.getParentFile.listFiles() - if (files == null) { - logWarning(s"Hive jar path '${file.getPath}' does not exist.") - Nil - } else { - files.filter(_.getName.toLowerCase(Locale.ROOT).endsWith(".jar")).map(_.toURL).toSeq - } - } else { - file.toURL :: Nil - } - } - - def checkRemoteHiveJars(path: String): Seq[URL] = { - try { - val hadoopPath = new Path(path) - val fs = hadoopPath.getFileSystem(hadoopConf) - if (hadoopPath.getName == "*") { - val parent = hadoopPath.getParent - if (!fs.exists(parent)) { - logWarning(s"Hive Jar ${path} does not exist.") - Nil - } else if (!fs.getFileStatus(parent).isDirectory) { - logWarning(s"Hive Jar ${parent} is not a directory.") - Nil - } else { - fs.listStatus(parent).map(_.getPath.toUri.toURL) - } - } else { - if (!fs.exists(hadoopPath)) { - logWarning(s"Hive Jar ${path} does not exist.") - Nil - } else if (fs.getFileStatus(hadoopPath).isDirectory) { - logWarning(s"Hive Jar ${path} not allow directory without `*`") - Nil - } else { - // Since tar/tar.gz file we can't know it's final path yet, not support it - hadoopPath.toUri.toURL :: Nil - } - } - } catch { - case NonFatal(e) => - logError(s"Failed to find $path to Hive Jars", e) - Nil - } - } - // Convert to files and expand any directories. val jars = HiveUtils.hiveMetastoreJarsPath(sqlConf) @@ -478,19 +451,17 @@ private[spark] object HiveUtils extends Logging { case path => val uri = new Path(path).toUri uri.getScheme match { - case "file" | "local" => + // For path `/path/to/jar/xx.jar` schema is `null` + case "file" | "local" | null => addLocalHiveJars(new File(uri.getPath)) - case "http" | "https" | "ftp" => - try { - // validate and fetch URI file - uri.toURL :: Nil - } catch { - case _: Throwable => - logWarning(s"Hive Jars URI (${uri.toString}) is not a valid URL.") - Nil - } case _ => - checkRemoteHiveJars(path) + DataSource.checkAndGlobPathIfNecessary( + pathStrings = Seq(path), + hadoopConf = hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = false, + enableGlobbing = true + ).map(_.toUri.toURL) } } @@ -511,18 +482,9 @@ private[spark] object HiveUtils extends Logging { val jars = hiveMetastoreJars .split(File.pathSeparator) - .flatMap { - case path if new File(path).getName == "*" => - val files = new File(path).getParentFile.listFiles() - if (files == null) { - logWarning(s"Hive jar path '$path' does not exist.") - Nil - } else { - files.filter(_.getName.toLowerCase(Locale.ROOT).endsWith(".jar")).toSeq - } - case path => - new File(path) :: Nil - } + .flatMap { path => + addLocalHiveJars(new File(path)) + } .map(_.toURI.toURL) logInfo( From b5241c810377a19d75cafb15c014a5cbe4fbccef Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 14 Oct 2020 22:22:22 +0800 Subject: [PATCH 17/24] Update HiveUtils.scala --- .../src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 0452f4f81571e..11266a0aaa5a6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import scala.language.implicitConversions -import scala.util.control.NonFatal import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.apache.hadoop.conf.Configuration @@ -201,7 +200,7 @@ private[spark] object HiveUtils extends Logging { * The location of the jars that should be used to instantiate the HiveMetastoreClient. This * property can be one of three options: * - a classpath in the standard format for both hive and hadoop. - * - path - attempt to discover the jars with paths configured bt `HIVE_METASTORE_JARS_PATH` + * - path - attempt to discover the jars with paths configured by `HIVE_METASTORE_JARS_PATH`. * - builtin - attempt to discover the jars that were used to load Spark SQL and use those. This * option is only valid when using the execution version of Hive. * - maven - download the correct version of hive on demand from maven. From 6fbe082416a778d1b71bf3c5560f8b9725938c6a Mon Sep 17 00:00:00 2001 From: angerszhu Date: Thu, 15 Oct 2020 11:10:53 +0800 Subject: [PATCH 18/24] Update HiveUtils.scala --- .../org/apache/spark/sql/hive/HiveUtils.scala | 43 ++++++++----------- 1 file changed, 18 insertions(+), 25 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 11266a0aaa5a6..c9bad6b89b62a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -28,7 +28,6 @@ import scala.language.implicitConversions import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.session.SessionState @@ -101,15 +100,16 @@ private[spark] object HiveUtils extends Logging { .createWithDefault("builtin") val HIVE_METASTORE_JARS_PATH = buildStaticConf("spark.sql.hive.metastore.jars.path") - .doc(s"Comma separated path of Hive jars, support both local and remote paths," + - s"Such as:" + - s" 1. /path/to/jar/xxx.jar" + - s" 2. file:///path/to/jar/xxx.jar" + - s" 3. local:///path/to/jar/xxx.jar" + - s" 4. hdfs://path/to/jar/xxx.jar" + - s" 5. [http/https/ftp]://path/to/jar/xxx.jar" + - s"For local path and URI path, we only support path wildcard for the last layer," + - s"but for remote HDFS path, we support nested path wildcard." + + .doc(s"Comma separated fully qualified URL of Hive jars, support both local and remote paths," + + s"Such as: " + + s" 1. file://path/to/jar/xxx.jar" + + s" 2. hdfs://nameservice/path/to/jar/xxx.jar" + + s" 3. /path/to/jar (path without schema will be treated as HDFS path)" + + s" 3. [http/https/ftp]://path/to/jar/xxx.jar" + + s"For URI, we can't support path wildcard, but for other URL support nested path wildcard," + + s"Such as: " + + s" 1. file://path/to/jar/*, file://path/to/jar/*/*" + + s" 2. hdfs://nameservice/path/to/jar/*, hdfs://nameservice/path/to/jar/*/*" + s"When ${HIVE_METASTORE_JARS.key} is set to `path`, we will use Hive jars configured by this") .version("3.1.0") .stringConf @@ -445,23 +445,16 @@ private[spark] object HiveUtils extends Logging { val jars = HiveUtils.hiveMetastoreJarsPath(sqlConf) .flatMap { - case path if Utils.isWindows => + case path if path.contains("\\") && Utils.isWindows => addLocalHiveJars(new File(path)) case path => - val uri = new Path(path).toUri - uri.getScheme match { - // For path `/path/to/jar/xx.jar` schema is `null` - case "file" | "local" | null => - addLocalHiveJars(new File(uri.getPath)) - case _ => - DataSource.checkAndGlobPathIfNecessary( - pathStrings = Seq(path), - hadoopConf = hadoopConf, - checkEmptyGlobPath = true, - checkFilesExist = false, - enableGlobbing = true - ).map(_.toUri.toURL) - } + DataSource.checkAndGlobPathIfNecessary( + pathStrings = Seq(path), + hadoopConf = hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = false, + enableGlobbing = true + ).map(_.toUri.toURL) } logInfo( From b79ab0d93a6e86fbf0706588d21c0540b3a4c0ab Mon Sep 17 00:00:00 2001 From: angerszhu Date: Thu, 15 Oct 2020 18:21:38 +0800 Subject: [PATCH 19/24] Update HiveUtils.scala --- .../src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index c9bad6b89b62a..4645724a09c06 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -104,8 +104,8 @@ private[spark] object HiveUtils extends Logging { s"Such as: " + s" 1. file://path/to/jar/xxx.jar" + s" 2. hdfs://nameservice/path/to/jar/xxx.jar" + - s" 3. /path/to/jar (path without schema will be treated as HDFS path)" + - s" 3. [http/https/ftp]://path/to/jar/xxx.jar" + + s" 3. /path/to/jar/ (path without schema will follow hadoop conf `fs.defaultFS`'s schema)" + + s" 4. [http/https/ftp]://path/to/jar/xxx.jar" + s"For URI, we can't support path wildcard, but for other URL support nested path wildcard," + s"Such as: " + s" 1. file://path/to/jar/*, file://path/to/jar/*/*" + From f1a408575758bd0a03deb0437e619bec9d4846af Mon Sep 17 00:00:00 2001 From: angerszhu Date: Thu, 15 Oct 2020 18:49:05 +0800 Subject: [PATCH 20/24] Update HiveUtils.scala --- .../src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 4645724a09c06..0700cbf6b1479 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -104,7 +104,7 @@ private[spark] object HiveUtils extends Logging { s"Such as: " + s" 1. file://path/to/jar/xxx.jar" + s" 2. hdfs://nameservice/path/to/jar/xxx.jar" + - s" 3. /path/to/jar/ (path without schema will follow hadoop conf `fs.defaultFS`'s schema)" + + s" 3. /path/to/jar/ (path without URI scheme follow conf `fs.defaultFS`'s URI schema)" + s" 4. [http/https/ftp]://path/to/jar/xxx.jar" + s"For URI, we can't support path wildcard, but for other URL support nested path wildcard," + s"Such as: " + From d631a758e491fb7f5e7ffcb66f74cdef19028be8 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sat, 17 Oct 2020 11:24:22 +0800 Subject: [PATCH 21/24] Update HiveUtils.scala --- .../main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 0700cbf6b1479..df059be037adc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -90,8 +90,8 @@ private[spark] object HiveUtils extends Logging { | 2. "maven" | Use Hive jars of specified version downloaded from Maven repositories. | 3. "path" - | Use Hive jars configured by `spark.sql.hive.metastore.jars.path` in comma separated format - | support both local or remote paths, it should always be fully qualified URL to indicate + | Use Hive jars configured by `spark.sql.hive.metastore.jars.path` in comma separated format. + | Support both local or remote paths, it should always be fully qualified URL to indicate | other file systems. | 4. A classpath in the standard format for both Hive and Hadoop. """.stripMargin) @@ -106,8 +106,8 @@ private[spark] object HiveUtils extends Logging { s" 2. hdfs://nameservice/path/to/jar/xxx.jar" + s" 3. /path/to/jar/ (path without URI scheme follow conf `fs.defaultFS`'s URI schema)" + s" 4. [http/https/ftp]://path/to/jar/xxx.jar" + - s"For URI, we can't support path wildcard, but for other URL support nested path wildcard," + - s"Such as: " + + s"Notice: `http/https/ftp` doesn't support wildcard, but for other URLs support" + + s" nested path wildcard, Such as: " + s" 1. file://path/to/jar/*, file://path/to/jar/*/*" + s" 2. hdfs://nameservice/path/to/jar/*, hdfs://nameservice/path/to/jar/*/*" + s"When ${HIVE_METASTORE_JARS.key} is set to `path`, we will use Hive jars configured by this") @@ -477,7 +477,6 @@ private[spark] object HiveUtils extends Logging { .flatMap { path => addLocalHiveJars(new File(path)) } - .map(_.toURI.toURL) logInfo( s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " + From efc5ae0718836f06afffe860c84277e80ef2fa0a Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sat, 17 Oct 2020 12:19:25 +0800 Subject: [PATCH 22/24] Update HiveUtils.scala --- .../src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index df059be037adc..961ad9ede571c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -90,7 +90,7 @@ private[spark] object HiveUtils extends Logging { | 2. "maven" | Use Hive jars of specified version downloaded from Maven repositories. | 3. "path" - | Use Hive jars configured by `spark.sql.hive.metastore.jars.path` in comma separated format. + | Use Hive jars set by `spark.sql.hive.metastore.jars.path` in comma separated format. | Support both local or remote paths, it should always be fully qualified URL to indicate | other file systems. | 4. A classpath in the standard format for both Hive and Hadoop. From 6e67c7b9fcf9f5c66cb899f5f9000a5598492831 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sat, 17 Oct 2020 12:20:22 +0800 Subject: [PATCH 23/24] Update HiveUtils.scala --- .../main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 961ad9ede571c..8389e64cbc95e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -90,9 +90,9 @@ private[spark] object HiveUtils extends Logging { | 2. "maven" | Use Hive jars of specified version downloaded from Maven repositories. | 3. "path" - | Use Hive jars set by `spark.sql.hive.metastore.jars.path` in comma separated format. - | Support both local or remote paths, it should always be fully qualified URL to indicate - | other file systems. + | Use Hive jars configured by `spark.sql.hive.metastore.jars.path` + | in comma separated format. Support both local or remote paths, it should always + | be fully qualified URL to indicate other file systems. | 4. A classpath in the standard format for both Hive and Hadoop. """.stripMargin) .version("1.4.0") From ea9ef2bbfd8c2314605ae8e4daa12d2b77b31e62 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Thu, 22 Oct 2020 15:29:06 +0800 Subject: [PATCH 24/24] Update HiveUtils.scala --- .../org/apache/spark/sql/hive/HiveUtils.scala | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 8389e64cbc95e..67467637805f9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -91,8 +91,7 @@ private[spark] object HiveUtils extends Logging { | Use Hive jars of specified version downloaded from Maven repositories. | 3. "path" | Use Hive jars configured by `spark.sql.hive.metastore.jars.path` - | in comma separated format. Support both local or remote paths, it should always - | be fully qualified URL to indicate other file systems. + | in comma separated format. Support both local or remote paths. | 4. A classpath in the standard format for both Hive and Hadoop. """.stripMargin) .version("1.4.0") @@ -100,16 +99,16 @@ private[spark] object HiveUtils extends Logging { .createWithDefault("builtin") val HIVE_METASTORE_JARS_PATH = buildStaticConf("spark.sql.hive.metastore.jars.path") - .doc(s"Comma separated fully qualified URL of Hive jars, support both local and remote paths," + + .doc(s"Comma separated URL of Hive jars, support both local and remote paths," + s"Such as: " + - s" 1. file://path/to/jar/xxx.jar" + - s" 2. hdfs://nameservice/path/to/jar/xxx.jar" + - s" 3. /path/to/jar/ (path without URI scheme follow conf `fs.defaultFS`'s URI schema)" + - s" 4. [http/https/ftp]://path/to/jar/xxx.jar" + - s"Notice: `http/https/ftp` doesn't support wildcard, but for other URLs support" + - s" nested path wildcard, Such as: " + - s" 1. file://path/to/jar/*, file://path/to/jar/*/*" + - s" 2. hdfs://nameservice/path/to/jar/*, hdfs://nameservice/path/to/jar/*/*" + + s" 1. file://path/to/jar/xxx.jar\n" + + s" 2. hdfs://nameservice/path/to/jar/xxx.jar\n" + + s" 3. /path/to/jar/ (path without URI scheme follow conf `fs.defaultFS`'s URI schema)\n" + + s" 4. [http/https/ftp]://path/to/jar/xxx.jar\n" + + s"Notice: `http/https/ftp` doesn't support wildcard, but other URLs support" + + s"nested path wildcard, Such as: " + + s" 1. file://path/to/jar/*, file://path/to/jar/*/*\n" + + s" 2. hdfs://nameservice/path/to/jar/*, hdfs://nameservice/path/to/jar/*/*\n" + s"When ${HIVE_METASTORE_JARS.key} is set to `path`, we will use Hive jars configured by this") .version("3.1.0") .stringConf