Skip to content
17 changes: 11 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,6 @@ object SparkSubmit extends CommandLineUtils {
OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.submit.deployMode"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"),
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
sysProp = "spark.driver.memory"),
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
Expand Down Expand Up @@ -480,7 +479,12 @@ object SparkSubmit extends CommandLineUtils {
sysProp = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
sysProp = "spark.driver.supervise"),
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy")
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.jars.ivy"),
OptionAssigner(args.repositories, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.jars.repositories"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a new option?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it is used to store user's repos , then we can use it in download hive jars.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to document it in http://spark.apache.org/docs/latest/configuration.html, like what we did for spark.jars.ivy

OptionAssigner(args.sparkProperties.get("spark.jars.ivySettings").orNull,
ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.jars.ivySettings")
)

// In client mode, launch the application main class directly
Expand Down Expand Up @@ -1027,10 +1031,8 @@ private[spark] object SparkSubmitUtils {
val cr = new ChainResolver
cr.setName("user-list")

// add current default resolver, if any
Option(ivySettings.getDefaultResolver).foreach(cr.add)

// add additional repositories, last resolution in chain takes precedence
// before default resolvers, add additional repositories,
// last resolution in chain takes precedence
repositoryList.zipWithIndex.foreach { case (repo, i) =>
val brr: IBiblioResolver = new IBiblioResolver
brr.setM2compatible(true)
Expand All @@ -1043,6 +1045,9 @@ private[spark] object SparkSubmitUtils {
// scalastyle:on println
}

// add current default resolver, if any
Option(ivySettings.getDefaultResolver).foreach(cr.add)

ivySettings.addResolver(cr)
ivySettings.setDefaultResolver(cr.getName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy

import java.io.{File, OutputStream, PrintStream}
import java.io.{File, FileInputStream, OutputStream, PrintStream}
import java.nio.charset.StandardCharsets

import scala.collection.mutable.ArrayBuffer
Expand All @@ -30,6 +30,7 @@ import org.scalatest.BeforeAndAfterAll

import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils

class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
Expand Down Expand Up @@ -85,8 +86,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
val expected = repos.split(",").map(r => s"$r/")
resolver.getResolvers.toArray.zipWithIndex.foreach { case (resolver: AbstractResolver, i) =>
if (1 < i && i < 3) {
assert(resolver.getName === s"repo-$i")
assert(resolver.asInstanceOf[IBiblioResolver].getRoot === expected(i - 1))
assert(resolver.getName === s"repo-${i + 1}")
assert(resolver.asInstanceOf[IBiblioResolver].getRoot === expected(i))
}
}
}
Expand Down Expand Up @@ -258,4 +259,51 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
assert(jarPath.indexOf("mydep") >= 0, "should find dependency")
}
}

test("search for artifact taking order from user defined repositories to default repositories") {
val main = new MavenCoordinate("a", "b", "0.1")

def isSameFile(left: String, right: String): Boolean = {
val leftInput: FileInputStream = new FileInputStream(left)
val leftMd5 = org.apache.commons.codec.digest.DigestUtils.md5Hex(leftInput)

val rightInput: FileInputStream = new FileInputStream(left)
val rightMd5 = org.apache.commons.codec.digest.DigestUtils.md5Hex(rightInput)

leftMd5 == rightMd5
}

var userDefinedRepo = Utils.createTempDir("my_m2")
try {
IvyTestUtils.withRepository(main, None, Some(userDefinedRepo)) { repo =>
IvyTestUtils.withRepository(main, None, Some(SparkSubmitUtils.m2Path)) {
defaultRepo =>
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
main.toString,
SparkSubmitUtils.buildIvySettings(Option(repo), None),
isTest = false)
assert(isSameFile(Seq(userDefinedRepo, main.groupId, main.artifactId, main.version,
"b-0.1.jar").mkString(File.separatorChar.toString), jarPath))
assert(jarPath.indexOf("b") >= 0, "should find artifact")

}
}

IvyTestUtils.withRepository(main, None, Some(SparkSubmitUtils.m2Path)) { defaultRepo =>
IvyTestUtils.withRepository(main, None, Some(userDefinedRepo)) {
repo =>
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
main.toString,
SparkSubmitUtils.buildIvySettings(Option(repo), None),
isTest = false)
assert(isSameFile(Seq(SparkSubmitUtils.m2Path.getCanonicalPath, main.groupId,
main.artifactId, main.version, "b-0.1.jar").mkString(File.separatorChar.toString),
jarPath))
assert(jarPath.indexOf("b") >= 0, "should find artifact")
}
}
} finally {
Utils.deleteRecursively(userDefinedRepo)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ private[spark] object HiveUtils extends Logging {
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
} else if (hiveMetastoreJars == "maven") {
// TODO: Support for loading the jars from an already downloaded location.
logInfo(
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.")
IsolatedClientLoader.forVersion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private[hive] object IsolatedClientLoader extends Logging {
} else {
val (downloadedFiles, actualHadoopVersion) =
try {
(downloadVersion(resolvedVersion, hadoopVersion, ivyPath), hadoopVersion)
(downloadVersion(resolvedVersion, hadoopVersion, sparkConf, ivyPath), hadoopVersion)
} catch {
case e: RuntimeException if e.getMessage.contains("hadoop") =>
// If the error message contains hadoop, it is probably because the hadoop
Expand All @@ -73,7 +73,7 @@ private[hive] object IsolatedClientLoader extends Logging {
"It is recommended to set jars used by Hive metastore client through " +
"spark.sql.hive.metastore.jars in the production environment.")
sharesHadoopClasses = false
(downloadVersion(resolvedVersion, "2.4.0", ivyPath), "2.4.0")
(downloadVersion(resolvedVersion, "2.4.0", sparkConf, ivyPath), "2.4.0")
}
resolvedVersions.put((resolvedVersion, actualHadoopVersion), downloadedFiles)
resolvedVersions((resolvedVersion, actualHadoopVersion))
Expand Down Expand Up @@ -102,28 +102,37 @@ private[hive] object IsolatedClientLoader extends Logging {
private def downloadVersion(
version: HiveVersion,
hadoopVersion: String,
sparkConf: SparkConf,
ivyPath: Option[String]): Seq[URL] = {
val hiveArtifacts = version.extraDeps ++
Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde")
.map(a => s"org.apache.hive:$a:${version.fullVersion}") ++
Seq("com.google.guava:guava:14.0.1",
s"org.apache.hadoop:hadoop-client:$hadoopVersion")

// if repositories contain a local repo, it will not download jars from remote repo
val repos: Option[String] = Option(sparkConf.get("spark.jars.repositories", ""))
.filterNot(_.isEmpty).map { repo =>
Seq(repo, "http://www.datanucleus.org/downloads/maven2").mkString(",")
}.orElse(Some("http://www.datanucleus.org/downloads/maven2"))

val ivyRepoPath = Option(sparkConf.get("spark.jars.ivy", "")).filterNot(_.isEmpty)
val ivySettings = Option(sparkConf.get("spark.jars.ivySettings", ""))
.filterNot(_.isEmpty).map { ivySettingsFile =>
SparkSubmitUtils.loadIvySettings(ivySettingsFile, repos, ivyRepoPath)
}.getOrElse {
SparkSubmitUtils.buildIvySettings(repos, ivyRepoPath)
}

val classpath = quietly {
SparkSubmitUtils.resolveMavenCoordinates(
hiveArtifacts.mkString(","),
SparkSubmitUtils.buildIvySettings(
Some("http://www.datanucleus.org/downloads/maven2"),
ivyPath),
ivySettings,
exclusions = version.exclusions)
}
val allFiles = classpath.split(",").map(new File(_)).toSet

// TODO: Remove copy logic.
val tempDir = Utils.createTempDir(namePrefix = s"hive-${version}")
allFiles.foreach(f => FileUtils.copyFileToDirectory(f, tempDir))
logInfo(s"Downloaded metastore jars to ${tempDir.getCanonicalPath}")
tempDir.listFiles().map(_.toURI.toURL)
logInfo(s"Downloaded metastore jars location: $classpath")
classpath.split(",").map(new File(_).toURI.toURL)
}

// A map from a given pair of HiveVersion and Hadoop version to jar files.
Expand Down