-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-115] Enhance OverwriteWithLatestAvroPayload to also respect ordering value of record in storage #1704
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you describe any more use-cases apart from the ordering field for the Map<> introduced ? Why can't the user simply perform this during the old combineAndGetUpdateValue by comparing the incoming ordering field val vs the old one (in the GenericRecord), what am I missing ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Map was a more generic structure to send properties. For now, its only the ordering field that is required here. For your second question, not sure if I am understanding it right. Are you suggesting, we send in the ordering field val itself instead of the field name ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me explain. The current interface combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) doesn't allow the user to pass the ordering field names. You're trying to introduce this and have a Map<> to allow the user to do this. (The description link in the JIRA ticket doesn't work so I needed more context).
Now, since the user can have their their own implementation of combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema), they can have the ordering field name in their own class and maintain it themselves, thus not requiring the need to explicitly pass these names through a method.
I see that the intention of this PR to support this functionality specifically for the OverwriteWithLatestAvroPayload. For preCombine, will we just continue to support natural ordering which may not be the same as for combineAndGetUpdateValue, correct ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the clarification @n3nash . I am not able to open the link either. It could be because the initial reporter had different jira ids or something else. But the comments section has good context on what this ticket is about. Also, this has come across multiple times in Slack channels from different users. Based on that I can summarize as follows.
Users are looking to consider records in disk when using OverwriteWithLatestAvro payload class. In preCombine step - we pick the latest value for every key from a batch of input data based on some ordering field. At this step we dont know what is in storage yet. When we are ready to write the records, we iterate the records in storage, for each record determine if there is a matching entry in the input batch. If so, we invoke combineAndGetUpdateValue and pass in the record in disk as the currentValue param. In this step specifically, OverwriteWithLatestAvro can possibly overwrite an already stored record with an older update since we werent comparing the ordering val of new data with data in disk. This whole PR is to provide that capability instead of asking Users to write their own implementation to do this. w.r.t preCombine and combineAndGetUpdateValue both will be using the same ordering field(s) to determine the latest.
In order to not disrupt other payload classes, I deprecated the existing combineAndGetUpdateValue, extended it and provided a default implementation that would ignore the Map field and internally do the old way. OverwriteWithLatestAvroPayload alone will override this default functionality to achieve the above purpose.
Hope that helps!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bhasudha thanks for explaining, although my metapoint is still not resolved. If you look at the log scanner -> https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java#L114, the preCombine method is used to combine records written to the log (over multiple batches). Ideally, according to your use-case, since during logging we don't know what is the order of updates, we simply write to disk and leave the resolution for a later time. In this scenario, since combineAndGetUpdateValue is not called, how will preCombine honor the orderingVal ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@n3nash preCombine is using the same orderingVal as would combineAndGetUpdateValue. And in the HoodieMergedLogRedordScanner class we are trying to resolve records that are logged to the system already and use orderingVal to identify the most recent one. I dont understand why would we need to use combineAndGetUpdateValue here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just my two cents:
The interface changes looks a bit of overkill for this type of new feature.
If we are still relying on avro schema, we may leverage schema.getProp(orderingFieldKey) to get ordering field for this record.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@advancedxy I guess we would nt know which is the orderingFiledKey when calling this method. That is why we would need to send in that info using another param, so we can get fetch that from the schema later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change expected ? Have the styling changed to have the static methods be imported instead of the class ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same static imports question style question, has this changed recently ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nothing changed. Added static import to avoid multiple reference in code.
|
@n3nash could you PTAL when you can ? |
|
@bhasudha The PR looks good to me. Looks like the same ordering field will be honored in all places. One high level question before I accept it -> If |
|
@bhasudha let me know if you need any clarification |
@n3nash Thanks! Yes user needs to ensure that the Map uses the same orderingVal as the one configured for precombine. We are kind of abstracting this logic for users by passing in the precombine field ourselves in the HoodieMergeHandle where this method is invoked. In DeltaStreamer this constructor invocation is happening already here - hudi/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java Line 341 in 0cb24e4
|
|
@xushiyan This PR fails CI due to log limits over threshold - https://travis-ci.org/github/apache/hudi/builds/711685577. I see this error message consistently - |
|
Hi @bhasudha this means there are too many logs got printed out during the test. In ideal scenario, a test does not output anything unless there is a failure. Could you check if any log or print statement can be removed? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
created a Jira issue to track this separately - https://issues.apache.org/jira/browse/HUDI-1127
7f22cc1 to
002da8c
Compare
|
@bhasudha : I haven't reviewed the PR, but a high level question. do we have any guards if user changes the ordering field prop along the way ? Basically I expect we fail any operation if ordering field prop is changed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if schema got evolved and the ordering field for incoming record does not exist in existing storage, this might throw NPE.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bhasudha : I haven't reviewed the PR, but a high level question. do we have any guards if user changes the ordering field prop along the way ? Basically I expect we fail any operation if ordering field prop is changed.
This will be handled the same way our ingestion works when ordering field changes. in the merge phase, the new ordering field will be used to compare the incoming and persisted record.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if schema got evolved and the ordering field for incoming record does not exist in existing storage, this might throw NPE.
Thanks @nsivabalan nice catch. I think if the incoming records schema evolved to add new columns and they are also used for ordering, the persisted record will not have that ordering field. And that will throw an NPE. I ll handle that here.
nsivabalan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leaving one comment.
… in disk when combining - Deprecate the existing combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) method - Add new API combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Map<String, String> props) in HoodieRecordPayload interface - The new API would be a default method in the HoodieRecordPayload interface that would internally call the deprecated method in order to not break existing implementations of HoodieRecordPayload interface. - OverwriteWithLatestAvroPayload overwrites the default new API to incude logic to consider records in disk when merging - HoodieWriteConfig introduces "hoodie.payload.ordering.field" to be passed in as props to the new combineAndGetUpdateValue() that lets to extract ordering field from record in disk. - Add corresponding changes to support spark datasource writer - Add unit tests for testing datasource
|
@bhasudha let me know once you've addressed @nsivabalan comment and the build is passing, can merge this. |
|
I am working on https://issues.apache.org/jira/browse/HUDI-1058 , it also needs similar modifications. It is better to work on HUDI-1058 after this merge request is merged, but if this MR has not progressed, I can resolve HUDI-1058 first. |
|
Looks like this is almost ready . I will rebase, review and try to land this first. thanks for flagging @shenh062326 ! |
|
@bhasudha as discussed, should we try and have another payload for this behavior. to avoid overwriting for backfill scenarios |
yeah makes sense. I can put up a PR with another payload @vinothchandar |
|
closing this for now. |
|
@bhasudha What is the new PR ( as you mentioned "I can put up a PR with another payload @vinothchandar"). Or this feature is implemented already? Which PR is used to track late arrived latest records? |
|
@FelixKJose I don't think there is one up yet. We can get to this in the next few weeks, if you are in need of this. |
|
@vinothchandar Thank you for the reply. Yes, this will be very important feature for us, since we are relying on async messaging as source, so out of order message is a real scenario and this feature will handle those out of order message scenario. |
I think this is also a big issue for my cases here. @bhasudha or @vinothchandar is it possible the new payload will be ready in the next week? I can contribute the new implementation if you are lacking of time. |
@advancedxy That would be very useful. I can help you if you have any questions. Please feel free to send a PR with the new payload. |
|
@vinothchandar : may I know whats the consensus here. Let me take a stab, but do confirm the same. Will this extend from existing HoodieRecordPayload? bcoz, this the interface used everywhere right? Or did you intend a class as follows |
| public static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks"; | ||
| public static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7; | ||
|
|
||
| private static final String PAYLOAD_ORDERING_FIELD_PROP = "hoodie.payload.ordering.field"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vinothchandar @bhasudha : can you throw some light on why new config rather than reusing preCombine field value? I am working on putting up a new PR, so would like to know was this intentional or unintentional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's because you don't know what the ordering field value is, when reading the data from storage right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, so for preCombine (combining two records within the same ingestion batch) we use one config and for determining ordering among one incoming record and another in storage or in other words, to combine an incoming record w/ one in storage, we use a diff config?
I see both these as one and the same. just that whether we resolve records within same batch or across batches is what differs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, they are one and the same. If we can avoid the additional config, we should
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gotcha. here is my take.
Will introduce a config called "honorOrderingToCombineRecordsAcrossBatches" (we can work out a good naming). default value is false. Will send in orderingFieldKey (same as preCombineFieldKey) as an arg to OverwriteWithLatestAvroPayload.
public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal,
String orderingFieldKey, boolean honorOrderingToCombineRecordsAcrossBatches) {
.
.
Based on "honorOrderingToCombineRecordsAcrossBatches" value, combineAndGetUpdateValue() impl will decide to go with existing impl or new one (as we see in this patch)
I know we do have a gap here where in, if ordering field has changed over time, we can't do much here. But we can't afford to store the ordering field as a separate column in dataset either, as diff commits could theoretically have diff ordering field. But guess we can call it out that we may not support such evolution of ordering field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer we don't do this. but separate the existing impl and new one into separate payload classes.
don't understand actually how we plan to implement obtaining the ordering field from the record in dfs. could you please clarify? @nsivabalan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to how this patch already does. basically we lookup the preCombine field in existing generic record.
Object persistedOrderingVal = getNestedFieldVal((GenericRecord) currentValue, props.get(ORDERING_FIELD_OPT_KEY), true);
Reason I proposed to tweak existing class is.
Even if we want to go w/ 2 classes, Ideally I would want to name existing class as OverwriteWithIncomingPayload and introduce a new class called OverwriteWithLatestAvroPayload, bcoz, existing one overwrites w/ incoming and we are introduce a new class which will actually overwrite w/ latest payload.
But since users configure via class names, we can't go w/ this approach and hence my proposal to introduce a config and change existing class.
Tips
What is the purpose of the pull request
This enhances the default payload class to respect records in disk when combining. If the records in disk is more recent they are rewritten with current commit time.
(For example: This pull request adds quick-start document.)
Brief change log
(for example:)
Verify this pull request
(Please pick either of the following options)
(or)
(or)
(example:)
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.