From f922fd8c995164cada4a8b72e92c369a827def16 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Tue, 12 Jun 2018 09:51:44 +0800 Subject: [PATCH 1/5] bug fix for local:/ path in sc.addFile --- .../scala/org/apache/spark/SparkContext.scala | 7 ++- .../org/apache/spark/SparkContextSuite.scala | 49 ++++++++++--------- 2 files changed, 31 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 5e8595603cc9..2f15e39a01cb 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1517,9 +1517,12 @@ class SparkContext(config: SparkConf) extends Logging { * only supported for Hadoop-supported filesystems. */ def addFile(path: String, recursive: Boolean): Unit = { - val uri = new Path(path).toUri + var uri = new Path(path).toUri val schemeCorrectedPath = uri.getScheme match { - case null | "local" => new File(path).getCanonicalFile.toURI.toString + case null | "local" => + // SPARK-24195: Local is not a valid scheme for FileSystem, we should only keep path here. + uri = new Path(uri.getPath).toUri + new File(uri.getPath).getCanonicalFile.toURI.toString case _ => path } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index ce9f2be1c02d..0d995a0e8e05 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -116,49 +116,52 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu test("basic case for addFile and listFiles") { val dir = Utils.createTempDir() + // file and absolute path for normal path val file1 = File.createTempFile("someprefix1", "somesuffix1", dir) val absolutePath1 = file1.getAbsolutePath + // file and absolute path for relative path val file2 = File.createTempFile("someprefix2", "somesuffix2", dir) val relativePath = file2.getParent + "/../" + file2.getParentFile.getName + "/" + file2.getName val absolutePath2 = file2.getAbsolutePath + // file and absolute path for path with local scheme + val file3 = File.createTempFile("someprefix3", "somesuffix3", dir) + val localPath = "local://" + file3.getParent + "/../" + file3.getParentFile.getName + + "/" + file3.getName + val absolutePath3 = file3.getAbsolutePath + try { Files.write("somewords1", file1, StandardCharsets.UTF_8) Files.write("somewords2", file2, StandardCharsets.UTF_8) - val length1 = file1.length() - val length2 = file2.length() + Files.write("somewords3", file3, StandardCharsets.UTF_8) - sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) - sc.addFile(file1.getAbsolutePath) - sc.addFile(relativePath) - sc.parallelize(Array(1), 1).map(x => { - val gotten1 = new File(SparkFiles.get(file1.getName)) - val gotten2 = new File(SparkFiles.get(file2.getName)) - if (!gotten1.exists()) { + def checkGottenFile(file: File, absolutePath: String): Unit = { + val length = file.length() + val gotten = new File(SparkFiles.get(file.getName)) + if (!gotten.exists()) { throw new SparkException("file doesn't exist : " + absolutePath1) } - if (!gotten2.exists()) { - throw new SparkException("file doesn't exist : " + absolutePath2) - } - if (length1 != gotten1.length()) { + if (file.length() != gotten.length()) { throw new SparkException( - s"file has different length $length1 than added file ${gotten1.length()} : " + + s"file has different length $length than added file ${gotten.length()} : " + absolutePath1) } - if (length2 != gotten2.length()) { - throw new SparkException( - s"file has different length $length2 than added file ${gotten2.length()} : " + - absolutePath2) - } - if (absolutePath1 == gotten1.getAbsolutePath) { + if (absolutePath == gotten.getAbsolutePath) { throw new SparkException("file should have been copied :" + absolutePath1) } - if (absolutePath2 == gotten2.getAbsolutePath) { - throw new SparkException("file should have been copied : " + absolutePath2) - } + } + + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + sc.addFile(file1.getAbsolutePath) + sc.addFile(relativePath) + sc.addFile(localPath) + sc.parallelize(Array(1), 1).map(x => { + checkGottenFile(file1, absolutePath1) + checkGottenFile(file2, absolutePath2) + checkGottenFile(file3, absolutePath3) x }).count() assert(sc.listFiles().filter(_.contains("somesuffix1")).size == 1) From 797cefe599cfd4500b2bf5296420dcc402cf4eff Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 13 Jun 2018 23:41:01 +0800 Subject: [PATCH 2/5] address comments --- .../test/scala/org/apache/spark/SparkContextSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 0d995a0e8e05..2a4659296688 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -127,8 +127,8 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu // file and absolute path for path with local scheme val file3 = File.createTempFile("someprefix3", "somesuffix3", dir) - val localPath = "local://" + file3.getParent + "/../" + file3.getParentFile.getName + - "/" + file3.getName + val localPath = s"local://${file3.getParent}/../${file3.getParentFile.getName}" + + s"/${file3.getName}" val absolutePath3 = file3.getAbsolutePath try { @@ -158,12 +158,12 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu sc.addFile(file1.getAbsolutePath) sc.addFile(relativePath) sc.addFile(localPath) - sc.parallelize(Array(1), 1).map(x => { + sc.parallelize(Array(1), 1).map { x => checkGottenFile(file1, absolutePath1) checkGottenFile(file2, absolutePath2) checkGottenFile(file3, absolutePath3) x - }).count() + }.count() assert(sc.listFiles().filter(_.contains("somesuffix1")).size == 1) } finally { sc.stop() From 5daf8042a5f3b6613a462e34bb1d7b1d18ffc4fc Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Sun, 17 Jun 2018 22:33:32 +0800 Subject: [PATCH 3/5] local file should not be added into fileServer --- .../main/scala/org/apache/spark/SparkContext.scala | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 2f15e39a01cb..6b4bc2e1cc72 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1518,11 +1518,18 @@ class SparkContext(config: SparkConf) extends Logging { */ def addFile(path: String, recursive: Boolean): Unit = { var uri = new Path(path).toUri + // mark the original path's scheme is local or not, there is no need to add the local file + // in file server. + var localFile = false val schemeCorrectedPath = uri.getScheme match { - case null | "local" => + case null => + new File(path).getCanonicalFile.toURI.toString + case "local" => + localFile = true + val tmpPath = new File(uri.getPath).getCanonicalFile.toURI.toString // SPARK-24195: Local is not a valid scheme for FileSystem, we should only keep path here. uri = new Path(uri.getPath).toUri - new File(uri.getPath).getCanonicalFile.toURI.toString + tmpPath case _ => path } @@ -1544,7 +1551,7 @@ class SparkContext(config: SparkConf) extends Logging { Utils.validateURL(uri) } - val key = if (!isLocal && scheme == "file") { + val key = if (!isLocal && scheme == "file" && !localFile) { env.rpcEnv.fileServer.addFile(new File(uri.getPath)) } else { schemeCorrectedPath From ac12568f90e27a1748c73de57f3d63190c823278 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 27 Jun 2018 17:58:28 +0800 Subject: [PATCH 4/5] addFile don not support add file in local scheme --- .../scala/org/apache/spark/SparkContext.scala | 19 +++---- .../org/apache/spark/SparkContextSuite.scala | 51 +++++++++---------- 2 files changed, 31 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6b4bc2e1cc72..4dbcc8105090 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1517,19 +1517,14 @@ class SparkContext(config: SparkConf) extends Logging { * only supported for Hadoop-supported filesystems. */ def addFile(path: String, recursive: Boolean): Unit = { - var uri = new Path(path).toUri - // mark the original path's scheme is local or not, there is no need to add the local file - // in file server. - var localFile = false + val uri = new Path(path).toUri val schemeCorrectedPath = uri.getScheme match { - case null => - new File(path).getCanonicalFile.toURI.toString + case null => new File(path).getCanonicalFile.toURI.toString case "local" => - localFile = true - val tmpPath = new File(uri.getPath).getCanonicalFile.toURI.toString - // SPARK-24195: Local is not a valid scheme for FileSystem, we should only keep path here. - uri = new Path(uri.getPath).toUri - tmpPath + logWarning("We do not support add a local file here because file with local scheme is " + + "already existed on every node, there is no need to call addFile to add it again. " + + "(See more discussion about this in SPARK-24195.)") + return case _ => path } @@ -1551,7 +1546,7 @@ class SparkContext(config: SparkConf) extends Logging { Utils.validateURL(uri) } - val key = if (!isLocal && scheme == "file" && !localFile) { + val key = if (!isLocal && scheme == "file") { env.rpcEnv.fileServer.addFile(new File(uri.getPath)) } else { schemeCorrectedPath diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 2a4659296688..ce9f2be1c02d 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -116,54 +116,51 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu test("basic case for addFile and listFiles") { val dir = Utils.createTempDir() - // file and absolute path for normal path val file1 = File.createTempFile("someprefix1", "somesuffix1", dir) val absolutePath1 = file1.getAbsolutePath - // file and absolute path for relative path val file2 = File.createTempFile("someprefix2", "somesuffix2", dir) val relativePath = file2.getParent + "/../" + file2.getParentFile.getName + "/" + file2.getName val absolutePath2 = file2.getAbsolutePath - // file and absolute path for path with local scheme - val file3 = File.createTempFile("someprefix3", "somesuffix3", dir) - val localPath = s"local://${file3.getParent}/../${file3.getParentFile.getName}" + - s"/${file3.getName}" - val absolutePath3 = file3.getAbsolutePath - try { Files.write("somewords1", file1, StandardCharsets.UTF_8) Files.write("somewords2", file2, StandardCharsets.UTF_8) - Files.write("somewords3", file3, StandardCharsets.UTF_8) + val length1 = file1.length() + val length2 = file2.length() - def checkGottenFile(file: File, absolutePath: String): Unit = { - val length = file.length() - val gotten = new File(SparkFiles.get(file.getName)) - if (!gotten.exists()) { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + sc.addFile(file1.getAbsolutePath) + sc.addFile(relativePath) + sc.parallelize(Array(1), 1).map(x => { + val gotten1 = new File(SparkFiles.get(file1.getName)) + val gotten2 = new File(SparkFiles.get(file2.getName)) + if (!gotten1.exists()) { throw new SparkException("file doesn't exist : " + absolutePath1) } + if (!gotten2.exists()) { + throw new SparkException("file doesn't exist : " + absolutePath2) + } - if (file.length() != gotten.length()) { + if (length1 != gotten1.length()) { throw new SparkException( - s"file has different length $length than added file ${gotten.length()} : " + + s"file has different length $length1 than added file ${gotten1.length()} : " + absolutePath1) } + if (length2 != gotten2.length()) { + throw new SparkException( + s"file has different length $length2 than added file ${gotten2.length()} : " + + absolutePath2) + } - if (absolutePath == gotten.getAbsolutePath) { + if (absolutePath1 == gotten1.getAbsolutePath) { throw new SparkException("file should have been copied :" + absolutePath1) } - } - - sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) - sc.addFile(file1.getAbsolutePath) - sc.addFile(relativePath) - sc.addFile(localPath) - sc.parallelize(Array(1), 1).map { x => - checkGottenFile(file1, absolutePath1) - checkGottenFile(file2, absolutePath2) - checkGottenFile(file3, absolutePath3) + if (absolutePath2 == gotten2.getAbsolutePath) { + throw new SparkException("file should have been copied : " + absolutePath2) + } x - }.count() + }).count() assert(sc.listFiles().filter(_.contains("somesuffix1")).size == 1) } finally { sc.stop() From eb46ccfec084c2439a26eee38015381f091fe164 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 28 Jun 2018 12:38:11 +0800 Subject: [PATCH 5/5] Rephrase log --- core/src/main/scala/org/apache/spark/SparkContext.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4dbcc8105090..e3c58a5cf7a0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1521,9 +1521,8 @@ class SparkContext(config: SparkConf) extends Logging { val schemeCorrectedPath = uri.getScheme match { case null => new File(path).getCanonicalFile.toURI.toString case "local" => - logWarning("We do not support add a local file here because file with local scheme is " + - "already existed on every node, there is no need to call addFile to add it again. " + - "(See more discussion about this in SPARK-24195.)") + logWarning("File with 'local' scheme is not supported to add to file server, since " + + "it is already available on every node.") return case _ => path }