Skip to content

Conversation

@Karl-WangSK
Copy link
Contributor

Tips

What is the purpose of the pull request

preCombine all HoodieRecords and update all fields(which is not DefaultValue) according to orderingVal

Brief change log

When more than one HoodieRecord have the same HoodieKey, this function combines all fields(which is not DefaultValue)
before attempting to insert/upsert (if combining turned on in HoodieClientConfig).
eg: 1)

In preCombine: (suppose Default value is null)
id   name     age      money      ts
1    Karl    null      30         0.0   (orderingVal=1)
1    null     18       40         0.0   (orderingVal=2)
After:
id   name     age      money      ts
1     Karl    18         40      0.0

Verify this pull request

Added one test in TestOverwriteWithLatestAvroPayload to verify the change

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@Karl-WangSK
Copy link
Contributor Author

cc @yanghua @leesf

@leesf
Copy link
Contributor

leesf commented Sep 26, 2020

@Karl-WangSK Hi, sorry for the late reply, I don't fully get the point of this changes, would you please describe in more details in which cases we need these changes?

@Karl-WangSK
Copy link
Contributor Author

@Karl-WangSK Hi, sorry for the late reply, I don't fully get the point of this changes, would you please describe in more details in which cases we need these changes?

Like when we have 2 records with same id in a batch . and we need to retain all property like name and age in my Brief change log. But in default. HUDI just choose a record with biggest orderingVal.

@Karl-WangSK Karl-WangSK reopened this Sep 26, 2020
@Karl-WangSK
Copy link
Contributor Author

oops ! in this pr #2116 . lack of a period at line 60 in src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue. so test failed.

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Karl-WangSK Can this kind of merging should belong in a separate payload class. I am not sure overloading the existing payload is the right way to go. It has a specific purpose of ingesting full change log records, and pick the latest record based on orderingVal. What I am trying to say : there may be users who just want null, 18 i.e latest values for name,age instead of merged.

Happy to take this contribution as a separate payload class .

@Karl-WangSK
Copy link
Contributor Author

@Karl-WangSK Can this kind of merging should belong in a separate payload class. I am not sure overloading the existing payload is the right way to go. It has a specific purpose of ingesting full change log records, and pick the latest record based on orderingVal. What I am trying to say : there may be users who just want null, 18 i.e latest values for name,age instead of merged.

Happy to take this contribution as a separate payload class .

In HoodieWriteConfig. I added COMBINE_ALL_FIELDS_BEFORE_UPSERT_PROP, and DEFAULT_COMBINE_ALL_FIELDS_BEFORE_UPSERT = "false". So it won't merge(which means will choose latest values for name,age). If I want to enable this function described in this PR, just set DEFAULT_COMBINE_ALL_FIELDS_BEFORE_UPSERT as "true". I think it doesn't need to add a new payload. wdyt?

@yanghua yanghua assigned leesf and unassigned yanghua Oct 10, 2020
@yanghua
Copy link
Contributor

yanghua commented Oct 10, 2020

@Karl-WangSK Can you fix the conflicts? thanks.

T reducedData = (T) rec1.getData().preCombine(rec2.getData());
T reducedData;
//To prevent every records from parsing schema
if (rec2.getData() instanceof UpdatePrecombineAvroPayload) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this prevent old payload impl from calling preCombine(_, schema) ?

@vinothchandar
Copy link
Member

vinothchandar commented Oct 24, 2020

@Karl-WangSK We need some more thinking to resolve the pending issues I think.

Only the classes that I just added in this PR will take advantage of it.

I am thinking about how we can safely make this the only API the Hudi code calls. TOL, existing payload classes will have just precombine(payload) defined and not the new preCombine(payload, schema) API method. But, the default method will help existing payloads not break and work? (I think so. would be good to confirm)

But one problem is that all payload will parse the schema every record whether it needs or not.It will affect performance.

Now, we can think about how to perf issue. We can parse it once on the driver and then send it across. but issue is Schema is not serializable. But I think we can solve this, by wrapping Schema in a SerializableAvroSchema class (see how its done in SerializableConfiguration). This way we will send a Schema as a string to the executor and then reconstruct the object again on the executor.

