-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-5503] Optimize flink table factory option check #7608
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-5503] Optimize flink table factory option check #7608
Conversation
0024bdd to
04a9f9c
Compare
|
@hudi-bot run azure |
04a9f9c to
0765120
Compare
| if (this.simpleRecordKey) { | ||
| if (!hasRecordKey) { | ||
| return DEFAULT_RECORD_KEY; | ||
| } else if (this.simpleRecordKey) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure whether we should use the empty string for the pk-less scenario, because all the records would have the same value primary key, which breaks the pk-less semantics, for pk-less, we actually mean all the records are unique, there is no need to define the primary key.
Another solution is to use the UUID as the primary key, WDYT ?
| if (this.simpleRecordKey) { | ||
| if (!hasRecordKey) { | ||
| return DEFAULT_RECORD_KEY; | ||
| } else if (this.simpleRecordKey) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure whether we should use the empty string for the pk-less scenario, because all the records would have the same value primary key, which breaks the pk-less semantics, for pk-less, we actually mean all the records are unique, there is no need to define the primary key.
Another solution is to use the UUID as the primary key, WDYT ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if remove the pk field will cause error somewhere, and write a identical value should use very low storage in columnar file format like parquet, and UUID will use much more space since its uniq so cannot compress well, and i don't know where we can use uuid, so i think maybe store a identical value for pk is better.
I change default key value to RowDataKeyGen.EMPTY_RECORDKEY_PLACEHOLDER since empty row key will report error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In #7622, empty string is also used for keyless primary keys, so it's okey here if we reach an agreement and never uses the primary key.
| Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions()); | ||
| ResolvedSchema schema = context.getCatalogTable().getResolvedSchema(); | ||
| sanityCheck(conf, schema); | ||
| setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can not remove the sanity check for MOR table source, the MOR table relies on the pk and preCombine key for merging payloads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh i miss that pk field is used to emit delete data.
I add sanity check for stream read mor table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, mering happens for mor table no matter whether the consuming is streaming or batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But as in the code of MergeOnReadInputFormat.MergeIterator.hasNext(), record key is read from meta field, and in constructor of AbstractHoodieLogRecordReader the preCombineField is read from table config.
So it seems that the passed in recordKey is only used to emit delete record when stream read mor log and preCombine field is not used at all when reading hudi table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right
| if (!conf.contains(FlinkOptions.RECORD_KEY_FIELD) && !schema.getPrimaryKey().isPresent() | ||
| && propsMap.containsKey(HoodieTableConfig.RECORDKEY_FIELDS.key()) | ||
| && writeColumnNames.contains(propsMap.get(HoodieTableConfig.RECORDKEY_FIELDS.key()))) { | ||
| conf.set(FlinkOptions.RECORD_KEY_FIELD, propsMap.get(HoodieTableConfig.RECORDKEY_FIELDS.key())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The writeColumnNames.contains does not work when there are multiple pk fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, will fix this. thanks
158f8a9 to
3ca99dc
Compare
|
@hudi-bot run azure |
danny0405
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see there is a key-less support for Spark here: #7622, we can do similar changes in Flink side, so that in append mode, there is no need to define primary keys.
You can do similar work in this patch actually, but i guess the change in SimpleKeyGenerator is a blocker, we can merge this patch after #7622 is merged.
| if (this.simpleRecordKey) { | ||
| if (!hasRecordKey) { | ||
| return DEFAULT_RECORD_KEY; | ||
| } else if (this.simpleRecordKey) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In #7622, empty string is also used for keyless primary keys, so it's okey here if we reach an agreement and never uses the primary key.
| Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions()); | ||
| ResolvedSchema schema = context.getCatalogTable().getResolvedSchema(); | ||
| sanityCheck(conf, schema); | ||
| setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, mering happens for mor table no matter whether the consuming is streaming or batch.
ok, so lets wait for #7622 and do similar change for append only write. |
|
In #7295 , we decide to fix the hoodie hive table by merging the table options for flink read/write path, which has some overlap with this PR, just a kind reminder. |
|
Thanks for the contribution, I have reviewed and applied a patch: |
3ca99dc to
6feac73
Compare
Thanks for the patch, I applied with small changes: separate sanity check for source and sink(when create source only check pk if stream read mor table) and made corresponding change to TestHoodieTableFactory.java |
| setupTableOptions(conf.getString(FlinkOptions.PATH), conf); | ||
| ResolvedSchema schema = context.getCatalogTable().getResolvedSchema(); | ||
| sourceSanityCheck(conf, schema); | ||
| setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sanity check for source can be removed. When the primary key definition is missing, the streaming source for MOR table would distinguish the case as pk-less, so no deletes are emitted.
6feac73 to
530e8a4
Compare
danny0405
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
Co-authored-by: hbg <[email protected]>
Co-authored-by: hbg <[email protected]>
Co-authored-by: hbg <[email protected]>
Change Logs
1.remove pk and pre-combine field check for source and append mode sink
2.fallback to table config if pk or pre-combine field not set.
Impact
1.In flink sql, no need to set pk and pre-combine field when reading hudi or writing in append mode
2.If pk or pre-combine field not set in option, fallback to table config if possible
Risk level (write none, low medium or high below)
low
Contributor's checklist