diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 7b6d5a394bc35..a6ad365375117 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -17,10 +17,11 @@ package org.apache.spark.deploy -import java.io.{File, PrintStream} +import java.io.{File, IOException, PrintStream} import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException} import java.net.URL import java.security.PrivilegedExceptionAction +import java.text.ParseException import scala.annotation.tailrec import scala.collection.mutable.{ArrayBuffer, HashMap, Map} @@ -291,8 +292,12 @@ object SparkSubmit { } else { Nil } + + val ivySettings = Option(args.ivySettingsFile).map(SparkSubmitUtils.loadIvySettings).getOrElse( + SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath))) + val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages, - Option(args.repositories), Option(args.ivyRepoPath), exclusions = exclusions) + ivySettings, exclusions = exclusions) if (!StringUtils.isBlank(resolvedMavenCoordinates)) { args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) if (args.isPython) { @@ -445,6 +450,8 @@ object SparkSubmit { sysProp = "spark.submit.deployMode"), OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"), OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"), + OptionAssigner(args.ivySettingsFile, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, + sysProp = "spark.ivy.settings"), OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.driver.memory"), OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, @@ -975,6 +982,56 @@ private[spark] object SparkSubmitUtils { } } + /** + * Build Ivy Settings using options with default resolvers + * @param remoteRepos Comma-delimited string of remote repositories other than maven central + * @param ivyPath The path to the local ivy repository + * @return An IvySettings object + */ + def buildIvySettings( + remoteRepos: Option[String], + ivyPath: Option[String]): IvySettings = { + val ivySettings: IvySettings = new IvySettings + + // set ivy settings for location of cache + val alternateIvyCache = ivyPath.getOrElse("") + val packagesDirectory: File = + if (alternateIvyCache == null || alternateIvyCache.trim.isEmpty) { + new File(ivySettings.getDefaultIvyUserDir, "jars") + } else { + ivySettings.setDefaultIvyUserDir(new File(alternateIvyCache)) + ivySettings.setDefaultCache(new File(alternateIvyCache, "cache")) + new File(alternateIvyCache, "jars") + } + + // create a pattern matcher + ivySettings.addMatcher(new GlobPatternMatcher) + // create the dependency resolvers + val repoResolver = createRepoResolvers(remoteRepos, ivySettings) + ivySettings.addResolver(repoResolver) + ivySettings.setDefaultResolver(repoResolver.getName) + ivySettings + } + + /** + * Load Ivy settings from a given filename, using supplied resolvers + * @param settingsFile Path to Ivy settings file + * @return An IvySettings object + */ + def loadIvySettings(settingsFile: String): IvySettings = { + val file = new File(settingsFile) + require(file.exists(), s"Ivy settings file $file does not exist") + require(file.isFile(), s"Ivy settings file $file is not a normal file") + val ivySettings: IvySettings = new IvySettings + try { + ivySettings.load(file) + } catch { + case e @ (_: IOException | _: ParseException) => + throw new SparkException(s"Failed when loading Ivy settings from $settingsFile", e) + } + ivySettings + } + /** A nice function to use in tests as well. Values are dummy strings. */ def getModuleDescriptor: DefaultModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance( ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-parent", "1.0")) @@ -982,16 +1039,14 @@ private[spark] object SparkSubmitUtils { /** * Resolves any dependencies that were supplied through maven coordinates * @param coordinates Comma-delimited string of maven coordinates - * @param remoteRepos Comma-delimited string of remote repositories other than maven central - * @param ivyPath The path to the local ivy repository + * @param ivySettings An IvySettings containing resolvers to use * @param exclusions Exclusions to apply when resolving transitive dependencies * @return The comma-delimited path to the jars of the given maven artifacts including their * transitive dependencies */ def resolveMavenCoordinates( coordinates: String, - remoteRepos: Option[String], - ivyPath: Option[String], + ivySettings: IvySettings, exclusions: Seq[String] = Nil, isTest: Boolean = false): String = { if (coordinates == null || coordinates.trim.isEmpty) { @@ -1002,32 +1057,14 @@ private[spark] object SparkSubmitUtils { // To prevent ivy from logging to system out System.setOut(printStream) val artifacts = extractMavenCoordinates(coordinates) - // Default configuration name for ivy - val ivyConfName = "default" - // set ivy settings for location of cache - val ivySettings: IvySettings = new IvySettings // Directories for caching downloads through ivy and storing the jars when maven coordinates // are supplied to spark-submit - val alternateIvyCache = ivyPath.getOrElse("") - val packagesDirectory: File = - if (alternateIvyCache == null || alternateIvyCache.trim.isEmpty) { - new File(ivySettings.getDefaultIvyUserDir, "jars") - } else { - ivySettings.setDefaultIvyUserDir(new File(alternateIvyCache)) - ivySettings.setDefaultCache(new File(alternateIvyCache, "cache")) - new File(alternateIvyCache, "jars") - } + val packagesDirectory: File = new File(ivySettings.getDefaultIvyUserDir, "jars") // scalastyle:off println printStream.println( s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}") printStream.println(s"The jars for the packages stored in: $packagesDirectory") // scalastyle:on println - // create a pattern matcher - ivySettings.addMatcher(new GlobPatternMatcher) - // create the dependency resolvers - val repoResolver = createRepoResolvers(remoteRepos, ivySettings) - ivySettings.addResolver(repoResolver) - ivySettings.setDefaultResolver(repoResolver.getName) val ivy = Ivy.newInstance(ivySettings) // Set resolve options to download transitive dependencies as well @@ -1043,6 +1080,9 @@ private[spark] object SparkSubmitUtils { resolveOptions.setDownload(true) } + // Default configuration name for ivy + val ivyConfName = "default" + // A Module descriptor must be specified. Entries are dummy strings val md = getModuleDescriptor // clear ivy resolution from previous launches. The resolution file is usually at diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index f1761e7c1ec92..370672e1dd2c4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -60,6 +60,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S var packages: String = null var repositories: String = null var ivyRepoPath: String = null + var ivySettingsFile: String = null var packagesExclusions: String = null var verbose: Boolean = false var isPython: Boolean = false @@ -175,6 +176,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull files = Option(files).orElse(sparkProperties.get("spark.files")).orNull ivyRepoPath = sparkProperties.get("spark.jars.ivy").orNull + ivySettingsFile = sparkProperties.get("spark.ivy.settings").orNull packages = Option(packages).orElse(sparkProperties.get("spark.jars.packages")).orNull packagesExclusions = Option(packagesExclusions) .orElse(sparkProperties.get("spark.jars.excludes")).orNull diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index 4877710c1237d..b6d822c8ce18a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -126,8 +126,10 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { val main = MavenCoordinate("my.awesome.lib", "mylib", "0.1") IvyTestUtils.withRepository(main, None, None) { repo => // end to end - val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, Option(repo), - Option(tempIvyPath), isTest = true) + val jarPath = SparkSubmitUtils.resolveMavenCoordinates( + main.toString, + SparkSubmitUtils.buildIvySettings(Option(repo), Option(tempIvyPath)), + isTest = true) assert(jarPath.indexOf(tempIvyPath) >= 0, "should use non-default ivy path") } } @@ -137,7 +139,9 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { val dep = "my.great.dep:mydep:0.5" // Local M2 repository IvyTestUtils.withRepository(main, Some(dep), Some(SparkSubmitUtils.m2Path)) { repo => - val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None, + val jarPath = SparkSubmitUtils.resolveMavenCoordinates( + main.toString, + SparkSubmitUtils.buildIvySettings(None, None), isTest = true) assert(jarPath.indexOf("mylib") >= 0, "should find artifact") assert(jarPath.indexOf("mydep") >= 0, "should find dependency") @@ -146,7 +150,9 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { val settings = new IvySettings val ivyLocal = new File(settings.getDefaultIvyUserDir, "local" + File.separator) IvyTestUtils.withRepository(main, Some(dep), Some(ivyLocal), useIvyLayout = true) { repo => - val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None, + val jarPath = SparkSubmitUtils.resolveMavenCoordinates( + main.toString, + SparkSubmitUtils.buildIvySettings(None, None), isTest = true) assert(jarPath.indexOf("mylib") >= 0, "should find artifact") assert(jarPath.indexOf("mydep") >= 0, "should find dependency") @@ -156,8 +162,10 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { settings.setDefaultIvyUserDir(new File(tempIvyPath)) IvyTestUtils.withRepository(main, Some(dep), Some(dummyIvyLocal), useIvyLayout = true, ivySettings = settings) { repo => - val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, - Some(tempIvyPath), isTest = true) + val jarPath = SparkSubmitUtils.resolveMavenCoordinates( + main.toString, + SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)), + isTest = true) assert(jarPath.indexOf("mylib") >= 0, "should find artifact") assert(jarPath.indexOf(tempIvyPath) >= 0, "should be in new ivy path") assert(jarPath.indexOf("mydep") >= 0, "should find dependency") @@ -166,7 +174,10 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { test("dependency not found throws RuntimeException") { intercept[RuntimeException] { - SparkSubmitUtils.resolveMavenCoordinates("a:b:c", None, None, isTest = true) + SparkSubmitUtils.resolveMavenCoordinates( + "a:b:c", + SparkSubmitUtils.buildIvySettings(None, None), + isTest = true) } } @@ -178,12 +189,17 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0").mkString(",") + ",org.apache.spark:spark-core_fake:1.2.0" - val path = SparkSubmitUtils.resolveMavenCoordinates(coordinates, None, None, isTest = true) + val path = SparkSubmitUtils.resolveMavenCoordinates( + coordinates, + SparkSubmitUtils.buildIvySettings(None, None), + isTest = true) assert(path === "", "should return empty path") val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.10", "1.2.0") IvyTestUtils.withRepository(main, None, None) { repo => - val files = SparkSubmitUtils.resolveMavenCoordinates(coordinates + "," + main.toString, - Some(repo), None, isTest = true) + val files = SparkSubmitUtils.resolveMavenCoordinates( + coordinates + "," + main.toString, + SparkSubmitUtils.buildIvySettings(Some(repo), None), + isTest = true) assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact") } } @@ -192,8 +208,11 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { val main = new MavenCoordinate("my.great.lib", "mylib", "0.1") val dep = "my.great.dep:mydep:0.5" IvyTestUtils.withRepository(main, Some(dep), None) { repo => - val files = SparkSubmitUtils.resolveMavenCoordinates(main.toString, - Some(repo), None, Seq("my.great.dep:mydep"), isTest = true) + val files = SparkSubmitUtils.resolveMavenCoordinates( + main.toString, + SparkSubmitUtils.buildIvySettings(Some(repo), None), + Seq("my.great.dep:mydep"), + isTest = true) assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact") assert(files.indexOf("my.great.dep") < 0, "Returned excluded artifact") } diff --git a/docs/configuration.md b/docs/configuration.md index 8aea74505e28b..c550441786ca8 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -406,9 +406,10 @@ Apart from these, the following properties are also available, and may be useful
spark.jars.ivy. The format for the coordinates should be
- groupId:artifactId:version.
+ classpaths. If spark.ivy.settings is given artifacts will be resolved according
+ to the configuration in the file, otherwise artifacts will be searched for in the local maven repo,
+ then maven central and finally any additional remote repositories given by spark.jars.ivy.
+ The format for the coordinates should be groupId:artifactId:version.
spark.jars.packages.
spark.ivy.settingsspark.jars.packages.
+ Will override spark.jars.ivy, so they shouldn't be used together. Useful for allowing Spark to
+ resolve artifacts from behind a firewall e.g. via an in-house artifact server like Artifactory. Details on
+ the settings file format can be found at http://ant.apache.org/ivy/history/latest-milestone/settings.html
+ spark.pyspark.driver.python