-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-33298][CORE] Decouple file naming from FileCommitProtocol #32881
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @cloud-fan could you help take a look when you have time? |
|
Test build #139693 has finished for PR 32881 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #139777 has finished for PR 32881 at commit
|
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #139796 has finished for PR 32881 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #139805 has finished for PR 32881 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to understand more about backward compatibility. If there is an old impl of file commit protocol, in Spark 3.2 the impl will commit nothing, and this silent "noop" is even worse than an explicit compile error.
This is a developer API, if there is really no way to keep backward compatibility, let's just break the API and force people to correct their impl according to the new API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - I think it's impossible for me to keep backward compatibility cleanly. Updated here to just introduce new method and break the API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need to allow users to customize the file naming?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - yes I think so. Before this PR, file naming is part of commit protocol, and each commit protocol checked in Spark code base, has their own specification for naming - HadoopMapReduceCommitProtocol, PathOutputCommitProtocol and ManifestFileCommitProtocol. So the external commit protocol we should expect they might already have their custom way of naming, and we should allow them to implement their own naming protocol.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that the file naming shouldn't be part of commit protocol, but I think the callers in Spark should decide the file name, like file format writer, or something similar for hive tables, not users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated per discussion offline. Keep naming protocol internal and not user-facing via config.
…ated as PathOutputCommitProtocol depends on it
|
Test build #139958 has finished for PR 32881 at commit
|
|
Test build #139959 has finished for PR 32881 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test starting |
|
Test build #139964 has finished for PR 32881 at commit
|
|
Kubernetes integration test status success |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Test build #139968 has finished for PR 32881 at commit
|
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Test build #139970 has finished for PR 32881 at commit
|
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
Co-authored-by: Wenchen Fan <[email protected]>
|
@cloud-fan - thank you for offline discussion and update the PR to use the discussed approach. Thanks. |
|
Test build #139985 has finished for PR 32881 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #140012 has finished for PR 32881 at commit
|
| } | ||
|
|
||
| def addFilesWithAbsolutePathUnsupportedError(commitProtocol: String): Throwable = { | ||
| def addFilesWithAbsolutePathUnsupportedError(protocol: String): Throwable = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - sorry, this does not need. Was calling it from naming protocol as well in one iteration. Will change.
| plan: SparkPlan, | ||
| fileFormat: FileFormat, | ||
| committer: FileCommitProtocol, | ||
| protocols: (FileCommitProtocol, FileNamingProtocol), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not pass 2 parameter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my side, whenever I add the 11th parameter of a scala method, intellij will mark it as a lint error. Do we have # of parameter rule in Spark?
| */ | ||
| final case class FileContext( | ||
| ext: String, | ||
| relativeDir: Option[String], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not very sure about it. It seems more clear to me to let FileNamingProtocol only generate filename, and the caller side should construct the proper relative path w.r.t. the partition dir.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same to ext and prefix: can we let the caller side to pretend/append ext/prefix to the generated file name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the information in FileContext is not something that the impl can customize: the generated file name must have ext and the end, prefix at the beginning, and relativeDir as the parent dir. Then it's better to let the caller side to guarantee it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And the API can be simply named getTaskFileName.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API is only to abstract the naming differences between batch and streaming file writing. If that's not necessary, maybe we can remove this abstraction entirely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jobId is generated with UUID as well, I don't see why streaming write needs to generate a new UUID per file, instead of using job id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm cleaning this up in #33002
| val split = taskContext.getTaskAttemptID.getTaskID.getId | ||
| val uuid = UUID.randomUUID.toString | ||
| val ext = fileContext.ext | ||
| val filename = f"part-$split%05d-$uuid$ext" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we fail here if prefix is not None? or support prefix here?
| */ | ||
| def newTaskTempFileAbsPath( | ||
| taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String | ||
| taskContext: TaskAttemptContext, relativePath: String, finalPath: String): String |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: I can see that, this API makes the code simpler, but it makes the semantic a bit more complicated. What if the final path doesn't have the same file name as the relativePath? Maybe it's better to have fileName: String, targetDir: String. Then the semantic is pretty here: the impl should commit the new file to the target dir.
|
Update: we decided to go with #33012 instead of this PR, as we know some other projects ( |
What changes were proposed in this pull request?
This PR is to propose to decouple file naming functionality from
FileCommitProtocol. CurrentlyFileCommitProtocolmainly does three things:A
FileCommitProtocolshould cover first two functionalities, but not the 3rd (naming output file). The file commit protocol (by its name and design) should take care of committing output (e.g. rename file and directory), but it should not control what's the file name as well. It should leave caller the flexibility to specify file names (e.g. Hive/Presto/Trino compatible bucket file name is different from what Spark has now - #30003). So here the PR is to decouple the file naming fromFileCommitProtocol.The changes are:
FileNamingProtocolto specify how to do output file naming. Add implementationBatchFileNamingProtocolfor batch queries andStreamingFileNamingProtocolfor streaming queries.newTaskTempFileandnewTaskTempFileAbsPathinFileCommitProtocolto allow commit protocol to be notified when a new task output file is added. The input is relative file name/path, and output is the full file path.FileFormatDataWriterto callFileNamingProtocolto get relative file path andFileCommitProtocolto get full file path.Why are the changes needed?
To make commit protocol clearer and allow future flexibility to specify Spark output file name.
Pre-requisite of #30003.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing unit test e.g.
DataFrameReaderWriterSuite.scalaandInsertSuite.scalaWill add more unit tests if required. This PR anyway is a code refactoring.