-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-27194][SPARK-29302][SQL] Fix commit collision in dynamic partition overwrite mode #29000
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27194][SPARK-29302][SQL] Fix commit collision in dynamic partition overwrite mode #29000
Conversation
|
gentle ping @cloud-fan @dongjoon-hyun @xuanyuanking @turboFei @LuciferYang @advancedxy |
|
ok to test |
|
Test build #124977 has finished for PR 29000 at commit
|
a4d99d0 to
6921f22
Compare
|
Test build #124990 has finished for PR 29000 at commit
|
| val stagingDir = new File(d, ".spark-staging-jobId") | ||
| stagingDir.mkdirs() | ||
| val conflictTaskFile = new File(stagingDir, "part-00000-jobId-c000.snappy.parquet") | ||
| conflictTaskFile.createNewFile() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, the ut is wrong, I have fixed it.
val stagingDir = new File(d, ".spark-staging-jobId")
stagingDir.mkdirs()
val stagingPartDir = new File(stagingDir, "p1=2")
stagingPartDir.mkdirs()
val conflictTaskFile = new File(stagingPartDir, "part-00000-jobId.c000.snappy.parquet")
conflictTaskFile.createNewFile()
I also recreated another PR, #28989,
In this Pr, I define a Spark staging output committer to leverage OutputCommitCoordinator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for reminding, this UT is based on your pr #26339 , I'll correct it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@advancedxy thanks
6921f22 to
9b4f8f3
Compare
|
Test build #125006 has finished for PR 29000 at commit
|
|
··· |
|
Just left some comments. This PR did resolve the issue, it also involve some costs. I prefer #28989, in this PR, I define a Spark staging output committer based on the current implementation of HadoopMapReduceCommitProtocol. |
|
@turboFei Thanks for your comments. Actually I think there is no partition-explosion cost. Commit task output dir is generated by |
|
Thanks for your reply @WinkerDu I am wrong about that, In #28989 I thought the taskAttemptContext.getTaskAttemptId.getId is same with the taskAttemptId of spark and it would at most create several(the largest task attempt number) staging partition dir. |
|
Thanks for your reply @turboFei |
|
ok to test |
|
Test build #125501 has finished for PR 29000 at commit
|
|
retest this please @cloud-fan @dongjoon-hyun |
|
gentle ping @cloud-fan @dongjoon-hyun @SparkQA to retest this pr |
You can push a new commit(like two commits, the second one revert the first one) to trigger jenkins job. |
thanks for your advice, will try it :) |
|
Test build #126035 has finished for PR 29000 at commit
|
|
gentle ping @dongjoon-hyun @cloud-fan @xuanyuanking to review this pr :) |
xuanyuanking
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The root cause of this issue is that the speculative task and normal task sharing the final output dir in dynamic overwrite mode. Please emphasize this in the PR description.
| this.stagingDir | ||
| // For FileOutputCommitter it has its own staging path called "work path". | ||
| case f: FileOutputCommitter => | ||
| handleDynamicPartitionOverwrite(dir) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we changed the behavior, please also update the comment: https://github.com/apache/spark/pull/29000/files#diff-d97cfb5711116287a7655f32cd5675cbR43
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
| new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path)) | ||
| case _ => new Path(path) | ||
| case _ => | ||
| handleDynamicPartitionOverwrite(dir) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If both case branches need to call handleDynamicPartitionOverwrite, then we can call it outside case match?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this pr leverages FileOutputCommitter to deal with commit collision, only handle dynamic partition overwrite in the first branch.
BTW, it seems all committers practically used in Spark are derived from 'FIleOutputCommiter'
| } | ||
|
|
||
| // For SPARK-27194 unit test, we try to set constant jobId carried by options | ||
| val jobId = options.getOrElse("test.jobId", java.util.UUID.randomUUID().toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we try to reproduce the file collision without adding this extra option?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, I'll try to pre-create a partition file with a customized file commit protocol to keep this code clean
|
Test build #126618 has finished for PR 29000 at commit
|
|
Test build #126621 has finished for PR 29000 at commit
|
|
Test build #126629 has finished for PR 29000 at commit
|
|
Kubernetes integration test status failure |
|
Test build #131350 has finished for PR 29000 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #131363 has finished for PR 29000 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #131368 has finished for PR 29000 at commit
|
|
retest this please |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #131552 has finished for PR 29000 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #131560 has finished for PR 29000 at commit
|
|
Test build #131607 has finished for PR 29000 at commit
|
|
Test build #131634 has finished for PR 29000 at commit
|
45f8ea5 to
85aa12a
Compare
|
Test build #131682 has finished for PR 29000 at commit
|
|
GA passed, merging to master! |
|
@WinkerDu do you have a JIRA account? |
|
@cloud-fan yes, I have a JIRA account named 'duripeng' |
|
thank all for patch review! |
|
Congrats on your first contribution, Ripeng! :) |
|
Congrats on your first contribution, Ripeng! :) +1 |
| * {appAttemptId}/{taskId}/a=1/b=1, | ||
| * then move them to | ||
| * /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1. | ||
| * 2. When [[FileOutputCommitter]] algorithm version set to 2, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this isn't the normal behavior of the algorithm version 2, right? Normally it writes the task files directly to the final output location. The whole point of algorithm 2 is to prevent all of the extra moves on the driver at the end of the job. For large jobs this time can be huge. I'm not sure the benefit here of algorithm 2 because that is all happening distributed on each task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
v2 isn't safe in the presence of failures during task commit; at least here if the entire job fails then, provided job ids are unique, the output doesn't become visible. it is essentially a second attempt at the v1 rename algorithm with (hopefully) smaller output datasets.
What changes were proposed in this pull request?
When using dynamic partition overwrite, each task has its working dir under staging dir like
stagingDir/.spark-staging-{jobId}, each task commits tooutputPath/.spark-staging-{jobId}/{partitionId}/part-{taskId}-{jobId}{ext}.When speculation enable, multiple task attempts would be setup for one task, they have same task id and they would commit to same file concurrently. Due to host done or node preemption, the partly-committed files aren't cleaned up, a FileAlreadyExistsException would be raised in this situation, resulting in job failure.
I don't try to change task commit process for dynamic partition overwrite, like adding attempt id to task working dir for each attempts and committing to final output dir via a new outputCommitCoordinator, here is reason:
FileOutputCommitteralready has commit coordinator for each task attempts, we can leverage it rather than build a new one.FileAlreadyExistsExceptionrisk still existsIn this pr, I leverage FileOutputCommitter to solve the problem:
outputPath/.spark-staging-{jobId}as the output diroutputPath/.spark-staging-{jobId}/_temporary/${appAttemptId}/_temporary/${taskAttemptId}/{partitionId}/part-{taskId}-{jobId}{ext}FileOutputCommittercoordinator, write job firstly commits output tooutputPath/.spark-staging-{jobId}/{partitionId}outputPath/.spark-staging-{jobId}/{partitionId}tooutputPath/{partitionId}Why are the changes needed?
Without this pr, dynamic partition overwrite would fail
Does this PR introduce any user-facing change?
No.
How was this patch tested?
added UT.