Skip to content

Commit 186a3d5

Browse files
committed
Merge remote-tracking branch 'upstream/master' into SPARK-33884
2 parents ae3b284 + b33fa53 commit 186a3d5

File tree

116 files changed

+2600
-561
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

116 files changed

+2600
-561
lines changed

core/src/main/resources/org/apache/spark/ui/static/stagepage.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -946,7 +946,8 @@ $(document).ready(function () {
946946
},
947947
{
948948
data : function (row, type) {
949-
if (row.taskMetrics && row.taskMetrics.shuffleReadMetrics && row.taskMetrics.shuffleReadMetrics.localBytesRead > 0) {
949+
if (row.taskMetrics && row.taskMetrics.shuffleReadMetrics &&
950+
(row.taskMetrics.shuffleReadMetrics.localBytesRead > 0 || row.taskMetrics.shuffleReadMetrics.remoteBytesRead > 0)) {
950951
var totalBytesRead = parseInt(row.taskMetrics.shuffleReadMetrics.localBytesRead) + parseInt(row.taskMetrics.shuffleReadMetrics.remoteBytesRead);
951952
if (type === 'display') {
952953
return formatBytes(totalBytesRead, type) + " / " + row.taskMetrics.shuffleReadMetrics.recordsRead;

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1631,7 +1631,7 @@ class SparkContext(config: SparkConf) extends Logging {
16311631
// Fetch the file locally so that closures which are run on the driver can still use the
16321632
// SparkFiles API to access files.
16331633
Utils.fetchFile(uri.toString, new File(SparkFiles.getRootDirectory()), conf,
1634-
env.securityManager, hadoopConfiguration, timestamp, useCache = false)
1634+
hadoopConfiguration, timestamp, useCache = false)
16351635
postEnvironmentUpdate()
16361636
} else if (
16371637
isArchive &&
@@ -1643,7 +1643,7 @@ class SparkContext(config: SparkConf) extends Logging {
16431643
val uriToUse = if (!isLocal && scheme == "file") uri else new URI(key)
16441644
val uriToDownload = UriBuilder.fromUri(uriToUse).fragment(null).build()
16451645
val source = Utils.fetchFile(uriToDownload.toString, Utils.createTempDir(), conf,
1646-
env.securityManager, hadoopConfiguration, timestamp, useCache = false, shouldUntar = false)
1646+
hadoopConfiguration, timestamp, useCache = false, shouldUntar = false)
16471647
val dest = new File(
16481648
SparkFiles.getRootDirectory(),
16491649
if (uri.getFragment != null) uri.getFragment else source.getName)
@@ -1929,7 +1929,7 @@ class SparkContext(config: SparkConf) extends Logging {
19291929
}
19301930

19311931
private def addJar(path: String, addedOnSubmit: Boolean): Unit = {
1932-
def addLocalJarFile(file: File): String = {
1932+
def addLocalJarFile(file: File): Seq[String] = {
19331933
try {
19341934
if (!file.exists()) {
19351935
throw new FileNotFoundException(s"Jar ${file.getAbsolutePath} not found")
@@ -1938,15 +1938,15 @@ class SparkContext(config: SparkConf) extends Logging {
19381938
throw new IllegalArgumentException(
19391939
s"Directory ${file.getAbsoluteFile} is not allowed for addJar")
19401940
}
1941-
env.rpcEnv.fileServer.addJar(file)
1941+
Seq(env.rpcEnv.fileServer.addJar(file))
19421942
} catch {
19431943
case NonFatal(e) =>
19441944
logError(s"Failed to add $path to Spark environment", e)
1945-
null
1945+
Nil
19461946
}
19471947
}
19481948

1949-
def checkRemoteJarFile(path: String): String = {
1949+
def checkRemoteJarFile(path: String): Seq[String] = {
19501950
val hadoopPath = new Path(path)
19511951
val scheme = hadoopPath.toUri.getScheme
19521952
if (!Array("http", "https", "ftp").contains(scheme)) {
@@ -1959,47 +1959,58 @@ class SparkContext(config: SparkConf) extends Logging {
19591959
throw new IllegalArgumentException(
19601960
s"Directory ${path} is not allowed for addJar")
19611961
}
1962-
path
1962+
Seq(path)
19631963
} catch {
19641964
case NonFatal(e) =>
19651965
logError(s"Failed to add $path to Spark environment", e)
1966-
null
1966+
Nil
19671967
}
19681968
} else {
1969-
path
1969+
Seq(path)
19701970
}
19711971
}
19721972

19731973
if (path == null || path.isEmpty) {
19741974
logWarning("null or empty path specified as parameter to addJar")
19751975
} else {
1976-
val key = if (path.contains("\\") && Utils.isWindows) {
1976+
val (keys, scheme) = if (path.contains("\\") && Utils.isWindows) {
19771977
// For local paths with backslashes on Windows, URI throws an exception
1978-
addLocalJarFile(new File(path))
1978+
(addLocalJarFile(new File(path)), "local")
19791979
} else {
19801980
val uri = new Path(path).toUri
19811981
// SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
19821982
Utils.validateURL(uri)
1983-
uri.getScheme match {
1983+
val uriScheme = uri.getScheme
1984+
val jarPaths = uriScheme match {
19841985
// A JAR file which exists only on the driver node
19851986
case null =>
19861987
// SPARK-22585 path without schema is not url encoded
19871988
addLocalJarFile(new File(uri.getPath))
19881989
// A JAR file which exists only on the driver node
19891990
case "file" => addLocalJarFile(new File(uri.getPath))
19901991
// A JAR file which exists locally on every worker node
1991-
case "local" => "file:" + uri.getPath
1992+
case "local" => Seq("file:" + uri.getPath)
1993+
case "ivy" =>
1994+
// Since `new Path(path).toUri` will lose query information,
1995+
// so here we use `URI.create(path)`
1996+
DependencyUtils.resolveMavenDependencies(URI.create(path))
1997+
.flatMap(jar => addLocalJarFile(new File(jar)))
19921998
case _ => checkRemoteJarFile(path)
19931999
}
2000+
(jarPaths, uriScheme)
19942001
}
1995-
if (key != null) {
2002+
if (keys.nonEmpty) {
19962003
val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis
1997-
if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
1998-
logInfo(s"Added JAR $path at $key with timestamp $timestamp")
2004+
val (added, existed) = keys.partition(addedJars.putIfAbsent(_, timestamp).isEmpty)
2005+
if (added.nonEmpty) {
2006+
val jarMessage = if (scheme != "ivy") "JAR" else "dependency jars of Ivy URI"
2007+
logInfo(s"Added $jarMessage $path at ${added.mkString(",")} with timestamp $timestamp")
19992008
postEnvironmentUpdate()
2000-
} else {
2001-
logWarning(s"The jar $path has been added already. Overwriting of added jars " +
2002-
"is not supported in the current version.")
2009+
}
2010+
if (existed.nonEmpty) {
2011+
val jarMessage = if (scheme != "ivy") "JAR" else "dependency jars of Ivy URI"
2012+
logInfo(s"The $jarMessage $path at ${existed.mkString(",")} has been added already." +
2013+
" Overwriting of added jar is not supported in the current version.")
20032014
}
20042015
}
20052016
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -304,28 +304,29 @@ private[spark] class SparkSubmit extends Logging {
304304
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
305305
// too for packages that include Python code
306306
val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(
307-
args.packagesExclusions, args.packages, args.repositories, args.ivyRepoPath,
308-
args.ivySettingsPath)
307+
packagesTransitive = true, args.packagesExclusions, args.packages,
308+
args.repositories, args.ivyRepoPath, args.ivySettingsPath)
309309

310-
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
310+
if (resolvedMavenCoordinates.nonEmpty) {
311311
// In K8s client mode, when in the driver, add resolved jars early as we might need
312312
// them at the submit time for artifact downloading.
313313
// For example we might use the dependencies for downloading
314314
// files from a Hadoop Compatible fs e.g. S3. In this case the user might pass:
315315
// --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6
316316
if (isKubernetesClusterModeDriver) {
317317
val loader = getSubmitClassLoader(sparkConf)
318-
for (jar <- resolvedMavenCoordinates.split(",")) {
318+
for (jar <- resolvedMavenCoordinates) {
319319
addJarToClasspath(jar, loader)
320320
}
321321
} else if (isKubernetesCluster) {
322322
// We need this in K8s cluster mode so that we can upload local deps
323323
// via the k8s application, like in cluster mode driver
324-
childClasspath ++= resolvedMavenCoordinates.split(",")
324+
childClasspath ++= resolvedMavenCoordinates
325325
} else {
326-
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
326+
args.jars = mergeFileLists(args.jars, mergeFileLists(resolvedMavenCoordinates: _*))
327327
if (args.isPython || isInternal(args.primaryResource)) {
328-
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
328+
args.pyFiles = mergeFileLists(args.pyFiles,
329+
mergeFileLists(resolvedMavenCoordinates: _*))
329330
}
330331
}
331332
}
@@ -373,13 +374,13 @@ private[spark] class SparkSubmit extends Logging {
373374
var localPyFiles: String = null
374375
if (deployMode == CLIENT) {
375376
localPrimaryResource = Option(args.primaryResource).map {
376-
downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr)
377+
downloadFile(_, targetDir, sparkConf, hadoopConf)
377378
}.orNull
378379
localJars = Option(args.jars).map {
379-
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
380+
downloadFileList(_, targetDir, sparkConf, hadoopConf)
380381
}.orNull
381382
localPyFiles = Option(args.pyFiles).map {
382-
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
383+
downloadFileList(_, targetDir, sparkConf, hadoopConf)
383384
}.orNull
384385

385386
if (isKubernetesClusterModeDriver) {
@@ -388,14 +389,14 @@ private[spark] class SparkSubmit extends Logging {
388389
// Explicitly download the related files here
389390
args.jars = localJars
390391
val filesLocalFiles = Option(args.files).map {
391-
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
392+
downloadFileList(_, targetDir, sparkConf, hadoopConf)
392393
}.orNull
393394
val archiveLocalFiles = Option(args.archives).map { uris =>
394395
val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI)
395396
val localArchives = downloadFileList(
396397
resolvedUris.map(
397398
UriBuilder.fromUri(_).fragment(null).build().toString).mkString(","),
398-
targetDir, sparkConf, hadoopConf, secMgr)
399+
targetDir, sparkConf, hadoopConf)
399400

400401
// SPARK-33748: this mimics the behaviour of Yarn cluster mode. If the driver is running
401402
// in cluster mode, the archives should be available in the driver's current working
@@ -446,7 +447,7 @@ private[spark] class SparkSubmit extends Logging {
446447
if (file.exists()) {
447448
file.toURI.toString
448449
} else {
449-
downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr)
450+
downloadFile(resource, targetDir, sparkConf, hadoopConf)
450451
}
451452
case _ => uri.toString
452453
}
@@ -1201,7 +1202,7 @@ private[spark] object SparkSubmitUtils {
12011202
*/
12021203
def resolveDependencyPaths(
12031204
artifacts: Array[AnyRef],
1204-
cacheDirectory: File): String = {
1205+
cacheDirectory: File): Seq[String] = {
12051206
artifacts.map { artifactInfo =>
12061207
val artifact = artifactInfo.asInstanceOf[Artifact].getModuleRevisionId
12071208
val extraAttrs = artifactInfo.asInstanceOf[Artifact].getExtraAttributes
@@ -1212,7 +1213,7 @@ private[spark] object SparkSubmitUtils {
12121213
}
12131214
cacheDirectory.getAbsolutePath + File.separator +
12141215
s"${artifact.getOrganisation}_${artifact.getName}-${artifact.getRevision}$classifier.jar"
1215-
}.mkString(",")
1216+
}
12161217
}
12171218

12181219
/** Adds the given maven coordinates to Ivy's module descriptor. */
@@ -1360,17 +1361,19 @@ private[spark] object SparkSubmitUtils {
13601361
* Resolves any dependencies that were supplied through maven coordinates
13611362
* @param coordinates Comma-delimited string of maven coordinates
13621363
* @param ivySettings An IvySettings containing resolvers to use
1364+
* @param transitive Whether resolving transitive dependencies, default is true
13631365
* @param exclusions Exclusions to apply when resolving transitive dependencies
1364-
* @return The comma-delimited path to the jars of the given maven artifacts including their
1366+
* @return Seq of path to the jars of the given maven artifacts including their
13651367
* transitive dependencies
13661368
*/
13671369
def resolveMavenCoordinates(
13681370
coordinates: String,
13691371
ivySettings: IvySettings,
1372+
transitive: Boolean,
13701373
exclusions: Seq[String] = Nil,
1371-
isTest: Boolean = false): String = {
1374+
isTest: Boolean = false): Seq[String] = {
13721375
if (coordinates == null || coordinates.trim.isEmpty) {
1373-
""
1376+
Nil
13741377
} else {
13751378
val sysOut = System.out
13761379
// Default configuration name for ivy
@@ -1396,7 +1399,7 @@ private[spark] object SparkSubmitUtils {
13961399
val ivy = Ivy.newInstance(ivySettings)
13971400
// Set resolve options to download transitive dependencies as well
13981401
val resolveOptions = new ResolveOptions
1399-
resolveOptions.setTransitive(true)
1402+
resolveOptions.setTransitive(transitive)
14001403
val retrieveOptions = new RetrieveOptions
14011404
// Turn downloading and logging off for testing
14021405
if (isTest) {

core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,6 @@ private[deploy] class DriverRunner(
160160
driverDesc.jarUrl,
161161
driverDir,
162162
conf,
163-
securityManager,
164163
SparkHadoopUtil.get.newConfiguration(conf),
165164
System.currentTimeMillis(),
166165
useCache = false)

core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@ package org.apache.spark.deploy.worker
1919

2020
import java.io.File
2121

22-
import org.apache.commons.lang3.StringUtils
23-
2422
import org.apache.spark.{SecurityManager, SparkConf}
25-
import org.apache.spark.deploy.{DependencyUtils, SparkHadoopUtil}
23+
import org.apache.spark.deploy.SparkHadoopUtil
2624
import org.apache.spark.internal.{config, Logging}
2725
import org.apache.spark.rpc.RpcEnv
2826
import org.apache.spark.util._
@@ -79,27 +77,21 @@ object DriverWrapper extends Logging {
7977
val secMgr = new SecurityManager(sparkConf)
8078
val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf)
8179

82-
val Seq(packagesExclusions, packages, repositories, ivyRepoPath, ivySettingsPath) =
83-
Seq(
84-
"spark.jars.excludes",
85-
"spark.jars.packages",
86-
"spark.jars.repositories",
87-
"spark.jars.ivy",
88-
"spark.jars.ivySettings"
89-
).map(sys.props.get(_).orNull)
80+
val ivyProperties = DependencyUtils.getIvyProperties()
9081

91-
val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(packagesExclusions,
92-
packages, repositories, ivyRepoPath, Option(ivySettingsPath))
82+
val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(true,
83+
ivyProperties.packagesExclusions, ivyProperties.packages, ivyProperties.repositories,
84+
ivyProperties.ivyRepoPath, Option(ivyProperties.ivySettingsPath))
9385
val jars = {
9486
val jarsProp = sys.props.get(config.JARS.key).orNull
95-
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
96-
DependencyUtils.mergeFileLists(jarsProp, resolvedMavenCoordinates)
87+
if (resolvedMavenCoordinates.nonEmpty) {
88+
DependencyUtils.mergeFileLists(jarsProp,
89+
DependencyUtils.mergeFileLists(resolvedMavenCoordinates: _*))
9790
} else {
9891
jarsProp
9992
}
10093
}
101-
val localJars = DependencyUtils.resolveAndDownloadJars(jars, userJar, sparkConf, hadoopConf,
102-
secMgr)
94+
val localJars = DependencyUtils.resolveAndDownloadJars(jars, userJar, sparkConf, hadoopConf)
10395
DependencyUtils.addJarsToClassPath(localJars, loader)
10496
}
10597
}

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -924,15 +924,15 @@ private[spark] class Executor(
924924
logInfo(s"Fetching $name with timestamp $timestamp")
925925
// Fetch file with useCache mode, close cache for local mode.
926926
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf,
927-
env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
927+
hadoopConf, timestamp, useCache = !isLocal)
928928
currentFiles(name) = timestamp
929929
}
930930
for ((name, timestamp) <- newArchives if currentArchives.getOrElse(name, -1L) < timestamp) {
931931
logInfo(s"Fetching $name with timestamp $timestamp")
932932
val sourceURI = new URI(name)
933933
val uriToDownload = UriBuilder.fromUri(sourceURI).fragment(null).build()
934934
val source = Utils.fetchFile(uriToDownload.toString, Utils.createTempDir(), conf,
935-
env.securityManager, hadoopConf, timestamp, useCache = !isLocal, shouldUntar = false)
935+
hadoopConf, timestamp, useCache = !isLocal, shouldUntar = false)
936936
val dest = new File(
937937
SparkFiles.getRootDirectory(),
938938
if (sourceURI.getFragment != null) sourceURI.getFragment else source.getName)
@@ -951,7 +951,7 @@ private[spark] class Executor(
951951
logInfo(s"Fetching $name with timestamp $timestamp")
952952
// Fetch file with useCache mode, close cache for local mode.
953953
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf,
954-
env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
954+
hadoopConf, timestamp, useCache = !isLocal)
955955
currentJars(name) = timestamp
956956
// Add it to our class loader
957957
val url = new File(SparkFiles.getRootDirectory(), localName).toURI.toURL

core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ object FallbackStorage extends Logging {
158158
val name = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name
159159
val dataFile = new Path(fallbackPath, s"$appId/$shuffleId/$name")
160160
val f = fallbackFileSystem.open(dataFile)
161-
val size = nextOffset - 1 - offset
161+
val size = nextOffset - offset
162162
logDebug(s"To byte array $size")
163163
val array = new Array[Byte](size.toInt)
164164
val startTimeNs = System.nanoTime()

0 commit comments

Comments
 (0)