-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark 3.3: Add RemoveDanglingDeletes action #6581
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
523408f to
8f59b01
Compare
8f59b01 to
88a05db
Compare
| .toDF("partition", "spec_id", "min_data_sequence_number"); | ||
|
|
||
| // Dangling position delete files | ||
| Column joinCond = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't reproduce it on my own machine, but on build machine I get the error if I try the cleaner: Dataset.join(ds, Seq columns)
error: no suitable method found for join(Dataset<Row>,Buffer<String>) see: https://github.com/apache/iceberg/actions/runs/3913210764/jobs/6688831726
So I make an explicit join condition, like RewriteManifestFileSparkAction
| import org.apache.iceberg.DeleteFile; | ||
|
|
||
| /** | ||
| * An action that removes dangling delete files from the current snapshot. A delete file is dangling |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need a note that this removes delete files only if they don't apply to any non-expired datafile. Just to make it clear that this isn't just about "live" delete files
| * <p>The following dangling delete files are removed: | ||
| * | ||
| * <ul> | ||
| * <li>Position delete files with a sequence number less than that of any data file in the same |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more of a technical detail right? Not sure we need it in the java doc
|
|
||
| public class RemoveDanglingDeleteFilesActionResult implements RemoveDanglingDeleteFiles.Result { | ||
|
|
||
| private static final RemoveDanglingDeleteFilesActionResult EMPTY = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this? Seems like in the code we can just add
new RemoveDanglingDeleteFileActionResult(Collections.emptyList()) Doesn't seem like we really are making that many of these objects
| if (useCaching) { | ||
| reusableDS = ds.cache(); | ||
| } else { | ||
| int parallelism = SQLConf.get().numShufflePartitions(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit internal to spark, probably fine but i'm not a fan of touching this over spark.conf()
| } else { | ||
| int parallelism = SQLConf.get().numShufflePartitions(); | ||
| reusableDS = | ||
| ds.repartition(parallelism).map((MapFunction<T, T>) value -> value, ds.exprEnc()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we repartition here? (and encode)
|
Chatting with @aokolnychyi , @RussellSpitzer , to do an analysis on when this can be useful. There will be two types of operations that can remove delete files:
Analysis: RemoveDanglingDeleteFiles is cheaper and simpler, and can work across both types of files out of the box. However, to get it to exactly work, we need the following conditions: RewriteDataFiles being run with:
Note RemoveDanglingDeleteFiles can still remove some delete files if these conditions are not met, but just it may not do so for all dangling delete files, because an old data file (one with a low sequence number) not rewritten in a partition will prevent delete files from getting removed. So Im open to whether there is a good use-case of this. One idea is to bundle this with RewriteDataFiles, and if trigger optimistically if these conditions are met, or trigger in any case in hopes it will remove delete files as, as its relatively cheap. Otherwise, the complete solution (all to be developed) would be: |
| "data_file.spec_id as spec_id", | ||
| "data_file.file_path as file_path", | ||
| "data_file.content as content", | ||
| "data_file.file_size_in_bytes as file_size_in_bytes", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry if it's a naive question, do we need to project file_size_in_bytes for this action?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nvm, we need to serialize the result to a delete file so it's needed.
|
|
||
| /** The action result that contains a summary of the execution. */ | ||
| interface Result { | ||
| /** Removes representation of removed delete files. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this comment just be "List of removed delete files"?
| for (int i = 0; i < partitionRow.length(); i++) { | ||
| partition.set(i, partitionRow.get(i)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style nit: newline after the loop
|
@szehon-ho Thanks for the detailed analysis. On the surface it does make sense to combine with existing compaction mechanisms like RewriteDataFiles if the metadata only RemoveDanglingDeleteFiles is cheap and especially if many users are already running RewriteDataFiles periodically anyways. Is there a case we're missing where users would just want to run remove dangling delete files separately? I can't really think of a case. |
| } | ||
|
|
||
| String desc = String.format("Remove dangling delete files for %s", table.name()); | ||
| JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor] should we also mention it's rewriting manifest post removing danling delete files in the name ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.
| Dataset<Row> minDataSeqNumberPerPartition = | ||
| entries | ||
| .filter("content == 0") // data files | ||
| .groupBy("partition", "spec_id") | ||
| .agg(min("sequence_number")) | ||
| .toDF("partition", "spec_id", "min_data_sequence_number"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we cache this df this as well, since we need this in two joins below ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I am not so sure, it seems cache is a bit controversial. I think the general approach in Iceberg spark action is not to use cache?
@szehon-ho Have you considered |
|
Thanks all for reviews. After thinking, this will be good to have, but as the complete 100% position delete removal will be done via minor compaction of delete files (first part of the effort is here: #6365, more to come). Will work on that first. @amogh-jahagirdar yea I initially thought it would be useful as a standalone action, but some chats with @aokolnychyi and maybe its a bit too tricky for user to know when to run it. Still open though. @manuzhang do you mean integrating this commit with existing partial commits? Initially envisoning a separate commit altogether at the end of RewriteDataFiles. |
|
@szehon-ho yes, integrating removing delete files per partition with partial commits. When new position delete files are generated during RewriteDataFiles with partial commits, all following commits will fail and no delete files will be removed at the end. |
|
Is this action currently available? Is there any usage documentation. |
We have already applied the v2 table to production. It is too uncomfortable to delete the delete file when the snapshot expires. When will this Pr be merged into the master. |
|
I am +1 for integrating these into existing Rewrite Action. |
|
Hi, can you guys please check #7389 , it is already merged. It is a way to do it manually for now (and also optimize position delete), we can come back later to integrate this action into rewriteDataFiles automatically if we need. There's also some ongoing post-pr improvements thats also in review like #7582 and #7572 |
|
|
||
| return builder | ||
| .withPath(path) | ||
| .withPartition(partition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the partition data here doesn't comply with the builder spec, so it will build out incorrect partition data.
is it a bit of overwork to rebuild the deleteFile, cant we just delete using paths? a lot of overhead could be avoided.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
usually the copy needs to assert the specs are the same
public Builder copy(DeleteFile toCopy) {
if (isPartitioned) {
Preconditions.checkState(
specId == toCopy.specId(), "Cannot copy a DeleteFile with a different spec");
this.partitionData = DataFiles.copyPartitionData(spec, toCopy.partition(), partitionData);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be great if we can delete a deleteFile by filePaths only, but from mergingSnapshotProducer, looks like we never make it to work for path based deletion
iceberg/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Lines 193 to 202 in d32abe8
| protected void delete(DeleteFile file) { | |
| deleteFilterManager.delete(file); | |
| } | |
| /** Add a specific data path to be deleted in the new snapshot. */ | |
| protected void delete(CharSequence path) { | |
| // this is an old call that never worked for delete files and can only be used to remove data | |
| // files. | |
| filterManager.delete(path); | |
| } |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
This adds an action to cleanup dangling (invalid) DeleteFiles that may otherwise keep getting carried over with the table's current snapshot and which may negatively impact read performance.
The problem and design doc is here: https://docs.google.com/document/d/11d-cIUR_89kRsMmWnEoxXGZCvp7L4TUmPJqUC60zB5M/edit#
In a nutshell, the current table-wide mechanism is crude and may miss many instances of aging off DeleteFiles, even after they become invalid after compaction. This implements a spark action to perform a partition-by-partition removal using the same rules.