-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-28945][SPARK-29037][CORE][SQL] Fix the issue that spark gives duplicate result and support concurrent file source write operations write to different partitions in the same table. #25863
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
e5deb55 to
fa60fc9
Compare
|
How about just clean up the staging dir before write job starts? |
|
ok to test |
|
Test build #111047 has finished for PR 25863 at commit
|
|
@turboFei it seems your description does not reflect changes in this PR. |
The staging dir may be used by another job, so we can not clean up it directly. |
|
Test build #111061 has finished for PR 25863 at commit
|
c3791f7 to
eedad23
Compare
|
Test build #111073 has finished for PR 25863 at commit
|
|
Test build #111075 has finished for PR 25863 at commit
|
|
Test build #111109 has finished for PR 25863 at commit
|
a4f33d9 to
98aa212
Compare
|
In this PR, for the partition overwrite operation, I set a unique output for that. I have test the UT failed before locally, they all passed. |
|
retest this please. |
|
Test build #111116 has finished for PR 25863 at commit
|
|
In this PR, I set a unique output dir for partition overwrite operation, both dynamic and static partition overwrite. |
|
With this PR, the InsertOverwrite operation for partitioned table will not reuse InsertInto operation for partitioned table and InsertOverwrite/InsertInto for non-partitioned table will still use |
|
In the new commit. |
|
Test build #111125 has finished for PR 25863 at commit
|
f87aeb3 to
8098a8f
Compare
|
Test build #111516 has finished for PR 25863 at commit
|
| * @param isInsertIntoHadoopFsRelation whether is a InsertIntoHadoopFsRelation operation | ||
| * @param escapedStaticPartitionKVs static partition key and value pairs, which have been escaped | ||
| */ | ||
| case class FileSourceWriteDesc( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this might be used/subclassed by user, we may just use a normal class.
| " dynamicPartitionOverwrite)") | ||
| instantiate(className, jobId, outputPath, dynamicPartitionOverwrite) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kind of, but I think we can also add dynamicPartitionOverwrite to fileSourceWriteDesc.
For user defined FileCommitProtocol class, We can extract fileSourceWriteDesc and pass it to the old instantiate method.
| context.getConfiguration.set(FileOutputFormat.OUTDIR, stagingOutputPath.toString) | ||
| logWarning("Set file output committer algorithm version to 2 implicitly," + | ||
| " for that the task output would be committed to staging output path firstly," + | ||
| " which is equivalent to algorithm 1.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
15843 is a lot, however, it would be not that much inside one spark application.
One way to solve this, is to use an object level counter to only log the first warning log(or logs).
But I am not sure if that's worth it. Also, the head of logs may get rotated and discarded...
Or use logDebug is fine, but normally user won't set log level to DEBUG.
I am not sure which one is better. It's up to you then.
...ain/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
Show resolved
Hide resolved
| checkAnswer(spark.sql("select a, b from t where b = 1"), df1) | ||
| checkAnswer(spark.sql("select a, b from t where b = 2"), df2) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, that's a limitation of DataFrameWriter, we may need to extend DataFrameWriter to support that.
But currently, i think we can simply use the SQL syntax since we can use spark.sql and get the same behaviour.
advancedxy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late reply.
Also, the comments should be updated.
spark/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
Lines 34 to 37 in 8098a8f
| * 2. Implementations should have a constructor with 2 or 3 arguments: | |
| * (jobId: String, path: String) or | |
| * (jobId: String, path: String, dynamicPartitionOverwrite: Boolean) | |
| * 3. A committer should not be reused across multiple Spark jobs. |
|
refactor the code. |
|
retest this please. |
|
Test build #111569 has finished for PR 25863 at commit
|
|
This scares me. The
You are proposing using reflection to get at private superclass methods that are not part of the public API, where we aren't even going to be aware of the fact that they are being used this way and have no guarantees of stability whatsoever. And of course, for anything which implement the new superclass, Overall then doing things during commit operations for which it is going to be very hard to prove correctness -and vulnerable to people doing things in the MR codebase we could fundamentally break things, either a link time, or worse, in the semantics of the operations you are trying to perform. I don't see an easy solution here, but as someone with write access to the MR codebase, I'm happy to discuss how we could make things easier. For example, someone could actually pull out the In ideal world, how we commit should be opaque to the caller. You commit a task and its work is promoted into the set of committed tasks work, or you don't get a response from the task's process, in which case you can safely commit another task, confident that irrespective of what has gone wrong with the task which failed/partitioned/hung only one tasks output will ever become visible -and after the job has been committed or aborted the output of a partitioned task will never appear. V2 doesn't offer those guarantees, which is why I consider it broken. One thing I have thought about adding to |
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
Outdated
Show resolved
Hide resolved
...ain/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
Show resolved
Hide resolved
|
I've looked at the code a bit more. As I noted earlier, this scares me. in the FileOutputCommitter, the FS itself is synchronization point, with assumptions about atomicity and performance implicitly implemented in the code. The application which use the committer have their own assumptions about atomicity and performance which are derived transitively from those of the file system and then extended by assumptions about the correctness of the algorithms. Things are bad enough as they are. I am not convinced that relying on the internals of FileOutputCommitter Versions is the safe way to do this. I think you better off specifying a commit protocol which is explicitly designed for writing files into the destination, and then implementing it. For S3A, knowing the destination path lets is initiate but not complete the upload to the final path; we would then propagate the information needed to manifest that file to the job committer. The "Magic" committer does exactly this by recognising that when someone writes to That is very much black magic in the FS connector. Having a commit protol where you could ask the committer for an output path to use when writing to a specific destination we let you eliminate that trick completely and possibly help with this problem. The other thing to consider is that spark permits committed tasks to pass serialized data back to the driver, as an alternative to relying on the file system. Scale issues notwithstanding, task committers should be able to provide the information needed to include the task in the jobs final output. For example, rather have task commit renaming files, it could just return a path too where it has stored its list of files to commit. Job commit becomes a matter of loading those files and moving the output into the final destination. Again, this is essentially what we do with the S3A committer -we just save, propagate and reload those files within the committer. This all gets complicated fast -even without worrying about concurrent jobs. But trying to use |
|
Thanks @steveloughran |
|
Test build #112119 has finished for PR 25863 at commit
|
…duplicate result and support concurrent file source write operations write to different partitions in the same table.
add concurrent ut extract doInsertion and set logDebug and modify comment
fac7b7b to
fa66a5b
Compare
|
Test build #114890 has finished for PR 25863 at commit
|
|
Test build #114891 has finished for PR 25863 at commit
|
|
Test build #114901 has finished for PR 25863 at commit
|
|
Test build #115045 has finished for PR 25863 at commit
|
What changes were proposed in this pull request?
For InsertIntoHadoopFsRelation operations.
Case A:
Application appA insert overwrite table table_a with static partition overwrite.
But it was killed when committing tasks, because one task is hang.
And parts of its committed tasks output is kept under /path/table_a/_temporary/0/.
Then we rerun appA. It will reuse the staging dir /path/table_a/_temporary/0/.
It executes successfully.
But it also commit the data reminded by killed application to destination dir.
Case B:
Application appA insert overwrite table table_a.
Application appB insert overwrite table table_a, too.
They execute concurrently, and they may all use
/path/table_a/_temporary/0/as workPath.And their result may be corruptted.
In this PR, we set a staging output path for InsertIntoHadoopFsRelation operation.
The output path is a multi level path and is composed of specified partition key-value pairs formatted
.spark-staging-${depth}/p1=v1/p2=v2/.../pn=vn.We can detect the conflict by checking the existence of relative staging output path.
For example:
table_a(c1 string, a string, b string,...) partitioned by (a, b , ...)
When writing to partition
a=1, we need to check the existence of...
When we found relative staging output path is existed, an exception would be thrown.
Thanks @advancedxy for providing the prototype of solution and valuable suggestions.
Why are the changes needed?
Data may be corrupted without this PR.
Does this PR introduce any user-facing change?
User can see the exists of staging output path and may need clean up these path manually, when this path is belong to a killed application and has not been cleaned up gracefully.
How was this patch tested?
Added unit test.