diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java index eae2f58af9440..daa1dcb0207ff 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.avro.Schema; @@ -33,6 +34,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; /** * {@link HoodieRecordPayload} impl that honors ordering field in both preCombine and combineAndGetUpdateValue. @@ -44,6 +46,8 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload { public static final String DELETE_KEY = "hoodie.payload.delete.field"; public static final String DELETE_MARKER = "hoodie.payload.delete.marker"; private Option eventTime = Option.empty(); + private AtomicBoolean isDeleteComputed = new AtomicBoolean(false); + private boolean isDefaultRecordPayloadDeleted = false; public DefaultHoodieRecordPayload(GenericRecord record, Comparable orderingVal) { super(record, orderingVal); @@ -72,10 +76,13 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue */ eventTime = updateEventTime(incomingRecord, properties); + if (!isDeleteComputed.getAndSet(true)) { + isDefaultRecordPayloadDeleted = isDeleteRecord(incomingRecord, properties); + } /* * Now check if the incoming record is a delete record. */ - return isDeleteRecord(incomingRecord, properties) ? Option.empty() : Option.of(incomingRecord); + return isDefaultRecordPayloadDeleted ? Option.empty() : Option.of(incomingRecord); } @Override @@ -86,7 +93,25 @@ public Option getInsertValue(Schema schema, Properties properties GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema); eventTime = updateEventTime(incomingRecord, properties); - return isDeleteRecord(incomingRecord, properties) ? Option.empty() : Option.of(incomingRecord); + if (!isDeleteComputed.getAndSet(true)) { + isDefaultRecordPayloadDeleted = isDeleteRecord(incomingRecord, properties); + } + return isDefaultRecordPayloadDeleted ? Option.empty() : Option.of(incomingRecord); + } + + public boolean isDeleted(Schema schema, Properties props) { + if (recordBytes.length == 0) { + return true; + } + try { + if (!isDeleteComputed.getAndSet(true)) { + GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema); + isDefaultRecordPayloadDeleted = isDeleteRecord(incomingRecord, props); + } + return isDefaultRecordPayloadDeleted; + } catch (IOException e) { + throw new HoodieIOException("Deserializing bytes to avro failed ", e); + } } /** diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java index 1cb146ec97e70..6fdb85c29f1c7 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java @@ -109,6 +109,8 @@ public void testDeletedRecord(String key) throws IOException { DefaultHoodieRecordPayload payload1 = new DefaultHoodieRecordPayload(record1, 1); DefaultHoodieRecordPayload payload2 = new DefaultHoodieRecordPayload(delRecord1, 2); + assertFalse(payload1.isDeleted(schema, props)); + assertTrue(payload2.isDeleted(schema, props)); assertEquals(payload1.preCombine(payload2, props), payload2); assertEquals(payload2.preCombine(payload1, props), payload2); @@ -145,9 +147,13 @@ public void testDeleteKey() throws IOException { DefaultHoodieRecordPayload deletePayload = new DefaultHoodieRecordPayload(delRecord, 2); DefaultHoodieRecordPayload defaultDeletePayload = new DefaultHoodieRecordPayload(defaultDeleteRecord, 2); + assertFalse(payload.isDeleted(schema, props)); + assertTrue(deletePayload.isDeleted(schema, props)); + assertFalse(defaultDeletePayload.isDeleted(schema, props)); // if custom marker is present, should honor that irrespective of hoodie_is_deleted + assertEquals(record, payload.getInsertValue(schema, props).get()); - assertEquals(defaultDeleteRecord, defaultDeletePayload.getInsertValue(schema, props).get()); assertFalse(deletePayload.getInsertValue(schema, props).isPresent()); + assertTrue(defaultDeletePayload.getInsertValue(schema, props).isPresent()); // if custom marker is present, should honor that irrespective of hoodie_is_deleted assertEquals(delRecord, payload.combineAndGetUpdateValue(delRecord, schema, props).get()); assertEquals(defaultDeleteRecord, payload.combineAndGetUpdateValue(defaultDeleteRecord, schema, props).get()); @@ -174,6 +180,7 @@ public void testDeleteKeyConfiguration() throws IOException { } try { + payload = new DefaultHoodieRecordPayload(record, 1); payload.combineAndGetUpdateValue(record, schema, props).get(); fail("Should fail"); } catch (IllegalArgumentException e) {