Skip to content

[HUDI-9424] Extract and test merger logic for FileGroupReader#13242

Closed
the-other-tim-brown wants to merge 31 commits intoapache:masterfrom
the-other-tim-brown:merger-and-refactor
Closed

[HUDI-9424] Extract and test merger logic for FileGroupReader#13242
the-other-tim-brown wants to merge 31 commits intoapache:masterfrom
the-other-tim-brown:merger-and-refactor

Conversation

@the-other-tim-brown
Copy link
Contributor

Change Logs

  • Adds a class EngineBasedMerger that handles the merging logic that was duplicated in the FileGroupReader code. This class serves as an optimal way to perform the commit and event time ordering based merge without constructing HoodieRecords which reduces overhead.
  • Standardizes how deletes are translated into HoodieRecords
  • Reduces duplicate code in FileGroupRecordBuffer implementations to ensure consistent behavior

Impact

  • Makes the merging logic easier to test and consistent when merging two log records vs a log and base file record
  • Reduces maintenance overhead by deduping the code

Risk level (write none, low medium or high below)

Low, increases coverage and fixes some minor issues in the merge logic

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:XL PR with lines of changes > 1000 label Apr 30, 2025
Copy link
Contributor

@jonvex jonvex left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems pretty good, but need to look at this again at least once more

public class BufferedRecord<T> implements Serializable {
// the key of the record
private final String recordKey;
// the ordering value of the record to be used for even time based ordering
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*event

} else if (isSkipMerge) {
return new UnmergedFileGroupRecordBuffer<>(
readerContext, hoodieTableMetaClient, recordMergeMode, Option.empty(), Option.empty(), props, readStats);
readerContext, hoodieTableMetaClient, recordMergeMode, Option.empty(), Option.empty(), props, readStats, merger);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need to be passing in the merge mode here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The merge mode is used internally for determining whether the ordering field should be set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to @jonvex , why can't we just initialze the merger inside each specific record buffer?


