Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
e1166be
Squash: get baseline testing and cow handle setup
the-other-tim-brown Aug 8, 2025
b3ecd1a
fix conflicts
the-other-tim-brown Aug 8, 2025
7ab9e93
allow incoming records loader to specify the record schema which may …
the-other-tim-brown Aug 8, 2025
b508a13
move conversion to loader
the-other-tim-brown Aug 9, 2025
3e441d3
fix handling of auto-keygen flow, update test which requires errors
the-other-tim-brown Aug 9, 2025
929aa84
fix schema used in buffered record after projection, remove unnecessa…
the-other-tim-brown Aug 9, 2025
e4997d6
update test setup to recreate table if populate meta fields is false
the-other-tim-brown Aug 9, 2025
9c41a8f
pass in the reader context factory so we can use engine specific reco…
the-other-tim-brown Aug 9, 2025
f4b86e7
cleanup
the-other-tim-brown Aug 10, 2025
5e957ba
fix expression payload handling (still 2 test failures)
the-other-tim-brown Aug 10, 2025
a14f916
add temporary shouldIgnore step
the-other-tim-brown Aug 10, 2025
735eec1
fix sentinel case for index utils
the-other-tim-brown Aug 10, 2025
e6fab6e
Add custom merger test
Aug 6, 2025
a3edd13
Change custom merger logic to accept lower ordering value records
Aug 7, 2025
3400d07
fix test setup
the-other-tim-brown Aug 10, 2025
22dc097
move logic to update processer for skipping in payload case, add comm…
the-other-tim-brown Aug 10, 2025
4da3472
fix update processor check
the-other-tim-brown Aug 10, 2025
ded87dc
clean up repeated code
the-other-tim-brown Aug 10, 2025
10259e2
fix delete context in buffer loader to match incoming record schema, …
the-other-tim-brown Aug 10, 2025
70ace82
handle expression payload field rewrite when shouldIgnore is false an…
the-other-tim-brown Aug 11, 2025
5c62e8c
remove changes to compaction flow for index update, ensure consistent…
the-other-tim-brown Aug 11, 2025
fb8218d
add support for merger shouldFlush
the-other-tim-brown Aug 11, 2025
c182418
fix multi-format writes, fix test serialization issues
the-other-tim-brown Aug 11, 2025
0dbe048
use new static instance in test
the-other-tim-brown Aug 11, 2025
c90c00f
fix multi-format on java reader
the-other-tim-brown Aug 11, 2025
404dac8
add concept of used keys to allow duplicate updates
the-other-tim-brown Aug 11, 2025
3fff643
move addKey to common place with null check
the-other-tim-brown Aug 11, 2025
6e74365
fix handle factory expectations to match new defaults, update TestCus…
the-other-tim-brown Aug 12, 2025
8604cbc
address feedback on StreamingFileGroupRecordBufferLoader taking in sc…
the-other-tim-brown Aug 12, 2025
9cf49e9
cleanup FileGroupReaderBasedMergeHandle initialization
the-other-tim-brown Aug 13, 2025
83b63a2
add hasLogFiles method to InputSplit
the-other-tim-brown Aug 13, 2025
ba96270
move some utility methods to OrderingValues, fix IOUtils
danny0405 Aug 13, 2025
706ba1e
move reader context factory to hoodie table
danny0405 Aug 13, 2025
fd30e59
fix close to return existing write status if already closed instead o…
the-other-tim-brown Aug 13, 2025
3adf0b9
fix test setups after changes
the-other-tim-brown Aug 13, 2025
f18ff16
limit the reader context factory to just spark COW upsert scenarios
danny0405 Aug 13, 2025
51b615d
refresh timeline in test
the-other-tim-brown Aug 13, 2025
2cc89df
handle small file updates on MoR
the-other-tim-brown Aug 13, 2025
8a083cb
remove indentation only changes to files
the-other-tim-brown Aug 13, 2025
3d2b485
update close to return write status if already closed, handle schema …
the-other-tim-brown Aug 13, 2025
cea6238
update schema evolution check
the-other-tim-brown Aug 14, 2025
4503d4e
fix cache bug
the-other-tim-brown Aug 14, 2025
f6572e3
fix query schema and undo change to InternalSchemaUtils
the-other-tim-brown Aug 14, 2025
b150753
fix close sequence on flink if already closed
the-other-tim-brown Aug 14, 2025
ff39bef
remvoe dupe key handling
the-other-tim-brown Aug 14, 2025
00569c2
remove shouldFlush handling and test cases
the-other-tim-brown Aug 14, 2025
4f3f3af
use legacy writer class for duplicate data cases
the-other-tim-brown Aug 14, 2025
af3a4b6
style
the-other-tim-brown Aug 14, 2025
d8c20e5
fix schema used in gcs test
the-other-tim-brown Aug 14, 2025
a234757
add requsted comments, add unit tests for update processor, fix NPE w…
the-other-tim-brown Aug 15, 2025
539cf8b
update comment on shouldPreserveRecordMetadata case
the-other-tim-brown Aug 15, 2025
85139a1
mark test as ignored
the-other-tim-brown Aug 15, 2025
cc430c9
Adding tests for event time metadata
nsivabalan Aug 15, 2025
04242c5
Address minor comments, fix schema for merger in index utils
the-other-tim-brown Aug 15, 2025
2a9cad3
make operation -> compactionOperation to avoid overlap in naming in F…
the-other-tim-brown Aug 15, 2025
64d3575
use write schema without meta fields for expression payload merge
the-other-tim-brown Aug 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class WriteStatus implements Serializable {
private long totalErrorRecords = 0;

private final double failureFraction;
private final boolean trackSuccessRecords;
private boolean trackSuccessRecords;
private final transient Random random;
private IndexStats indexStats = new IndexStats();

Expand Down Expand Up @@ -109,6 +109,17 @@ public void markSuccess(HoodieRecord record, Option<Map<String, String>> optiona
updateStatsForSuccess(optionalRecordMetadata);
}

/**
* Allows the writer to manually add record delegates to the index stats.
*/
public void manuallyTrackSuccess() {
this.trackSuccessRecords = false;
}

public void addRecordDelegate(HoodieRecordDelegate recordDelegate) {
indexStats.addHoodieRecordDelegate(recordDelegate);
}

/**
* Used by native write handles like HoodieRowCreateHandle and HoodieRowDataCreateHandle.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.FileGroupReaderBasedMergeHandle;
import org.apache.hudi.io.HoodieConcatHandle;
import org.apache.hudi.io.HoodieWriteMergeHandle;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
Expand Down Expand Up @@ -858,7 +857,7 @@ public class HoodieWriteConfig extends HoodieConfig {

public static final ConfigProperty<String> MERGE_HANDLE_CLASS_NAME = ConfigProperty
.key("hoodie.write.merge.handle.class")
.defaultValue(HoodieWriteMergeHandle.class.getName())
.defaultValue(FileGroupReaderBasedMergeHandle.class.getName())
.markAdvanced()
.sinceVersion("1.1.0")
.withDocumentation("The merge handle class that implements interface{@link HoodieMergeHandle} to merge the records "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,16 +430,11 @@ private static <R> Option<HoodieRecord<R>> mergeIncomingWithExistingRecordWithEx
//the record was deleted
return Option.empty();
}
if (mergeResult.getRecord() == null) {
// SENTINEL case: the record did not match and merge case and should not be modified
if (mergeResult.getRecord() == null || mergeResult == existingBufferedRecord) {
// SENTINEL case: the record did not match the merge case and should not be modified or there is no update to the record
return Option.of((HoodieRecord<R>) new HoodieAvroIndexedRecord(HoodieRecord.SENTINEL));
}

if (mergeResult.getRecord().equals(HoodieRecord.SENTINEL)) {
//the record did not match and merge case and should not be modified
return Option.of(existingRecordContext.constructHoodieRecord(mergeResult, incoming.getPartitionPath()));
}

//record is inserted or updated
String partitionPath = inferPartitionPath(incoming, existing, writeSchemaWithMetaFields, keyGenerator, existingRecordContext, mergeResult);
HoodieRecord<R> result = existingRecordContext.constructHoodieRecord(mergeResult, partitionPath);
Expand Down Expand Up @@ -526,14 +521,14 @@ public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdatesIfNeeded(
.distinct(updatedConfig.getGlobalIndexReconcileParallelism());
// define the buffered record merger.
ReaderContextFactory<R> readerContextFactory = (ReaderContextFactory<R>) hoodieTable.getContext()
.getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), config.getRecordMerger().getRecordType(), config.getProps());
.getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), config.getRecordMerger().getRecordType(), config.getProps(), true);
HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
RecordContext<R> incomingRecordContext = readerContext.getRecordContext();
readerContext.initRecordMergerForIngestion(config.getProps());
// Create a reader context for the existing records. In the case of merge-into commands, the incoming records
// can be using an expression payload so here we rely on the table's configured payload class if it is required.
ReaderContextFactory<R> readerContextFactoryForExistingRecords = (ReaderContextFactory<R>) hoodieTable.getContext()
.getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), config.getRecordMerger().getRecordType(), hoodieTable.getMetaClient().getTableConfig().getProps());
.getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), config.getRecordMerger().getRecordType(), hoodieTable.getMetaClient().getTableConfig().getProps(), true);
RecordContext<R> existingRecordContext = readerContextFactoryForExistingRecords.getContext().getRecordContext();
// merged existing records with current locations being set
SerializableSchema writerSchema = new SerializableSchema(hoodieTable.getConfig().getWriteSchema());
Expand All @@ -551,7 +546,7 @@ public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdatesIfNeeded(
false,
readerContext.getRecordMerger(),
orderingFieldNames,
writerSchema.get(),
isExpressionPayload ? writerSchema.get() : writerSchemaWithMetaFields.get(),
Option.ofNullable(Pair.of(hoodieTable.getMetaClient().getTableConfig().getPayloadClass(), hoodieTable.getConfig().getPayloadClass())),
hoodieTable.getConfig().getProps(),
hoodieTable.getMetaClient().getTableConfig().getPartialUpdateMode());
Expand Down
Loading
Loading