Skip to content

Commit e2c7e09

Browse files
committed
[SPARK-24646][CORE] Minor change to spark.yarn.dist.forceDownloadSchemes to support wildcard '*'
## What changes were proposed in this pull request? In the case of getting tokens via customized `ServiceCredentialProvider`, it is required that `ServiceCredentialProvider` be available in local spark-submit process classpath. In this case, all the configured remote sources should be forced to download to local. For the ease of using this configuration, here propose to add wildcard '*' support to `spark.yarn.dist.forceDownloadSchemes`, also clarify the usage of this configuration. ## How was this patch tested? New UT added. Author: jerryshao <[email protected]> Closes #21633 from jerryshao/SPARK-21917-followup.
1 parent 79c6689 commit e2c7e09

File tree

4 files changed

+28
-16
lines changed

4 files changed

+28
-16
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ private[spark] class SparkSubmit extends Logging {
385385
val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES)
386386

387387
def shouldDownload(scheme: String): Boolean = {
388-
forceDownloadSchemes.contains(scheme) ||
388+
forceDownloadSchemes.contains("*") || forceDownloadSchemes.contains(scheme) ||
389389
Try { FileSystem.getFileSystemClass(scheme, hadoopConf) }.isFailure
390390
}
391391

@@ -578,7 +578,8 @@ private[spark] class SparkSubmit extends Logging {
578578
}
579579
// Add the main application jar and any added jars to classpath in case YARN client
580580
// requires these jars.
581-
// This assumes both primaryResource and user jars are local jars, otherwise it will not be
581+
// This assumes both primaryResource and user jars are local jars, or already downloaded
582+
// to local by configuring "spark.yarn.dist.forceDownloadSchemes", otherwise it will not be
582583
// added to the classpath of YARN client.
583584
if (isYarnCluster) {
584585
if (isUserJar(args.primaryResource)) {

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -486,10 +486,11 @@ package object config {
486486

487487
private[spark] val FORCE_DOWNLOAD_SCHEMES =
488488
ConfigBuilder("spark.yarn.dist.forceDownloadSchemes")
489-
.doc("Comma-separated list of schemes for which files will be downloaded to the " +
489+
.doc("Comma-separated list of schemes for which resources will be downloaded to the " +
490490
"local disk prior to being added to YARN's distributed cache. For use in cases " +
491491
"where the YARN service does not support schemes that are supported by Spark, like http, " +
492-
"https and ftp.")
492+
"https and ftp, or jars required to be in the local YARN client's classpath. Wildcard " +
493+
"'*' is denoted to download resources for all the schemes.")
493494
.stringConf
494495
.toSequence
495496
.createWithDefault(Nil)

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -995,20 +995,24 @@ class SparkSubmitSuite
995995
}
996996

997997
test("download remote resource if it is not supported by yarn service") {
998-
testRemoteResources(enableHttpFs = false, blacklistHttpFs = false)
998+
testRemoteResources(enableHttpFs = false)
999999
}
10001000

10011001
test("avoid downloading remote resource if it is supported by yarn service") {
1002-
testRemoteResources(enableHttpFs = true, blacklistHttpFs = false)
1002+
testRemoteResources(enableHttpFs = true)
10031003
}
10041004

10051005
test("force download from blacklisted schemes") {
1006-
testRemoteResources(enableHttpFs = true, blacklistHttpFs = true)
1006+
testRemoteResources(enableHttpFs = true, blacklistSchemes = Seq("http"))
1007+
}
1008+
1009+
test("force download for all the schemes") {
1010+
testRemoteResources(enableHttpFs = true, blacklistSchemes = Seq("*"))
10071011
}
10081012

10091013
private def testRemoteResources(
10101014
enableHttpFs: Boolean,
1011-
blacklistHttpFs: Boolean): Unit = {
1015+
blacklistSchemes: Seq[String] = Nil): Unit = {
10121016
val hadoopConf = new Configuration()
10131017
updateConfWithFakeS3Fs(hadoopConf)
10141018
if (enableHttpFs) {
@@ -1025,8 +1029,8 @@ class SparkSubmitSuite
10251029
val tmpHttpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir)
10261030
val tmpHttpJarPath = s"http://${new File(tmpHttpJar.toURI).getAbsolutePath}"
10271031

1028-
val forceDownloadArgs = if (blacklistHttpFs) {
1029-
Seq("--conf", "spark.yarn.dist.forceDownloadSchemes=http")
1032+
val forceDownloadArgs = if (blacklistSchemes.nonEmpty) {
1033+
Seq("--conf", s"spark.yarn.dist.forceDownloadSchemes=${blacklistSchemes.mkString(",")}")
10301034
} else {
10311035
Nil
10321036
}
@@ -1044,14 +1048,19 @@ class SparkSubmitSuite
10441048

10451049
val jars = conf.get("spark.yarn.dist.jars").split(",").toSet
10461050

1047-
// The URI of remote S3 resource should still be remote.
1048-
assert(jars.contains(tmpS3JarPath))
1051+
def isSchemeBlacklisted(scheme: String) = {
1052+
blacklistSchemes.contains("*") || blacklistSchemes.contains(scheme)
1053+
}
1054+
1055+
if (!isSchemeBlacklisted("s3")) {
1056+
assert(jars.contains(tmpS3JarPath))
1057+
}
10491058

1050-
if (enableHttpFs && !blacklistHttpFs) {
1059+
if (enableHttpFs && blacklistSchemes.isEmpty) {
10511060
// If Http FS is supported by yarn service, the URI of remote http resource should
10521061
// still be remote.
10531062
assert(jars.contains(tmpHttpJarPath))
1054-
} else {
1063+
} else if (!enableHttpFs || isSchemeBlacklisted("http")) {
10551064
// If Http FS is not supported by yarn service, or http scheme is configured to be force
10561065
// downloading, the URI of remote http resource should be changed to a local one.
10571066
val jarName = new File(tmpHttpJar.toURI).getName

docs/running-on-yarn.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,9 +218,10 @@ To use a custom metrics.properties for the application master and executors, upd
218218
<td><code>spark.yarn.dist.forceDownloadSchemes</code></td>
219219
<td><code>(none)</code></td>
220220
<td>
221-
Comma-separated list of schemes for which files will be downloaded to the local disk prior to
221+
Comma-separated list of schemes for which resources will be downloaded to the local disk prior to
222222
being added to YARN's distributed cache. For use in cases where the YARN service does not
223-
support schemes that are supported by Spark, like http, https and ftp.
223+
support schemes that are supported by Spark, like http, https and ftp, or jars required to be in the
224+
local YARN client's classpath. Wildcard '*' is denoted to download resources for all the schemes.
224225
</td>
225226
</tr>
226227
<tr>

0 commit comments

Comments
 (0)