Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 100 additions & 15 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,18 @@ import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.language.implicitConversions
import scala.util.control.NonFatal

import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.util.VersionInfo
import org.apache.hive.common.util.HiveVersionInfo

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFiles}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
Expand Down Expand Up @@ -88,12 +90,20 @@ private[spark] object HiveUtils extends Logging {
| <code>${builtinHiveVersion}</code> or not defined.
| 2. "maven"
| Use Hive jars of specified version downloaded from Maven repositories.
| 3. A classpath in the standard format for both Hive and Hadoop.
| 3. "path"
| A classpath configured by `spark.sql.hive.metastore.jars.path` in the standard format
| for both Hive and Hadoop.
""".stripMargin)
.version("1.4.0")
.stringConf
.createWithDefault("builtin")

val HIVE_METASTORE_JARS_PATH = buildStaticConf("spark.sql.hive.metastore.jars.path")
.doc(s"When ${HIVE_METASTORE_JARS} is set as `path`, use Hive jars configured by this")
.stringConf
.toSequence
.createWithDefault(Nil)

val CONVERT_METASTORE_PARQUET = buildConf("spark.sql.hive.convertMetastoreParquet")
.doc("When set to true, the built-in Parquet reader and writer are used to process " +
"parquet tables created by using the HiveQL syntax, instead of Hive serde.")
Expand Down Expand Up @@ -396,23 +406,95 @@ private[spark] object HiveUtils extends Logging {
config = configurations,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
} else {
// Convert to files and expand any directories.
val jars =
hiveMetastoreJars
.split(File.pathSeparator)
.flatMap {
case path if new File(path).getName == "*" =>
val files = new File(path).getParentFile.listFiles()
if (files == null) {
logWarning(s"Hive jar path '$path' does not exist.")
} else if (hiveMetastoreJars == "path") {

val hiveMetastoreJarsPath: Seq[String] = conf.get(HiveUtils.HIVE_METASTORE_JARS_PATH)

def addLocalHiveJars(file: File): Seq[File] = {
if (file.getName == "*") {
val files = file.getParentFile.listFiles()
if (files == null) {
logWarning(s"Hive jar path '${file.getPath}' does not exist.")
Nil
} else {
files.filter(_.getName.toLowerCase(Locale.ROOT).endsWith(".jar")).toSeq
}
} else {
file :: Nil
}
}

def checkRemoteHiveJars(path: String): Seq[File] = {
try {
val hadoopPath = new Path(path)
val fs = hadoopPath.getFileSystem(hadoopConf)
if (hadoopPath.getName == "*") {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use DataSource.checkAndGlobPathIfNecessary?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use DataSource.checkAndGlobPathIfNecessary?

Yea, I will test this.

@AngersZhuuuu AngersZhuuuu Oct 14, 2020

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use DataSource.checkAndGlobPathIfNecessary?

Yea, I will test this.

@sunchao Follow this suggestion, we can support nested hdfs path wildcard now such as hdfs://xx/xx//

val parent = hadoopPath.getParent
if (!fs.exists(parent)) {
logWarning(s"Hive Jar ${path} does not exist.")
Nil
} else if (!fs.getFileStatus(parent).isDirectory) {
logWarning(s"Hive Jar ${parent} is not a directory.")
Nil
} else {
fs.listStatus(parent).map(file =>
Utils.fetchFile(file.getPath.toUri.toString,
new File(SparkFiles.getRootDirectory()), conf,
SparkEnv.get.securityManager, hadoopConf,
System.currentTimeMillis(), useCache = false)
)
}
} else {
if (!fs.exists(hadoopPath)) {
logWarning(s"Hive Jar ${path} does not exist.")
Nil
} else if (fs.getFileStatus(hadoopPath).isDirectory) {
logWarning(s"Hive Jar ${path} not allow directory without `*`")
Nil
} else {
files.filter(_.getName.toLowerCase(Locale.ROOT).endsWith(".jar")).toSeq
// Since tar/tar.gz file we can't know it's final path yet, not support it
Utils.fetchFile(hadoopPath.toUri.toString,
new File(SparkFiles.getRootDirectory()), conf,
SparkEnv.get.securityManager, hadoopConf,
System.currentTimeMillis(), useCache = false) :: Nil
}
case path =>
new File(path) :: Nil
}
} catch {
case NonFatal(e) =>
logError(s"Failed to find $path to Hive Jars", e)
Nil
}
}

// Convert to files and expand any directories.
val jars =
hiveMetastoreJarsPath
.flatMap {
case path if path.contains("\\") =>

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not follow this. Do you want to check if this is a Windows path? If that's the case, you can use Utils.isWindows.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

\\ can be in file names in other OSs. I don't think it makes sense to assume that this is a local path.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

\\ can be in file names in other OSs. I don't think it makes sense to assume that this is a local path.

All right, I will raise a pr to change this in SparkContext's addJar/addFile

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not follow this. Do you want to check if this is a Windows path? If that's the case, you can use Utils.isWindows.

Here need use condition Utils.isWindows && path.contains("\\") since path maybe remote path

addLocalHiveJars(new File(path))
case path =>
val uri = new Path(path).toUri
uri.getScheme match {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we need to check the scheme and do things differently. spark.read.parquet(...) can support any schema and internally just use the Hadoop DFS API. Can't we do that as well here?

Can you point to other places in Spark that do similar things to support this PR?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we need to check the scheme and do things differently. spark.read.parquet(...) can support any schema and internally just use the Hadoop DFS API. Can't we do that as well here?

Can you point to other places in Spark that do similar things to support this PR?

In datasource file index, path is fully qualified URL to indicate other file systems.
So it support file:// | local:// , but without schema, it will be handled as hdfs path.
One way to avoid check schema, as #29881 (comment) mentioned, we host user use fully qualified URL.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about current change? Make the logic more simpler.

case null | "file" =>
addLocalHiveJars(new File(uri.getPath))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming null as a local path is kind of ambiguous (local paths vs fs.default.name). Maybe you'll have to document that it should always be fully qualified URL to indicate other file systems.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming null as a local path is kind of ambiguous (local paths vs fs.default.name). Maybe you'll have to document that it should always be fully qualified URL to indicate other file systems.

Add comment in doc of conf spark.sql.hive.metastore.jars

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this comment on the option "path" instead of "classpath"?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this comment on the option "path" instead of "classpath"?

Can't got your point, can you make it more clear?

case "local" =>
new File("file:" + uri.getPath) :: Nil
case "http" | "https" | "ftp" =>
try {
// validate and fetch URI file
Utils.fetchFile(uri.toURL.toString,
new File(SparkFiles.getRootDirectory()), conf,
SparkEnv.get.securityManager, hadoopConf,
System.currentTimeMillis(), useCache = false) :: Nil
} catch {
case _: Throwable =>
logWarning(s"Hive Jars URI (${uri.toString}) is not a valid URL.")
Nil
}
case _ =>
checkRemoteHiveJars(path)
}
}
.map(_.toURI.toURL)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line can be removed, as addLocalHiveJars already returns URL

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line can be removed, as addLocalHiveJars already returns URL

Done


logInfo(
Expand All @@ -427,6 +509,9 @@ private[spark] object HiveUtils extends Logging {
isolationOn = true,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
} else {
throw new IllegalArgumentException(s"Please set ${HIVE_METASTORE_JARS.key} correctlly using" +
s" ${Seq("buildin", "maven", "path").mkString(", ")}.")
}
isolatedLoader.createClient()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ class HadoopVersionInfoSuite extends SparkFunSuite {

val sparkConf = new SparkConf()
sparkConf.set(HiveUtils.HIVE_METASTORE_VERSION, "2.0")
sparkConf.set(HiveUtils.HIVE_METASTORE_JARS, "path")
sparkConf.set(
HiveUtils.HIVE_METASTORE_JARS,
jars.map(_.getCanonicalPath).mkString(File.pathSeparator))
HiveUtils.HIVE_METASTORE_JARS_PATH.key,
jars.map(_.getCanonicalPath).mkString(","))
HiveClientBuilder.buildConf(Map.empty).foreach { case (k, v) =>
hadoopConf.set(k, v)
}
Expand Down