diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java index 8c78209788d23..4be2e3e093e90 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.model; +import org.apache.avro.JsonProperties; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.util.Option; @@ -99,6 +100,9 @@ protected boolean isDeleteRecord(GenericRecord genericRecord) { * Return true if value equals defaultValue otherwise false. */ public Boolean overwriteField(Object value, Object defaultValue) { + if (JsonProperties.NULL_VALUE.equals(defaultValue)) { + return value == null; + } return Objects.equals(value, defaultValue); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java index a8c0321929563..c6eee05b87e6d 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.model; +import org.apache.avro.JsonProperties; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -126,4 +127,34 @@ public void testDeletedRecord() throws IOException { assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema).get(), record2); assertFalse(payload2.combineAndGetUpdateValue(record1, schema).isPresent()); } + + @Test + public void testNullColumn() throws IOException { + Schema avroSchema = Schema.createRecord(Arrays.asList( + new Schema.Field("id", Schema.createUnion(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE), + new Schema.Field("name", Schema.createUnion(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE), + new Schema.Field("age", Schema.createUnion(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE), + new Schema.Field("job", Schema.createUnion(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE) + )); + GenericRecord record1 = new GenericData.Record(avroSchema); + record1.put("id", "1"); + record1.put("name", "aa"); + record1.put("age", "1"); + record1.put("job", "1"); + + GenericRecord record2 = new GenericData.Record(avroSchema); + record2.put("id", "1"); + record2.put("name", "bb"); + record2.put("age", "2"); + record2.put("job", null); + + GenericRecord record3 = new GenericData.Record(avroSchema); + record3.put("id", "1"); + record3.put("name", "bb"); + record3.put("age", "2"); + record3.put("job", "1"); + + OverwriteNonDefaultsWithLatestAvroPayload payload2 = new OverwriteNonDefaultsWithLatestAvroPayload(record2, 1); + assertEquals(payload2.combineAndGetUpdateValue(record1, avroSchema).get(), record3); + } }