diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java index 6a93e5499c4dc..07eea1e7a29e3 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java @@ -22,6 +22,7 @@ import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.model.AWSDmsAvroPayload; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -29,6 +30,8 @@ import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.PartialUpdateAvroPayload; +import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload; +import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator; import org.apache.hudi.common.util.Option; @@ -75,10 +78,16 @@ private static Stream avroPayloadClasses() { Arguments.of(COPY_ON_WRITE, OverwriteNonDefaultsWithLatestAvroPayload.class), Arguments.of(COPY_ON_WRITE, PartialUpdateAvroPayload.class), Arguments.of(COPY_ON_WRITE, DefaultHoodieRecordPayload.class), + Arguments.of(COPY_ON_WRITE, AWSDmsAvroPayload.class), + Arguments.of(COPY_ON_WRITE, MySqlDebeziumAvroPayload.class), + Arguments.of(COPY_ON_WRITE, PostgresDebeziumAvroPayload.class), Arguments.of(MERGE_ON_READ, OverwriteWithLatestAvroPayload.class), Arguments.of(MERGE_ON_READ, OverwriteNonDefaultsWithLatestAvroPayload.class), Arguments.of(MERGE_ON_READ, PartialUpdateAvroPayload.class), - Arguments.of(MERGE_ON_READ, DefaultHoodieRecordPayload.class) + Arguments.of(MERGE_ON_READ, DefaultHoodieRecordPayload.class), + Arguments.of(MERGE_ON_READ, AWSDmsAvroPayload.class), + Arguments.of(MERGE_ON_READ, MySqlDebeziumAvroPayload.class), + Arguments.of(MERGE_ON_READ, PostgresDebeziumAvroPayload.class) ); } @@ -127,8 +136,12 @@ private static Stream avroPayloadClassesThatHonorOrdering() { return Stream.of( Arguments.of(COPY_ON_WRITE, PartialUpdateAvroPayload.class), Arguments.of(COPY_ON_WRITE, DefaultHoodieRecordPayload.class), + Arguments.of(COPY_ON_WRITE, MySqlDebeziumAvroPayload.class), + Arguments.of(COPY_ON_WRITE, PostgresDebeziumAvroPayload.class), Arguments.of(MERGE_ON_READ, PartialUpdateAvroPayload.class), - Arguments.of(MERGE_ON_READ, DefaultHoodieRecordPayload.class) + Arguments.of(MERGE_ON_READ, DefaultHoodieRecordPayload.class), + Arguments.of(MERGE_ON_READ, MySqlDebeziumAvroPayload.class), + Arguments.of(MERGE_ON_READ, PostgresDebeziumAvroPayload.class) ); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java index 451008257a535..f22cfc083340a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java @@ -62,7 +62,7 @@ private Option handleDeleteOperation(IndexedRecord insertValue) t boolean delete = false; if (insertValue instanceof GenericRecord) { GenericRecord record = (GenericRecord) insertValue; - delete = record.get(OP_FIELD) != null && record.get(OP_FIELD).toString().equalsIgnoreCase("D"); + delete = isDMSDeleteRecord(record); } return delete ? Option.empty() : Option.of(insertValue); @@ -94,4 +94,13 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue } return handleDeleteOperation(insertValue.get()); } + + @Override + protected boolean isDeleteRecord(GenericRecord record) { + return isDMSDeleteRecord(record) || super.isDeleteRecord(record); + } + + private static boolean isDMSDeleteRecord(GenericRecord record) { + return record.get(OP_FIELD) != null && record.get(OP_FIELD).toString().equalsIgnoreCase("D"); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java index aaafe61abff9a..85e46690287d3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java @@ -83,7 +83,7 @@ public boolean canProduceSentinel() { * @param genericRecord instance of {@link GenericRecord} of interest. * @returns {@code true} if record represents a delete record. {@code false} otherwise. */ - protected static boolean isDeleteRecord(GenericRecord genericRecord) { + protected boolean isDeleteRecord(GenericRecord genericRecord) { final String isDeleteKey = HoodieRecord.HOODIE_IS_DELETED_FIELD; // Modify to be compatible with new version Avro. // The new version Avro throws for GenericRecord.get if the field name diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java index 72bfaebaf9218..69cefc73d723f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java @@ -64,7 +64,8 @@ public Option getInsertValue(Schema schema) throws IOException { public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException { // Step 1: If the time occurrence of the current record in storage is higher than the time occurrence of the // insert record (including a delete record), pick the current record. - Option insertValue = getInsertRecord(schema); + Option insertValue = (recordBytes.length == 0) + ? Option.empty() : Option.of((IndexedRecord) HoodieAvroUtils.bytesToAvro(recordBytes, schema)); if (!insertValue.isPresent()) { return Option.empty(); } @@ -81,8 +82,7 @@ private Option handleDeleteOperation(IndexedRecord insertRecord) boolean delete = false; if (insertRecord instanceof GenericRecord) { GenericRecord record = (GenericRecord) insertRecord; - Object value = HoodieAvroUtils.getFieldVal(record, DebeziumConstants.FLATTENED_OP_COL_NAME); - delete = value != null && value.toString().equalsIgnoreCase(DebeziumConstants.DELETE_OP); + delete = isDebeziumDeleteRecord(record); } return delete ? Option.empty() : Option.of(insertRecord); @@ -91,4 +91,14 @@ private Option handleDeleteOperation(IndexedRecord insertRecord) private Option getInsertRecord(Schema schema) throws IOException { return super.getInsertValue(schema); } -} \ No newline at end of file + + @Override + protected boolean isDeleteRecord(GenericRecord record) { + return isDebeziumDeleteRecord(record) || super.isDeleteRecord(record); + } + + private static boolean isDebeziumDeleteRecord(GenericRecord record) { + Object value = HoodieAvroUtils.getFieldVal(record, DebeziumConstants.FLATTENED_OP_COL_NAME); + return value != null && value.toString().equalsIgnoreCase(DebeziumConstants.DELETE_OP); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java index 2d0fd5b094e62..9f578e27a462c 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java @@ -28,6 +28,7 @@ import java.util.Properties; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -106,6 +107,7 @@ public void testDelete() { Option outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema, properties); // expect nothing to be committed to table assertFalse(outputPayload.isPresent()); + assertTrue(payload.isDeleted(avroSchema, properties)); } catch (Exception e) { fail("Unexpected exception"); } @@ -142,19 +144,13 @@ public void testPreCombineWithDelete() { deleteRecord.put("Op", "D"); GenericRecord oldRecord = new GenericData.Record(avroSchema); - oldRecord.put("field1", 4); + oldRecord.put("field1", 3); oldRecord.put("Op", "I"); AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(deleteRecord)); AWSDmsAvroPayload insertPayload = new AWSDmsAvroPayload(Option.of(oldRecord)); - try { - OverwriteWithLatestAvroPayload output = payload.preCombine(insertPayload); - Option outputPayload = output.getInsertValue(avroSchema, properties); - // expect nothing to be committed to table - assertFalse(outputPayload.isPresent()); - } catch (Exception e) { - fail("Unexpected exception"); - } + OverwriteWithLatestAvroPayload output = payload.preCombine(insertPayload); + assertEquals(payload, output); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestMySqlDebeziumAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestMySqlDebeziumAvroPayload.java index 55badd4b4a3d2..f5c3563f06426 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestMySqlDebeziumAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestMySqlDebeziumAvroPayload.java @@ -33,10 +33,12 @@ import java.io.IOException; import java.util.Arrays; import java.util.Objects; +import java.util.Properties; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Tests {@link MySqlDebeziumAvroPayload}. @@ -100,6 +102,7 @@ public void testMergeWithUpdate() throws IOException { public void testMergeWithDelete() throws IOException { GenericRecord deleteRecord = createRecord(2, Operation.DELETE, "00002.11"); MySqlDebeziumAvroPayload payload = new MySqlDebeziumAvroPayload(deleteRecord, "00002.11"); + assertTrue(payload.isDeleted(avroSchema, new Properties())); GenericRecord existingRecord = createRecord(2, Operation.UPDATE, "00001.111"); Option mergedRecord = payload.combineAndGetUpdateValue(existingRecord, avroSchema); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestPostgresDebeziumAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestPostgresDebeziumAvroPayload.java index acb8120dc7b9c..54eca3c6d05d9 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestPostgresDebeziumAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestPostgresDebeziumAvroPayload.java @@ -41,11 +41,13 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Objects; +import java.util.Properties; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Tests {@link PostgresDebeziumAvroPayload}. @@ -108,6 +110,7 @@ public void testMergeWithUpdate() throws IOException { public void testMergeWithDelete() throws IOException { GenericRecord deleteRecord = createRecord(2, Operation.DELETE, 100L); PostgresDebeziumAvroPayload payload = new PostgresDebeziumAvroPayload(deleteRecord, 100L); + assertTrue(payload.isDeleted(avroSchema, new Properties())); GenericRecord existingRecord = createRecord(2, Operation.UPDATE, 99L); Option mergedRecord = payload.combineAndGetUpdateValue(existingRecord, avroSchema); diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala index ad1cb3b353565..0989b8b09aee4 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala @@ -26,7 +26,7 @@ import org.apache.hudi.SparkAdapterSupport.sparkAdapter import org.apache.hudi.avro.AvroSchemaUtils.{isNullable, resolveNullableSchema} import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro -import org.apache.hudi.common.model.{BaseAvroPayload, DefaultHoodieRecordPayload, HoodiePayloadProps, HoodieRecord} +import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodiePayloadProps, HoodieRecord} import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.common.util.{BinaryUtil, ValidationUtils, Option => HOption} import org.apache.hudi.config.HoodieWriteConfig @@ -236,7 +236,7 @@ class ExpressionPayload(@transient record: GenericRecord, val recordSchema = getRecordSchema(properties) val incomingRecord = ConvertibleRecord(bytesToAvro(recordBytes, recordSchema)) - if (BaseAvroPayload.isDeleteRecord(incomingRecord.asAvro)) { + if (super.isDeleteRecord(incomingRecord.asAvro)) { HOption.empty[IndexedRecord]() } else if (isMORTable(properties)) { // For the MOR table, both the matched and not-matched record will step into the getInsertValue() method.