Skip to content

Commit dedd4e0

Browse files
the-other-tim-brownnsivabalanLokesh JainLokesh Jaindanny0405
authored
[HUDI-9622] Add implementation of MergeHandle backed by the HoodieFileGroupReader (#13699)
The goal of this PR is to ensure consistent behavior while reading and writing data across our Merge-on-Read and Copy-on-Write tables by leveraging the existing HoodieFileGroupReader to manage the merging of records. The FileGroupReaderBasedMergeHandle that is currently used for compaction is updated to allow merging with an incoming stream of records. Summary of changes: - FileGroupReaderBasedMergeHandle.java is updated to allow incoming records in the form of an iterator of records directly instead of reading changes exclusively from log files. New callbacks are added to support creating the required outputs for updates to Record Level and Secondary indexes. - The merge handle is also updated to account for preserving the metadata of records that are not updated while also generating the metadata for updated records. This does not impact the compaction workflow which will preserve the metadata of the records. - The FileGroupReaderBasedMergeHandle is set as the default merge handle - New test cases are added for RLI including a test where records move between partitions and deletes are sent to partitions that do not contain the original record - The delete record ordering value is now converted to the engine specific type so there are no issues when performing comparisons Differences between FileGroupReaderBasedMergeHandle and HoodieWriteMergeHandle - Currently the HoodieWriteMergeHandle can handle applying a single update to multiple records with the same key. This functionality does not exist in the FileGroupReaderBasedMergeHandle - The FileGroupReaderBasedMergeHandle does not support the shouldFlush functionality in the HoodieRecordMerger --------- Co-authored-by: Sivabalan Narayanan <[email protected]> Co-authored-by: Lokesh Jain <[email protected]> Co-authored-by: Lokesh Jain <[email protected]> Co-authored-by: danny0405 <[email protected]>
1 parent fc41c22 commit dedd4e0

File tree

70 files changed

+1770
-386
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+1770
-386
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public class WriteStatus implements Serializable {
7373
private long totalErrorRecords = 0;
7474

7575
private final double failureFraction;
76-
private final boolean trackSuccessRecords;
76+
private boolean trackSuccessRecords;
7777
private final transient Random random;
7878
private IndexStats indexStats = new IndexStats();
7979

@@ -109,6 +109,17 @@ public void markSuccess(HoodieRecord record, Option<Map<String, String>> optiona
109109
updateStatsForSuccess(optionalRecordMetadata);
110110
}
111111

112+
/**
113+
* Allows the writer to manually add record delegates to the index stats.
114+
*/
115+
public void manuallyTrackSuccess() {
116+
this.trackSuccessRecords = false;
117+
}
118+
119+
public void addRecordDelegate(HoodieRecordDelegate recordDelegate) {
120+
indexStats.addHoodieRecordDelegate(recordDelegate);
121+
}
122+
112123
/**
113124
* Used by native write handles like HoodieRowCreateHandle and HoodieRowDataCreateHandle.
114125
*

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@
7373
import org.apache.hudi.index.HoodieIndex;
7474
import org.apache.hudi.io.FileGroupReaderBasedMergeHandle;
7575
import org.apache.hudi.io.HoodieConcatHandle;
76-
import org.apache.hudi.io.HoodieWriteMergeHandle;
7776
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
7877
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
7978
import org.apache.hudi.keygen.constant.KeyGeneratorType;
@@ -858,7 +857,7 @@ public class HoodieWriteConfig extends HoodieConfig {
858857

859858
public static final ConfigProperty<String> MERGE_HANDLE_CLASS_NAME = ConfigProperty
860859
.key("hoodie.write.merge.handle.class")
861-
.defaultValue(HoodieWriteMergeHandle.class.getName())
860+
.defaultValue(FileGroupReaderBasedMergeHandle.class.getName())
862861
.markAdvanced()
863862
.sinceVersion("1.1.0")
864863
.withDocumentation("The merge handle class that implements interface{@link HoodieMergeHandle} to merge the records "

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -430,16 +430,11 @@ private static <R> Option<HoodieRecord<R>> mergeIncomingWithExistingRecordWithEx
430430
//the record was deleted
431431
return Option.empty();
432432
}
433-
if (mergeResult.getRecord() == null) {
434-
// SENTINEL case: the record did not match and merge case and should not be modified
433+
if (mergeResult.getRecord() == null || mergeResult == existingBufferedRecord) {
434+
// SENTINEL case: the record did not match the merge case and should not be modified or there is no update to the record
435435
return Option.of((HoodieRecord<R>) new HoodieAvroIndexedRecord(HoodieRecord.SENTINEL));
436436
}
437437

438-
if (mergeResult.getRecord().equals(HoodieRecord.SENTINEL)) {
439-
//the record did not match and merge case and should not be modified
440-
return Option.of(existingRecordContext.constructHoodieRecord(mergeResult, incoming.getPartitionPath()));
441-
}
442-
443438
//record is inserted or updated
444439
String partitionPath = inferPartitionPath(incoming, existing, writeSchemaWithMetaFields, keyGenerator, existingRecordContext, mergeResult);
445440
HoodieRecord<R> result = existingRecordContext.constructHoodieRecord(mergeResult, partitionPath);
@@ -526,14 +521,14 @@ public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdatesIfNeeded(
526521
.distinct(updatedConfig.getGlobalIndexReconcileParallelism());
527522
// define the buffered record merger.
528523
ReaderContextFactory<R> readerContextFactory = (ReaderContextFactory<R>) hoodieTable.getContext()
529-
.getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), config.getRecordMerger().getRecordType(), config.getProps());
524+
.getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), config.getRecordMerger().getRecordType(), config.getProps(), true);
530525
HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
531526
RecordContext<R> incomingRecordContext = readerContext.getRecordContext();
532527
readerContext.initRecordMergerForIngestion(config.getProps());
533528
// Create a reader context for the existing records. In the case of merge-into commands, the incoming records
534529
// can be using an expression payload so here we rely on the table's configured payload class if it is required.
535530
ReaderContextFactory<R> readerContextFactoryForExistingRecords = (ReaderContextFactory<R>) hoodieTable.getContext()
536-
.getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), config.getRecordMerger().getRecordType(), hoodieTable.getMetaClient().getTableConfig().getProps());
531+
.getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), config.getRecordMerger().getRecordType(), hoodieTable.getMetaClient().getTableConfig().getProps(), true);
537532
RecordContext<R> existingRecordContext = readerContextFactoryForExistingRecords.getContext().getRecordContext();
538533
// merged existing records with current locations being set
539534
SerializableSchema writerSchema = new SerializableSchema(hoodieTable.getConfig().getWriteSchema());
@@ -551,7 +546,7 @@ public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdatesIfNeeded(
551546
false,
552547
readerContext.getRecordMerger(),
553548
orderingFieldNames,
554-
writerSchema.get(),
549+
isExpressionPayload ? writerSchema.get() : writerSchemaWithMetaFields.get(),
555550
Option.ofNullable(Pair.of(hoodieTable.getMetaClient().getTableConfig().getPayloadClass(), hoodieTable.getConfig().getPayloadClass())),
556551
hoodieTable.getConfig().getProps(),
557552
hoodieTable.getMetaClient().getTableConfig().getPartialUpdateMode());

0 commit comments

Comments
 (0)