private static Stream<Arguments> commitTimeOrdering() {
return Stream.of(
// Validate commit time does not impact the ordering
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is correct?:

// Validate event time does not impact the ordering

Copy link
Contributor

@jonvex jonvex left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

* @return the latest record as a delete record
*/
private BufferedRecord<T> getLatestAsDeleteRecord(BufferedRecord<T> newer, BufferedRecord<T> older) {
if (recordMerger.map(merger -> merger.getMergingStrategy().equals(HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID)).orElse(false)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the record merge mode does not work perfectly here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to support the retraction message you had brought up. It avoids setting the value to null so you can send a retraction for the content. I never got a concrete example from you so I just went with my best understanding of the topic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's tricky you emplify the msg payload first for delete before merger merging then try to recover it here, let's remove the special handling in constructHoodieRecord for deletes, just construct the record like others and merger would return the correct record based on the orderingVal, and the BufferedRecord can be recoverd correctly with right HoodieOperation and payload data(the row).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue here is that the mergers will return an empty option when there is a delete: https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java#L50

I think that this can cause some unexpected issues when using event time ordering since you will lose the ordering value that should be used when comparing to the next record.

For example, consider a delete at T2 and then it is followed by an insert with T1, that insert should be ignored but how will we keep track of that when the output drops this context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the code a bit and if either of the records are a delete we handle it first. This case where the merger outputs an empty option should not happen anymore but I am not sure of the safest way to handle this since a developer can provide any implementation of this merger logic.

* The class takes in {@link HoodieReaderContext<T>} for the engine specific operations such as fetching the value representing the event time when {@link RecordMergeMode#EVENT_TIME_ORDERING} is used.
* @param <T> The type of the engine's row
*/
public class EngineBasedMerger<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have RecordMerger abstraction, maybe we rename it as MergeEngine or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to changing the name. The idea around EngineBased was that we did not need to convert into HoodieRecords event and commit ordering paths. The hope here is that we can keep optimizing along this path to have a single implementation for all merging that relies on some basic functionality provided by each engine for selecting ordering fields or handling partial merging

* @return A new instance of {@link HoodieRecord}.
*/
public abstract HoodieRecord<T> constructHoodieRecord(BufferedRecord<T> bufferedRecord);
protected abstract HoodieRecord<T> constructHoodieDataRecord(BufferedRecord<T> bufferedRecord);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this makes sense, we still need to keep the delete payloads for streaming retraction scenarios.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you link to these scenarios so I can get a better understanding of how they are used?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, the schema is (id, name, val) and the id is the record key, there is an insert then update to the value of it, you have msg events like below and the operator sends one msg at a time to the downstream:

[+I] [1, "a", 1]
[-U] [1, "a", 1]
[+U] [1, "a", 3]

The -U msg is a retraction msg to downstream, when the downstream operators received the msg, it would minus the current value 1 with 1 so it becomes 0.

The point here is to keep the data payload of the delete msg so the downstream can figure out the value to subtract.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, seems like this is related to the question here


@Override
public void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable recordKey) {
BufferedRecord<T> existingRecord = records.get(recordKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it makes sense to merge the handling of processNextDataRecord and processNextDeletedRecord

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There can be deletions inside of data records as well so we need to make sure these are handled in a uniform way

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There can be deletions inside of data records as well

it's true but the paload data is there while the DeleteRecord is not, I know unifying of code is good but it woud mess up the BufferedRecord -> HoodieRecord conversion because the later can only be constructed as empty hoodie record.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why this would mess up the conversion. How is it different than what is in place today where we handle comparing BufferedRecords that are deletes to BufferedRecords that are not deletes?

Copy link
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocked on my review

*/
public HoodieRecord<T> constructHoodieRecord(BufferedRecord<T> bufferedRecord) {
if (bufferedRecord.isDelete()) {
return new HoodieEmptyRecord<>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's dangerous to do this because many mergers just returns empty if the payload data is null, then the event time merging semantics would be lost and we also lost the payload data that been stored in the BufferedRecord.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how the code is currently written, I am just doing some refactoring.
See these references: Spark and Flink.

Are you saying that this is already wrong and needs to be fixed? If so, is the solution check if the data is present instead of simply whether it is a delete so we carry through as much context as possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated this to check for null instead of isDelete

Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

block on my final review

@the-other-tim-brown the-other-tim-brown changed the title [DRAFT] Extract and test merger logic for FileGroupReader [HUDI-9424] Extract and test merger logic for FileGroupReader May 20, 2025
@the-other-tim-brown the-other-tim-brown marked this pull request as ready for review May 20, 2025 01:52
@danny0405
Copy link
Contributor

I see some negative changs that I don't really like:

unnecessary overhead:

  1. BufferedRecord#forRecordWithContext -> readerContext.convertValueToEngineType(orderingValue)

unnecessary complexity exposed:

  1. KeyBasedFileGroupRecordBuffer#processNextDataRecord -> !existing.equals(merged), enablePartialMerging
  2. FileGroupRecordBuffer#hasNextBaseRecord -> readerContext.projectRecord(

interface that does not make sense:

  1. BufferedRecord#asDeleteRecord
  2. EngineBasedMerger#getLatestAsDeleteRecord

Maybe we just put the merging related logic together and does not touch the logic/interface change as of now, so you can test each of them separately.

  1. FileGroupRecordBuffer#merge;
  2. FileGroupRecordBUffer#doProcessNextDataRecord;
  3. FileGroupRecordBUffer#doProcessNextDeletedRecord.

return readerSchema;
}
return readerContext.getSchemaFromBufferRecord(bufferedRecord);
private BufferedRecord<T> merge(BufferedRecord<T> baseRecord, BufferedRecord<T> logRecord) throws IOException {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This signature changes since creating a Pair per record is unnecessary overhead. We already have the BufferedRecord which will give us the context of whether or not it is a delete. The pair also uses a Boolean so you have the autoboxing overhead

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@danny0405
Copy link
Contributor

I tried to push some high-level changes to the code:

  • move the enablePartialMerging inside the EngineRecordMerger and add an enable method for it so that there is no need to pass it every time for #merge ;
  • add merge(Option, T) for log/log merging and merge(T, Option) for base/log merging, the assumption for null is different for these 2 scenarios;

Then I try to inspect the logic changes and found several issues that made me quit:

  1. for log/log merging with CUSTOM merge mode, before the change, there is no need to do combined record -> buffer record conversion if the new has lower ordering value;
  2. for log/log merging with CUSTOM merge mode, before the change, the log records would be evolved uniformly in the FileGroupRecordBuffer#getRecordsIterator , now they evolved in the merging phase, which incur unnecessary performance regression;
  3. the getNewerRecordWithEventTimeOrdering semantics has been changed to be wrong, because a newer record with bigger ordering value would be ignored;
  4. In BufferedRecord#forDeleteRecord , the ordering value is converted to engine type which is wrong because we should use Java type to keep engine agnostic;

It looks like you do not have good knowledge of these nuances and my suggestion is we do not change the logic and api first and just move them together for easy testing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:XL PR with lines of changes > 1000

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants