diff --git a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala new file mode 100644 index 000000000000..e66ce902356a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.io.File +import java.nio.file.Files + +import scala.collection.mutable.HashMap + +import org.apache.commons.io.FileUtils +import org.apache.commons.lang3.StringUtils +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.util.MutableURLClassLoader + +private[deploy] object DependencyUtils { + + def resolveMavenDependencies( + packagesExclusions: String, + packages: String, + repositories: String, + ivyRepoPath: String): String = { + val exclusions: Seq[String] = + if (!StringUtils.isBlank(packagesExclusions)) { + packagesExclusions.split(",") + } else { + Nil + } + // Create the IvySettings, either load from file or build defaults + val ivySettings = sys.props.get("spark.jars.ivySettings").map { ivySettingsFile => + SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(repositories), + Option(ivyRepoPath)) + }.getOrElse { + SparkSubmitUtils.buildIvySettings(Option(repositories), Option(ivyRepoPath)) + } + + SparkSubmitUtils.resolveMavenCoordinates(packages, ivySettings, exclusions = exclusions) + } + + def createTempDir(): File = { + val targetDir = Files.createTempDirectory("tmp").toFile + // scalastyle:off runtimeaddshutdownhook + Runtime.getRuntime.addShutdownHook(new Thread() { + override def run(): Unit = { + FileUtils.deleteQuietly(targetDir) + } + }) + // scalastyle:on runtimeaddshutdownhook + targetDir + } + + def resolveAndDownloadJars(jars: String, userJar: String): String = { + val targetDir = DependencyUtils.createTempDir() + val hadoopConf = new Configuration() + val sparkProperties = new HashMap[String, String]() + val securityProperties = List("spark.ssl.fs.trustStore", "spark.ssl.trustStore", + "spark.ssl.fs.trustStorePassword", "spark.ssl.trustStorePassword", + "spark.ssl.fs.protocol", "spark.ssl.protocol") + + securityProperties.foreach { pName => + sys.props.get(pName).foreach { pValue => + sparkProperties.put(pName, pValue) + } + } + + Option(jars) + .map { + SparkSubmit.resolveGlobPaths(_, hadoopConf) + .split(",") + .filterNot(_.contains(userJar.split("/").last)) + .mkString(",") + } + .filterNot(_ == "") + .map(SparkSubmit.downloadFileList(_, targetDir, sparkProperties, hadoopConf)) + .orNull + } + + def addJarsToClassPath(jars: String, loader: MutableURLClassLoader): Unit = { + if (jars != null) { + for (jar <- jars.split(",")) { + SparkSubmit.addJarToClasspath(jar, loader) + } + } + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 0ea14361b2f7..f0d978fc3c90 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -20,7 +20,6 @@ package org.apache.spark.deploy import java.io._ import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException} import java.net.URL -import java.nio.file.Files import java.security.{KeyStore, PrivilegedExceptionAction} import java.security.cert.X509Certificate import java.text.ParseException @@ -31,7 +30,6 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, Map} import scala.util.Properties import com.google.common.io.ByteStreams -import org.apache.commons.io.FileUtils import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.{Configuration => HadoopConfiguration} import org.apache.hadoop.fs.{FileSystem, Path} @@ -300,28 +298,14 @@ object SparkSubmit extends CommandLineUtils { } val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER + val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER - if (!isMesosCluster) { + if (!isMesosCluster && !isStandAloneCluster) { // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files // too for packages that include Python code - val exclusions: Seq[String] = - if (!StringUtils.isBlank(args.packagesExclusions)) { - args.packagesExclusions.split(",") - } else { - Nil - } - - // Create the IvySettings, either load from file or build defaults - val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map { ivySettingsFile => - SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(args.repositories), - Option(args.ivyRepoPath)) - }.getOrElse { - SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath)) - } - - val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages, - ivySettings, exclusions = exclusions) - + val resolvedMavenCoordinates = + DependencyUtils.resolveMavenDependencies(args.packagesExclusions, args.packages, + args.repositories, args.ivyRepoPath) if (!StringUtils.isBlank(resolvedMavenCoordinates)) { args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) @@ -338,14 +322,7 @@ object SparkSubmit extends CommandLineUtils { } val hadoopConf = new HadoopConfiguration() - val targetDir = Files.createTempDirectory("tmp").toFile - // scalastyle:off runtimeaddshutdownhook - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run(): Unit = { - FileUtils.deleteQuietly(targetDir) - } - }) - // scalastyle:on runtimeaddshutdownhook + val targetDir = DependencyUtils.createTempDir() // Resolve glob path for different resources. args.jars = Option(args.jars).map(resolveGlobPaths(_, hadoopConf)).orNull @@ -473,11 +450,13 @@ object SparkSubmit extends CommandLineUtils { OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.driver.extraLibraryPath"), - // Mesos only - propagate attributes for dependency resolution at the driver side - OptionAssigner(args.packages, MESOS, CLUSTER, sysProp = "spark.jars.packages"), - OptionAssigner(args.repositories, MESOS, CLUSTER, sysProp = "spark.jars.repositories"), - OptionAssigner(args.ivyRepoPath, MESOS, CLUSTER, sysProp = "spark.jars.ivy"), - OptionAssigner(args.packagesExclusions, MESOS, CLUSTER, sysProp = "spark.jars.excludes"), + // Propagate attributes for dependency resolution at the driver side + OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars.packages"), + OptionAssigner(args.repositories, STANDALONE | MESOS, CLUSTER, + sysProp = "spark.jars.repositories"), + OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars.ivy"), + OptionAssigner(args.packagesExclusions, STANDALONE | MESOS, + CLUSTER, sysProp = "spark.jars.excludes"), // Yarn only OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"), @@ -780,7 +759,7 @@ object SparkSubmit extends CommandLineUtils { } } - private def addJarToClasspath(localJar: String, loader: MutableURLClassLoader) { + private[deploy] def addJarToClasspath(localJar: String, loader: MutableURLClassLoader) { val uri = Utils.resolveURI(localJar) uri.getScheme match { case "file" | "local" => @@ -845,7 +824,7 @@ object SparkSubmit extends CommandLineUtils { * Merge a sequence of comma-separated file lists, some of which may be null to indicate * no files, into a single comma-separated string. */ - private def mergeFileLists(lists: String*): String = { + private[deploy] def mergeFileLists(lists: String*): String = { val merged = lists.filterNot(StringUtils.isBlank) .flatMap(_.split(",")) .mkString(",") @@ -968,7 +947,7 @@ object SparkSubmit extends CommandLineUtils { } } - private def resolveGlobPaths(paths: String, hadoopConf: HadoopConfiguration): String = { + private[deploy] def resolveGlobPaths(paths: String, hadoopConf: HadoopConfiguration): String = { require(paths != null, "paths cannot be null.") paths.split(",").map(_.trim).filter(_.nonEmpty).flatMap { path => val uri = Utils.resolveURI(path) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala index 6799f78ec0c1..cd3e361530c1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -19,7 +19,10 @@ package org.apache.spark.deploy.worker import java.io.File +import org.apache.commons.lang3.StringUtils + import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.deploy.{DependencyUtils, SparkSubmit} import org.apache.spark.rpc.RpcEnv import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils} @@ -51,6 +54,7 @@ object DriverWrapper { new MutableURLClassLoader(Array(userJarUrl), currentLoader) } Thread.currentThread.setContextClassLoader(loader) + setupDependencies(loader, userJar) // Delegate to supplied main class val clazz = Utils.classForName(mainClass) @@ -66,4 +70,23 @@ object DriverWrapper { System.exit(-1) } } + + private def setupDependencies(loader: MutableURLClassLoader, userJar: String): Unit = { + val Seq(packagesExclusions, packages, repositories, ivyRepoPath) = + Seq("spark.jars.excludes", "spark.jars.packages", "spark.jars.repositories", "spark.jars.ivy") + .map(sys.props.get(_).orNull) + + val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(packagesExclusions, + packages, repositories, ivyRepoPath) + val jars = { + val jarsProp = sys.props.get("spark.jars").orNull + if (!StringUtils.isBlank(resolvedMavenCoordinates)) { + SparkSubmit.mergeFileLists(jarsProp, resolvedMavenCoordinates) + } else { + jarsProp + } + } + val localJars = DependencyUtils.resolveAndDownloadJars(jars, userJar) + DependencyUtils.addJarsToClassPath(localJars, loader) + } }