-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-2815] add partial overwrite payload to support partial overwrit… #4724
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
# Conflicts: # hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
|
@hudi-bot run azure |
1 similar comment
|
@hudi-bot run azure |
danny0405
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution ~ I kind of think we better do this after id-based schema evolution is supported, only after that, we have more light-wright solution to support per-record schema.
Generally, take schema with record seems not a good solution.
| boolean choosePrev = data1.equals(reducedData); | ||
| boolean choosePrev = data2.compareTo(data1) < 0; | ||
| HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey(); | ||
| HoodieOperation operation = choosePrev ? rec1.getOperation() : rec2.getOperation(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we must need a compareTo here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous logic of data2.preCombine(data1) is that return one of data1 or data2 ordering by their orderVal. But if we merge/combine data1 and data2 into a new payload(reduceData), the data1.equals(reduceData) is always false. In order to get the HoodieKey and HoodieOperation for new HoodieRecord with reduceData, we need to get the latest HoodieKey and HoodieOperation from data1 and data2, compareTo is used for replace #preCombine to compare their orderingVal.
@Override
public int compareTo(OverwriteWithLatestAvroPayload oldValue) {
return orderingVal.compareTo(oldValue.orderingVal);
}
@Test
public void testCompareFunction() {
GenericRecord record = new GenericData.Record(schema);
record.put("id", "1");
record.put("partition", "partition1");
record.put("ts", 0L);
record.put("_hoodie_is_deleted", false);
record.put("city", "NY0");
record.put("child", Arrays.asList("A"));
PartialOverwriteWithLatestAvroPayload payload1 = new PartialOverwriteWithLatestAvroPayload(record, 1);
PartialOverwriteWithLatestAvroPayload payload2 = new PartialOverwriteWithLatestAvroPayload(record, 2);
assertEquals(payload1.compareTo(payload2), -1);
assertEquals(payload2.compareTo(payload1), 1);
assertEquals(payload1.compareTo(payload1), 0);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, rec1 and rec2 should have same HoodieKey here, right, but the HodieOperation might different.
| return new PartialOverwriteWithLatestAvroPayload(currentRecord, chooseCurrent ? this.orderingVal : oldValue.orderingVal, this.schema); | ||
| } else { | ||
| return isDeleteRecord(insertRecord) ? this : oldValue; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be caution of the DELETEs, should we still merge for DELETE messages ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, if one of record is DELETE record, just return themselves directly, no need to merge, the DELETE message to delete old record during hudi write. Only when two records are not DELETE records, we need to merge them.
| @Override | ||
| public void initializeState(FunctionInitializationContext context) { | ||
| ValueStateDescriptor<HoodieRecordGlobalLocation> indexStateDesc = | ||
| ValueStateDescriptor<HoodieRecord> indexStateDesc = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would increase the state size significantly. We better avoid this with better solution, and why we must sore full record instead of index here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @danny0405 for reviewing this.
Yeah, as mentioned in this issue #4030.
The reason why we need full record here is want to handle the case that the record partition path is changed.
In the original logic, once the record partition path changed, will sink a delete record to old partition file to delete old record, and then sink the incoming record with new partition to new partition file, the final record will only contains the info from incoming record and miss the info from old record. (note the OverwriteNonDefaultsWithLatestAvroPayload also have this issue)
So we need to retrieve the old/existing record from base file, and then merge/combine with incoming record, as currently we don't support lookup record from base file, so we have to store the old/existing in somewhere. e.g. fink state. BucketAssignFunction is the only place we can store the old/existing record and change its location from old partition file to new partition file.
So the new logic is that:
- store the old record(from source or base file by enable Bootstrap) in Flink state
- once a new record coming with same record key but the partition changed
- sink a delete record to old partition file to delete file
- retrieve & copy old record from state and change its location with new partition, and sink to new partition file
- the copied record and incoming record will be merged by
#preCombine
the drawback here is that it will increase the state size, but if we don't use the state to store full record, it seems that we don't have approach to merge incoming record with existing record in base file while partition change.
I also consider this problem, what I'm thinking to avoid impact the current logic(overwrite with latest payload) is that create a updateState abstract method and treat indexState as a abstract filed, different sub class will implement the logic with ValueState<HoodieRecordGlobalLocation> or ValueState<HoodieRecord>, or a StateHelper to handle state operation.
Yeah, agree with taking schema with record is not a good solution, it couple the byte data with schema and hard to evolute schema later, but it seems that we cannot merge two |
|
CC @xushiyan @alexeykudinkin : who might be working on refactoring payload interfaces. and @xiarixiaoyao who might be working on schema evolution story. can you folks take a look at the patch. |
Thanks @nsivabalan to follow up. Take a summary, the current implementation has two drawbacks:
And there are two challenges need to be solved for supporting partial update.
Any thoughts on these two challenges? |
nsivabalan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some high level comments. Will let Danny follow up on reviews since this fixes flink code base.
| /** | ||
| * the schema of generic record | ||
| */ | ||
| public final String schema; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this might be confusing w/ schema arg with combineAndGetUpdateValue. can you fix either of the names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but in general, storing schema along w/ payload might have an impact on the performance. and thats why initial payload was designed that way. So, do add a line here for payload implementations setting this schema field might have to watch out for performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed this field.
| } | ||
|
|
||
| GenericRecord currentRecord = (GenericRecord) currentValue; | ||
| List<Schema.Field> fields = schema.getFields(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
guess, this has to be "this.schema.getFields". as I commented earlier, its confusing :) . can we fix the naming of either of them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed the schema name.
| GenericRecord currentRecord = (GenericRecord) currentValue; | ||
| List<Schema.Field> fields = schema.getFields(); | ||
| fields.forEach(field -> { | ||
| Object value = incomingRecord.get(field.name()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to deal w/ nested fields here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
current logic will overwrite whole nested field, if the incoming field is not null.
And I think we don't need to support the partial update inner nested field, for example, for Map, List, etc. we should not merge map(1 -> 'a', 2 -> 'b') & map(1 -> ''', 3 -> 'c') to map(1 -> '', 2 -> 'b', 3 -> 'c') incase the upstream want to delete the key '2', if we merge them together, they cannot delete some elements. the List as well.
| } | ||
|
|
||
| @Override | ||
| public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of storing the schema with payload, did you think about adding a new preCombine method as follows
OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue, Schema schema);
this would make it a lot simpler right. Since preCombine is used only to dedup records within a single batch, both records should have same schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @nsivabalan , thanks a lot for review this.
Regarding adding new preCombine method with Schema, I considered this, but it means that the method caller who need to get the schema info at first, and currently, it seems that we only can get the schema info from Configuration(from hoodie.avro.schema field). Sometimes, the caller might hard to get the schema info, Especially for FlinkWriteHeler.deduplicateRecords(List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism).
But compare the performance, it seems that passing the schema in method might be a better approach.
BTW, since we already have had the method preCombine(T oldValue, Properties properties), how about put the schema string in properties, and then parse the schema string to Schema later, so that we don't need to create a new method any more. otherwise, I cannot image when will we will Properties.
| try { | ||
| Schema schema = new Schema.Parser().parse(this.schema); | ||
| Option<IndexedRecord> incomingOption = getInsertValue(new Schema.Parser().parse(this.schema)); | ||
| Option<IndexedRecord> insertRecordOption = oldValue.getInsertValue(new Schema.Parser().parse(oldValue.schema)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
insertRecordOption -> oldRecordOption
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
solved.
| } | ||
|
|
||
| try { | ||
| Schema schema = new Schema.Parser().parse(this.schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
argh. this again clashes w/ instance variable "schema". Can we fix the naming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
solved
# Conflicts: # hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
…payload. 2. revert change for storing full HoodieRecord in the flink state to support the case the partition path changed
|
Hi @danny0405 , regarding the two changes:
for first one, I remove the schema filed from payload class, instead, by passing the schema to BTW, do you think is it worthy to implement a new bucket assign function(can be controlled by feature toggle/configuration ) which store the full record to support partial update totally ? |
| * @return the combined value | ||
| */ | ||
| @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) | ||
| default T preCombine(T oldValue, Properties properties, Schema schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think currently established semantic for preCombine -- you select either A or B, but you don't produce new record based on those 2, since it's mostly used to de-dupe records in the incoming batch. I can hardly imagine the case to fuse 2 incoming records into something third. Can you help me understand what use-case you have in mind here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @alexeykudinkin for reviewing this.
What we are trying to do is implement partial update purpose. For example, let's assume the record schema is (f0 int , f1 int, f2 int), The first record value is: (1, 2, 3), the second record value is: (4, 5, null) with the field f2 value as null. We hope that the result after run preCombine is (4, 5, 3), which means we need to combine/merge two records to a third one, not only choose one of them.
Actually, what we want to implement is similar with #combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) which used for combine the incoming record with existing record from base/log file.
But #preCombine will be used for combing/merging two incoming records in a batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, that's exactly my question: why do you want to implement such semantic w/in preCombine? What use-case you're trying to accommodate for here?
Essentially with this change you will introduce a way for 2 records w/in the batch to be combined into 1. But why do you need this?
After all you can achieve the same goal if you just stop de-duping your records, and then subsequently merge them against what is on disk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @alexeykudinkin, I got your point. if we have to combine two records to a combined one, we'd better to implement the combine logics in other place, maybe in some util or helper classes, or skip the de-duping logic, right?
Here are some options from mine that #preCombine might be a better place to implement these logics, or create new merge method in HoodieRecordPayload interface.
- First, from the description of
preCombinemethod, it used for combining multiple records with same HoodieKey before attempting to insert/upsert to disk. The "combine multiple records" might not mean only choosing one of them, we also can combine & merged them to a new one, just depends on how the sub-class implement the preCombine logic(Please correct me if my understanding is wrong :) ). Yeah, it might be a little bit confused that we needSchemaif we are trying to merged them. - Second, I checked when will we call
preCombinemethod is trying to duplicate records with same HoodieKey before insert/update to disk, especially in Flink write case, even through the duplicated logic is choose the latest record, but we need to ensure that one HoodieKey should only contains one record before comparing to existing record and write to disk, otherwise, some records will missed. For example, inHoodieMergeHandle.init(fieId, newRecordsIter), it will convert the record iterator to a map and treat the recordKey as key. So we might not stop de-duping logics and merge them against what is on disk unless we change the logic here. And also we implement another class/method to handle the merge logic, and switch the existing de-duping logic from callingpreCombineto new class/method, we have to add an condition to control whether should we callpreCombineor not, I think it might not a good way. Instead, we should handle it inpreCombinemethod by different implemented payload.
That's what my thought here, and I'm glad to listen your useful suggestions. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me try to clarify a few things:
preCombine has a very specific semantic: it's de-duplicating by the way of picking "most recent" among records in the batch. Expectation always is that it being handed 2 records it will have to return either of them. It could not produce new record. If we want to revisit this semantic this is a far larger change that will surely require writing an RFC and broader discussion regarding the merits of such migration. Please also keep in mind that as of RFC-46 there's an effort underway to abstract whole "record combination/merging" semantic out of RecordPayload hierarchy into standalone Combination/Merge Engine API.
First, from the description of preCombine method, it used for combining multiple records with same HoodieKey before attempting to insert/upsert to disk. The "combine multiple records" might not mean only choosing one of them, we also can combine & merged them to a new one, just depends on how the sub-class implement the preCombine logic(Please correct me if my understanding is wrong :) ). Yeah, it might be a little bit confused that we need Schema if we are trying to merged them.
Please see my comment regarding preCombine semantic above. I certainly agree with you that the name is confusing, but i've tried to clear that confusion. Let me know if you have more questions about it.
Second, I checked when will we call preCombine method is trying to duplicate records with same HoodieKey before insert/update to disk, especially in Flink write case, even through the duplicated logic is choose the latest record, but we need to ensure that one HoodieKey should only contains one record before comparing to existing record and write to disk, otherwise, some records will missed. For example, in HoodieMergeHandle.init(fieId, newRecordsIter), it will convert the record iterator to a map and treat the recordKey as key. So we might not stop de-duping logics and merge them against what is on disk unless we change the logic here. And also we implement another class/method to handle the merge logic, and switch the existing de-duping logic from calling preCombine to new class/method, we have to add an condition to control whether should we call preCombine or not, I think it might not a good way. Instead, we should handle it in preCombine method by different implemented payload.
You're bringing up a good points, let's dive into them one by one: so currently we have 2 mechanisms
preCombinethat allows to select "most recent" record among those having the same key w/in the batchcombineAndGetUpdateValuethat allows to combine previous or "historical" record (on Disk) with the new incoming one (all partial merging semantic is currently implemented in this method)
You rightfully mention some of the invariants are currently that the batch would be de-duped at certain level (b/c we have to maintain PK uniqueness on disk), and so we might need to shift that to accommodate for case that you have. And that's exactly what my question was: if you can elaborate on use-case that you have at hand that you're trying to solve w/ this PR, i would be able to better understand where you're coming from and what's the best path forward for us here.
Questions i'm looking an answers for are basically following:
- What's nature of your use-case? (domain, record types, frequency, size, etc)
- Where requirements for partial updates are coming from?
and etc. I'm happy to set some 30min to talk in person regarding this or connect on Slack and discuss it there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @alexeykudinkin , Thanks a lot for you detail clarification.
- Regarding the design of
preCombine, I'm clear now. I'm sorry I don't know the detail of RFC-46, and also I didn't find the link RFC-46 from here, cloud you please share the link? - and regarding the requirements for partial updates/overwrite, I saw some same requirements from community. In my case, generally, we want to build a customer profile with multiple attributes, these attributes might come from different systems, one system might only provides some attributes in a event/record, and two systems might the events/records with different attributes, we should not only choose the recent one, we need to merged them before writing to disk. Otherwise, we have to keep all change logs, and then start a new job to dedup & merge these attributes among the change logs. For example, we have 10 attributes a1-a10(all of them are optional), source system A only has the a1-a5, source system B only has a6-a10, what result we expect is that the final record contains a1-a10, not only a1-a5 or a6-a10. And because we might receive two events/records in same time, they might be in a same batch, that's why we want to merge them before
combineAndGetUpdateValue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, thanks a lot for you time, will ping you on slack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CC @rmahindra123 who encountered a necessity to do preCombine but to combine bits and pieces from both records to return a new one. Rajesh: do you wanna go over your use-case may be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexeykudinkin , I'm sorry that I still don't find a suitable time to align with online, may i check any thoughts or suggests on this PR?
| Option<IndexedRecord> incomingOption = getInsertValue(schema); | ||
| Option<IndexedRecord> oldRecordOption = oldValue.getInsertValue(schema); | ||
|
|
||
| if (incomingOption.isPresent() && oldRecordOption.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general it's better to express common functionality in a way that would allow it to be re-used and adopted in other places: here for ex, we can reuse the same routine of combining 2 records into one, across 2 methods if we properly abstract it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
abstracted the merge method, but still in current class.
| .withDescription("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.\n" | ||
| + "This will render any value set for the option in-effective"); | ||
|
|
||
| public static final ConfigOption<Boolean> PARTIAL_OVERWRITE_ENABLED = ConfigOptions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the idea for this additional configuration (beside the record payload class)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the feature toggle to control another change about BucketAssignFunction to support the case the record partition path is changed, but I have removed it. so this feature toggle can be removed as well.
|
Do I need to modify the preCombine in the HoodieMergedLogRecordScanner.processNextRecord method? What I understand is that when we read a log file, we need to do deduplication and also call preCombine. |
| if (incomingOption.isPresent() && oldRecordOption.isPresent()) { | ||
| GenericRecord incomingRecord = (GenericRecord) incomingOption.get(); | ||
| GenericRecord oldRecord = (GenericRecord) oldRecordOption.get(); | ||
| boolean chooseIncomingRecord = this.orderingVal.compareTo(oldValue.orderingVal) > 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This place needs to be changed to >= , because when we do not set the preCombine field, the first data will always be used instead of the latest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point here, thx.
As I know, if we also want to achieve partial update purpose, we just need to pass the |
|
Just FYI for all interested folks. |
|
yeah, @LinMingQiang has mentioned this one above. From my understanding, if we want to enable "partial update" feature by defining customized payload class, it should running "partial update" in these three cases:
So I also update the The current situation is that will we treat
|
# Conflicts: # hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
…e#5117) * Remove glob pattern basePath from the deltastreamer tests. * [HUDI-3689] Fix file scheme config for CI failure in TestHoodieRealTimeRecordReader Co-authored-by: Raymond Xu <[email protected]>
* Make sure nulls are properly handled in `HoodieColumnRangeMetadata`
Co-authored-by: Sagar Sumit <[email protected]>
…in Data Skipping flow (apache#4996)
…ECOMBINE_FIELD_TYPE_PROP (apache#5096)
# Conflicts: # .github/workflows/bot.yml # azure-pipelines.yml # hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java # hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java # hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java # hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java # hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkAdapterSupport.scala # hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala # hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java # hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java # hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java # hudi-examples/hudi-examples-flink/src/main/resources/source-file.json # hudi-examples/hudi-examples-flink/src/test/resources/log4j-surefire-quiet.properties # hudi-examples/hudi-examples-flink/src/test/resources/log4j-surefire.properties # hudi-flink/src/test/resources/log4j-surefire-quiet.properties # hudi-flink/src/test/resources/log4j-surefire.properties # hudi-flink/src/test/resources/test_source.data # hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala # hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala # hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala # hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala # hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala # hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala # hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala # hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala # hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java `
# Conflicts: # .github/workflows/bot.yml # hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java # hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java # hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java # hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala # hudi-examples/hudi-examples-common/pom.xml # hudi-examples/hudi-examples-common/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java # hudi-examples/hudi-examples-flink/pom.xml # hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieFlinkQuickstart.java # hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java # hudi-examples/hudi-examples-flink/src/test/resources/log4j-surefire-quiet.properties # hudi-examples/hudi-examples-java/pom.xml # hudi-examples/hudi-examples-spark/pom.xml # hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java # hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java # hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala # hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala # hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala # hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala # hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystExpressionUtils.scala # hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala # hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala # hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark3_1CatalystExpressionUtils.scala # hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala # hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/HoodieSpark3_2CatalystExpressionUtils.scala # hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala # pom.xml
alvarolemos
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just added a comment, suggestion a generalization of the proposed approach :)
| * @param secondRecord the new record provide new field value | ||
| * @return merged records | ||
| */ | ||
| private GenericRecord overwriteWithNonNullValue(Schema schema, GenericRecord firstRecord, GenericRecord secondRecord) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really liked the idea of having a record payload that does partial merging. However, if I understood it correctly, what's proposed here is to do so in a very specific way: you're favoring the income record field's values, unless they are null (in which case, you would keep the existing one). I'm not saying this is not valuable, but that the idea of doing partial merging is so good that maybe we could have something more generic. I'm going to suggest a few changes in order to accomplish that:
- Make
PartialOverwriteWithLatestAvroPayloadan abstract class - Instead of having
mergeFuncas a parameter of themergeRecordmethod, it could become an abstract method. This would lead to the removal of theoverwriteWithNonNullValuemethod, which makes this implementation specific to your merging logic - For the original use case (partial merge favoring non-null values), implement the proposed abstract class and implement the
mergeFuncmethod with what you have inoverwriteWithNonNullValue:(first, second) -> Objects.isNull(second) ? first : second
It's just an idea, that could make what you proposed useful for many more use cases. Hope this made sense, and thanks for bringing this idea!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @alvarolemos , thanks a lot for your useful suggestion. Yeah, I also considered to abstract the merge logic by using an abstract merge method or passing merge function into a generic function, and I choose the later. The reason as follow:
- the
preCombineandcombineAndGetUpdateUpdatemight have different merge/combine logic, only implement one abstractmergefunction might not enough for both two cases. For example, these two methods inOverwriteWithLatestAvroPayloadhave different merge/combine logic. - In current implementation, actually, the
mergeRecordis a generic method even through it's a private method currently, but it don't care the detail merge logic and can be changed to protected/public scope if need. Instead, theoverwriteWithNonNullValueis merge implementation in current "Payload", which is wrapper ofmergeFuncand we can create two wrappers forpreCombineandcombineAndGetUpdateValuetwo scenarios if need, which is similar with what you mentioned about implement detailmergeFunclogic in sub class. We can still inherit this class implement detailmergeFunclogic, and pass tomergeRecordmethod. - Another reason why i didn't chose creating abstract class currently is that there will only one sub class, we can refactor it if we have many case need to inherit this class, right now, just make it simple as much as possible.
|
@hudi-bot run azure |
|
@stayrascal : We landed a partial payload support via #4676. |
…e case
Tips
What is the purpose of the pull request
Add a customized payload
PartialOverwriteWithLatestAvroPayloadto support partial overwrite records fields.For example, two incoming record batch A(1, null, 1, 1) and B(2, 2, null, 2) will be combined as the result (2, 2, 1, 2)
(For example: This pull request adds quick-start document.)
Brief change log
PartialOverwriteWithLatestAvroPayloadto support partial overwrite recordspartial.overwrite.enabledto control if enable partial overwriteBucketAssignFunctionfromHoodieRecordGlobalLocationtoHoodieRecordHoodieRecordbefore storing the record in ValueSatecompareToto compare two HoodieRecords to chose their sequence, because cannot get the sequence from#preCombinemethod if merge two records in#preCombinemethodschemafiled(String) in new AvroPayload, which will be used to#preCombinetwo recordsBootstrapOperatorto loadHoodieRecordwith data instead of only HoddieKeyRowDataToHoodieFunctionto create payload with schemaVerify this pull request
preCombineandcombineAndGetUpdateValuemethodsCommitter checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.