diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 5ffdedd1658a..96cf8d19353f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -443,7 +443,6 @@ object SparkSubmit extends CommandLineUtils { OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.submit.deployMode"), OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"), - OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"), OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.driver.memory"), OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, @@ -480,7 +479,12 @@ object SparkSubmit extends CommandLineUtils { sysProp = "spark.driver.cores"), OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER, sysProp = "spark.driver.supervise"), - OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy") + OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, + sysProp = "spark.jars.ivy"), + OptionAssigner(args.repositories, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, + sysProp = "spark.jars.repositories"), + OptionAssigner(args.sparkProperties.get("spark.jars.ivySettings").orNull, + ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.jars.ivySettings") ) // In client mode, launch the application main class directly @@ -1027,10 +1031,8 @@ private[spark] object SparkSubmitUtils { val cr = new ChainResolver cr.setName("user-list") - // add current default resolver, if any - Option(ivySettings.getDefaultResolver).foreach(cr.add) - - // add additional repositories, last resolution in chain takes precedence + // before default resolvers, add additional repositories, + // last resolution in chain takes precedence repositoryList.zipWithIndex.foreach { case (repo, i) => val brr: IBiblioResolver = new IBiblioResolver brr.setM2compatible(true) @@ -1043,6 +1045,9 @@ private[spark] object SparkSubmitUtils { // scalastyle:on println } + // add current default resolver, if any + Option(ivySettings.getDefaultResolver).foreach(cr.add) + ivySettings.addResolver(cr) ivySettings.setDefaultResolver(cr.getName) } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index 266c9d33b5a9..493c9ca610b9 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy -import java.io.{File, OutputStream, PrintStream} +import java.io.{File, FileInputStream, OutputStream, PrintStream} import java.nio.charset.StandardCharsets import scala.collection.mutable.ArrayBuffer @@ -30,6 +30,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkFunSuite import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { @@ -85,8 +86,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { val expected = repos.split(",").map(r => s"$r/") resolver.getResolvers.toArray.zipWithIndex.foreach { case (resolver: AbstractResolver, i) => if (1 < i && i < 3) { - assert(resolver.getName === s"repo-$i") - assert(resolver.asInstanceOf[IBiblioResolver].getRoot === expected(i - 1)) + assert(resolver.getName === s"repo-${i + 1}") + assert(resolver.asInstanceOf[IBiblioResolver].getRoot === expected(i)) } } } @@ -258,4 +259,51 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { assert(jarPath.indexOf("mydep") >= 0, "should find dependency") } } + + test("search for artifact taking order from user defined repositories to default repositories") { + val main = new MavenCoordinate("a", "b", "0.1") + + def isSameFile(left: String, right: String): Boolean = { + val leftInput: FileInputStream = new FileInputStream(left) + val leftMd5 = org.apache.commons.codec.digest.DigestUtils.md5Hex(leftInput) + + val rightInput: FileInputStream = new FileInputStream(left) + val rightMd5 = org.apache.commons.codec.digest.DigestUtils.md5Hex(rightInput) + + leftMd5 == rightMd5 + } + + var userDefinedRepo = Utils.createTempDir("my_m2") + try { + IvyTestUtils.withRepository(main, None, Some(userDefinedRepo)) { repo => + IvyTestUtils.withRepository(main, None, Some(SparkSubmitUtils.m2Path)) { + defaultRepo => + val jarPath = SparkSubmitUtils.resolveMavenCoordinates( + main.toString, + SparkSubmitUtils.buildIvySettings(Option(repo), None), + isTest = false) + assert(isSameFile(Seq(userDefinedRepo, main.groupId, main.artifactId, main.version, + "b-0.1.jar").mkString(File.separatorChar.toString), jarPath)) + assert(jarPath.indexOf("b") >= 0, "should find artifact") + + } + } + + IvyTestUtils.withRepository(main, None, Some(SparkSubmitUtils.m2Path)) { defaultRepo => + IvyTestUtils.withRepository(main, None, Some(userDefinedRepo)) { + repo => + val jarPath = SparkSubmitUtils.resolveMavenCoordinates( + main.toString, + SparkSubmitUtils.buildIvySettings(Option(repo), None), + isTest = false) + assert(isSameFile(Seq(SparkSubmitUtils.m2Path.getCanonicalPath, main.groupId, + main.artifactId, main.version, "b-0.1.jar").mkString(File.separatorChar.toString), + jarPath)) + assert(jarPath.indexOf("b") >= 0, "should find artifact") + } + } + } finally { + Utils.deleteRecursively(userDefinedRepo) + } + } } diff --git a/docs/configuration.md b/docs/configuration.md index 7c040330db63..fc1eb4168e5d 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -438,9 +438,10 @@ Apart from these, the following properties are also available, and may be useful Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths. The coordinates should be groupId:artifactId:version. If spark.jars.ivySettings is given artifacts will be resolved according to the configuration in the file, otherwise artifacts - will be searched for in the local maven repo, then maven central and finally any additional remote - repositories given by the command-line option --repositories. For more details, see - Advanced Dependency Management. + will be searched for in any additional remote repositories given by the command-line option + --repositories, then the local maven repo(${user.home}/.m2/repository), finally maven central. + For more details, see + Advanced Dependency Management. @@ -451,6 +452,14 @@ Apart from these, the following properties are also available, and may be useful provided in spark.jars.packages to avoid dependency conflicts. + + spark.jars.repositories + + + Comma-separated list of additional remote repositories to search for the maven coordinates. It is also + given by the command-line option --repositories + + spark.jars.ivy diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 312ec6776b1e..0a93f038204c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -327,7 +327,6 @@ private[spark] object HiveUtils extends Logging { barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) } else if (hiveMetastoreJars == "maven") { - // TODO: Support for loading the jars from an already downloaded location. logInfo( s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.") IsolatedClientLoader.forVersion( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index d2487a2c034c..1cf5074538e8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -59,7 +59,7 @@ private[hive] object IsolatedClientLoader extends Logging { } else { val (downloadedFiles, actualHadoopVersion) = try { - (downloadVersion(resolvedVersion, hadoopVersion, ivyPath), hadoopVersion) + (downloadVersion(resolvedVersion, hadoopVersion, sparkConf, ivyPath), hadoopVersion) } catch { case e: RuntimeException if e.getMessage.contains("hadoop") => // If the error message contains hadoop, it is probably because the hadoop @@ -70,7 +70,7 @@ private[hive] object IsolatedClientLoader extends Logging { "It is recommended to set jars used by Hive metastore client through " + "spark.sql.hive.metastore.jars in the production environment.") sharesHadoopClasses = false - (downloadVersion(resolvedVersion, "2.6.5", ivyPath), "2.6.5") + (downloadVersion(resolvedVersion, "2.6.5", sparkConf, ivyPath), "2.6.5") } resolvedVersions.put((resolvedVersion, actualHadoopVersion), downloadedFiles) resolvedVersions((resolvedVersion, actualHadoopVersion)) @@ -99,6 +99,7 @@ private[hive] object IsolatedClientLoader extends Logging { private def downloadVersion( version: HiveVersion, hadoopVersion: String, + sparkConf: SparkConf, ivyPath: Option[String]): Seq[URL] = { val hiveArtifacts = version.extraDeps ++ Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde") @@ -106,21 +107,29 @@ private[hive] object IsolatedClientLoader extends Logging { Seq("com.google.guava:guava:14.0.1", s"org.apache.hadoop:hadoop-client:$hadoopVersion") + // if repositories contain a local repo, it will not download jars from remote repo + val repos: Option[String] = Option(sparkConf.get("spark.jars.repositories", "")) + .filterNot(_.isEmpty).map { repo => + Seq(repo, "http://www.datanucleus.org/downloads/maven2").mkString(",") + }.orElse(Some("http://www.datanucleus.org/downloads/maven2")) + + val ivyRepoPath = Option(sparkConf.get("spark.jars.ivy", "")).filterNot(_.isEmpty) + val ivySettings = Option(sparkConf.get("spark.jars.ivySettings", "")) + .filterNot(_.isEmpty).map { ivySettingsFile => + SparkSubmitUtils.loadIvySettings(ivySettingsFile, repos, ivyRepoPath) + }.getOrElse { + SparkSubmitUtils.buildIvySettings(repos, ivyRepoPath) + } + val classpath = quietly { SparkSubmitUtils.resolveMavenCoordinates( hiveArtifacts.mkString(","), - SparkSubmitUtils.buildIvySettings( - Some("http://www.datanucleus.org/downloads/maven2"), - ivyPath), + ivySettings, exclusions = version.exclusions) } - val allFiles = classpath.split(",").map(new File(_)).toSet - // TODO: Remove copy logic. - val tempDir = Utils.createTempDir(namePrefix = s"hive-${version}") - allFiles.foreach(f => FileUtils.copyFileToDirectory(f, tempDir)) - logInfo(s"Downloaded metastore jars to ${tempDir.getCanonicalPath}") - tempDir.listFiles().map(_.toURI.toURL) + logInfo(s"Downloaded metastore jars location: $classpath") + classpath.split(",").map(new File(_).toURI.toURL) } // A map from a given pair of HiveVersion and Hadoop version to jar files.