Are you able to attempt this. (else I ll try. might take bit of time.please let me know)

@Karl-WangSK
Copy link
Contributor Author

@Karl-WangSK We need some more thinking to resolve the pending issues I think.

Only the classes that I just added in this PR will take advantage of it.

I am thinking about how we can safely make this the only API the Hudi code calls. TOL, existing payload classes will have just precombine(payload) defined and not the new preCombine(payload, schema) API method. But, the default method will help existing payloads not break and work? (I think so. would be good to confirm)

But one problem is that all payload will parse the schema every record whether it needs or not.It will affect performance.

Now, we can think about how to perf issue. We can parse it once on the driver and then send it across. but issue is Schema is not serializable. But I think we can solve this, by wrapping Schema in a SerializableAvroSchema class (see how its done in SerializableConfiguration). This way we will send a Schema as a string to the executor and then reconstruct the object again on the executor.

Are you able to attempt this. (else I ll try. might take bit of time.please let me know)

I will try.

@vinothchandar
Copy link
Member

@Karl-WangSK any updates on this? Happy to help with any open ended issues here

@Karl-WangSK
Copy link
Contributor Author

@Karl-WangSK any updates on this? Happy to help with any open ended issues here

meet some trouble in SerializableSchema.
Maybe more convenient if upgrade avro to 1.9.x

@vinothchandar
Copy link
Member

@Karl-WangSK avro upgrade is a non-trivial task. We need to ensure parquet-avro etc work the same and all that.

@n3nash wdyt?

@nsivabalan
Copy link
Contributor

@Karl-WangSK : we have introduced new apis for all methods in HoodieRecordPayload with Properties arg to assist in special needs like this. Do you think we can leverage that instead of adding a schema arg to preCombine.

@nsivabalan
Copy link
Contributor

@vinothchandar : do you think we need to make this release blocker?

@nsivabalan
Copy link
Contributor

@Karl-WangSK @vinothchandar : since its been open for some time, wanted to see how we can get it to closure. let me summarize the state of the PR.

Requirement:
We wish to support new preCombine logic to preCombine two records within same incoming batch (if config is enabled).
Logic: any field w/ null value in record w/ latest ordering value, will pick value from the other row.
Few clarifying questions:

  • Is this just for PreCombine or even for combineAndGetUpdateValue as well?
  • If there are more than 2 records, we apply the same logic right?

So, initial proposal was to change the signature of preCombine to take in schema. but feedback was given to try out serializable version so that we don't need to change the api and incur perf impact.

@Karl-WangSK : feel free to clarify my questions and give us the latest update on this PR.

@nsivabalan
Copy link
Contributor

fyi: we have another PR open to support similar feature for combineAndUpdate

@Karl-WangSK
Copy link
Contributor Author

@Karl-WangSK @vinothchandar : since its been open for some time, wanted to see how we can get it to closure. let me summarize the state of the PR.

Requirement:
We wish to support new preCombine logic to preCombine two records within same incoming batch (if config is enabled).
Logic: any field w/ null value in record w/ latest ordering value, will pick value from the other row.
Few clarifying questions:

  • Is this just for PreCombine or even for combineAndGetUpdateValue as well?
  • If there are more than 2 records, we apply the same logic right?

So, initial proposal was to change the signature of preCombine to take in schema. but feedback was given to try out serializable version so that we don't need to change the api and incur perf impact.

@Karl-WangSK : feel free to clarify my questions and give us the latest update on this PR.

  • Is this just for PreCombine or even for combineAndGetUpdateValue as well?
    both of them.
  • If there are more than 2 records, we apply the same logic right?
    yes
    now problem is parsing the schema every record will affect performance

@nsivabalan
Copy link
Contributor

@Karl-WangSK : we have another PR being reviewed right now for partial updates support. #2666
Please check it out. We don't need to parse schema for every record. You can get some inspiration from the linked PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:high Significant impact; potential bugs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants