diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java index 20a20fb62999b..7153ea069d8d9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java @@ -69,8 +69,7 @@ private Option handleDeleteOperation(IndexedRecord insertValue) t @Override public Option getInsertValue(Schema schema, Properties properties) throws IOException { - IndexedRecord insertValue = super.getInsertValue(schema, properties).get(); - return handleDeleteOperation(insertValue); + return getInsertValue(schema); } @Override @@ -82,8 +81,7 @@ public Option getInsertValue(Schema schema) throws IOException { @Override public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException { - IndexedRecord insertValue = super.getInsertValue(schema, properties).get(); - return handleDeleteOperation(insertValue); + return combineAndGetUpdateValue(currentValue, schema); } @Override diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java index 5ba537269ea19..a60f4ff6a763b 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java @@ -25,6 +25,8 @@ import org.apache.hudi.common.util.Option; import org.junit.jupiter.api.Test; +import java.util.Properties; + import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -42,13 +44,14 @@ public void testInsert() { Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING); GenericRecord record = new GenericData.Record(avroSchema); + Properties properties = new Properties(); record.put("field1", 0); record.put("Op", "I"); AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(record)); try { - Option outputPayload = payload.getInsertValue(avroSchema); + Option outputPayload = payload.getInsertValue(avroSchema, properties); assertTrue((int) outputPayload.get().get(0) == 0); assertTrue(outputPayload.get().get(1).toString().equals("I")); } catch (Exception e) { @@ -61,6 +64,7 @@ public void testInsert() { public void testUpdate() { Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING); GenericRecord newRecord = new GenericData.Record(avroSchema); + Properties properties = new Properties(); newRecord.put("field1", 1); newRecord.put("Op", "U"); @@ -71,7 +75,7 @@ public void testUpdate() { AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(newRecord)); try { - Option outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema); + Option outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema, properties); assertTrue((int) outputPayload.get().get(0) == 1); assertTrue(outputPayload.get().get(1).toString().equals("U")); } catch (Exception e) { @@ -84,6 +88,7 @@ public void testUpdate() { public void testDelete() { Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING); GenericRecord deleteRecord = new GenericData.Record(avroSchema); + Properties properties = new Properties(); deleteRecord.put("field1", 2); deleteRecord.put("Op", "D"); @@ -94,7 +99,7 @@ public void testDelete() { AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(deleteRecord)); try { - Option outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema); + Option outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema, properties); // expect nothing to be committed to table assertFalse(outputPayload.isPresent()); } catch (Exception e) { @@ -107,6 +112,7 @@ public void testDelete() { public void testPreCombineWithDelete() { Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING); GenericRecord deleteRecord = new GenericData.Record(avroSchema); + Properties properties = new Properties(); deleteRecord.put("field1", 4); deleteRecord.put("Op", "D"); @@ -119,7 +125,7 @@ public void testPreCombineWithDelete() { try { OverwriteWithLatestAvroPayload output = payload.preCombine(insertPayload); - Option outputPayload = output.getInsertValue(avroSchema); + Option outputPayload = output.getInsertValue(avroSchema, properties); // expect nothing to be committed to table assertFalse(outputPayload.isPresent()); } catch (Exception e) {