diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java index 6ce99aae2138d..9ce241bc7822f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java @@ -70,7 +70,7 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue if (!overwriteField(value, defaultValue)) { builder.set(field, value); } else { - builder.set(field, currentRecord.get(field.pos())); + builder.set(field, currentRecord.get(field.name())); } }); return Option.of(builder.build()); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java index 9e3405b304111..0807b41f610c9 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java @@ -23,6 +23,7 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.avro.HoodieAvroUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -54,13 +55,15 @@ public void setUp() throws Exception { @Test public void testActiveRecords() throws IOException { + Schema writerSchema = HoodieAvroUtils.addMetadataFields(schema); + GenericRecord record1 = new GenericData.Record(schema); record1.put("id", "1"); record1.put("partition", "partition1"); record1.put("ts", 0L); record1.put("_hoodie_is_deleted", false); record1.put("city", "NY0"); - record1.put("child", Arrays.asList("A")); + record1.put("child", Collections.singletonList("A")); GenericRecord record2 = new GenericData.Record(schema); record2.put("id", "2"); @@ -76,11 +79,38 @@ public void testActiveRecords() throws IOException { record3.put("ts", 1L); record3.put("_hoodie_is_deleted", false); record3.put("city", "NY0"); - record3.put("child", Arrays.asList("A")); - + record3.put("child", Collections.singletonList("A")); + + // same content with record1 plus metadata fields + GenericRecord record4 = createRecordWithMetadataFields(writerSchema, "1", "partition1"); + record4.put("id", "1"); + record4.put("partition", "partition1"); + record4.put("ts", 0L); + record4.put("_hoodie_is_deleted", false); + record4.put("city", "NY0"); + record4.put("child", Collections.singletonList("A")); + + // same content with record2 plus metadata fields + GenericRecord record5 = createRecordWithMetadataFields(writerSchema, "2", ""); + record5.put("id", "2"); + record5.put("partition", ""); + record5.put("ts", 1L); + record5.put("_hoodie_is_deleted", false); + record5.put("city", "NY"); + record5.put("child", Collections.emptyList()); + + // same content with record3 plus metadata fields + GenericRecord record6 = createRecordWithMetadataFields(writerSchema, "2", ""); + record6.put("id", "2"); + record6.put("partition", "partition1"); + record6.put("ts", 1L); + record6.put("_hoodie_is_deleted", false); + record6.put("city", "NY0"); + record6.put("child", Collections.singletonList("A")); OverwriteNonDefaultsWithLatestAvroPayload payload1 = new OverwriteNonDefaultsWithLatestAvroPayload(record1, 1); OverwriteNonDefaultsWithLatestAvroPayload payload2 = new OverwriteNonDefaultsWithLatestAvroPayload(record2, 2); + OverwriteNonDefaultsWithLatestAvroPayload payload5 = new OverwriteNonDefaultsWithLatestAvroPayload(record5, 2); assertEquals(payload1.preCombine(payload2), payload2); assertEquals(payload2.preCombine(payload1), payload2); @@ -94,6 +124,19 @@ public void testActiveRecords() throws IOException { IndexedRecord combinedVal2 = payload2.combineAndGetUpdateValue(record1, schema).get(); assertEquals(combinedVal2, record3); assertNotSame(combinedVal2, record3); + + // the real case in production is: the current record to be combined includes the metadata fields, + // the payload record could include the metadata fields (for compaction) or not (for normal writer path). + + // case1: validate normal writer path + IndexedRecord combinedVal3 = payload2.combineAndGetUpdateValue(record4, schema).get(); + assertEquals(combinedVal3, record3); + assertNotSame(combinedVal3, record3); + + // case2: validate compaction path + IndexedRecord combinedVal4 = payload5.combineAndGetUpdateValue(record4, writerSchema).get(); + assertEquals(combinedVal4, record6); + assertNotSame(combinedVal4, record6); } @Test @@ -164,4 +207,14 @@ public void testNullColumn() throws IOException { OverwriteNonDefaultsWithLatestAvroPayload payload2 = new OverwriteNonDefaultsWithLatestAvroPayload(record2, 1); assertEquals(payload2.combineAndGetUpdateValue(record1, avroSchema).get(), record3); } + + private static GenericRecord createRecordWithMetadataFields(Schema schema, String recordKey, String partitionPath) { + GenericRecord record = new GenericData.Record(schema); + record.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, "001"); + record.put(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, "123"); + record.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recordKey); + record.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, partitionPath); + record.put(HoodieRecord.FILENAME_METADATA_FIELD, "file1"); + return record; + } }