From ea3efa0db6b2a2e88508641d6ffb7eec9c33bf00 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Tue, 21 Nov 2023 01:55:35 -0800 Subject: [PATCH 1/5] [HUDI-6961] Fixing DefaultHoodieRecordPayload to honor deletion based on meta field as well as custome delete marker --- .../model/DefaultHoodieRecordPayload.java | 42 +++++++++++-------- .../model/TestDefaultHoodieRecordPayload.java | 5 ++- 2 files changed, 29 insertions(+), 18 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java index eae2f58af9440..415610a5ecc20 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java @@ -75,7 +75,7 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue /* * Now check if the incoming record is a delete record. */ - return isDeleteRecord(incomingRecord, properties) ? Option.empty() : Option.of(incomingRecord); + return isDeleteRecord(incomingRecord, properties, schema) ? Option.empty() : Option.of(incomingRecord); } @Override @@ -86,30 +86,38 @@ public Option getInsertValue(Schema schema, Properties properties GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema); eventTime = updateEventTime(incomingRecord, properties); - return isDeleteRecord(incomingRecord, properties) ? Option.empty() : Option.of(incomingRecord); + return isDeleteRecord(incomingRecord, properties, schema) ? Option.empty() : Option.of(incomingRecord); } /** * @param genericRecord instance of {@link GenericRecord} of interest. * @param properties payload related properties + * @param schema schema of interest. * @returns {@code true} if record represents a delete record. {@code false} otherwise. */ - protected boolean isDeleteRecord(GenericRecord genericRecord, Properties properties) { - final String deleteKey = properties.getProperty(DELETE_KEY); - if (StringUtils.isNullOrEmpty(deleteKey)) { - return isDeleteRecord(genericRecord); - } - - ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(properties.getProperty(DELETE_MARKER)), - () -> DELETE_MARKER + " should be configured with " + DELETE_KEY); - // Modify to be compatible with new version Avro. - // The new version Avro throws for GenericRecord.get if the field name - // does not exist in the schema. - if (genericRecord.getSchema().getField(deleteKey) == null) { - return false; + protected boolean isDeleteRecord(GenericRecord genericRecord, Properties properties, Schema schema) { + // call super.isDeleted to account of deletion based on _hoodie_is_deleted + boolean isDeletedBaseAvroPayload = super.isDeleted(schema, properties); + if (isDeletedBaseAvroPayload) { + return true; + } else { + // check for deletion based on custom delete marker. + final String deleteKey = properties.getProperty(DELETE_KEY); + if (StringUtils.isNullOrEmpty(deleteKey)) { + return isDeleteRecord(genericRecord); + } + + ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(properties.getProperty(DELETE_MARKER)), + () -> DELETE_MARKER + " should be configured with " + DELETE_KEY); + // Modify to be compatible with new version Avro. + // The new version Avro throws for GenericRecord.get if the field name + // does not exist in the schema. + if (genericRecord.getSchema().getField(deleteKey) == null) { + return false; + } + Object deleteMarker = genericRecord.get(deleteKey); + return deleteMarker != null && properties.getProperty(DELETE_MARKER).equals(deleteMarker.toString()); } - Object deleteMarker = genericRecord.get(deleteKey); - return deleteMarker != null && properties.getProperty(DELETE_MARKER).equals(deleteMarker.toString()); } private static Option updateEventTime(GenericRecord record, Properties properties) { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java index 1cb146ec97e70..1227fe63130d0 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java @@ -109,6 +109,8 @@ public void testDeletedRecord(String key) throws IOException { DefaultHoodieRecordPayload payload1 = new DefaultHoodieRecordPayload(record1, 1); DefaultHoodieRecordPayload payload2 = new DefaultHoodieRecordPayload(delRecord1, 2); + assertFalse(payload1.isDeleted(schema, props)); + assertTrue(payload2.isDeleted(schema, props)); assertEquals(payload1.preCombine(payload2, props), payload2); assertEquals(payload2.preCombine(payload1, props), payload2); @@ -145,8 +147,9 @@ public void testDeleteKey() throws IOException { DefaultHoodieRecordPayload deletePayload = new DefaultHoodieRecordPayload(delRecord, 2); DefaultHoodieRecordPayload defaultDeletePayload = new DefaultHoodieRecordPayload(defaultDeleteRecord, 2); + assertFalse(payload.isDeleted(schema, props)); assertEquals(record, payload.getInsertValue(schema, props).get()); - assertEquals(defaultDeleteRecord, defaultDeletePayload.getInsertValue(schema, props).get()); + assertFalse(defaultDeletePayload.getInsertValue(schema, props).isPresent()); assertFalse(deletePayload.getInsertValue(schema, props).isPresent()); assertEquals(delRecord, payload.combineAndGetUpdateValue(delRecord, schema, props).get()); From c586dc82aa4c791cabd8f3172ee1f982f71433da Mon Sep 17 00:00:00 2001 From: sivabalan Date: Tue, 21 Nov 2023 12:46:53 -0800 Subject: [PATCH 2/5] Fixing isDelete with DefaultHoodieRecordPayload --- .../model/DefaultHoodieRecordPayload.java | 49 +++++++++++-------- .../model/TestDefaultHoodieRecordPayload.java | 5 +- 2 files changed, 32 insertions(+), 22 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java index 415610a5ecc20..4d6bd3a3757ad 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.avro.Schema; @@ -89,6 +90,18 @@ public Option getInsertValue(Schema schema, Properties properties return isDeleteRecord(incomingRecord, properties, schema) ? Option.empty() : Option.of(incomingRecord); } + public boolean isDeleted(Schema schema, Properties props) { + if (recordBytes.length == 0) { + return true; + } + try { + GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema); + return isDeleteRecord(incomingRecord, props, schema); + } catch (IOException e) { + throw new HoodieIOException("Deserializing bytes to avro failed ", e); + } + } + /** * @param genericRecord instance of {@link GenericRecord} of interest. * @param properties payload related properties @@ -96,28 +109,22 @@ public Option getInsertValue(Schema schema, Properties properties * @returns {@code true} if record represents a delete record. {@code false} otherwise. */ protected boolean isDeleteRecord(GenericRecord genericRecord, Properties properties, Schema schema) { - // call super.isDeleted to account of deletion based on _hoodie_is_deleted - boolean isDeletedBaseAvroPayload = super.isDeleted(schema, properties); - if (isDeletedBaseAvroPayload) { - return true; - } else { - // check for deletion based on custom delete marker. - final String deleteKey = properties.getProperty(DELETE_KEY); - if (StringUtils.isNullOrEmpty(deleteKey)) { - return isDeleteRecord(genericRecord); - } - - ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(properties.getProperty(DELETE_MARKER)), - () -> DELETE_MARKER + " should be configured with " + DELETE_KEY); - // Modify to be compatible with new version Avro. - // The new version Avro throws for GenericRecord.get if the field name - // does not exist in the schema. - if (genericRecord.getSchema().getField(deleteKey) == null) { - return false; - } - Object deleteMarker = genericRecord.get(deleteKey); - return deleteMarker != null && properties.getProperty(DELETE_MARKER).equals(deleteMarker.toString()); + // check for deletion based on custom delete marker. + final String deleteKey = properties.getProperty(DELETE_KEY); + if (StringUtils.isNullOrEmpty(deleteKey)) { + return isDeleteRecord(genericRecord); + } + + ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(properties.getProperty(DELETE_MARKER)), + () -> DELETE_MARKER + " should be configured with " + DELETE_KEY); + // Modify to be compatible with new version Avro. + // The new version Avro throws for GenericRecord.get if the field name + // does not exist in the schema. + if (genericRecord.getSchema().getField(deleteKey) == null) { + return false; } + Object deleteMarker = genericRecord.get(deleteKey); + return deleteMarker != null && properties.getProperty(DELETE_MARKER).equals(deleteMarker.toString()); } private static Option updateEventTime(GenericRecord record, Properties properties) { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java index 1227fe63130d0..efb153673423b 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java @@ -148,9 +148,12 @@ public void testDeleteKey() throws IOException { DefaultHoodieRecordPayload defaultDeletePayload = new DefaultHoodieRecordPayload(defaultDeleteRecord, 2); assertFalse(payload.isDeleted(schema, props)); + assertTrue(deletePayload.isDeleted(schema, props)); + assertFalse(defaultDeletePayload.isDeleted(schema, props)); // if custom marker is present, should honor that irrespective of hoodie_is_deleted + assertEquals(record, payload.getInsertValue(schema, props).get()); - assertFalse(defaultDeletePayload.getInsertValue(schema, props).isPresent()); assertFalse(deletePayload.getInsertValue(schema, props).isPresent()); + assertTrue(defaultDeletePayload.getInsertValue(schema, props).isPresent()); // if custom marker is present, should honor that irrespective of hoodie_is_deleted assertEquals(delRecord, payload.combineAndGetUpdateValue(delRecord, schema, props).get()); assertEquals(defaultDeleteRecord, payload.combineAndGetUpdateValue(defaultDeleteRecord, schema, props).get()); From 1f7f7cdaf5be480a170169e5d97bc6ec76aa5d6f Mon Sep 17 00:00:00 2001 From: sivabalan Date: Tue, 21 Nov 2023 16:45:02 -0800 Subject: [PATCH 3/5] reverting unintended changes --- .../hudi/common/model/DefaultHoodieRecordPayload.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java index 4d6bd3a3757ad..539880bd4bb34 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java @@ -76,7 +76,7 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue /* * Now check if the incoming record is a delete record. */ - return isDeleteRecord(incomingRecord, properties, schema) ? Option.empty() : Option.of(incomingRecord); + return isDeleteRecord(incomingRecord, properties) ? Option.empty() : Option.of(incomingRecord); } @Override @@ -87,7 +87,7 @@ public Option getInsertValue(Schema schema, Properties properties GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema); eventTime = updateEventTime(incomingRecord, properties); - return isDeleteRecord(incomingRecord, properties, schema) ? Option.empty() : Option.of(incomingRecord); + return isDeleteRecord(incomingRecord, properties) ? Option.empty() : Option.of(incomingRecord); } public boolean isDeleted(Schema schema, Properties props) { @@ -96,7 +96,7 @@ public boolean isDeleted(Schema schema, Properties props) { } try { GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema); - return isDeleteRecord(incomingRecord, props, schema); + return isDeleteRecord(incomingRecord, props); } catch (IOException e) { throw new HoodieIOException("Deserializing bytes to avro failed ", e); } @@ -105,11 +105,9 @@ public boolean isDeleted(Schema schema, Properties props) { /** * @param genericRecord instance of {@link GenericRecord} of interest. * @param properties payload related properties - * @param schema schema of interest. * @returns {@code true} if record represents a delete record. {@code false} otherwise. */ - protected boolean isDeleteRecord(GenericRecord genericRecord, Properties properties, Schema schema) { - // check for deletion based on custom delete marker. + protected boolean isDeleteRecord(GenericRecord genericRecord, Properties properties) { final String deleteKey = properties.getProperty(DELETE_KEY); if (StringUtils.isNullOrEmpty(deleteKey)) { return isDeleteRecord(genericRecord); From 2db7b8bee13140a4756427aeb802bad13822e5af Mon Sep 17 00:00:00 2001 From: sivabalan Date: Tue, 21 Nov 2023 18:05:36 -0800 Subject: [PATCH 4/5] Caching isDeleted --- .../model/DefaultHoodieRecordPayload.java | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java index 539880bd4bb34..daa1dcb0207ff 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; /** * {@link HoodieRecordPayload} impl that honors ordering field in both preCombine and combineAndGetUpdateValue. @@ -45,6 +46,8 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload { public static final String DELETE_KEY = "hoodie.payload.delete.field"; public static final String DELETE_MARKER = "hoodie.payload.delete.marker"; private Option eventTime = Option.empty(); + private AtomicBoolean isDeleteComputed = new AtomicBoolean(false); + private boolean isDefaultRecordPayloadDeleted = false; public DefaultHoodieRecordPayload(GenericRecord record, Comparable orderingVal) { super(record, orderingVal); @@ -73,10 +76,13 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue */ eventTime = updateEventTime(incomingRecord, properties); + if (!isDeleteComputed.getAndSet(true)) { + isDefaultRecordPayloadDeleted = isDeleteRecord(incomingRecord, properties); + } /* * Now check if the incoming record is a delete record. */ - return isDeleteRecord(incomingRecord, properties) ? Option.empty() : Option.of(incomingRecord); + return isDefaultRecordPayloadDeleted ? Option.empty() : Option.of(incomingRecord); } @Override @@ -87,7 +93,10 @@ public Option getInsertValue(Schema schema, Properties properties GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema); eventTime = updateEventTime(incomingRecord, properties); - return isDeleteRecord(incomingRecord, properties) ? Option.empty() : Option.of(incomingRecord); + if (!isDeleteComputed.getAndSet(true)) { + isDefaultRecordPayloadDeleted = isDeleteRecord(incomingRecord, properties); + } + return isDefaultRecordPayloadDeleted ? Option.empty() : Option.of(incomingRecord); } public boolean isDeleted(Schema schema, Properties props) { @@ -95,8 +104,11 @@ public boolean isDeleted(Schema schema, Properties props) { return true; } try { - GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema); - return isDeleteRecord(incomingRecord, props); + if (!isDeleteComputed.getAndSet(true)) { + GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema); + isDefaultRecordPayloadDeleted = isDeleteRecord(incomingRecord, props); + } + return isDefaultRecordPayloadDeleted; } catch (IOException e) { throw new HoodieIOException("Deserializing bytes to avro failed ", e); } From 7a678c8f26e7b94fca3812d29e9ddca59b083127 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Tue, 21 Nov 2023 19:11:47 -0800 Subject: [PATCH 5/5] Fixing tests --- .../apache/hudi/common/model/TestDefaultHoodieRecordPayload.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java index efb153673423b..6fdb85c29f1c7 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java @@ -180,6 +180,7 @@ public void testDeleteKeyConfiguration() throws IOException { } try { + payload = new DefaultHoodieRecordPayload(record, 1); payload.combineAndGetUpdateValue(record, schema, props).get(); fail("Should fail"); } catch (IllegalArgumentException e) {