-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: Add ChangeLog DataStream end-to-end unit tests. #1974
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| * @param columns defines the iceberg table's key. | ||
| * @return {@link Builder} to connect the iceberg table. | ||
| */ | ||
| public Builder equalityFieldColumns(List<String> columns) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think that we should consider adding primary key columns to the spec?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the next PR https://github.com/openinx/incubator-iceberg/commit/a863c66eb3d72dd975ea64c75ed2ac35984c17fe, The flink table SQL's primary key will act as the equality field columns. The semantic of iceberg equality columns is almost the same as primary key, one difference I can think of is: the uniqueness of key are not enforced. In this discussion, we don't guarantee the uniqueness when writing a key which has been also wrote in the previous committed txn, that means if :
Txn-1: INSERT key1, txn commit;
Txn-2: INSERT key1, txn commit;Then the table will have two records with the same key.
If people really need iceberg to maintain the key's uniqueness, then they will need to transform all the INSERT to UPSERT, which means DELETE firstly and then INSERT the new values.
It will introduce another issues: Each INSERT will be regarded as an UPSERT, so it write a DELETE and a INSERT. Finally the size of delete files will be almost same as the size of data files. The process of merging on read will be quite inefficient because there are too many useless DELETE to JOIN.
The direct way is using bloom filter to reduce the useless DELETE, say we will generate bloom filter binary for each committed data file. When bootstrap the flink/spark job we will need to prefetch all the bloom filter binary from parquet/avro data files's metadata. Before writing a equality delete, we will check the bloom filter, and if the bloom filter indicate that all the committed data files are not containing the given key, then we could skip to append that equality-delete. That would reduce lots of useless DELETE in delete files. Of course, the bloom filter will have 'false positive' issue, but that probability is less than 1%, that means we may append
small amout of deletes whose keys don't exist in the current table. In my view, that should be OK.
In summary, I think it's reasonable to regard those equality fields as primary key in iceberg table, people could choose to use UNIQUENESS ENFORCED or UNIQUENESS NOT-ENFORCED, in this way they could trade off between strong semantic and performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the bloom filter idea, @wangmiao1981 has been working on a proposal for secondary indexes. I think that could be used for the check you're suggesting here.
people could choose to use UNIQUENESS ENFORCED or UNIQUENESS NOT-ENFORCED, in this way they could trade off between strong semantic and performance.
Are you saying that if uniqueness is enforced, each insert becomes an upsert. But if uniqueness is not enforced, then the sink would assume that whatever is emitting records will correctly delete before inserting? That sounds reasonable to me.
Finally the size of delete files will be almost same as the size of data files. The process of merging on read will be quite inefficient because there are too many useless DELETE to JOIN.
I think that even if uniqueness is not enforced, tables will quickly require compaction to rewrite the equality deletes. I think we should spend some time making sure that we have good ways to maintain tables and compact equality deletes into position deletes, and position deletes into data files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you saying that if uniqueness is enforced, each insert becomes an upsert. But if uniqueness is not enforced, then the sink would assume that whatever is emitting records will correctly delete before inserting?
Yes. If someone are exporting relational database's change log events to apache iceberg table and they could guarantee the exactly-once semantics (For example, the flink-cdc-connector could guarantee that), then the uniqueness is always correct when we just write the INSERT/DELETE/UPDATE_BEFORE/UPDATE_AFTER to iceberg. While in some other cases, for example flink aggregate job to refresh the metrics count value, we will write the same key several times without deleting first, then we should regard all the INSERT as UPSERT.
even if uniqueness is not enforced, tables will quickly require compaction to rewrite the equality deletes.
That was planned in the second phase, include:
- Use bloom filter to reduce lots of useless deletes;
- Minor compaction to convert parts of equality deletes to pos-deletes
- Major compaction to eliminate all the deletes.
- Make the whole read path & write path more stable. For example, cache policy reduce duplicated delete files loading when merging on read in the same tasks; Spill to disk if the
insertedRowMapis exceeding the task's memory threshold, etc. I will evaluate the read & write & compaction paths in a large dataset, making this to be a stable solution for production.
It's good to have a document to collect all those things for reviewing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’d vote for not ensuring uniqueness as it is really hard at scale. If we are to ensure this at write, we have to join the incoming data with the target table making it really expensive. Doing this at read would require sorting the data not only by the sort key but also by the sequence number.
| column, table.schema()); | ||
| equalityFieldIds.add(field.fieldId()); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not do this conversion in equalityFieldColumns and keep the column ids in the builder instead of the source column names?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the FlinkSink is an API which will be exposed to flink's DataStream users, the concept of equality field id is harder to understand for those flink users. Equality field column names will be more friendly.
| DataStream<Row> dataStream = env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO); | ||
|
|
||
| // Shuffle by the equality key, so that different operations from the same key could be wrote in order when | ||
| // executing tasks in parallelism. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think you mean "executing tasks in parallel" rather than "parallelism".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing it out, will address it in next update.
| "+I", RowKind.INSERT, | ||
| "-D", RowKind.DELETE, | ||
| "-U", RowKind.UPDATE_BEFORE, | ||
| "+U", RowKind.UPDATE_AFTER); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be a private static map instead of defining it each time a row is created?
|
|
||
| @Test | ||
| public void testChangeLogOnIdKey() throws Exception { | ||
| List<String> equalityFieldIds = ImmutableList.of("id"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be equalityFieldNames?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
| List<String> equalityFieldIds = ImmutableList.of("id"); | ||
| List<List<Row>> elementsPerCheckpoint = ImmutableList.of( | ||
| ImmutableList.of( | ||
| row("+I", 1, "aaa"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: This makes it look like the row has an operation as its first column, but that doesn't align with the key selector below that uses row.getField(0) to get the ID. I think it would make tests easier to read if row passed the row kind at the end. That way the fields align.
I'm not sure if it is worth changing all of the rows. Up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current way is correct because it will maintain rowKind in a separate field ( rather than in the shared fields array) , see here.
rdblue
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few minor comments, but nothing is a blocker.
|
All checks passed, I've merged this patch to repo so that I could create the next PR for flink table cdc e2e unit tests. Thanks @rdblue for reviewing. |
Add unit tests to proof that flink DataStream job could write the CDC events correctly. Will open a separate PR to address the flink SQL unit tests.