-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-4612][RFC-59] RFC-59 Materials (RFC Proposal) Submission: "Multiple event_time Fields Latest Verification in a Single Table" #6382
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…iple event_time Fields Latest Verification in a Single Table' for Hudi
|
Hi Sivabalan @nsivabalan, Could you please spend some time to help us do a brief review? If you don't have time then may I ask you to recommend someone else to do this? Thanks a lot! :) Wish you all the best. |
|
Hi Shiyan @xushiyan , |
|
@XinyaoTian Thanks for the RFC submission. Merge logic is going to be well abstracted and can be custom implemented once RFC-46 lands (it is very close to landing). I suggest you should rethink the problem after the HoodieMerge interface is in place and see if this combining logic is generic enough to implement it within Hudi. |
|
@prasannarajaperumal Hi Prasanna, thanks for your review :) To my understanding, RFC-46 intends to improve the entire design of HoodieRecordPayload, which is extremely awesome and will provide quite a lot benefits. However, this doesn't give Hoodie the ability to verify multiple event-time fields in a single table (Although it may be easier to implement this feature due to the new Payload design, but HoodieRecordPayload is just a part of this feature). What we would like to achieve is to give Hudi the ability to JOIN multiple tables in stream-consuming mode without multiple event-time disordering. Therefore, I think we still need to propose this feature since it's really matter to have multiple event-time fields verification in a single Hudi table (currently we ONLY have one, i.e. precombine.field='ts'; What we want to achieve is precombine.field='ts1, ts2, ts3, ts4'). For your convenience, we can wait for the final landing of the RFC-46 and then implement the feature proposed in this RFC. I promise this feature is very important because people asking for this feature in many place (including Hudi Slack e.g. Thread and dev-maillist e.g. Disscussion ) almost every week. We really need to have MORE THAN ONE event-time fields so than we can ensure the accuracy of events even if there are many JOIN operations sinking to ONE Hudi table. If there's anything worth to note please contact me! Look forward to receiving your further feedback. |
|
Hi @yihua ,thanks for your review. |
|
The feature we implemented looks like below. We gave a simple but useful example here to illustrate directly what this RFC is doing. If we have a table whose configuration contains multiple event-time fields, which could be looked like this: We check the table and see this table has a record, whose schema is simple: spark-sql> select * from test_db.hudi_payload_test_03;
20220622111029695 20220622111029695_0_1 public_id:1 pt=DD 214f6985-fee5-4091-a65d-d52e9eb20634-0_0-67-4219_20220622111029695.parquet 1 a_101 101 b_101 101 0DD
Time taken: 0.858 seconds, Fetched 1 row(s)We upsert this record with a bigger value in b_ts field but any fields related with a is null: INSERT INTO test_db.hudi_payload_test_03
SELECT 1 AS public_id, null AS a_info, null AS a_ts, 'b_105_New_record' AS b_info, 105 AS b_ts, 0 AS ts, 'DD' AS pt;The result should be looked like this, only columns related with b has been updated, and a_columns keep unchanged. spark-sql> select * from test_db.hudi_payload_test_03;
20220622111939468 20220622111939468_0_1 public_id:1 pt=DD 214f6985-fee5-4091-a65d-d52e9eb20634-0_0-30-2209_20220622111939468.parquet 1 a_101 101 b_105_New_record 105 0 DD
Time taken: 0.496 seconds, Fetched 1 row(s)If we upsert a smaller value in the b_ts field, nothing happened. Neither null fields or fields containing values. INSERT INTO test_db.hudi_payload_test_03
SELECT 1 AS public_id, null AS a_info, null AS a_ts, 'b_99_Some_Record' AS b_info, 99 AS b_ts, 0 AS ts, 'DD' AS pt;spark-sql> select * from test_db.hudi_payload_test_03;
20220622112743351 20220622112743351_0_1 public_id:1 pt=DD 214f6985-fee5-4091-a65d-d52e9eb20634-0_0-69-4422_20220622112743351.parquet 1 a_101 101 b_105_New_record 105 0 DD
Time taken: 0.501 seconds, Fetched 1 row(s)By using this feature, a Hudi table can gain the ability only upsert a part of a record, which means data developers can combine several tables into one table (and keep everything up-to-date through streaming ingestion), and only use this table to conduct further work like ML algorithms, AI training, or BIg-screen visualization. This feature will make many things really fast and simple. Hope my example is useful for understanding the feature provided by our RFC :) @yihua @prasannarajaperumal @alexeykudinkin |
nsivabalan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nicely written RFC w/ good amount of details. appreciate it!
I have left some suggestions.
|
|
||
| (Describe the problem you are trying to solve and a brief description of why it’s needed.) | ||
|
|
||
| A new feature temporarily named "Multiple event-time fields latest verification" is being proposed in this RFC. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may be, we can name the RFC as "Multiple Ordering fields".
|
|
||
| In general, there are primarily THREE essentials to help Hudi achieve the above feature cooperated with Flink. | ||
|
|
||
| 1. Create a new Payload class, temporarily named `hudi-common/src/main/java/org/apache/hudi/common/model/PickAndCombineMultipleFieldsPayload.java`. This Payload class is the core of the entire new feature. It has two key functions and one crucial property to support the entire feature running properly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I align w/ the proposal. But adding a new payload means, that other payload impls don't get this feature right.
Is it not possible to evolve existing payload impls?
For eg, make the ordering value an array instead of just 1 element. By default the array will contain just 1 contain (backwards compatible). and for interested users, it might contain more entries. and the ordering will be based on values in order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, if we plan to go this route. we might also need to think how to support upgrades to existing tables.
for eg, lets say we add multiple ordering fields to OverwriteWithLatestPayload.
for a table in 0.12.0, when they upgrade to 0.13.0, is it possible to switch to having multiple ordering fields for this old table? if yes, would it work for all cases.
Or is it that, we can only support multiple ordering fields only for a new table.
|
|
||
| Firstly, the `preCombineWithMultipleFields` function helps Hudi to deduplicate records inside a buffered bucket with same primary key(s) by multiple event-time fields and only remain the latest record whose every event-time field is latest. | ||
|
|
||
| This function would be invoked in `hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java` # `deduplicateRecordsWithMultipleCombineFields` to deduplicate record. The deduplication process will also be described later. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make the impl generic so that all engine can leverage it
|
|
||
| // A string in order to save the eventime-time fieldname of orderingVal. | ||
| // This is a compromise designed to be compatible with the current single event-time (property 'orderingVal' provided by BaseAvroPayload). | ||
| private String orderingValFieldName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why can't we store the entry in the hashmap.
| public VerifyMultipleLatestEventTimesPayload(GenericRecord record, Comparable orderingVal, String orderingValFieldName) ; | ||
|
|
||
| // Construction for the multiple event-time latest verification | ||
| public VerifyMultipleLatestEventTimesPayload(GenericRecord record, HashMap<String, Comparable<?>> eventTimeFieldNamesAndValues) ; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you help me understand something. when reading a record from storage (already stored in hudi), how would we get all values for multiple ordering fields? do we deserialize the entire record and then fetch the respective values is it? can you check the flow and ensure it happens.
| * @param previousValue another instance of {@link VerifyMultipleLatestEventTimesPayload} which needs to be compared with this one and choose the one whose every event-time is the latest (null value should be dealt carefully) | ||
| * @returns latestPayload return the chosen recordPayload (every event-time field is latest) | ||
| */ | ||
| public VerifyMultipleLatestEventTimesPayload preCombineWithMultipleFields(VerifyMultipleLatestEventTimesPayload previousValue) ; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's not add new methods. we can add new overloaded methods. but not new methods w/ new names. It achieves the same purpose as existing method just that instead of 1 ordering field, its now N fields.
| * This function deduplicates records having same primary keys by multiple event-time fields. The only record with the latest event-time fields will be remained. | ||
| * It's very like another function (FlinkWriteHelper # deduplicateRecords) in this class. The difference is that this function use multiple event-time to deduplicate records. | ||
| */ | ||
| public List<HoodieRecord<T>> deduplicateRecordsWithMultipleCombineFields(List<HoodieRecord<T>> records, HoodieIndex<T, ?, ?, ?> index, int parallelism) ; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have much context on Flink classes design and layering. but would be good to avoid adding new methods and just add overloaded methods for existing ones.
| * The configuration would look like below when creating Hudi Table: | ||
| * 'hoodie.eventime.latest.verification.fields' = 'event_a_ts,event_b_ts,event_c_ts' | ||
| * */ | ||
| public static final String PAYLOAD_COMBINE_DEPENDING_ON_FIELDS = "hoodie.eventime.latest.verification.fields"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to re-use the same config.
PAYLOAD_ORDERING_FIELD_PROP_KEY
if value is a comma separated list, we deduce its multiple ordering fields. if not, its a single ordering field.
| // ------------------------------------------------------------------------ | ||
|
|
||
| public static final ConfigOption<Boolean> IS_USING_MULTIPLE_COMBINE_FIELDS = ConfigOptions | ||
| .key("multiple.eventime.latest.verification.enable") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could avoid adding this new field if we automatically deduce single vs multiple ordering fields
|
lets review/revisit this, after 1.1 and see if this is still needed.. It may not be. |
Change Logs
According to Hudi RFC Process, once the RFC-number has been assigned, proposers should submit their RFC materials as soon as possible. We are very glad that we finished proposal writing in a short time with a high-qualified proposal. We would like to invite anyone who is interested in this feature to read.
The RFC proposal could be read in rfc/rfc-59/rfc-59.md
Impact
Just added a new markdown file with some pictures in the folder rfc/rfc-59
No impact on code running, but will have a BIG IMPACT in brain-storming area.
Risk level: None
Contributor's checklist