-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: Add unit test to write CDC events by SQL. #1978
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| } | ||
| private static List<String> toEqualityColumns(TableSchema schema) { | ||
| List<String> equalityColumns = Lists.newArrayList(); | ||
| schema.getPrimaryKey().ifPresent(uniqueConstraint -> equalityColumns.addAll(uniqueConstraint.getColumns())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like we should track primary key columns in the table format rather than in properties. If we are going to add primary key columns, then we should add it before v2 so that v2 writers are required to not drop it.
| } | ||
|
|
||
| @Test | ||
| public void testSqlChangeLogOnIdKey() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests look the same as the non-SQL tests. Is there a way to set up the test and expected data for both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I read the source code of the flink cdc test class and found that it used docker to start a mysql service to get the cdc data. It seems that there really is no good way to get the cdc data in flink.
| List<List<Row>> inputRowsPerCheckpoint, | ||
| List<List<Record>> expectedRecordsPerCheckpoint) throws Exception { | ||
| String partitionByCause = partitioned ? "PARTITIONED BY (data)" : ""; | ||
| sql("CREATE TABLE %s (id INT, data VARCHAR, PRIMARY KEY(%s) NOT ENFORCED) %s WITH %s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of creating v2 tables using the table property, I think it would be better to load the table just after creation and update it using the API. That way you can test with a v2 table, but we don't need to change any of the core classes or expose the format before it is finalized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I saw there's an upgradeToFormatVersion API to do the upgrade in TestSparkReaderDeletes.java, I can use the similar way.
| if (schema.getPrimaryKey().isPresent()) { | ||
| throw new UnsupportedOperationException("Creating table with primary key is not supported yet."); | ||
| } | ||
| private static List<String> toEqualityColumns(TableSchema schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the format, I think that equality columns should be tracked by ID rather than by name so that renaming columns doesn't break the primary key metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it should track ID here. I was thinking that it could be a temporary solution before introducing the primary key specification. Considering the renaming issue, it seems not a good idea to track column names in iceberg properties.
I will create another PR to introduce the primary key specification based on the discussion from here.
| List<String> equalityFieldColumns = toEqualityColumns(table.getSchema()); | ||
| if (!equalityFieldColumns.isEmpty()) { | ||
| properties.put(TableProperties.EQUALITY_FIELD_COLUMNS, | ||
| org.apache.commons.lang.StringUtils.join(equalityFieldColumns, ",")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We try to avoid using StringUtils and other commons classes because we don't want to leak the dependency. Could you use Guava's Joiner instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds great. I did not realize that there's a Joiner in Guava. Thanks.
| String concatColumns = PropertyUtil.propertyAsString(table.properties(), | ||
| TableProperties.EQUALITY_FIELD_COLUMNS, | ||
| TableProperties.DEFAULT_EQUALITY_FIELD_COLUMNS); | ||
| String[] columns = org.apache.commons.lang3.StringUtils.split(concatColumns, ","); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. Could you use Splitter instead?
|
Now, |
|
Closed via #2410. |
To verify that flink sql could write cdc/upsert events to iceberg table correctly, I made few changes to let it work:
Mapping the flink
CREATE TABLEDDL's primary key to iceberg's equality field columns, that means when creating a flink table with primary key, it will create an iceberg table with theequality.field.columnsproperties indicating which columns are equality columns. Apache Flink does not support altering primary key in DDL, so we don't support alter equality field columns in SQL, will need to use the iceberg java API to accomplish schema evolution.I exposed the iceberg format v2 to end users in this patch because I found that the iceberg catalog has no way to share the
TableTestBase. It's not the correct time to expose format v2, so I will try to find another way to make the unit tests work.Provide a basic unit test to verify the sql job work, it will need more work to add more unit tests, still in-progress.