-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-3559] fix flink Bucket Index with COW table type NoSuchElementException cause of deduplicateRecords method in FlinkWriteHelper out of order
#5018
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| if (hasInsert) { | ||
| recordList.get(0).getCurrentLocation().setInstantTime("I"); | ||
| } | ||
| return recordList; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In line 114, we already reset the location, so each records list under the same key after reduction should have the same instant time type as before, so why the set is needed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrote a test in local and found out the order of the list was changed after reduction. <id1, id2> became <id2,id1> somehow, so it's not related to a single record.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the Map::values does not guarantee the sequence, state index based writer has no problem because it assigns the instant "I" and "U" based on the buckets of last checkpoint, and reuse the buckets within one checkpoint.
This fix is necessary for it to be more robust.
garyli1019
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wxplovecc thanks for your contribution! I can reproduce this bug. left some minor comments. we should merge this before the next release
| @Override | ||
| public List<HoodieRecord<T>> deduplicateRecords( | ||
| List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) { | ||
| final boolean hasInsert = records.get(0).getCurrentLocation().getInstantTime().equals("I"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about renaming this as isInsertBucket and add a comment to explain why we need this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The keyedRecords can be made more efficient:
Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream()
.collect(Collectors.groupingBy(record -> record.getKey().getRecordKey()))| JobClient client = execEnv.executeAsync(execEnv.getStreamGraph()); | ||
| if (client.getJobStatus().get() != JobStatus.FAILED) { | ||
| try { | ||
| TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this sleep still needed if we test for COW?
|
|
||
| @ParameterizedTest | ||
| @ValueSource(strings = {"BUCKET"}) | ||
| public void testCopyOnWriteBucketIndex(String indexType) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use this test for the COW table? include state index as well
…eption Actually method FlinkWriteHelper#deduplicateRecords does not guarantee the records sequence, but there is a implicit constraint: all the records in one bucket should have the same bucket type(instant time here), the BucketStreamWriteFunction breaks the rule and fails to comply with this constraint. close apache#5018
…eption Actually method FlinkWriteHelper#deduplicateRecords does not guarantee the records sequence, but there is a implicit constraint: all the records in one bucket should have the same bucket type(instant time here), the BucketStreamWriteFunction breaks the rule and fails to comply with this constraint. close apache#5018
Tips
What is the purpose of the pull request
This pull request avoid deduplicateRecords method in FlinkWriteHelper run out of order
Brief change log
(for example:)
Verify this pull request
(Please pick either of the following options)
This pull request is a trivial rework / code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.