Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.AWSDmsAvroPayload;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -75,10 +78,16 @@ private static Stream<Arguments> avroPayloadClasses() {
Arguments.of(COPY_ON_WRITE, OverwriteNonDefaultsWithLatestAvroPayload.class),
Arguments.of(COPY_ON_WRITE, PartialUpdateAvroPayload.class),
Arguments.of(COPY_ON_WRITE, DefaultHoodieRecordPayload.class),
Arguments.of(COPY_ON_WRITE, AWSDmsAvroPayload.class),
Arguments.of(COPY_ON_WRITE, MySqlDebeziumAvroPayload.class),
Arguments.of(COPY_ON_WRITE, PostgresDebeziumAvroPayload.class),
Arguments.of(MERGE_ON_READ, OverwriteWithLatestAvroPayload.class),
Arguments.of(MERGE_ON_READ, OverwriteNonDefaultsWithLatestAvroPayload.class),
Arguments.of(MERGE_ON_READ, PartialUpdateAvroPayload.class),
Arguments.of(MERGE_ON_READ, DefaultHoodieRecordPayload.class)
Arguments.of(MERGE_ON_READ, DefaultHoodieRecordPayload.class),
Arguments.of(MERGE_ON_READ, AWSDmsAvroPayload.class),
Arguments.of(MERGE_ON_READ, MySqlDebeziumAvroPayload.class),
Arguments.of(MERGE_ON_READ, PostgresDebeziumAvroPayload.class)
);
}

