Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1517,9 +1517,19 @@ class SparkContext(config: SparkConf) extends Logging {
* only supported for Hadoop-supported filesystems.
*/
def addFile(path: String, recursive: Boolean): Unit = {
val uri = new Path(path).toUri
var uri = new Path(path).toUri
// mark the original path's scheme is local or not, there is no need to add the local file
// in file server.
var localFile = false
val schemeCorrectedPath = uri.getScheme match {
case null | "local" => new File(path).getCanonicalFile.toURI.toString
case null =>
new File(path).getCanonicalFile.toURI.toString
case "local" =>
localFile = true
val tmpPath = new File(uri.getPath).getCanonicalFile.toURI.toString
// SPARK-24195: Local is not a valid scheme for FileSystem, we should only keep path here.
uri = new Path(uri.getPath).toUri
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? Can't we just do new File(uri.getPath).getCanonicalFile.toURI.toString without this line?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, same question. The above line seems not useful.

Copy link
Member

@felixcheung felixcheung Jun 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it changes uri - which is referenced again below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, just as @felixcheung said, this because we will use uri in https://github.com/apache/spark/pull/21533/files/f922fd8c995164cada4a8b72e92c369a827def16#diff-364713d7776956cb8b0a771e9b62f82dR1557, if the uri with local scheme, we'll get an exception cause local is not a valid scheme for FileSystem.

Copy link
Member

@HyukjinKwon HyukjinKwon Jun 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean we getPath doesn't include scheme:

scala> new Path("local:///a/b/c")
res0: org.apache.hadoop.fs.Path = local:/a/b/c

scala> new Path("local:///a/b/c").toUri
res1: java.net.URI = local:///a/b/c

scala> new Path("local:///a/b/c").toUri.getScheme
res2: String = local

scala> new Path("local:///a/b/c").toUri.getPath
res3: String = /a/b/c

why should we do this again?

scala> new Path(new Path("local:///a/b/c").toUri.getPath).toUri.getPath
res4: String = /a/b/c

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea we can simplify this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon @jiangxb1987
Thanks for your explain, I think I know what's your meaning about we getPath doesn't include scheme. Actually the purpose of this code uri = new Path(uri.getPath).toUri, is to reassign the var in +1520, we don't want the uri including local scheme.

Can't we just do new File(uri.getPath).getCanonicalFile.toURI.toString without this line?

We can't because like I explained above, if we didn't do uri = new Path(uri.getPath).toUri, will get a exception like below:

No FileSystem for scheme: local
java.io.IOException: No FileSystem for scheme: local
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
	at org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1830)
	at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:690)
	at org.apache.spark.util.Utils$.fetchFile(Utils.scala:486)
	at org.apache.spark.SparkContext.addFile(SparkContext.scala:1557)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, at least we can do:

val a = new File(uri.getPath).getCanonicalFile.toURI.toString
uri = new Path(uri.getPath).toUri
a

new Path(uri.getPath).toUri for trimming the scheme looks not quite clean though. It's a-okay at least to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see, thanks. I'll do this in the next commit. Thanks for your patient explain.

tmpPath
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the change here make file with "local" scheme a no-op. This makes me think whether supporting "local" scheme in addFile is meaningful or not? Because file with "local" scheme is already existed on every node, also it should be aware by the user, so adding it seems not meaingful.

By looking at the similar method addJar, there "local" jar is properly treated without adding to fileServer, and properly convert to the right scheme used by classloader.

Copy link
Member Author

@xuanyuanking xuanyuanking Jun 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me think whether supporting "local" scheme in addFile is meaningful or not? Because file with "local" scheme is already existed on every node, also it should be aware by the user, so adding it seems not meaingful.

Yeah, agree with you. The last change wants to treat "local" file without adding to fileServer and correct its scheme to "file:", but maybe add a local file, the behavior itself is a no-op? So we just forbidden user pass a file with "local" scheme in addFile?

case _ => path
}

Expand All @@ -1541,7 +1551,7 @@ class SparkContext(config: SparkConf) extends Logging {
Utils.validateURL(uri)
}

val key = if (!isLocal && scheme == "file") {
val key = if (!isLocal && scheme == "file" && !localFile) {
env.rpcEnv.fileServer.addFile(new File(uri.getPath))
} else {
schemeCorrectedPath
Expand Down
51 changes: 27 additions & 24 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,51 +116,54 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
test("basic case for addFile and listFiles") {
val dir = Utils.createTempDir()

// file and absolute path for normal path
val file1 = File.createTempFile("someprefix1", "somesuffix1", dir)
val absolutePath1 = file1.getAbsolutePath

// file and absolute path for relative path
val file2 = File.createTempFile("someprefix2", "somesuffix2", dir)
val relativePath = file2.getParent + "/../" + file2.getParentFile.getName + "/" + file2.getName
val absolutePath2 = file2.getAbsolutePath

// file and absolute path for path with local scheme
val file3 = File.createTempFile("someprefix3", "somesuffix3", dir)
val localPath = s"local://${file3.getParent}/../${file3.getParentFile.getName}" +
s"/${file3.getName}"
val absolutePath3 = file3.getAbsolutePath

try {
Files.write("somewords1", file1, StandardCharsets.UTF_8)
Files.write("somewords2", file2, StandardCharsets.UTF_8)
val length1 = file1.length()
val length2 = file2.length()
Files.write("somewords3", file3, StandardCharsets.UTF_8)

sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
sc.addFile(file1.getAbsolutePath)
sc.addFile(relativePath)
sc.parallelize(Array(1), 1).map(x => {
val gotten1 = new File(SparkFiles.get(file1.getName))
val gotten2 = new File(SparkFiles.get(file2.getName))
if (!gotten1.exists()) {
def checkGottenFile(file: File, absolutePath: String): Unit = {
val length = file.length()
val gotten = new File(SparkFiles.get(file.getName))
if (!gotten.exists()) {
throw new SparkException("file doesn't exist : " + absolutePath1)
}
if (!gotten2.exists()) {
throw new SparkException("file doesn't exist : " + absolutePath2)
}

if (length1 != gotten1.length()) {
if (file.length() != gotten.length()) {
throw new SparkException(
s"file has different length $length1 than added file ${gotten1.length()} : " +
s"file has different length $length than added file ${gotten.length()} : " +
absolutePath1)
}
if (length2 != gotten2.length()) {
throw new SparkException(
s"file has different length $length2 than added file ${gotten2.length()} : " +
absolutePath2)
}

if (absolutePath1 == gotten1.getAbsolutePath) {
if (absolutePath == gotten.getAbsolutePath) {
throw new SparkException("file should have been copied :" + absolutePath1)
}
if (absolutePath2 == gotten2.getAbsolutePath) {
throw new SparkException("file should have been copied : " + absolutePath2)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we not change the existing test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I keep all existing test and just do clean work for reducing common code line by adding a function checkGottenFile in https://github.com/apache/spark/pull/21533/files/f922fd8c995164cada4a8b72e92c369a827def16#diff-8d5858d578a2dda1a2edb0d8cefa4f24R139. If you think it's unnecessary, I just change it back.

}

sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
sc.addFile(file1.getAbsolutePath)
sc.addFile(relativePath)
sc.addFile(localPath)
sc.parallelize(Array(1), 1).map { x =>
checkGottenFile(file1, absolutePath1)
checkGottenFile(file2, absolutePath2)
checkGottenFile(file3, absolutePath3)
x
}).count()
}.count()
assert(sc.listFiles().filter(_.contains("somesuffix1")).size == 1)
} finally {
sc.stop()
Expand Down