Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1517,9 +1517,12 @@ class SparkContext(config: SparkConf) extends Logging {
* only supported for Hadoop-supported filesystems.
*/
def addFile(path: String, recursive: Boolean): Unit = {
val uri = new Path(path).toUri
var uri = new Path(path).toUri
val schemeCorrectedPath = uri.getScheme match {
case null | "local" => new File(path).getCanonicalFile.toURI.toString
case null | "local" =>
// SPARK-24195: Local is not a valid scheme for FileSystem, we should only keep path here.
uri = new Path(uri.getPath).toUri
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? Can't we just do new File(uri.getPath).getCanonicalFile.toURI.toString without this line?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, same question. The above line seems not useful.

Copy link
Member

@felixcheung felixcheung Jun 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it changes uri - which is referenced again below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, just as @felixcheung said, this because we will use uri in https://github.com/apache/spark/pull/21533/files/f922fd8c995164cada4a8b72e92c369a827def16#diff-364713d7776956cb8b0a771e9b62f82dR1557, if the uri with local scheme, we'll get an exception cause local is not a valid scheme for FileSystem.

Copy link
Member

@HyukjinKwon HyukjinKwon Jun 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean we getPath doesn't include scheme:

scala> new Path("local:///a/b/c")
res0: org.apache.hadoop.fs.Path = local:/a/b/c

scala> new Path("local:///a/b/c").toUri
res1: java.net.URI = local:///a/b/c

scala> new Path("local:///a/b/c").toUri.getScheme
res2: String = local

scala> new Path("local:///a/b/c").toUri.getPath
res3: String = /a/b/c

why should we do this again?

scala> new Path(new Path("local:///a/b/c").toUri.getPath).toUri.getPath
res4: String = /a/b/c

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea we can simplify this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon @jiangxb1987
Thanks for your explain, I think I know what's your meaning about we getPath doesn't include scheme. Actually the purpose of this code uri = new Path(uri.getPath).toUri, is to reassign the var in +1520, we don't want the uri including local scheme.

Can't we just do new File(uri.getPath).getCanonicalFile.toURI.toString without this line?

We can't because like I explained above, if we didn't do uri = new Path(uri.getPath).toUri, will get a exception like below:

No FileSystem for scheme: local
java.io.IOException: No FileSystem for scheme: local
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
	at org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1830)
	at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:690)
	at org.apache.spark.util.Utils$.fetchFile(Utils.scala:486)
	at org.apache.spark.SparkContext.addFile(SparkContext.scala:1557)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, at least we can do:

val a = new File(uri.getPath).getCanonicalFile.toURI.toString
uri = new Path(uri.getPath).toUri
a

new Path(uri.getPath).toUri for trimming the scheme looks not quite clean though. It's a-okay at least to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see, thanks. I'll do this in the next commit. Thanks for your patient explain.

new File(uri.getPath).getCanonicalFile.toURI.toString
case _ => path
}

Expand Down
51 changes: 27 additions & 24 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,51 +116,54 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
test("basic case for addFile and listFiles") {
val dir = Utils.createTempDir()

// file and absolute path for normal path
val file1 = File.createTempFile("someprefix1", "somesuffix1", dir)
val absolutePath1 = file1.getAbsolutePath

// file and absolute path for relative path
val file2 = File.createTempFile("someprefix2", "somesuffix2", dir)
val relativePath = file2.getParent + "/../" + file2.getParentFile.getName + "/" + file2.getName
val absolutePath2 = file2.getAbsolutePath

// file and absolute path for path with local scheme
val file3 = File.createTempFile("someprefix3", "somesuffix3", dir)
val localPath = s"local://${file3.getParent}/../${file3.getParentFile.getName}" +
s"/${file3.getName}"
val absolutePath3 = file3.getAbsolutePath

try {
Files.write("somewords1", file1, StandardCharsets.UTF_8)
Files.write("somewords2", file2, StandardCharsets.UTF_8)
val length1 = file1.length()
val length2 = file2.length()
Files.write("somewords3", file3, StandardCharsets.UTF_8)

sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
sc.addFile(file1.getAbsolutePath)
sc.addFile(relativePath)
sc.parallelize(Array(1), 1).map(x => {
val gotten1 = new File(SparkFiles.get(file1.getName))
val gotten2 = new File(SparkFiles.get(file2.getName))
if (!gotten1.exists()) {
def checkGottenFile(file: File, absolutePath: String): Unit = {
val length = file.length()
val gotten = new File(SparkFiles.get(file.getName))
if (!gotten.exists()) {
throw new SparkException("file doesn't exist : " + absolutePath1)
}
if (!gotten2.exists()) {
throw new SparkException("file doesn't exist : " + absolutePath2)
}

if (length1 != gotten1.length()) {
if (file.length() != gotten.length()) {
throw new SparkException(
s"file has different length $length1 than added file ${gotten1.length()} : " +
s"file has different length $length than added file ${gotten.length()} : " +
absolutePath1)
}
if (length2 != gotten2.length()) {
throw new SparkException(
s"file has different length $length2 than added file ${gotten2.length()} : " +
absolutePath2)
}

if (absolutePath1 == gotten1.getAbsolutePath) {
if (absolutePath == gotten.getAbsolutePath) {
throw new SparkException("file should have been copied :" + absolutePath1)
}
if (absolutePath2 == gotten2.getAbsolutePath) {
throw new SparkException("file should have been copied : " + absolutePath2)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we not change the existing test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I keep all existing test and just do clean work for reducing common code line by adding a function checkGottenFile in https://github.com/apache/spark/pull/21533/files/f922fd8c995164cada4a8b72e92c369a827def16#diff-8d5858d578a2dda1a2edb0d8cefa4f24R139. If you think it's unnecessary, I just change it back.

}

sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
sc.addFile(file1.getAbsolutePath)
sc.addFile(relativePath)
sc.addFile(localPath)
sc.parallelize(Array(1), 1).map { x =>
checkGottenFile(file1, absolutePath1)
checkGottenFile(file2, absolutePath2)
checkGottenFile(file3, absolutePath3)
x
}).count()
}.count()
assert(sc.listFiles().filter(_.contains("somesuffix1")).size == 1)
} finally {
sc.stop()
Expand Down