-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: Auto compact file #2867
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Auto compact file #2867
Conversation
|
Hi @hameizi! Thank you for your contribution and interest in Iceberg! This is a non-trivial functionality addition. Would it be possible for you to create a GitHub issue for us to track and discuss this? Github issues are how we normally track new work etc, and I think given that this functionality has not been discussed etc, it would benefit from going through the standard process of having an issue. By no means do you need to close the PR, but it would help to follow the normal workflow (and also provides the benefit that people can search the issues to see the discussion). 🙂 |
Possibly I'm missing something, but I don't see any accounting for files that might be already near, or close to the optimal size. It's late and my eyes may deceive me, but this appears to be compacting all files to be ideally the target file size bytes, regardless of their existing size etc. In some cases, the cost of opening and rewriting provides less value than leaving the data as is. Can we account for this like we do in some other places? Or am I just missing the fact that that functionality is hidden elsewhere? This would be a good topic to consider discussing in the mentioned GitHub issue :) |
I add one issue #2869 |
It will compact files who is the result by partition filter, so it will compact all files when there is no partitions. But in my work sence, there is no problem because we compact every transction that will generate little files. So compact files in every transcation is quickly and small cost and the time compact files will not more than transaction time too. |
|
@openinx Could you help review? |
|
@hameizi, could you update the PR description with details about this feature? Auto-compaction is not very specific so I'd like to hear how you implemented it and what that means. |
|
@hameizi, could you please be more specific? How does this determine which files to rewrite? In what tasks are they rewritten? Does this introduce new operators? Are files rewritten before initial commit or afterward in a replace commit? There are a lot of details for a feature like this that need to be clear. |
@rdblue Hi, i have update the PR description. |
|
From the description, it sounds like the rewrite happens in the committer task rather than in parallel. Is there a good way to make this happen in parallel instead? What we discussed elsewhere was doing a compaction by adding a new parallel stage and second committer after the initial committer. The current commit task would output committed I think that we should plan on having some parallelism here, or else this is not going to be a very useful feature. |
|
I have been make compaction in parallel formerly ,but i abort it because i think it will cause maybe be there is not just one compaction are executing because of current compaction overtime. In addition in this PR the compaction will not block handle data in flink job because the data channel will be not block just when snapshot function completed so for handle data compaction is in parallel in this PR. |
|
I share the same concern as @rdblue. It seems to me that this impl basically have a single committer task/thread that reads all all rows from a CombinedScanTask (files batched by BaseRewriteDataFilesAction) and writes them out. How is different to just configure the StreamFileWriter with parallelism of 1? if we make it a truly parallel rewrite/compaction action, I am a little concerned about the complexity we are adding to the Flink streaming ingestion path. |
@stevenzwu |
|
if the writers are parallel (like 100) and the compactor is single parallelism, it is likely the compactor can't keep up with the workload. Even though compaction is running asynchronously with snapshotState(), it will eventually back up/block the threads executing notifyCheckpointComplete(). In the streaming ingestion path, here are a few things we can do or improve to mitigate the small files
Even with above changes, Flink streaming ingestion can still generate small files. The parallel compactors and 2nd committer that Ryan mentioned might be able to keep up with the throughput. However, personally I would rather not over-complicate the streaming ingestion path and make it less stable. Let's get the data into long-term data storage (like Iceberg tables) first. Other optimizations (like compaction or sorting) can happen in the background with scheduled (Spark) batch jobs. |
@stevenzwu In theory it's will occur but the impl in this PR is similar with the custom policy in hive-commiter of flink. And user offen use custom policy to compact hive files so am i. But in my most scene there are no problem, and i am working for stress testing for this PR and not found cleraly errors up to now. |
|
@hameizi can you try some setup like this for the stress testing with your auto compaction change here?
|
I would also echo @kbendick's comment above. Currently, we are reading everything in (regardless the file sizes). This assumes all/most files are small and can benefit from a compaction rewrite. But I am not sure if the assumption is valid for broad use cases |
I'm sorry that maybe my test can't up to the standard as you say. I can test one more later. |
@stevenzwu I test one case, parallelism of 10, 30000+ records/sec(max 60000+/sec), partition by hour, 3GB memory of per taskmanager. It's no problem for me. |
|
I have also been following this thread although I did no make any comment. Let me add some thoughts since I see you are making some new changes. I am mostly on the same line of thought as @stevenzwu, I am a bit worried about the scalability of the current implementation, and I think the parallel commit proposal that @rdblue proposed could work, but in the end running compaction in streaming pipeline is likely unnecessary complication. So far we have been advocating for streaming pipelines to just commit new files to storage, and use a separated process to handle compaction at the same time. Having the streaming pipeline also do compaction would mean that there might be 2 compaction processes competing with each other. This becomes especially complicated and prone to error when you have both batch jobs and streaming pipelines running at the same time (e.g. normal streaming + daily loading of corrected and late data). I understand it is likely a good optimization for simple use cases, but I would expect it to be a feature with a lot of in-depth knowledge to use safely and correctly if we open it for general usage. I wonder what is the initial drive behind this implementation. Do you just want to avoid a separated Spark cluster to run compaction in Spark? If we have Flink actions specifically for |
@jackye1995 In this PR the rewriteAction of flink is parallel, it will not make data deal slow down. Because when the function snapshot success flink will continue deal data but not wait the result of notifyCheckpointComplete.
Auto compact file every checkpoint in flink will solve several question.
|
@hameizi by parallel, we meant multiple executors/tasks executing the rewrite. Last time I checked, this PR runs the whole rewrite action in the single committer task synchronously. that is the main scalability concern we have. Also notifyCheckpointComplete (and snapshotState) executes in the mailbox thread. if it takes a long time to finish the notifyCheckpointComplete/rewrite, it can delay the checkpoint execution. I share the same philosophy as Jack on keep the streaming ingestion simple and stable. It is critical to reliably ingest data into long-term data storage (like Iceberg) first, as streaming input (like Kafka) typically has short retention.
regarding this issue, I agree that the lock steps of commit + compaction can avoid the problem. But it is not a solution for the general problem, because other users probably have compaction jobs like Spark. There are other more sophisticated compaction/rewrite actions that probably can't be supported by single-task rewrite action at scale. |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
This add one feature that flink write iceberg auto compact small files. And add config "write.auto-compact-files".
When we insert data into iceberg will generate much small files, so i try to auto compact small files when we use flink insert into iceberg.
In this PR, in the flink function IcebergFilesCommitter.snapshotState(the first step of flink checkpoint) will generate one rewriteaction. And we get all partitions this transcation related to generate partition filter, then set partition filter to this rewriteaction so that we will compact files group by partition.
The last step will add the rewriteaction in flink function IcebergFilesCommitter.notifyCheckpointComplete(the last step of flink checkpoint). In IcebergFilesCommitter.notifyCheckpointComplete will commit transcation first, then this function will execute rewriteaction.