-
Notifications
You must be signed in to change notification settings - Fork 3k
Core: Fix delete file index with non equality column filter lost equlity delete files #4311
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
kbendick
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for providing a patch @xloya.
It would help if. you could provide more context. Is this something that needs to be special cased for flink upsert mode in some way? Modifying core and removing tests causes me concern at first sight.
It's possible I missed something on the mailing list or some other discussion, forgive me if so. Is it possible there's an example scenario that can be given to help me understand?
Of course, we have a scenario to write data to iceberg's v2 table through Flink CDC. They have non-primary key query scenarios. The current implementation in This is currently the easiest way to fix the problem. If we want to optimize for Flink upsert, then I think may need to read the latest records with the primary key that already exists in the table and write them to the equality delete file when writing, while instead of writing the inserted data to the equality delete file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding the test and explaining the situation more @xloya.
However, I don't think this is necessarily the appropriate solution. There's several places that call to the delete file index.
Additionally, this removes the ability to have non-conflicting delete transactions go through.
I think we should reconsider how the filter is built in the case of Flink upsert mode (or anything that is affected by this), and likely check for any other places where this issue exists and do the same.
Possibly we can add the seq number into the filter or something? I'd have to take a closer look, but throwing that out there as a possible idea.
I can confirm though that the test provided will fail if filterRows is added back in to the DeleteFileIndex.
cc @RussellSpitzer @flyrain @aokolnychyi who have all worked on the delete file index or helped review it.
|
I agree with @kbendick , changing core will break some other engine like Spark that depend on this for things like validation check for serializable isolation mode. Also agree with his suggestion on consider how the Flink side using the filter, and see if we can add some other option to deleteFileIndex to support this use case. |
|
We seem to have the same issues. And it only happens on our Parquet table (doesn't happen on our Avro table). After analysis, we found that the problem occurred in the metric (such as Our solution is different from this PR. We try to trim the row filter fileds used for metric filtering. For deleted manifest file, only the metric filed id in the I'm not sure which way is better, or that there is another better solution. I'm sorry if your PR is to address a different issues. Thx. 😄 |
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| @Test | ||
| public void testConcurrentNonConflictingEqualityDeletes() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is invalid about this test? It is suspicious that it is removed.
Thank you for your attention to the same issue! I will take a look at the patch you committed later. In fact, I think my patch is only a temporary solution. I just want to get more opinions from out community to find the most appropriate way to fix the problem |
When we through Flink to write enabled the
upsertoption, and then we specify non equality columns conditions in the query, some of the equality delete files will be lost due to the wrong usage of the filter.