From da63c17d7ae7fbf04cc474d946d61a098b3e1ade Mon Sep 17 00:00:00 2001 From: Fangshi Li Date: Tue, 27 Mar 2018 21:25:54 -0700 Subject: [PATCH 1/4] Spark writer dynamic partition overwrite mode may fail to write output on multi level partition --- .../spark/internal/io/HadoopMapReduceCommitProtocol.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 6d20ef1f98a3..ac56adf28a86 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol( logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") for (part <- partitionPaths) { val finalPartPath = new Path(path, part) - fs.delete(finalPartPath, true) + if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { + fs.mkdirs(finalPartPath.getParent) + } fs.rename(new Path(stagingDir, part), finalPartPath) } } From 686a4043b54ff44a29f9d01b615c9de50678217c Mon Sep 17 00:00:00 2001 From: Fangshi Li Date: Sat, 7 Apr 2018 02:16:22 -0700 Subject: [PATCH 2/4] [SPARK-23815]follow up. add comment --- .../spark/internal/io/HadoopMapReduceCommitProtocol.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index ac56adf28a86..c6add10542aa 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -187,6 +187,13 @@ class HadoopMapReduceCommitProtocol( for (part <- partitionPaths) { val finalPartPath = new Path(path, part) if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { + // According to the official hadoop FileSystem API spec, delete op should assume + // the destination is no longer present regardless of return value, and in our case, + // it should return false only when finalPartPath does not exist. + // When finalPartPath does not exist, we need to take action only when the parent of + // finalPartPath does not exist(e.g. the scenario described on SPARK-23815), because + // FileSystem API spec on rename op says the rename destination must have a parent + // that exists, otherwise we may get unexpected result on the rename. fs.mkdirs(finalPartPath.getParent) } fs.rename(new Path(stagingDir, part), finalPartPath) From a9342ccaff7f16365f4740fded749ac1fdac9cb7 Mon Sep 17 00:00:00 2001 From: Fangshi Li Date: Sat, 7 Apr 2018 02:27:13 -0700 Subject: [PATCH 3/4] [SPARK-23815]follow up. improve comment --- .../internal/io/HadoopMapReduceCommitProtocol.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index c6add10542aa..cd0635a58322 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -188,12 +188,13 @@ class HadoopMapReduceCommitProtocol( val finalPartPath = new Path(path, part) if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { // According to the official hadoop FileSystem API spec, delete op should assume - // the destination is no longer present regardless of return value, and in our case, - // it should return false only when finalPartPath does not exist. - // When finalPartPath does not exist, we need to take action only when the parent of - // finalPartPath does not exist(e.g. the scenario described on SPARK-23815), because - // FileSystem API spec on rename op says the rename destination must have a parent - // that exists, otherwise we may get unexpected result on the rename. + // the destination is no longer present regardless of return value, thus we do not + // need to double check if finalPartPath exists before rename. + // Also, in our case, delete should return false only when finalPartPath does not + // exist. When this happens, we need to take action only if parent of finalPartPath + // also does not exist(e.g. the scenario described on SPARK-23815), because + // FileSystem API spec on rename op says the rename dest(finalPartPath) must have + // a parent that exists, otherwise we may get unexpected result on the rename. fs.mkdirs(finalPartPath.getParent) } fs.rename(new Path(stagingDir, part), finalPartPath) From 08b9601ee9a0850f8fea0af7c9027bdef2016857 Mon Sep 17 00:00:00 2001 From: Fangshi Li Date: Sat, 7 Apr 2018 02:34:03 -0700 Subject: [PATCH 4/4] [SPARK-23815]follow up. further improve comment --- .../spark/internal/io/HadoopMapReduceCommitProtocol.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index cd0635a58322..3e60c50ada59 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -190,8 +190,8 @@ class HadoopMapReduceCommitProtocol( // According to the official hadoop FileSystem API spec, delete op should assume // the destination is no longer present regardless of return value, thus we do not // need to double check if finalPartPath exists before rename. - // Also, in our case, delete should return false only when finalPartPath does not - // exist. When this happens, we need to take action only if parent of finalPartPath + // Also in our case, based on the spec, delete returns false only when finalPartPath + // does not exist. When this happens, we need to take action if parent of finalPartPath // also does not exist(e.g. the scenario described on SPARK-23815), because // FileSystem API spec on rename op says the rename dest(finalPartPath) must have // a parent that exists, otherwise we may get unexpected result on the rename.