-
Notifications
You must be signed in to change notification settings - Fork 3k
Core: rewrite should drop delete files by data sequence number partition wise #9454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| table, dataFile.partition(), dataFile.path().toString(), outputDeleteFiles); | ||
| } | ||
|
|
||
| private List<DeleteFile> writePosDeletes( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the same folder, we have some code to write the position and equality delete writer code. Can it be moved to a util class and reused here?
Lines 979 to 1036 in 4d34398
| private DeleteFile newDeleteFile(Table table, String partitionPath) { | |
| return FileMetadata.deleteFileBuilder(table.spec()) | |
| .ofPositionDeletes() | |
| .withPath("/path/to/pos-deletes-" + UUID.randomUUID() + ".parquet") | |
| .withFileSizeInBytes(5) | |
| .withPartitionPath(partitionPath) | |
| .withRecordCount(1) | |
| .build(); | |
| } | |
| private List<Pair<CharSequence, Long>> generatePosDeletes(String predicate) { | |
| List<Row> rows = | |
| spark | |
| .read() | |
| .format("iceberg") | |
| .load(tableLocation) | |
| .selectExpr("_file", "_pos") | |
| .where(predicate) | |
| .collectAsList(); | |
| List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(); | |
| for (Row row : rows) { | |
| deletes.add(Pair.of(row.getString(0), row.getLong(1))); | |
| } | |
| return deletes; | |
| } | |
| private Pair<DeleteFile, CharSequenceSet> writePosDeletes( | |
| Table table, List<Pair<CharSequence, Long>> deletes) throws IOException { | |
| return writePosDeletes(table, null, deletes); | |
| } | |
| private Pair<DeleteFile, CharSequenceSet> writePosDeletes( | |
| Table table, StructLike partition, List<Pair<CharSequence, Long>> deletes) | |
| throws IOException { | |
| OutputFile outputFile = Files.localOutput(File.createTempFile("junit", null, temp.toFile())); | |
| return FileHelpers.writeDeleteFile(table, outputFile, partition, deletes); | |
| } | |
| private DeleteFile writeEqDeletes(Table table, String key, Object... values) throws IOException { | |
| return writeEqDeletes(table, null, key, values); | |
| } | |
| private DeleteFile writeEqDeletes(Table table, StructLike partition, String key, Object... values) | |
| throws IOException { | |
| List<Record> deletes = Lists.newArrayList(); | |
| Schema deleteSchema = table.schema().select(key); | |
| Record delete = GenericRecord.create(deleteSchema); | |
| for (Object value : values) { | |
| deletes.add(delete.copy(key, value)); | |
| } | |
| OutputFile outputFile = Files.localOutput(File.createTempFile("junit", null, temp.toFile())); | |
| return FileHelpers.writeDeleteFile(table, outputFile, partition, deletes, deleteSchema); | |
| } |
I am just worried that we may be duplicating the test code for these functionalities.
| cleanUncommitted(SnapshotProducer.EMPTY_SET); | ||
| } | ||
|
|
||
| private void recordPartitionMinDataSequenceNumber(ManifestFile manifest) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The existing approach only looks at manifest metadata to understand the min data sequence number across all partitions. This is really cheap as we don't have to open manifests (which can be a really expensive operation). That leads to the problem if one partition is significantly behind, it prevents garbage collection of delete files in other partitions. We have solved that for position deletes via the rewritePositionDeletes action but it still remains open for equality deletes.
I am not convinced opening these manifests during commits is a good idea. Can we explore the option of leveraging the partition stats spec added recently? We are still building an action to generate those stats but let's think through whether it can help us. One option can be to check if the partition stats file is present and use it populate the min data sequence numbers, opening just a single Parquet file vs potentially tons of manifests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts, @szehon-ho @ajantha-bhat @zinking?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we explore the option of leveraging the partition stats spec added
Wow. That sounds like a nice usecase 👍
For each partition, we do keep the snapshot id that last updated that partition. Using the snapshot id we can extract the data sequence numbers from the snapshot.
@zinking: The current status of the partition stats project can be tracked from this: #8450
Another alternative approach is to convert equality delete to position delete (this work is pending), So we can reuse the rewritePositionDeletes. But it is a long route.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with above. I think there were some other attempts to do this before too, but the concern here is that you dont want to do a lot of things in the commit critical path (here potentially opening an unlimited number of manifest files). Yea if its something cheaper to do like reading one partition stats file, it may be better. Also yes the plan has always been to implement convert eq-delete to pos deletes (which can then be cleaned up by rewritePositionDeletes), though not sure if any progress is being made there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree that adding this on the general write path is too heavy, so that's why I prefer it enabled during rewrite, or probably just some of the rewrites.
on the other hand, it sounds reasonable to track this on partition metadata, but there has to be somewhere to calculate it anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what Szehon meant is to call the cleanup action from the finally block of rewrite action. Not expose as a new action for cleanup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll need to think about this a bit more but I do like the idea of using partition stats in one or another way. I'll get back next week.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea for me @zinking raised the good question about using partition stats. Its optional so if the user hasn't analyzed the table, it will be different behavior of whether dangling deletes are removed or not. Which may not be so obvious to users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I think we all agree that we should use partition stats if they are available and read manifests otherwise. We may think about extending our regular writes to check if there is a partition stats file available and drop the delete files per partition rather than globally, like it is done today. We shouldn't open manifests during writes. We can only do that in a distributed fashion, meaning it has to be part of an action. There we either can add a new action or integrate this logic into the existing action.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szehon-ho and I talked a little bit about this offline. I think we can try extending the action for rewriting data files to also attempt to remove dangling deletes in partitions that were successfully compacted. Separately, we may integrate the cleanup using partition stats during regular commits under a flag (off by default).
|
As is discussed, this approach burdens the write path with all partition checks, the preferred approach is using an async process. |
[issue] (#9383)
Currently when determine whether the delete file should be deleted, the delete file sequence number is compared against a data file min sequence number table wise. worst case retaining all delete files.
It is observed in production tables , position delete files within partition could not be deleted although it cannot act on any of the data file within that partition.
Proposed solution is to record a partition level min data sequence number instead of table wise, and use it to determine whether that delete file is expired.