Skip to content

Conversation

@the-other-tim-brown
Copy link
Contributor

@the-other-tim-brown the-other-tim-brown commented Aug 9, 2025

Change Logs

The goal of this PR is to ensure consistent behavior while reading and writing data across our Merge-on-Read and Copy-on-Write tables by leveraging the existing HoodieFileGroupReader to manage the merging of records. The FileGroupReaderBasedMergeHandle that is currently used for compaction is updated to allow merging with an incoming stream of records.

Summary of changes:

  • FileGroupReaderBasedMergeHandle.java is updated to allow incoming records in the form of an iterator of records directly instead of reading changes exclusively from log files. New callbacks are added to support creating the required outputs for updates to Record Level and Secondary indexes.
  • The merge handle is also updated to account for preserving the metadata of records that are not updated while also generating the metadata for updated records. This does not impact the compaction workflow which will preserve the metadata of the records.
  • The FileGroupReaderBasedMergeHandle is set as the default merge handle
  • New test cases are added for RLI including a test where records move between partitions and deletes are sent to partitions that do not contain the original record
  • The delete record ordering value is now converted to the engine specific type so there are no issues when performing comparisons

Differences between FileGroupReaderBasedMergeHandle and HoodieWriteMergeHandle

  1. Currently the HoodieWriteMergeHandle can handle applying a single update to multiple records with the same key. This functionality does not exist in the FileGroupReaderBasedMergeHandle
  2. The FileGroupReaderBasedMergeHandle does not support the shouldFlush functionality in the HoodieRecordMerger

Impact

Provides a unified path for handling updates to records in Hudi.

Risk level (write none, low medium or high below)

High, this is touching our core writer flows

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:XL PR with lines of changes > 1000 label Aug 9, 2025
} else {
mergeHandle.doMerge();
if (mergeHandle instanceof FileGroupReaderBasedMergeHandle) {
mergeHandle.close();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open question: Is there any reason to avoid calling close on the other merge handles?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we shoud just return mergeHandle.close in line 132 instead of closing here for all the merge handles.

}

public void manuallyTrackSuccess() {
this.manuallyTrackIndexUpdates = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can just set up trackSuccessRecords as false here.

*/
public FileGroupReaderBasedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt, HoodieReaderContext<T> readerContext) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to pass around the readerContext explicitly here? can we use hoodieTable.getContext.getReaderContextFactoryForWrite instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that the merge handles are created on the executors in spark so the hoodieTable.getContext will always return a local engine context instead of a spark engine context when required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

always return a local engine context instead of a spark engine context when required

Can we fix that, like hoodieTable.getContextForWrite or something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is getContextForWrite returning an EngineContext here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, not sure if it is feasible.

init(operation, this.partitionPath);
this.props = TypedProperties.copy(config.getProps());
this.isCompaction = true;
initRecordIndexCallback();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we even need to track RLI for compactions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this, it was causing some issues with existing tests so it was good to see we have some good coverage here :)

initRecordTypeAndCdcLogger(enginRecordType);
init(operation, this.partitionPath);
this.props = TypedProperties.copy(config.getProps());
this.isCompaction = true;
Copy link
Contributor

@danny0405 danny0405 Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have flag preserveMetadata to distinguish table service and regular write, can we continue to use that? some functions like SI tracing already relies on the flag preserveMetadata. And it seems clustering also uses this constructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This boolean was removed in later commits

}

