Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy

import java.io.File
import java.nio.file.Files

import scala.collection.mutable.HashMap

import org.apache.commons.io.FileUtils
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration

import org.apache.spark.util.MutableURLClassLoader

private[deploy] object DependencyUtils {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this new object? It make the order of calls confusing. For instance SparkSubmit calls DependencyUtils which makes calls to SparkSubmitUtils. Can we just move this to SparkSubmitUtils?

Copy link
Contributor

@vanzin vanzin Aug 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked for this change, because SparkSubmitUtils is kind of a bad place to keep this code if things other than SparkSubmit start using it. The calls into SparkSubmitUtils are not optimal but those can be refactored separately.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that's fine if you are thinking the calls back to SparkSubmitUtils can be moved here eventually.


def resolveMavenDependencies(
packagesExclusions: String,
packages: String,
repositories: String,
ivyRepoPath: String): String = {
val exclusions: Seq[String] =
if (!StringUtils.isBlank(packagesExclusions)) {
packagesExclusions.split(",")
} else {
Nil
}
// Create the IvySettings, either load from file or build defaults
val ivySettings = sys.props.get("spark.jars.ivySettings").map { ivySettingsFile =>
SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(repositories),
Option(ivyRepoPath))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i think this could be moved to the line above?

}.getOrElse {
SparkSubmitUtils.buildIvySettings(Option(repositories), Option(ivyRepoPath))
}

SparkSubmitUtils.resolveMavenCoordinates(packages, ivySettings, exclusions = exclusions)
}

def createTempDir(): File = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just use o.a.s.util.Utils.createTempDir? @vanzin it's slightly different but I think serves the same purpose, what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code cannot call into things that initialize logging, and the Utils method initializes logging indirectly through ShutdownHookManager.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, makes sense thanks for clarifying that

val targetDir = Files.createTempDirectory("tmp").toFile
// scalastyle:off runtimeaddshutdownhook
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: you need to add scalastyle:on after this block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it.

Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
FileUtils.deleteQuietly(targetDir)
}
})
// scalastyle:on runtimeaddshutdownhook
targetDir
}

def resolveAndDownloadJars(jars: String, userJar: String): String = {
val targetDir = DependencyUtils.createTempDir()
val hadoopConf = new Configuration()
val sparkProperties = new HashMap[String, String]()
val securityProperties = List("spark.ssl.fs.trustStore", "spark.ssl.trustStore",
"spark.ssl.fs.trustStorePassword", "spark.ssl.trustStorePassword",
"spark.ssl.fs.protocol", "spark.ssl.protocol")

securityProperties.foreach { pName =>
sys.props.get(pName).foreach { pValue =>
sparkProperties.put(pName, pValue)
}
}

Option(jars)
.map {
SparkSubmit.resolveGlobPaths(_, hadoopConf)
.split(",")
.filterNot(_.contains(userJar.split("/").last))
.mkString(",")
}
.filterNot(_ == "")
.map(SparkSubmit.downloadFileList(_, targetDir, sparkProperties, hadoopConf))
.orNull
}

def addJarsToClassPath(jars: String, loader: MutableURLClassLoader): Unit = {
if (jars != null) {
for (jar <- jars.split(",")) {
SparkSubmit.addJarToClasspath(jar, loader)
}
}
}
}
53 changes: 16 additions & 37 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.deploy
import java.io._
import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
import java.net.URL
import java.nio.file.Files
import java.security.{KeyStore, PrivilegedExceptionAction}
import java.security.cert.X509Certificate
import java.text.ParseException
Expand All @@ -31,7 +30,6 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.util.Properties

import com.google.common.io.ByteStreams
import org.apache.commons.io.FileUtils
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.{FileSystem, Path}
Expand Down Expand Up @@ -300,28 +298,14 @@ object SparkSubmit extends CommandLineUtils {
}
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER

if (!isMesosCluster) {
if (!isMesosCluster && !isStandAloneCluster) {
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code
val exclusions: Seq[String] =
if (!StringUtils.isBlank(args.packagesExclusions)) {
args.packagesExclusions.split(",")
} else {
Nil
}

// Create the IvySettings, either load from file or build defaults
val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map { ivySettingsFile =>
SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(args.repositories),
Option(args.ivyRepoPath))
}.getOrElse {
SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath))
}

val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages,
ivySettings, exclusions = exclusions)

val resolvedMavenCoordinates =
DependencyUtils.resolveMavenDependencies(args.packagesExclusions, args.packages,
args.repositories, args.ivyRepoPath)

if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
Expand All @@ -338,14 +322,7 @@ object SparkSubmit extends CommandLineUtils {
}

val hadoopConf = new HadoopConfiguration()
val targetDir = Files.createTempDirectory("tmp").toFile
// scalastyle:off runtimeaddshutdownhook
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
FileUtils.deleteQuietly(targetDir)
}
})
// scalastyle:on runtimeaddshutdownhook
val targetDir = DependencyUtils.createTempDir()

// Resolve glob path for different resources.
args.jars = Option(args.jars).map(resolveGlobPaths(_, hadoopConf)).orNull
Expand Down Expand Up @@ -473,11 +450,13 @@ object SparkSubmit extends CommandLineUtils {
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.driver.extraLibraryPath"),

// Mesos only - propagate attributes for dependency resolution at the driver side
OptionAssigner(args.packages, MESOS, CLUSTER, sysProp = "spark.jars.packages"),
OptionAssigner(args.repositories, MESOS, CLUSTER, sysProp = "spark.jars.repositories"),
OptionAssigner(args.ivyRepoPath, MESOS, CLUSTER, sysProp = "spark.jars.ivy"),
OptionAssigner(args.packagesExclusions, MESOS, CLUSTER, sysProp = "spark.jars.excludes"),
// Propagate attributes for dependency resolution at the driver side
OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars.packages"),
OptionAssigner(args.repositories, STANDALONE | MESOS, CLUSTER,
sysProp = "spark.jars.repositories"),
OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars.ivy"),
OptionAssigner(args.packagesExclusions, STANDALONE | MESOS,
CLUSTER, sysProp = "spark.jars.excludes"),

// Yarn only
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"),
Expand Down Expand Up @@ -780,7 +759,7 @@ object SparkSubmit extends CommandLineUtils {
}
}

private def addJarToClasspath(localJar: String, loader: MutableURLClassLoader) {
private[deploy] def addJarToClasspath(localJar: String, loader: MutableURLClassLoader) {
val uri = Utils.resolveURI(localJar)
uri.getScheme match {
case "file" | "local" =>
Expand Down Expand Up @@ -845,7 +824,7 @@ object SparkSubmit extends CommandLineUtils {
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
* no files, into a single comma-separated string.
*/
private def mergeFileLists(lists: String*): String = {
private[deploy] def mergeFileLists(lists: String*): String = {
val merged = lists.filterNot(StringUtils.isBlank)
.flatMap(_.split(","))
.mkString(",")
Expand Down Expand Up @@ -968,7 +947,7 @@ object SparkSubmit extends CommandLineUtils {
}
}

private def resolveGlobPaths(paths: String, hadoopConf: HadoopConfiguration): String = {
private[deploy] def resolveGlobPaths(paths: String, hadoopConf: HadoopConfiguration): String = {
require(paths != null, "paths cannot be null.")
paths.split(",").map(_.trim).filter(_.nonEmpty).flatMap { path =>
val uri = Utils.resolveURI(path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package org.apache.spark.deploy.worker

import java.io.File

import org.apache.commons.lang3.StringUtils

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.{DependencyUtils, SparkSubmit}
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}

Expand Down Expand Up @@ -51,6 +54,7 @@ object DriverWrapper {
new MutableURLClassLoader(Array(userJarUrl), currentLoader)
}
Thread.currentThread.setContextClassLoader(loader)
setupDependencies(loader, userJar)

// Delegate to supplied main class
val clazz = Utils.classForName(mainClass)
Expand All @@ -66,4 +70,23 @@ object DriverWrapper {
System.exit(-1)
}
}

private def setupDependencies(loader: MutableURLClassLoader, userJar: String): Unit = {
val Seq(packagesExclusions, packages, repositories, ivyRepoPath) =
Seq("spark.jars.excludes", "spark.jars.packages", "spark.jars.repositories", "spark.jars.ivy")
.map(sys.props.get(_).orNull)

val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(packagesExclusions,
packages, repositories, ivyRepoPath)
val jars = {
val jarsProp = sys.props.get("spark.jars").orNull
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
SparkSubmit.mergeFileLists(jarsProp, resolvedMavenCoordinates)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this same code repeated elsewhere? if so maybe it could be put into a common function

} else {
jarsProp
}
}
val localJars = DependencyUtils.resolveAndDownloadJars(jars, userJar)
DependencyUtils.addJarsToClassPath(localJars, loader)
}
}