-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23815][Core]Spark writer dynamic partition overwrite mode may fail to write output on multi level partition #20931
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…t on multi level partition
|
cc @cloud-fan |
|
Hi @fangshil , can you try and add test cases to verify the changes introduced in this patch? |
|
@sujithjay the test cases added in SPARK-20236 already covers the scenario(multi partitioning) in this patch. The added unit tests work fine with local file system, but the bug we reported here only happens when writing to HDFS. As a result, I did not add more test in this patch. |
|
ok to test |
| val finalPartPath = new Path(path, part) | ||
| fs.delete(finalPartPath, true) | ||
| if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { | ||
| fs.mkdirs(finalPartPath.getParent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you have some official HDFS document to support this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan yes, in official HDFS document(https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html), the rename command has precondition "dest must be root, or have a parent that exists"
| for (part <- partitionPaths) { | ||
| val finalPartPath = new Path(path, part) | ||
| fs.delete(finalPartPath, true) | ||
| if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we only create the parent dir if we fail to delete the finalPartPath?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan this is to follow the behavior of HDFS rename spec: it requires the parent to be present. If we create finalPartPath directly, then it will cause another wired behavior in rename when the dst path already exists. From the HDFS spec I shared above: " If the destination exists and is a directory, the final destination of the rename becomes the destination + the filename of the source path". We have confirmed this in our production cluster, and used the current patch to only create parent dir which follows the HDFS spec exactly,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the problem here is we didn't check whether the finalPartPath exists, and we shall actually check that before rename.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel the code here is not safe. We may fail to delete if finalPartPath doesn't exist, or there are some real failures. We should make sure finalPartPath doesn't exist before renaming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW we should add comments around here to explain all these stuff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on add comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The FileSystem API spec on delete says "Code SHOULD just call delete(path, recursive) and assume the destination is no longer present". Referring to its detailed spec, the only case that we may get false from delete would be finalPartPath does not exist. Other failures should result in exception. When finalPartPath does not exist, which is an expected case, we only need action if the parent of finalPartPath does not exist because otherwise we will have problem in rename according to rename spec. Please advise if you guys think we still should double-check finalPartPath before rename. will add comment after the discussion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah makes sense, let's add some comment to summary these discussions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
|
retest this please |
|
LGTM |
|
Test build #89187 has finished for PR 20931 at commit
|
… fail to write output on multi level partition ## What changes were proposed in this pull request? Spark introduced new writer mode to overwrite only related partitions in SPARK-20236. While we are using this feature in our production cluster, we found a bug when writing multi-level partitions on HDFS. A simple test case to reproduce this issue: val df = Seq(("1","2","3")).toDF("col1", "col2","col3") df.write.partitionBy("col1","col2").mode("overwrite").save("/my/hdfs/location") If HDFS location "/my/hdfs/location" does not exist, there will be no output. This seems to be caused by the job commit change in SPARK-20236 in HadoopMapReduceCommitProtocol. In the commit job process, the output has been written into staging dir /my/hdfs/location/.spark-staging.xxx/col1=1/col2=2, and then the code calls fs.rename to rename /my/hdfs/location/.spark-staging.xxx/col1=1/col2=2 to /my/hdfs/location/col1=1/col2=2. However, in our case the operation will fail on HDFS because /my/hdfs/location/col1=1 does not exists. HDFS rename can not create directory for more than one level. This does not happen in the new unit test added with SPARK-20236 which uses local file system. We are proposing a fix. When cleaning current partition dir /my/hdfs/location/col1=1/col2=2 before the rename op, if the delete op fails (because /my/hdfs/location/col1=1/col2=2 may not exist), we call mkdirs op to create the parent dir /my/hdfs/location/col1=1 (if the parent dir does not exist) so the following rename op can succeed. Reference: in official HDFS document(https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html), the rename command has precondition "dest must be root, or have a parent that exists" ## How was this patch tested? We have tested this patch on our production cluster and it fixed the problem Author: Fangshi Li <[email protected]> Closes #20931 from fangshil/master. (cherry picked from commit 4b07036) Signed-off-by: Wenchen Fan <[email protected]>
|
thanks, merging to master/2.3! |
… fail to write output on multi level partition
Spark introduced new writer mode to overwrite only related partitions in SPARK-20236. While we are using this feature in our production cluster, we found a bug when writing multi-level partitions on HDFS.
A simple test case to reproduce this issue:
val df = Seq(("1","2","3")).toDF("col1", "col2","col3")
df.write.partitionBy("col1","col2").mode("overwrite").save("/my/hdfs/location")
If HDFS location "/my/hdfs/location" does not exist, there will be no output.
This seems to be caused by the job commit change in SPARK-20236 in HadoopMapReduceCommitProtocol.
In the commit job process, the output has been written into staging dir /my/hdfs/location/.spark-staging.xxx/col1=1/col2=2, and then the code calls fs.rename to rename /my/hdfs/location/.spark-staging.xxx/col1=1/col2=2 to /my/hdfs/location/col1=1/col2=2. However, in our case the operation will fail on HDFS because /my/hdfs/location/col1=1 does not exists. HDFS rename can not create directory for more than one level.
This does not happen in the new unit test added with SPARK-20236 which uses local file system.
We are proposing a fix. When cleaning current partition dir /my/hdfs/location/col1=1/col2=2 before the rename op, if the delete op fails (because /my/hdfs/location/col1=1/col2=2 may not exist), we call mkdirs op to create the parent dir /my/hdfs/location/col1=1 (if the parent dir does not exist) so the following rename op can succeed.
Reference: in official HDFS document(https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html), the rename command has precondition "dest must be root, or have a parent that exists"
We have tested this patch on our production cluster and it fixed the problem
Author: Fangshi Li <[email protected]>
Closes apache#20931 from fangshil/master.
(cherry picked from commit 4b07036)
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit dfdf1bb)
What changes were proposed in this pull request?
Spark introduced new writer mode to overwrite only related partitions in SPARK-20236. While we are using this feature in our production cluster, we found a bug when writing multi-level partitions on HDFS.
A simple test case to reproduce this issue:
val df = Seq(("1","2","3")).toDF("col1", "col2","col3")
df.write.partitionBy("col1","col2").mode("overwrite").save("/my/hdfs/location")
If HDFS location "/my/hdfs/location" does not exist, there will be no output.
This seems to be caused by the job commit change in SPARK-20236 in HadoopMapReduceCommitProtocol.
In the commit job process, the output has been written into staging dir /my/hdfs/location/.spark-staging.xxx/col1=1/col2=2, and then the code calls fs.rename to rename /my/hdfs/location/.spark-staging.xxx/col1=1/col2=2 to /my/hdfs/location/col1=1/col2=2. However, in our case the operation will fail on HDFS because /my/hdfs/location/col1=1 does not exists. HDFS rename can not create directory for more than one level.
This does not happen in the new unit test added with SPARK-20236 which uses local file system.
We are proposing a fix. When cleaning current partition dir /my/hdfs/location/col1=1/col2=2 before the rename op, if the delete op fails (because /my/hdfs/location/col1=1/col2=2 may not exist), we call mkdirs op to create the parent dir /my/hdfs/location/col1=1 (if the parent dir does not exist) so the following rename op can succeed.
Reference: in official HDFS document(https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html), the rename command has precondition "dest must be root, or have a parent that exists"
How was this patch tested?
We have tested this patch on our production cluster and it fixed the problem