Skip to content

Conversation

@linliu-code
Copy link
Collaborator

@linliu-code linliu-code commented Jul 19, 2025

Change Logs

This PR introduces:

  1. Input record based on record buffer that merely creates the internal buffer based on the record iterator.
  2. create constructor in HoodieFileGroupReader to use the above record buffer.
  3. enable above FileGroupReaderBasedMergeHandle to use such FG reader api.
  4. add SI support FileGroupReaderBasedMergeHandle through callbacks.

Impact

Unify COW write path using FG reader.

Risk level (write none, low medium or high below)

Medium.

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Jul 19, 2025
@linliu-code linliu-code force-pushed the HUDI-9591-FGReader branch 2 times, most recently from caa9dea to d76ce34 Compare July 19, 2025 17:00
Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High level seems to be ok.
We need to plug in all stats generation (RLI related stats and secondary index stats) within FG reader.
Once you have that wired in, I can review in detail

return new HoodieMergeHandleWithChangeLog<>(writeConfig, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
} else {
if (readerContextOpt.isPresent()) {
return new FileGroupReaderBasedMergeHandle<>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this patch, is the regular HoodieMergeHandle even used anywhere?
why can't we remove it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove it from HoodieMergeHandle from this factory after this patch. I just want to be safe, and plug it to spark for now. I am trying to fix the failures and make it work for spark first.

@linliu-code linliu-code marked this pull request as ready for review July 20, 2025 15:19
@github-actions github-actions bot added size:L PR with lines of changes in (300, 1000] and removed size:M PR with lines of changes in (100, 300] labels Jul 20, 2025
Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high level feedback:

  • FG reader merge handle is not yet fixed for callers for COW merge code paths.
  • We need to fix BaseMergeHelper for COW merge code paths.
  • Secondary Index stats generation looks ok.

private HoodieReadStats readStats;
private final HoodieRecord.HoodieRecordType recordType;
private final Option<HoodieCDCLogger> cdcLogger;
private final Iterator<HoodieRecord<T>> recordIterator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this optional instead of dealing w/ nulls

public class InputBasedFileGroupRecordBuffer<T> extends KeyBasedFileGroupRecordBuffer<T> {
private final Iterator<T> inputRecordIterator;

public InputBasedFileGroupRecordBuffer(HoodieReaderContext readerContext,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RecordIteratorBasedFileGroupRecordBuffer

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feedback is not yet fully addressed :(
I did callout that BaseMergeHelper needs to be fixed so that cow merge uses new way of merging. i.e.
instead of calling

mergeHandle.write(HoodieRecord<T> oldRecord)

we need to leverage

mergeHandle.write() 

so that FG reader will be used.
how did you even validate that new FG reader is used for COW merges.
can you point me to test case where you validated this

try (HoodieFileGroupReader<T> fileGroupReader = HoodieFileGroupReader.<T>newBuilder().withReaderContext(readerContext).withHoodieTableMetaClient(hoodieTable.getMetaClient())
.withLatestCommitTime(maxInstantTime).withPartitionPath(partitionPath).withBaseFileOption(Option.ofNullable(baseFileToMerge)).withLogFiles(logFiles)
.withDataSchema(writeSchemaWithMetaFields).withRequestedSchema(writeSchemaWithMetaFields).withInternalSchema(internalSchemaOption).withProps(props)
try (HoodieFileGroupReader<T> fileGroupReader = HoodieFileGroupReader.<T>newBuilder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we instantiate the builder outside.
and only call either of

if (operation.isEmpty()) {
   fileGroupReaderBuilder.setLogFiles(logFiles);
} else {
   fileGroupReaderBuilder.withRecordIterator(engineRecordIterator); 
}

fileGroupReaderBuilder.build(); 

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think i did similar things. Anyways, grouping some of their attributes separately.

@Override
public void onUpdate(String recordKey, T previousRecord, T mergedRecord) {
HoodieKey hoodieKey = new HoodieKey(recordKey, partitionPath);
BufferedRecord<T> bufferedPrevousRecord = BufferedRecord.forRecordWithContext(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor typo.
bufferedPreviousRecord.
"i" is missing in "previous"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

mergedRecord, writeSchemaWithMetaFields, readerContext, Option.empty(), false);
SecondaryIndexStreamingTracker.trackSecondaryIndexStats(
hoodieKey,
Option.of(readerContext.constructHoodieRecord(bufferedMergedRecord)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, internally (w/n FG reader), we translate HoodieRecord to engine specific representation and wrap it using BufferedRecord (FG reader processing). and here, we get notifications on engine specific record. We create BufferedRecord and then construct HoodieRecord from bufferedRecord.

why not directly create HoodieRecord from engine specific representation here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of converting to HoodieRecord, can we just directly use the field accessors provided by the ReaderContext?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The secondaryIndex key can contains multiple columns. AFAIK, readercontext.getValue is only for single column. We can revisit here.

why not directly create HoodieRecord from engine specific representation here?
I think the reason is that HoodieRecord provides the api to fetch column values, does not.

HoodieReaderContext<T> readerContext = table.getContext().<T>getReaderContextFactory(table.getMetaClient()).getContext();
mergeHandle = HoodieMergeHandleFactory.create(
operationType, config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt,
readerContext, HoodieRecord.HoodieRecordType.SPARK);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't blindly choose Spark as the record type here.
HoodieWriteConfig exposes getRecordMerger() which inturn will expose getRecordType().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

try {
invoker.invoke(callback);
} catch (Exception e) {
LOG.error(String.format("Callback %s failed: ", callback.getName()), e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use {}

Copy link
Contributor

@the-other-tim-brown the-other-tim-brown left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@linliu-code can you ensure that there are tests that include the validation of the commit stats for this path? In the past we have seen a lot of deviations from how the stats are computed with the new FGReader based handles.

@linliu-code linliu-code changed the title [HUDI-9591] Add support of record iterator input for FG reader based merge handle [HUDI-9591] FG reader based merge handle for COW merge Jul 23, 2025
@vinothchandar
Copy link
Member

Is this ready for the final review?

HoodieMergeHandle mergeHandle = HoodieMergeHandleFactory.create(operationType, config, instantTime, table, recordItr, partitionPath, fileId,
taskContextSupplier, keyGeneratorOpt);
HoodieMergeHandle mergeHandle;
if (config.getMergeHandleClassName().equals(FileGroupReaderBasedMergeHandle.class.getName())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this does not take effect unless we switch the default value for HoodieWriteConfig.MERGE_HANDLE_CLASS_NAME config property.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I updated it.

@nsivabalan
Copy link
Contributor

nsivabalan commented Jul 24, 2025

Feedback is not yet fully addressed :( I did callout that BaseMergeHelper needs to be fixed so that cow merge uses new way of merging. i.e. instead of calling

mergeHandle.write(HoodieRecord<T> oldRecord)

we need to leverage

mergeHandle.write() 

so that FG reader will be used. how did you even validate that new FG reader is used for COW merges. can you point me to test case where you validated this

I tired executing a simple COW merge w/ this patch, and I do see we are hitting BaseMergeHelper.consume() method :(
Then, I added breakpoint in FilegroupReaderBasedMergeHandle constructor and I don't see it being invoked. With this patch, we are still using HoodieWriteMergeHandle. :(

I left a comment above here #13580 wrt the fix we might need to make.

@linliu-code
Copy link
Collaborator Author

@linliu-code can you ensure that there are tests that include the validation of the commit stats for this path? In the past we have seen a lot of deviations from how the stats are computed with the new FGReader based handles.

Will add test properly after functional tests are runnable.

+ "columns are disabled. Please choose the right key generator if you wish to disable meta fields.", e);
}
}
if (config.getMergeHandleClassName().equals(FileGroupReaderBasedMergeHandle.class.getName())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should fix the HoodieMergeHandleFactory instead of here.

}
return new BufferedRecord<>(recordKey, record.getOrderingValue(schema, props), record.getData(), schemaId, isDelete);
T row = record.getData();
return new BufferedRecord<>(recordKey, readerContext.getOrderingValue(row, schema, orderingFieldName), row, schemaId, isDelete);
Copy link
Contributor

@danny0405 danny0405 Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use readerContext.getOrderingValue( instead of record.getOrderingValue( to unify the ordering value format in BufferedRecord, record.getOrderingValue( is engine agnostic(maybe used in serialized value such as DeleteRecord).

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

}
}

private static class CompositeCallback<T> implements BaseFileUpdateCallback<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can implement RLI tracing in similiair way.

Copy link
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes are covered by #13699 which is merged.

@yihua yihua closed this Nov 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:L PR with lines of changes in (300, 1000]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants