-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink 1.14: Fix the flaky testHashDistributeMode by ingesting all rows in one checkpoint cycle. #4189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink 1.14: Fix the flaky testHashDistributeMode by ingesting all rows in one checkpoint cycle. #4189
Conversation
…in one checkpoint precisely.
|
Run this 20 times in my host, everything seems OK: |
| List<Row> dataSet = ImmutableList.of( | ||
| Row.of(1, "aaa"), Row.of(1, "bbb"), Row.of(1, "ccc"), | ||
| Row.of(2, "aaa"), Row.of(2, "bbb"), Row.of(2, "ccc"), | ||
| Row.of(3, "aaa"), Row.of(3, "bbb"), Row.of(3, "ccc")); | ||
| String dataId = BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we produce more than one checkpoint? and add enough records in each part instead of enumerating them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a single checkpoint is good enough to validate the PartitionKeySelector. More checkpoints will make the unit test more complex but validate the same thing in my mind.
Mocking more records as the testing data set looks good to me.
| sql("INSERT INTO %s SELECT * FROM %s", tableName, SOURCE_TABLE); | ||
|
|
||
| Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName)); | ||
| SimpleDataUtil.assertTableRecords(table, ImmutableList.of( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check records based on dataSet?
yittg
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
@openinx I have run the test hundreds of times locally in a test loop like you did before and was never able to reproduce it. Think again about the root cause that we discussed in the issue where we may miss the notifyCheckpointComplete callback. As a result, two checkpoint cycles got squashed into one Iceberg commit and hence have 2 files for a partition in one Iceberg commit. I misunderstood the PR earlier. Looks like the change is to make sure we have one checkpoint cycle for all rows to bypass the potential problem from multiple checkpoint cycles. |
stevenzwu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
nit: can we change the description to "by ingesting all rows in one checkpoint cycle"? Earlier, I misunderstood the PR. I mistakenly thought we are still doing multiple checkpoint cycles and we are just precisely control rows in each checkpoint cycle.
|
@stevenzwu The root cause is : Previous design could not guarantee that a single checkpoint could commit all rows to a given transaction. Here is another example. That's why we are now trying to guarantee this in this PR. The new description looks good to me if you think it's more clear. |
|
@openinx looks good. can you merge this? should be safe. |
|
Thanks for fixing the flaky test, @openinx! |
… one checkpoint cycle (apache#4189)
… one checkpoint cycle (apache#4189)
… one checkpoint cycle (apache#4189)
… one checkpoint cycle (apache#4189)
… one checkpoint cycle (apache#4189)
… one checkpoint cycle (apache#4189)
… one checkpoint cycle (apache#4189)
… one checkpoint cycle (apache#4189)
… one checkpoint cycle (apache#4189)
This PR is trying to fix the flaky testHashDistributeMode unit test fundamentally. The following are the explannation about the current fix.