Expand Down Expand Up @@ -127,8 +136,12 @@ private static Stream<Arguments> avroPayloadClassesThatHonorOrdering() {
return Stream.of(
Arguments.of(COPY_ON_WRITE, PartialUpdateAvroPayload.class),
Arguments.of(COPY_ON_WRITE, DefaultHoodieRecordPayload.class),
Arguments.of(COPY_ON_WRITE, MySqlDebeziumAvroPayload.class),
Arguments.of(COPY_ON_WRITE, PostgresDebeziumAvroPayload.class),
Arguments.of(MERGE_ON_READ, PartialUpdateAvroPayload.class),
Arguments.of(MERGE_ON_READ, DefaultHoodieRecordPayload.class)
Arguments.of(MERGE_ON_READ, DefaultHoodieRecordPayload.class),
Arguments.of(MERGE_ON_READ, MySqlDebeziumAvroPayload.class),
Arguments.of(MERGE_ON_READ, PostgresDebeziumAvroPayload.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private Option<IndexedRecord> handleDeleteOperation(IndexedRecord insertValue) t
boolean delete = false;
if (insertValue instanceof GenericRecord) {
GenericRecord record = (GenericRecord) insertValue;
delete = record.get(OP_FIELD) != null && record.get(OP_FIELD).toString().equalsIgnoreCase("D");
delete = isDMSDeleteRecord(record);
}

return delete ? Option.empty() : Option.of(insertValue);
Expand Down Expand Up @@ -94,4 +94,13 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue
}
return handleDeleteOperation(insertValue.get());
}

@Override
protected boolean isDeleteRecord(GenericRecord record) {
return isDMSDeleteRecord(record) || super.isDeleteRecord(record);
}

private static boolean isDMSDeleteRecord(GenericRecord record) {
return record.get(OP_FIELD) != null && record.get(OP_FIELD).toString().equalsIgnoreCase("D");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public boolean canProduceSentinel() {
* @param genericRecord instance of {@link GenericRecord} of interest.
* @returns {@code true} if record represents a delete record. {@code false} otherwise.
*/
protected static boolean isDeleteRecord(GenericRecord genericRecord) {
protected boolean isDeleteRecord(GenericRecord genericRecord) {
final String isDeleteKey = HoodieRecord.HOODIE_IS_DELETED_FIELD;
// Modify to be compatible with new version Avro.
// The new version Avro throws for GenericRecord.get if the field name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
// Step 1: If the time occurrence of the current record in storage is higher than the time occurrence of the
// insert record (including a delete record), pick the current record.
Option<IndexedRecord> insertValue = getInsertRecord(schema);
Option<IndexedRecord> insertValue = (recordBytes.length == 0)
? Option.empty() : Option.of((IndexedRecord) HoodieAvroUtils.bytesToAvro(recordBytes, schema));
if (!insertValue.isPresent()) {
return Option.empty();
}
Expand All @@ -81,8 +82,7 @@ private Option<IndexedRecord> handleDeleteOperation(IndexedRecord insertRecord)
boolean delete = false;
if (insertRecord instanceof GenericRecord) {
GenericRecord record = (GenericRecord) insertRecord;
Object value = HoodieAvroUtils.getFieldVal(record, DebeziumConstants.FLATTENED_OP_COL_NAME);
delete = value != null && value.toString().equalsIgnoreCase(DebeziumConstants.DELETE_OP);
delete = isDebeziumDeleteRecord(record);
}

return delete ? Option.empty() : Option.of(insertRecord);
Expand All @@ -91,4 +91,14 @@ private Option<IndexedRecord> handleDeleteOperation(IndexedRecord insertRecord)
private Option<IndexedRecord> getInsertRecord(Schema schema) throws IOException {
return super.getInsertValue(schema);
}
}

@Override
protected boolean isDeleteRecord(GenericRecord record) {
return isDebeziumDeleteRecord(record) || super.isDeleteRecord(record);
}

private static boolean isDebeziumDeleteRecord(GenericRecord record) {
Object value = HoodieAvroUtils.getFieldVal(record, DebeziumConstants.FLATTENED_OP_COL_NAME);
return value != null && value.toString().equalsIgnoreCase(DebeziumConstants.DELETE_OP);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.util.Properties;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
Expand Down Expand Up @@ -106,6 +107,7 @@ public void testDelete() {
Option<IndexedRecord> outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema, properties);
// expect nothing to be committed to table
assertFalse(outputPayload.isPresent());
assertTrue(payload.isDeleted(avroSchema, properties));
} catch (Exception e) {
fail("Unexpected exception");
}
Expand Down Expand Up @@ -142,19 +144,13 @@ public void testPreCombineWithDelete() {
deleteRecord.put("Op", "D");

GenericRecord oldRecord = new GenericData.Record(avroSchema);
oldRecord.put("field1", 4);
oldRecord.put("field1", 3);
oldRecord.put("Op", "I");

AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(deleteRecord));
AWSDmsAvroPayload insertPayload = new AWSDmsAvroPayload(Option.of(oldRecord));

try {
OverwriteWithLatestAvroPayload output = payload.preCombine(insertPayload);
Option<IndexedRecord> outputPayload = output.getInsertValue(avroSchema, properties);
// expect nothing to be committed to table
assertFalse(outputPayload.isPresent());
} catch (Exception e) {
fail("Unexpected exception");
}
OverwriteWithLatestAvroPayload output = payload.preCombine(insertPayload);
assertEquals(payload, output);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;
import java.util.Properties;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Tests {@link MySqlDebeziumAvroPayload}.
Expand Down Expand Up @@ -100,6 +102,7 @@ public void testMergeWithUpdate() throws IOException {
public void testMergeWithDelete() throws IOException {
GenericRecord deleteRecord = createRecord(2, Operation.DELETE, "00002.11");
MySqlDebeziumAvroPayload payload = new MySqlDebeziumAvroPayload(deleteRecord, "00002.11");
assertTrue(payload.isDeleted(avroSchema, new Properties()));

GenericRecord existingRecord = createRecord(2, Operation.UPDATE, "00001.111");
Option<IndexedRecord> mergedRecord = payload.combineAndGetUpdateValue(existingRecord, avroSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Objects;
import java.util.Properties;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Tests {@link PostgresDebeziumAvroPayload}.
Expand Down Expand Up @@ -108,6 +110,7 @@ public void testMergeWithUpdate() throws IOException {
public void testMergeWithDelete() throws IOException {
GenericRecord deleteRecord = createRecord(2, Operation.DELETE, 100L);
PostgresDebeziumAvroPayload payload = new PostgresDebeziumAvroPayload(deleteRecord, 100L);
assertTrue(payload.isDeleted(avroSchema, new Properties()));

GenericRecord existingRecord = createRecord(2, Operation.UPDATE, 99L);
Option<IndexedRecord> mergedRecord = payload.combineAndGetUpdateValue(existingRecord, avroSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hudi.SparkAdapterSupport.sparkAdapter
import org.apache.hudi.avro.AvroSchemaUtils.{isNullable, resolveNullableSchema}
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro
import org.apache.hudi.common.model.{BaseAvroPayload, DefaultHoodieRecordPayload, HoodiePayloadProps, HoodieRecord}
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodiePayloadProps, HoodieRecord}
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.{BinaryUtil, ValidationUtils, Option => HOption}
import org.apache.hudi.config.HoodieWriteConfig
Expand Down Expand Up @@ -236,7 +236,7 @@ class ExpressionPayload(@transient record: GenericRecord,
val recordSchema = getRecordSchema(properties)
val incomingRecord = ConvertibleRecord(bytesToAvro(recordBytes, recordSchema))

if (BaseAvroPayload.isDeleteRecord(incomingRecord.asAvro)) {
if (super.isDeleteRecord(incomingRecord.asAvro)) {
HOption.empty[IndexedRecord]()
} else if (isMORTable(properties)) {
// For the MOR table, both the matched and not-matched record will step into the getInsertValue() method.
Expand Down