private void initRecordIndexCallback() {
if (this.writeStatus.isTrackingSuccessfulWrites()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The isTrackingSuccessfulWrites flag in write status comes from hoodieTable.shouldTrackSuccessRecords(), which is true when RLI or partitioned RLI is enabled, we should skip the location tracing for compaction which is redundant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is no longer called from the compaction path

*/
public ReaderContextFactory<?> getReaderContextFactoryForWrite(HoodieTableMetaClient metaClient, HoodieRecord.HoodieRecordType recordType,
TypedProperties properties) {
TypedProperties properties, boolean outputsCustomPayloads) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flag is only meaningful for avro reader context, is there anyway we can constraint it just to AvroReaderContextFactory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find a good way right now. This flag is really representing two different stages of the writer path, the dedupe/indexing stages and the final write. In the final write, we don't want to ever use the payload based records since we just want the final indexed record representation of the record.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have a plan to abadon the payload based records in writer path right? So this should be just a temporary solution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll still need it for ExpressionPayload and for any user provided payload so it is not temporary but these restrictions may allow us to clean things up further.


@Override
public void onUpdate(String recordKey, BufferedRecord<T> previousRecord, BufferedRecord<T> mergedRecord) {
writeStatus.addRecordDelegate(HoodieRecordDelegate.create(recordKey, partitionPath, fileRecordLocation, fileRecordLocation, mergedRecord.getHoodieOperation() == HoodieOperation.UPDATE_BEFORE));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we even need to add the delete when mergedRecord.getHoodieOperation() == HoodieOperation.UPDATE_BEFORE is true ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The write status will still be updated in the current code with this record delegate even though ignoreIndexUpdate is true. This is keeping parity with the old system but I am not sure of the context for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flag is used when all the delegates are collected into the driver and been utilized to calcurate the RLI index items for MDT, the delegate with flag ignoreIndexUpdate as true are just been dropped directly, so there is no need to even generate and collect them.


@Override
public void onInsert(String recordKey, BufferedRecord<T> newRecord) {
writeStatus.addRecordDelegate(HoodieRecordDelegate.create(recordKey, partitionPath, null, fileRecordLocation, newRecord.getHoodieOperation() == HoodieOperation.UPDATE_BEFORE));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newRecord.getHoodieOperation() == HoodieOperation.UPDATE_BEFORE is always false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's always false today but do we want to keep this in case there is some future case where it may not be the case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but do we want to keep this in case there is some future case

I don't think so, the hoodie operation is designed to be force set up there.

public void onDelete(String recordKey, BufferedRecord<T> previousRecord, HoodieOperation hoodieOperation) {
// The update before operation is used when a deletion is being sent to the old File Group in a different partition.
// In this case, we do not want to delete the record metadata from the index.
writeStatus.addRecordDelegate(HoodieRecordDelegate.create(recordKey, partitionPath, fileRecordLocation, null, hoodieOperation == HoodieOperation.UPDATE_BEFORE));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hoodieOperation == HoodieOperation.UPDATE_BEFORE is always false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I simplified this in a recent commit

} else {
Schema readerSchema = readerContext.getSchemaHandler().getRequestedSchema();
// If the record schema is different from the reader schema, rewrite the record using the payload methods to ensure consistency with legacy writer paths
if (!readerSchema.equals(recordSchema)) {
Copy link
Contributor

@danny0405 danny0405 Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be super costly. can it be simplified by checking the fields number?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking the number of fields will not be enough to guarantee safety. This case is currently limited to the Payload based mergers where there is an update in the incoming records and there is no record in the base file for that key so it should not be very common.

static <T> StreamingFileGroupRecordBufferLoader<T> getInstance() {
return INSTANCE;
StreamingFileGroupRecordBufferLoader(Schema recordSchema) {
this.recordSchema = recordSchema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no need to pass around the schema explicitly, it is actually the writeSchema, which equals to:
schemaHandler.requestedSchema - metadata fields, we already have utility method for it: HoodieAvroUtils.removeMetadataFields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this will simplify the changeset

* @param writeStatus The Write status
* @param secondaryIndexDefns Definitions for secondary index which need to be updated
*/
static <T> void trackSecondaryIndexStats(HoodieKey hoodieKey, Option<BufferedRecord<T>> combinedRecordOpt, @Nullable BufferedRecord<T> oldRecord, boolean isDelete,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method mirrors the one above it but operates directly on BufferedRecord instead of converting to HoodieRecord

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool. have we added UTs for this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we have not added UTs for this, can you track it in a follow up jira.

*/
public FileGroupReaderBasedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt, HoodieReaderContext<T> readerContext) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that the merge handles are created on the executors in spark so the hoodieTable.getContext will always return a local engine context instead of a spark engine context when required.

*/
public ReaderContextFactory<?> getReaderContextFactoryForWrite(HoodieTableMetaClient metaClient, HoodieRecord.HoodieRecordType recordType,
TypedProperties properties) {
TypedProperties properties, boolean outputsCustomPayloads) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find a good way right now. This flag is really representing two different stages of the writer path, the dedupe/indexing stages and the final write. In the final write, we don't want to ever use the payload based records since we just want the final indexed record representation of the record.


@Override
public void onInsert(String recordKey, BufferedRecord<T> newRecord) {
writeStatus.addRecordDelegate(HoodieRecordDelegate.create(recordKey, partitionPath, null, fileRecordLocation, newRecord.getHoodieOperation() == HoodieOperation.UPDATE_BEFORE));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's always false today but do we want to keep this in case there is some future case where it may not be the case?


@Override
public void onUpdate(String recordKey, BufferedRecord<T> previousRecord, BufferedRecord<T> mergedRecord) {
writeStatus.addRecordDelegate(HoodieRecordDelegate.create(recordKey, partitionPath, fileRecordLocation, fileRecordLocation, mergedRecord.getHoodieOperation() == HoodieOperation.UPDATE_BEFORE));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The write status will still be updated in the current code with this record delegate even though ignoreIndexUpdate is true. This is keeping parity with the old system but I am not sure of the context for this.

@the-other-tim-brown the-other-tim-brown force-pushed the cow-merge-handle-to-fgr-3 branch from 50022dc to 47f8303 Compare August 11, 2025 16:18
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;

abstract class FileGroupRecordBuffer<T> implements HoodieFileGroupRecordBuffer<T> {
protected final Set<String> usedKeys = new HashSet<>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a possibility of duplicate keys in a file and there is an expectation that updates are applied to both rows. See Test only insert for source table in dup key without preCombineField for an example. We need to figure out if there is a better way to handle this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this is a valid case, because the duplicates in base only occurs in pkless table, while in the test case, the table has a primary key but it sets up to allow duplicates for the first commit intentionally, seems incorrect. We should limit the "allow duplicates" for incoming records to just INSERT operation for pkless table, just like the doc says in HoodieConcatHandle.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fired a discussion here: #6824 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior is incorrect, fire a JIRA to trace it: https://issues.apache.org/jira/browse/HUDI-9708, let's abadon the support for duplicates in base file and fixe it in a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the test case that fails without this, should we just update it to be PK-less?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can skip the whole tests first and fix the MIT in HUDI-9708, the original test means to test pk table I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 there is one more case that fails without this change: testUpsertWithoutPrecombineFieldAndCombineBeforeUpsertDisabled - I am marking it as disabled for now

writeConfig, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt, readerContext);
} catch (Exception e) {
// Fallback to legacy constructor if the new one fails
LOG.warn("Failed to instantiate HoodieMergeHandle with new constructor, falling back to legacy constructor: {}", e.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the catch exception fallback is hacky, can we match the class name FileGroupReaderBasedMergeHandle.class.getName to invoke different constructors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option I was considering is checking if the constructor exists first. Then users can still provide a custom merge handle with the newer or older set of constructor args

@Override
protected BufferedRecord<T> handleNonDeletes(BufferedRecord<T> previousRecord, BufferedRecord<T> mergedRecord) {
try {
if (merger.shouldFlush(readerContext.getRecordContext().constructHoodieRecord(mergedRecord), readerContext.getRecordContext().getSchemaFromBufferRecord(mergedRecord), properties)) {
Copy link
Contributor

@danny0405 danny0405 Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have discussed to drop the shouldFlush functionality as of now, it is not in good design and the original fix does not cover MOR merging scenarios as mentioned here: #9809 (comment), let's just drop this support first and fire a JIRA to trace it instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JIRA created to trace the following-up fixes: https://issues.apache.org/jira/browse/HUDI-9709

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should we do about the failing tests? Override the write handle to use the old class?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skip the whole tests first and fix it in HUDI-9709


private SerializableBiFunction<T, Schema, String> metadataKeyExtractor() {
return (record, schema) -> getValue(record, schema, RECORD_KEY_METADATA_FIELD).toString();
return (record, schema) -> typeConverter.castToString(getValue(record, schema, RECORD_KEY_METADATA_FIELD));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering why this does not error out before the patch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not be needed anymore. There was a bug where someone updated the BufferedRecord creation from HoodieRecord to read the values from the data instead of from the HoodieKey so I was getting some NPEs.


private transient FileSystemViewManager viewManager;
protected final transient HoodieEngineContext context;
private final ReaderContextFactory<T> readerContextFactoryForWrite;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 I don't think this is the right way to approach this. Now every usage of HoodieTable will incur the cost of generating the factory, which requires broadcasting on Spark. Why can't we just pass this in when it is required like I had before?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hoodie table holds all the info required there: the engine context, the meta client, and the write config. It looks not right we pass around a "reader" context factory for all the write path, even for write path that is not COW table merging scenarios.

There are also discrepancies in write path, only spark needs this factory, Flink and Java can get the factory directly from the engine context in the hoodie table.

The broadcast is already there for all the write executors before my change, if we have some manner to resolve the serialization issue of the engine context or reader context itself, that would be the best. Or if we can limit the factory only been initialized for COW table updates in write handles, that would be great.

Copy link
Contributor

@danny0405 danny0405 Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can remove the reader context factory in hoodie table, and instantiate a reader context factory in SparkUpsertCommitActionExecutor, in SparkUpsertCommitActionExecutor, override the method BaseSparkCommitActionExecutor#getUpdateHandle to set up the reader context explicitly, may also need to add a set method in FileGroupReaderBasedMergeHandle, only this handle needs this currently. Just like these 2 HoodieMergeHandle.setPartitionFields and HoodieMergeHandle.setPartitionValues which are only specific for Spark COW table.

@the-other-tim-brown the-other-tim-brown force-pushed the cow-merge-handle-to-fgr-3 branch from 0db72ca to a01ce74 Compare August 13, 2025 22:40
}
return id;
}).collect(Collectors.toList());
}).filter(Objects::nonNull).collect(Collectors.toList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cannot prune col: %s which does not exist in hudi table

The original logic makes more sense, what is the use case that to prune the schema with a non existing field?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the internal schema has less fields than the requested schema. In the writer path, the writer schema can have new columns that are not in the in file's existing schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we get around this in the current flows by using AvroSchemaEvolutionUtils.reconcileSchema before setting the InternalSchema. I will test this out with the schema evolution cases.

@nsivabalan
Copy link
Contributor

nsivabalan commented Aug 14, 2025

I assume this is the patch being worked upon for COW merge handle migration. can we fix PR title, PR description as well. and revert the draft state.

@the-other-tim-brown the-other-tim-brown changed the title [DRAFT][DNM] CoW write handle with file group reader [DRAFT][DNM] Add implementation of MergeHandle backed by the HoodieFileGroupReader Aug 14, 2025
@the-other-tim-brown the-other-tim-brown changed the title [DRAFT][DNM] Add implementation of MergeHandle backed by the HoodieFileGroupReader [HUDI-9622] Add implementation of MergeHandle backed by the HoodieFileGroupReader Aug 14, 2025
@the-other-tim-brown the-other-tim-brown marked this pull request as ready for review August 14, 2025 14:46
@the-other-tim-brown the-other-tim-brown force-pushed the cow-merge-handle-to-fgr-3 branch from 500b4f5 to 539cf8b Compare August 15, 2025 01:36
* @return an Option containing the writer payload override class name if present, otherwise an empty Option
*/
static Option<String> getWriterPayloadOverride(Properties properties) {
if (properties.containsKey("hoodie.datasource.write.payload.class")) {
Copy link
Collaborator

@cshuo cshuo Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (containsKey()..) {Option.of()} else {} -> Option.ofNullable().map() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'm realizing I can also just do Option.ofNullable on the result of the get

Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, overall looks great.

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very few minor comments/clarification.
lets align on duplicate handling before and after this patch.

@nsivabalan
Copy link
Contributor

nsivabalan commented Aug 15, 2025

Pending comments to resolve before I can give it a go.

#13699 (comment)
#13699 (comment)

and getting CI to green.

Also, suggested to track few minor follow ups (UTs ) in a jira.

private static String addStrsAsInt(String a, String b) {
return String.valueOf(Integer.parseInt(a) + Integer.parseInt(b));
private static String addStrsAsLong(String a, String b) {
return String.valueOf(Long.parseLong(a) + Long.parseLong(b));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats this fix is about?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I triaged this.
looks like the event time metadata tracking could have been broken only if not for this fix. I am not sure if someone was ever using this feature only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, this is a test class so it may cause some test failures

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Appreciate your perseverance navigating through all test failures and getting this to landable shape.

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@nsivabalan
Copy link
Contributor

image

@nsivabalan nsivabalan merged commit dedd4e0 into apache:master Aug 15, 2025
60 of 61 checks passed
// the operation will be null. Records that are being updated or records being added to the file group for the first time will have an operation set and must generate new metadata.
boolean shouldPreserveRecordMetadata = preserveMetadata || record.getOperation() == null;
Schema recordSchema = shouldPreserveRecordMetadata ? writeSchemaWithMetaFields : writeSchema;
writeToFile(record.getKey(), record, recordSchema, config.getPayloadConfig().getProps(), shouldPreserveRecordMetadata);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we collect SI and RLI no matter wheter the record has been written successfully, this could incur inconsistencies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create this JIRA ticket to track and will take it up in the next day or so

alexr17 pushed a commit to alexr17/hudi that referenced this pull request Aug 25, 2025
…eGroupReader (apache#13699)

The goal of this PR is to ensure consistent behavior while reading and writing data across our Merge-on-Read and Copy-on-Write tables by leveraging the existing HoodieFileGroupReader to manage the merging of records. The FileGroupReaderBasedMergeHandle that is currently used for compaction is updated to allow merging with an incoming stream of records.

Summary of changes:

- FileGroupReaderBasedMergeHandle.java is updated to allow incoming records in the form of an iterator of records directly instead of reading changes exclusively from log files. New callbacks are added to support creating the required outputs for updates to Record Level and Secondary indexes.
- The merge handle is also updated to account for preserving the metadata of records that are not updated while also generating the metadata for updated records. This does not impact the compaction workflow which will preserve the metadata of the records.
- The FileGroupReaderBasedMergeHandle is set as the default merge handle
- New test cases are added for RLI including a test where records move between partitions and deletes are sent to partitions that do not contain the original record
- The delete record ordering value is now converted to the engine specific type so there are no issues when performing comparisons

Differences between FileGroupReaderBasedMergeHandle and HoodieWriteMergeHandle
- Currently the HoodieWriteMergeHandle can handle applying a single update to multiple records with the same key. This functionality does not exist in the FileGroupReaderBasedMergeHandle
- The FileGroupReaderBasedMergeHandle does not support the shouldFlush functionality in the HoodieRecordMerger

---------

Co-authored-by: Sivabalan Narayanan <[email protected]>
Co-authored-by: Lokesh Jain <[email protected]>
Co-authored-by: Lokesh Jain <[email protected]>
Co-authored-by: danny0405 <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:XL PR with lines of changes > 1000

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants