diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java index 93ac96cb42fed..6ce99aae2138d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java @@ -20,6 +20,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.generic.IndexedRecord; import org.apache.hudi.common.util.Option; @@ -60,16 +61,19 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue if (isDeleteRecord(insertRecord)) { return Option.empty(); } else { + final GenericRecordBuilder builder = new GenericRecordBuilder(schema); List fields = schema.getFields(); fields.forEach(field -> { Object value = insertRecord.get(field.name()); value = field.schema().getType().equals(Schema.Type.STRING) && value != null ? value.toString() : value; Object defaultValue = field.defaultVal(); if (!overwriteField(value, defaultValue)) { - currentRecord.put(field.name(), value); + builder.set(field, value); + } else { + builder.set(field, currentRecord.get(field.pos())); } }); - return Option.of(currentRecord); + return Option.of(builder.build()); } } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java index c6eee05b87e6d..9e3405b304111 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java @@ -22,6 +22,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -31,6 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotSame; /** * Unit tests {@link TestOverwriteNonDefaultsWithLatestAvroPayload}. @@ -85,8 +87,13 @@ public void testActiveRecords() throws IOException { assertEquals(record1, payload1.getInsertValue(schema).get()); assertEquals(record2, payload2.getInsertValue(schema).get()); - assertEquals(payload1.combineAndGetUpdateValue(record2, schema).get(), record1); - assertEquals(payload2.combineAndGetUpdateValue(record1, schema).get(), record3); + IndexedRecord combinedVal1 = payload1.combineAndGetUpdateValue(record2, schema).get(); + assertEquals(combinedVal1, record1); + assertNotSame(combinedVal1, record1); + + IndexedRecord combinedVal2 = payload2.combineAndGetUpdateValue(record1, schema).get(); + assertEquals(combinedVal2, record3); + assertNotSame(combinedVal2, record3); } @Test