Skip to content

Conversation

@yittg
Copy link
Contributor

@yittg yittg commented Apr 12, 2022

The second part metioned in #4532 (review)
cc @kbendick

@github-actions github-actions bot added the flink label Apr 12, 2022
@yittg yittg force-pushed the dynamic-upsert-test branch from 0d86c42 to 5445037 Compare April 12, 2022 01:45
@yittg
Copy link
Contributor Author

yittg commented Apr 12, 2022

Tests fail due to #4533, wait it being merged.

TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
Lists.newArrayList(Row.of("aaa", dt, 6), Row.of("bbb", dt, 7)));
testUpsert(tableName, row -> Row.of(row.getField(1), dt, row.getField(0)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my experience, when tests have failed, when of the indicators is that one or more of the resulting Rows will have extra fields.

So is mapping into specific Row fields possibly making it more likely that we will miss those things?

Copy link
Contributor

@kbendick kbendick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing we discussed in the original PR was changing the tests to use id as the “main” primary key, as “data” is somewhat confusing / less common.

Let me find the discussion, but if you were willing to take on that task that would be greatly appreciated.

Also @openinx had discussed abstracting some of the test setup into its own class / shared utility, as some of the code to set up the test environment is seen quite frequently.

If you have time and interest. Otherwise the research and work so far is greatly appreciated either way.

@kbendick
Copy link
Contributor

Here's a link to the unit test shared code that openinx had mentioned would be good to clean up / reuse across several tests: #4364 (comment)

@yittg
Copy link
Contributor Author

yittg commented Apr 12, 2022

@kbendick After making it more fuzzy locally, i found these tests get unstable again. I'll try to figure out whether they are real failed cases caused by UPSERT logical or others, or it's just because the cases are too fuzzy to be tested.

I'll update this change if they are valid test cases, and change this PR to WIP temporarily.
Maybe the current static cases is enough to focus on UPSERT relative logical, i'll add other cases if it failed because other reasons.

And about the tests architecture / utilities, i noticed it also. I'm very interested in improving them and making the tests more accurate and wider coverage, and making iceberg more robust finally.

@yittg yittg marked this pull request as draft April 12, 2022 05:38
@yittg
Copy link
Contributor Author

yittg commented Apr 12, 2022

@kbendick After diving deeper in Flink, for batch mode jobs, the records emitted to keyBy node will be sorted. In iceberg, records written to table with identifier fields will always be distributed with keyBy identifier fields. And I believe the sorter is unstable, so the records with same key can be swapped.

So here all records will be sorted before being emitted into IcebergStreamWriter. And records with same key can be out of order.

So dynamic generated case can break these cases, however, static records can always be sorted in the same way i think.

@kbendick
Copy link
Contributor

kbendick commented Apr 12, 2022

@kbendick After diving deeper in Flink, for batch mode jobs, the records emitted to keyBy node will be sorted. In iceberg, records written to table with identifier fields will always be distributed with keyBy identifier fields. And I believe the sorter is unstable, so the records with same key can be swapped.

So here all records will be sorted before being emitted into IcebergStreamWriter. And records with same key can be out of order.

So dynamic generated case can break these cases, however, static records can always be sorted in the same way i think.

I really appreciate the thorough research.

We (at least I) are hoping to migrate to some of the newer Flink Table API after we deprecate 1.12. Hopefully this will make things a bit more straightforward. (So using ResolvedSchema and LogicalSchema vs just Schema, things like that).

Im opening a PR to support Flink 1.15 tomorrow (the release candidate anyway). It’s just a port with the minimum required changes to make it work. And then the process of upgrading to the newer APIs can begin.

However, there’s one test case somewhat in this realm (change data capture) that isn’t passing in the 1.15 rc0. I’ll tag you and hopefully if you have time you can take a look? I’d greatly appreciate it.

Also feel free to reach out on Slack. Not sure what time zone you’re in but I’d love to ask you a few questions (if even asynchronously).

Thanks again for all the work you’ve put in, especially recently! @yittg

@yittg
Copy link
Contributor Author

yittg commented Apr 12, 2022

@kbendick feel free to ping me, will be glad if i can help.

And closing this PR.

@yittg yittg closed this Apr 12, 2022
@yittg yittg deleted the dynamic-upsert-test branch April 13, 2022 04:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants