-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-6784] Support deletion logic in merger #9593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-6784] Support deletion logic in merger #9593
Conversation
beyond1920
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for contribution. I left some comments.
There are some places missed to call new merge API.
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
Show resolved
Hide resolved
| // NOTE: This have to stay lazy to make sure it's initialized only at the point where it's | ||
| // going to be used, since we modify `logRecords` before that and therefore can't do it any earlier | ||
| private lazy val logRecordsIterator: Iterator[Option[HoodieRecord[_]]] = | ||
| private lazy val logRecordsIterator: Iterator[scala.Option[HoodieRecord[_]]] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why introduce scala prefix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because Hudi uses its own Option keyword; and scala has its own. There are two ways to differentiate them: one to add scala prefix to scala Option; one to using alias for Hudi Option, like HOption. I will probably do the later one and will update.
| protected def removeLogRecord(key: String): scala.Option[HoodieRecord[_]] = logRecords.remove(key) | ||
|
|
||
| protected def doHasNext: Boolean = hasNextInternal | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also call merger API here to check whether the record need to be dropped before load it as result of query.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @beyond1920, not quiet understand why we need to use merge here since the function tends to remove the record. Can you explain a bit?
| serializer.serialize(curRowRecord).asInstanceOf[GenericRecord] | ||
|
|
||
| private def merge(curRow: InternalRow, newRecord: HoodieRecord[_]): Option[InternalRow] = { | ||
| private def merge(curRow: InternalRow, newRecord: HoodieRecord[_]): scala.Option[InternalRow] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't forget to call this merge method even if updatedRecordOpt.isEmpty is true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@beyond1920, I modified the logic in the function hasNextInternal. Please see if it works correctly.
f906f3b to
d1fa8c6
Compare
|
@beyond1920 Thanks for the comments. I will update soon. |
d1fa8c6 to
6d4fb04
Compare
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
Outdated
Show resolved
Hide resolved
| public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException { | ||
| ValidationUtils.checkArgument(older.getRecordType() == HoodieRecordType.SPARK); | ||
| ValidationUtils.checkArgument(newer.getRecordType() == HoodieRecordType.SPARK); | ||
| public Option<Pair<HoodieRecord, Schema>> merge( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a follow-up, could you implement the logic of DefaultHoodieRecordPayload and a custom merging strategy mentioned in #9430 in new merger strategies and make sure the new merge API covers all the functionality we support with HoodieRecordPayload?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Will do.
| } else { | ||
| val mergedRecordOpt = merge(curRow, updatedRecordOpt.get) | ||
| if (mergedRecordOpt.isEmpty) { | ||
| val mergedRecordOpt = merge(curRow, updatedRecordOpt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you still need to check updatedRecordOpt.isEmpty and if there is no update for the record key, return the record from the base file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original code did that. So I will remove my change here.
b8c3581 to
1fa522e
Compare
2c341b9 to
6f810f4
Compare
|
@linliu-code could you check the test failure in GH actions? |
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
Outdated
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
Outdated
Show resolved
Hide resolved
| public void write(HoodieRecord<T> oldRecord) { | ||
| Schema oldSchema = config.populateMetaFields() ? writeSchemaWithMetaFields : writeSchema; | ||
| Schema newSchema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields : writeSchema; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's avoid unnecessary cosmetic changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I will start another PR for these changes.
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
Outdated
Show resolved
Hide resolved
|
|
||
| for (HoodieAvroRecord record : records) { | ||
| Option<Pair<HoodieRecord, Schema>> r = MERGER.merge( | ||
| Option.empty(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also add test cases where the invalid record is represented as HoodieEmptyRecord or HoodieRecord.SENTINEL used by HoodieAvroRecord?
| import static org.apache.hudi.model.TestUtil.SCHEMA; | ||
| import static org.apache.hudi.model.TestUtil.generateData; | ||
|
|
||
| public class TestHoodieAvroRecordMerger { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also add an functional/integration test which uses the Avro/Spark record merger to implement the same payload/merging logic as @beyond1920 mentioned and make sure that the upsert operation generates the same set of records for inserts, updates, and deletes (which is the criteria of approval on this PR), covering all new functionality of the new merge API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add that in a separate PR.
| Option<HoodieRecord> newerIgnored, | ||
| Schema newSchemaIgnored, | ||
| TypedProperties propsIgnored) throws IOException { | ||
| return Option.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the delete case, should the HoodieEmptyRecord be returned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either one should work, but checking option.isPresent() should be more efficient than instanceOf. Wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It depends on the merging case. When merging the records from log records with the ordering field, we have to use HoodieEmptyRecord to include the record key and ordering field to indicate that this is a delete record and used for merging with deletes/updates from other log files.
I could not reproduce some issues locally; e.g., some failing tests cannot be found. I think there should be some tricks to use "-pl" option in |
6f810f4 to
05d9424
Compare
|
@linliu-code are you still resolving the test failures? Are they related to production code or just test issues? |
In GH actions, some tests run in a specific Spark version so you need to check the exact setup where the test failed. Or it may be flakiness. |
05d9424 to
bb4e1c9
Compare
Right, I have to check the exact mvn options to reproduce the issues. Generally I can reproduce so far; now I am still chasing these bugs. |
8b1fb2c to
d98bb35
Compare
|
@hudi-bot run azure |
danny0405
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Block on me for a final review, cc @beyond1920 to review if you have spare time.
|
@hudi-bot run azure |
|
@linliu-code there's conflict with master. Could you resolve it? |
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
Show resolved
Hide resolved
| return writeRecord(newRecord, Option.of(processedRecord.get().getLeft()), writerSchema, config.getPayloadConfig().getProps(), isDelete); | ||
| } | ||
|
|
||
| protected boolean is_delete_record(HoodieRecord record, Schema schema, TypedProperties props) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| protected boolean is_delete_record(HoodieRecord record, Schema schema, TypedProperties props) throws IOException { | |
| protected boolean isDeleteRecord(HoodieRecord record, Schema schema, TypedProperties props) throws IOException { |
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
Outdated
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
Outdated
Show resolved
Hide resolved
| Option<IndexedRecord> newData = newer.toIndexedRecord(schema, props).map(HoodieAvroIndexedRecord::getData); | ||
| return newData; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a bug before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think this covers HoodieRecordPayload#getInsertValue because previousAvroData is not present means no such record exists on storage. Ideally, we shouldn't even be inside this if-block because the older record is empty (not valid) and hence the code will return from line 60 of merge method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yihua, @codope, based on the old logic, when the old record is not valid, the merge result is empty, then the merge logic in MergeHandle will try to keep the old data, that is, the invalid old data will be written to the disk again. So I updated it as: when the old data is invalid, but the new one is valid, we should output the new data; the mergeHandle will check if the new data is a delete or not.
hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePreCombineAvroRecordMerger.java
Show resolved
Hide resolved
|
Pasting my deep-diving of the code based on release 0.12.2 (pre-landing of RFC-46 PR #7345 ) here. Here is how different APIs,
|
|
To be fully compatible with before, the places where To make sure the handling of inserts, updates, and deletes are correct, here are a few scenarios you should test end-to-end with spark datasource read:
The goal is to make sure all complex logic around deletes and updates is handled correctly now. Note that some of these scenarios may not be covered by unit/functional tests. |
|
Ok, working on the comments now. |
| return record.isDelete(schema, props) | ||
| || record instanceof HoodieEmptyRecord | ||
| || (record.getData() != null && record.getData() instanceof EmptyHoodieRecordPayload) | ||
| || HoodieOperation.isDelete(record.getOperation()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the gains we bind all these check together ? Empty record and delete record are meant to be dropped but are they handled in the same logic originally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on my understanding, Hudi uses both empty record and delete record interchangeably to mean delete. So I combine here. I will modify this PR based on our discussion so this logic should be gone. But we should find a way to standardize these logics; otherwise, the scattered logic is really confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert all the changes to HoodieMergeHandle, then add a new interface to the Merger API:
/**
* Checks the merged record valility before flushing into dist, if returns false, the given record would be ignored.
* In some scenarios, the bussiness logic needs to check the validity of the merged record, so this interface give
* a chance for the user to do a sanity check.
*
* <p> This interface is experimental and might got evolved in the future.
**/
@Experimental
default boolean isValid(HoodieRecord record, Schema schema) {
return true;
}This interface would be invoked before each merged record flushing. Only merged record needs this check currently.
danny0405
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kind of feel the isDelete check is now scattered every where which is error prone to maintain, and the complexity are all handled over to the merger API, if you take a look at the original HoodieAvroRecordMerger it looks much simpler, so can we keep that as it is now?
It just does not make sense we check for validity in each merge call, a separate interface like #insert makes more sense to me.
Remove the Option.of from the param or add a new interface.
| * of the single record, both orders of operations applications have to yield the same result) | ||
| */ | ||
| Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException; | ||
| Option<Pair<HoodieRecord, Schema>> merge(Option<HoodieRecord> older, Schema oldSchema, Option<HoodieRecord> newer, Schema newSchema, TypedProperties props) throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's update the javadoc to define the parameters as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will revert the change here, but will add the javadoc.
| /** | ||
| * Check if a DELETE operation is intended. | ||
| */ | ||
| private boolean isDelete(Option<HoodieRecord> record, Schema schema, TypedProperties props) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this method and the above to a suitable util class in hudi-common, or maybe define s static methods in HoodieRecordMerger interface? We can reuse across mergers and/or write handle as the implementation does not differ.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we add "insert" method in the interface, the logic of the merger will be as simple as before. So we don't need these checks at all.
| Option<IndexedRecord> newData = newer.toIndexedRecord(schema, props).map(HoodieAvroIndexedRecord::getData); | ||
| return newData; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think this covers HoodieRecordPayload#getInsertValue because previousAvroData is not present means no such record exists on storage. Ideally, we shouldn't even be inside this if-block because the older record is empty (not valid) and hence the code will return from line 60 of merge method.
| if (!isValidOld && !isValidNew) { // No meaningful information found. | ||
| return Option.empty(); | ||
| } else if (isValidOld && !isValidNew) { // Return old record for data safety. | ||
| return Option.of(Pair.of(older.get(), oldSchema)); | ||
| } else if (!isValidOld) { // Either insert or delete case, return the new record. | ||
| return Option.of(Pair.of(newer.get(), newSchema)); | ||
| } else { | ||
| if (isDeleteNew) { // delete case | ||
| return Option.of(Pair.of(newer.get(), newSchema)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is all the complication that @danny0405 mentioned. I think we should still check for validity before calling combineAndGetUpdateValue. However, we can keep the handling of indert/delete to a separate method that can be overridden. Default would be to return the new record.
IMO, one method is cleaner and easy-to-use. Also, I checked what Kafka and Flink do. They also provide one method and leave the handling of empty/null keys to the concrete implementation.
Kafka Merger - https://kafka.apache.org/35/javadoc/org/apache/kafka/streams/kstream/Merger.html
Flink InternalRowMerger - https://nightlies.apache.org/flink/flink-docs-release-1.17/api/java/org/apache/flink/table/examples/java/functions/InternalRowMergerFunction.html
@danny0405 What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you look at the APIs from Kafka and Flink, they either uses a T or specific RowData there, and I guess both of the old and new values are not nullable, but take a look of our API:
- we make it as option so the user needs to take care whether they are empty and they even needs to disinduish UPSERT and DELETES, which is totally unnecessary.
- We have a HoodieRecord there, but there is many specific logic for hadling deletes inside Hudi which should not explose to user totally.
- We have many ramifications for write paths for different representations of delete
HoodieRecord, like a HoodieRecord with D as operation, a HoodieAvroEmptyRecord, a IGNORE record, I don't think any normal user should see that complexity, even we make a default impls there, the user still needs to undertand it and have a good comprehension to use it correctly.
88a73cb to
f85abc4
Compare
| // Inject custom logic for the record. | ||
| Option<Pair<HoodieRecord, Schema>> processedRecord = config.getRecordMerger().insert(record, schema, config.getProps()); | ||
| if (!processedRecord.isPresent() | ||
| || HoodieOperation.isDelete(processedRecord.get().getLeft().getOperation()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can revert all the changes to create handle, the delete messages already got handled in line 137.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we allow the custom logic?
| return record.isDelete(schema, props) | ||
| || record instanceof HoodieEmptyRecord | ||
| || (record.getData() != null && record.getData() instanceof EmptyHoodieRecordPayload) | ||
| || HoodieOperation.isDelete(record.getOperation()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert all the changes to HoodieMergeHandle, then add a new interface to the Merger API:
/**
* Checks the merged record valility before flushing into dist, if returns false, the given record would be ignored.
* In some scenarios, the bussiness logic needs to check the validity of the merged record, so this interface give
* a chance for the user to do a sanity check.
*
* <p> This interface is experimental and might got evolved in the future.
**/
@Experimental
default boolean isValid(HoodieRecord record, Schema schema) {
return true;
}This interface would be invoked before each merged record flushing. Only merged record needs this check currently.
f85abc4 to
4f59993
Compare
The solution is (1) to add a new interface method called "insert" in the HoodieRecordMerger to support custom logic. (2) to apply merge function in create/merge handles to enable potential custom logic. (3) to add various unit tests to confirm the current merger api works.
4f59993 to
f9651ab
Compare
Change Logs
The goal of this PR is to support custom logic during data write through Merger API. Our current solution is to add Option wrapper for older and newer parameters in merge function. In such way, all of update, delete, combine logics are merged into one api. Meanwhile, we apply this merge function into HoodieMergeHandle to handle both update and insert write path.
TESTS:
Unit tests are added for existing merger implementations.
Impact
Users could implement merge api to support their own logic about deletion now. Previously deletion is not supported.
Risk level (write none, low medium or high below)
Low.
Contributor's checklist