-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: Incrementally rewrite data files in streaming. #3323
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I just finish the streaming rewrite function, and I will complete the unittest in these few days. Maybe you can take a quick look and leave some comments, I will keep improve this PR in this week. @rdblue @openinx @stevenzwu @jackye1995 @kbendick :) |
|
I this PR is ok, could you take a look of this? 😄 @rdblue @openinx @stevenzwu @jackye1995 @kbendick |
flink/src/main/java/org/apache/iceberg/flink/sink/CommitResult.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/sink/DataFileGroup.java
Outdated
Show resolved
Hide resolved
flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
Outdated
Show resolved
Hide resolved
flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
Outdated
Show resolved
Hide resolved
|
@Reo-LEI would you mind to fix this conflicts ? I will plan to go through the whole PR, thanks for the contribution ! |
|
had a lot of thinking around this PR, I decide to write down in a doc, @openinx @rdblue @stevenzwu please let me know what you think! https://docs.google.com/document/d/18N-xwZasXLNEl2407xT1oD08Mv9Kd3p0xMX7ZJFyV20/edit?usp=sharing |
|
@openinx Thanks for your review! I have been resolve the conflict, please go ahead. 😄 |
|
I‘m very grateful to @jackye1995 for his excellent work. I think Jack summarized this PR from the background, implementation, benefits very well in this document. And Jack proposed two approach in the document to improve the defects of this PR in CDC case. I would like to make some additions and summaries to this document, and continue to discuss the approach about how to deal with the equality deletes compaction in CDC case at here. @rdblue @openinx At first, I want to discuss the effectiveness of current implementation of streaming rewrite(apply delta deletes to delta data). We can measure this effectiveness from the type of data stream and whether the table partition type. For the data stream type, we can be divided into append stream and update stream(include upsert and retract/cdc stream). For the table partition type, we can devided into partitioned and unpartitioned, and partitioned can be further divided into partitions that contain time and partitions that do not contain time. For effectiveness, we use
For append stream, streaming rewrite can work well in all partition type because there are no delete files and we only need to bin pack data files. For update stream(upsert and retract stream), streaming rewrite can work well with table of partitioned with time. Because the result will become certain over time, such as we have an day and hour partition table and T partition result will be certain in T+1 moment, so we can rewrite T partition after T. However, streaming rewrite can not work well with table of partitioned without time, as Jack mentioned in the doc, the high seqNum equality delete file still in storage and when we read the table, they will still apply to the low seqNum data files even if this data file has been rewritten before. So, I agree with Jack that we should first find a way to deal with the remaining equality deletes. And then, I want to discuss the approach about how to deal with the remaining equality deletes. Jack list two approach in the doc. The approach A is gathering the equality delete files and then perform a table scan to rewrite the affected data files. And the approach B is to convert the equality deletes to position deletes and then perform a table scan to apply the position deletes and rewrite the affected data files. Both approach require extra table scan and data rewrite, I think these approach is work, but will be complicated and we need to reconsider the cost and benefit. Finally, I think we can have approach C that we can simply rewrite the delta equality delete files to the new one and replace them by |
|
@jackye1995, thanks for taking the time to write up your thoughts. That doc is really helpful. I had previously been thinking about this mostly in terms of an append stream and v1 tables. Like @Reo-LEI notes, inline compaction for append streams is safe (for v1 tables) and is a good idea if you want frequent checkpoints but don't want a ton of small files. For v1 tables, we know that there won't be equality or position delete files, so we can easily compact. For v2 tables, append streams are pretty much like CDC streams because we may have concurrent writers adding equality or position delete files. That brings us back to Jack's points about adapting this idea to CDC streams. First, equality deletes create sequence number boundaries that make it difficult to compact. But I think there are still some use cases where this is valuable even if we only plan on compacting within a sequence number. Write tasks necessarily align with partitions, so compacting across tasks may still be valuable. For example, CDC writes to an unpartitioned table from multiple tasks will create a data file per task (per checkpoint) that is probably not full size. While there may be multiple files per partition, I think that the larger use case is compacting across checkpoints and that will require addressing compaction across sequence numbers. Let's assume that we have a fairly regular CDC stream for a bucketed table so that the commit for each bucket has one equality delete and one data file per checkpoint, and optionally a position delete file for the data file. That's the worst case that Jack describes, where there is nothing to compact within each sequence number except position deletes. Here are the files committed by sequence number for just one bucket (bucket_id = 0):
I think that we actually can do some compaction in this situation. Say we want to compact
The position delete files can be removed because they only reference data-103.parquet and data-106.parquet. The equality deletes must remain in the table in case they deleted data in other files. The new compacted data file should be data-107-compacted.parquet and should be committed at sequence number 107 so that future equality deletes are still applied correctly. Another thing to keep in mind is that we may have equality and position deletes coming from concurrent writers. But in this case all we would need to do is update the task emitting committed data files to emit all delete files, even those from concurrent writers. For example, maybe sequence number 5 was written by a concurrent commit. Then the delete files for that commit, eq-105.parquet, should be used. But the data file should not be part of the compaction (it would be compacted by a parallel Flink job). I think that this compaction strategy would address Jack's concern and is a safe way to compact across sequence numbers with equality deletes. The key idea is that we leave the equality delete files in the table so we don't have to worry about existing data files that aren't part of the stream we are compacting. I think that what I'm proposing is similar to @Reo-LEI's comment, except that it limits the scope of the compaction differently. Using time-based partitions would make us reasonably confident that the compaction is targeted, but the drawback is that we would still need to plan a scan, and it may still be impacted by concurrent deletes. But my strategy of keeping the compaction scoped to just files written by one Flink job and incrementally including delete files is a clean way to handle the issues and ensure that compaction won't conflict with one another. |
@rdblue If there is too much equality delete files will cause memory pressure when read table. |
…rite # Conflicts: # core/src/main/java/org/apache/iceberg/BaseSnapshot.java
…y StructLikeWrapper.
# Conflicts: # core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
|
I finish the reactor of this PR according the comment of @rdblue and have been updated the description of this PR. Briefly explain the change, I add the I test this in our prod env and has been running for a while, and that is work for v1 and v2 table. And for the time-based partitioned v1 table, we can use streaming rewrite to replace the batct rewrite action. @rdblue @jackye1995 @openinx @stevenzwu @kbendick Could you please take another look when you free? |
|
a quick question, can this pr be used in flink1.13.1? |
I have been port this featuer to flink 1.13 in branch ‘flink-streaming-rewrite-for-flink0.13’ before, but there have some conflicts need to be resloved, maybe you can use that. @Initial-neko |
|
Hello, may I ask if the rewrite is asynchronous or synchronous |
The rewrite is performed asynchronously. @lurnagao |
Thank you so much! |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
Description
This PR is base on the approach of @rdblue comment and trying to incrementally rewrite committed data files.
Implementation
In this PR, the parallel
IcebergStreamRewriterand single-parallelismIcebergRewriteFilesCommitterwill append toIcebergFilesCommitter.At the beginning, the committed data files and related delete files will be pack as
CommitResultwhich has sequence number and snapshot id and send toIcebergRewriteTaskEmitter.The
IcebergRewriteTaskEmitterreceive commit results and group data files and delete files by partition. And then each partitioned file groups will be packaged into one or moreRewriteFileGroupaccording theStreamingBinPackStrategy. Once aRewriteFileGroupreach a rewrite condition(file number / file size), it will be split and combine to one or moreCombinedScanTaskand then emitted asRewriteTask.IcebergStreamRewriterwill simply rewrite each receivedRewriteTaskand emit the added data files and the rewritten data files asRewriteResult.IcebergRewriteFilesCommitterwill collect allRewriteResultand group rewrite results by starting snapshot id and partition. the grouped rewrite results will be committed in next checkpoint.Note
IcebergRewriteTaskEmitterandIcebergRewriteFilesCommitterparallelism will be always 1, andIcebergStreamRewriterparallelism is configurable.IcebergRewriteTaskEmitterwill compare the received snapshot id of commit result with the last received snapshot id. If the last received snapshot id is not the parent of the received snapshot id, that is mean there are concurrent writers commit other snapshot to this table. And theIcebergRewriteTaskEmitterwill collect all delete files between the last received snapshot and received snapshot toRewriteFileGroupto avoid missing any equality delete files when doing compaction.IcebergRewriteTaskEmitterwill collect all data files and delete files which are committed by the restored flink job and collect all eq-delete files which are committed by other writer from the last received snapshot to the last committed snapshot of the restored flink job.IcebergStreamRewriterwill cause job fail.IcebergRewriteFilesCommitterwill using starting snapshot id and sequence number when commit rewrite results in next checkpoint.Configuration
flink.rewrite.enable: Configuring whether to enable the rewrite operator, default is false.flink.rewrite.parallelism: Configuring the rewrite operator parallel number, default is same as job parallelism.flink.rewrite.target-file-size-bytes: The output file size attempt to generate when rewriting files. the rewrite will be triggered when the rewrite files size ofRewriteFileGroupreach this value. Default is same aswrite.delete.target-file-size-bytes(512MB).flink.rewrite.min-file-size-bytes: Files smaller than this value will be considered for rewriting.flink.rewrite.max-file-size-bytes: Files larger than this value will be considered for rewriting.flink.rewrite.min-group-files: The minimum number of files that need to be in a file group for it to be considered for rewriting. Defaults to 2 files, which is mean at less 2 files in a file group will be considered for rewriting.flink.rewrite.max-group-files: The maximum number of files that allow to be in a file group for it to be considered for rewriting. The rewrite will be triggered when the total files count ofRewriteFileGroupreach this value. Default isInteger.MAX_VALUE.flink.rewrite.nums-of-commit-after-append:The maximum number of commits will be wait for a file group. If no more file append to this file group after this number of commits, this file group will be rewritten regardless of whether the total size of that group reaches theflink.rewrite.target-file-size-bytes. Defaults toInteger.MAX_VALUE, which means this feature is not enabled by default.