diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 1d8808a64e55b..fbf3fb7b34c0e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -1139,18 +1139,19 @@ public static HoodieRecord createHoodieRecordFromAvro( Boolean withOperation, Option partitionNameOp, Boolean populateMetaFields, - Option schemaWithoutMetaFields) { + Option schemaWithoutMetaFields, + Properties props) { if (populateMetaFields) { return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) data, - payloadClass, preCombineField, withOperation); + payloadClass, preCombineField, withOperation, props); // Support HoodieFileSliceReader } else if (simpleKeyGenFieldsOpt.isPresent()) { // TODO in HoodieFileSliceReader may partitionName=option#empty return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) data, - payloadClass, preCombineField, simpleKeyGenFieldsOpt.get(), withOperation, partitionNameOp, schemaWithoutMetaFields); + payloadClass, preCombineField, simpleKeyGenFieldsOpt.get(), withOperation, partitionNameOp, schemaWithoutMetaFields, props); } else { return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) data, - payloadClass, preCombineField, withOperation, partitionNameOp, schemaWithoutMetaFields); + payloadClass, preCombineField, withOperation, partitionNameOp, schemaWithoutMetaFields, props); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java index f22cfc083340a..992b0bac0517a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java @@ -27,6 +27,8 @@ import java.io.IOException; import java.util.Properties; +import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS; + /** * Provides support for seamlessly applying changes captured via Amazon Database Migration Service onto S3. * @@ -45,12 +47,22 @@ public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload { public static final String OP_FIELD = "Op"; + @Deprecated public AWSDmsAvroPayload(GenericRecord record, Comparable orderingVal) { - super(record, orderingVal); + this(record, orderingVal, EMPTY_PROPS); } + @Deprecated public AWSDmsAvroPayload(Option record) { - this(record.isPresent() ? record.get() : null, 0); // natural order + this(record, EMPTY_PROPS); + } + + public AWSDmsAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) { + super(record, orderingVal, props); + } + + public AWSDmsAvroPayload(Option record, Properties props) { + this(record.isPresent() ? record.get() : null, 0, props); // natural order } /** @@ -96,8 +108,8 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue } @Override - protected boolean isDeleteRecord(GenericRecord record) { - return isDMSDeleteRecord(record) || super.isDeleteRecord(record); + protected boolean isDeleteRecord(GenericRecord record, Properties props) { + return isDMSDeleteRecord(record) || super.isDeleteRecord(record, props); } private static boolean isDMSDeleteRecord(GenericRecord record) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java index 85e46690287d3..514f05effffe6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java @@ -27,6 +27,8 @@ import java.io.Serializable; import java.util.Properties; +import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS; + /** * Base class for all AVRO record based payloads, that can be ordered based on a field. */ @@ -43,16 +45,22 @@ public abstract class BaseAvroPayload implements Serializable { protected final boolean isDeletedRecord; + @Deprecated + public BaseAvroPayload(GenericRecord record, Comparable orderingVal) { + this(record, orderingVal, EMPTY_PROPS); + } + /** * Instantiate {@link BaseAvroPayload}. * * @param record Generic record for the payload. * @param orderingVal {@link Comparable} to be used in pre combine. + * @param props Configuration in {@link Properties}. */ - public BaseAvroPayload(GenericRecord record, Comparable orderingVal) { + public BaseAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) { this.recordBytes = record != null ? HoodieAvroUtils.avroToBytes(record) : new byte[0]; this.orderingVal = orderingVal; - this.isDeletedRecord = record == null || isDeleteRecord(record); + this.isDeletedRecord = record == null || isDeleteRecord(record, props); if (orderingVal == null) { throw new HoodieException("Ordering value is null for record: " + record); @@ -79,11 +87,17 @@ public boolean canProduceSentinel() { return false; } + @Deprecated + protected boolean isDeleteRecord(GenericRecord genericRecord) { + return isDeleteRecord(genericRecord, EMPTY_PROPS); + } + /** * @param genericRecord instance of {@link GenericRecord} of interest. + * @param props Configuration in {@link Properties}. * @returns {@code true} if record represents a delete record. {@code false} otherwise. */ - protected boolean isDeleteRecord(GenericRecord genericRecord) { + protected boolean isDeleteRecord(GenericRecord genericRecord, Properties props) { final String isDeleteKey = HoodieRecord.HOODIE_IS_DELETED_FIELD; // Modify to be compatible with new version Avro. // The new version Avro throws for GenericRecord.get if the field name diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java index eae2f58af9440..d41511244c927 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java @@ -45,12 +45,12 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload { public static final String DELETE_MARKER = "hoodie.payload.delete.marker"; private Option eventTime = Option.empty(); - public DefaultHoodieRecordPayload(GenericRecord record, Comparable orderingVal) { - super(record, orderingVal); + public DefaultHoodieRecordPayload(GenericRecord record, Comparable orderingVal, Properties props) { + super(record, orderingVal, props); } - public DefaultHoodieRecordPayload(Option record) { - this(record.isPresent() ? record.get() : null, 0); // natural order + public DefaultHoodieRecordPayload(Option record, Properties props) { + this(record.isPresent() ? record.get() : null, 0, props); // natural order } @Override @@ -75,7 +75,7 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue /* * Now check if the incoming record is a delete record. */ - return isDeleteRecord(incomingRecord, properties) ? Option.empty() : Option.of(incomingRecord); + return isDeleted(schema, properties) ? Option.empty() : Option.of(incomingRecord); } @Override @@ -86,30 +86,26 @@ public Option getInsertValue(Schema schema, Properties properties GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema); eventTime = updateEventTime(incomingRecord, properties); - return isDeleteRecord(incomingRecord, properties) ? Option.empty() : Option.of(incomingRecord); + return isDeleted(schema, properties) ? Option.empty() : Option.of(incomingRecord); } - /** - * @param genericRecord instance of {@link GenericRecord} of interest. - * @param properties payload related properties - * @returns {@code true} if record represents a delete record. {@code false} otherwise. - */ - protected boolean isDeleteRecord(GenericRecord genericRecord, Properties properties) { - final String deleteKey = properties.getProperty(DELETE_KEY); + @Override + protected boolean isDeleteRecord(GenericRecord record, Properties props) { + final String deleteKey = props.getProperty(DELETE_KEY); if (StringUtils.isNullOrEmpty(deleteKey)) { - return isDeleteRecord(genericRecord); + return super.isDeleteRecord(record, props); } - ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(properties.getProperty(DELETE_MARKER)), + ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(props.getProperty(DELETE_MARKER)), () -> DELETE_MARKER + " should be configured with " + DELETE_KEY); // Modify to be compatible with new version Avro. // The new version Avro throws for GenericRecord.get if the field name // does not exist in the schema. - if (genericRecord.getSchema().getField(deleteKey) == null) { + if (record.getSchema().getField(deleteKey) == null) { return false; } - Object deleteMarker = genericRecord.get(deleteKey); - return deleteMarker != null && properties.getProperty(DELETE_MARKER).equals(deleteMarker.toString()); + Object deleteMarker = record.get(deleteKey); + return deleteMarker != null && props.getProperty(DELETE_MARKER).equals(deleteMarker.toString()); } private static Option updateEventTime(GenericRecord record, Properties properties) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/EmptyHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/EmptyHoodieRecordPayload.java index abcad8d922f0f..bbf9bd15b71fa 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/EmptyHoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/EmptyHoodieRecordPayload.java @@ -24,6 +24,8 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; +import java.util.Properties; + /** * Empty payload used for deletions. */ @@ -32,9 +34,13 @@ public class EmptyHoodieRecordPayload implements HoodieRecordPayload record) { - this(record.isPresent() ? record.get() : null, 0); // natural order + public EventTimeAvroPayload(Option record, Properties props) { + this(record.isPresent() ? record.get() : null, 0, props); // natural order } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java index 77f724249c736..bcb9500a20300 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java @@ -156,7 +156,16 @@ public HoodieRecord wrapIntoHoodieRecordPayloadWithParams( Option schemaWithoutMetaFields) { String payloadClass = ConfigUtils.getPayloadClass(props); String preCombineField = ConfigUtils.getOrderingField(props); - return HoodieAvroUtils.createHoodieRecordFromAvro(data, payloadClass, preCombineField, simpleKeyGenFieldsOpt, withOperation, partitionNameOp, populateMetaFields, schemaWithoutMetaFields); + return HoodieAvroUtils.createHoodieRecordFromAvro( + data, + payloadClass, + preCombineField, + simpleKeyGenFieldsOpt, + withOperation, + partitionNameOp, + populateMetaFields, + schemaWithoutMetaFields, + props); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroPayload.java index 3fbcb8a620e0e..e8d8ec589bb33 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroPayload.java @@ -26,6 +26,9 @@ import org.apache.avro.generic.IndexedRecord; import java.io.IOException; +import java.util.Properties; + +import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS; /** * This is a payload to wrap a existing Hoodie Avro Record. Useful to create a HoodieRecord over existing GenericRecords @@ -39,11 +42,19 @@ public class HoodieAvroPayload implements HoodieRecordPayload private final Comparable orderingVal; public HoodieAvroPayload(GenericRecord record, Comparable orderingVal) { + this(record, orderingVal, EMPTY_PROPS); + } + + public HoodieAvroPayload(Option record) { + this(record, EMPTY_PROPS); + } + + public HoodieAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) { this.recordBytes = record == null ? new byte[0] : HoodieAvroUtils.avroToBytes(record); this.orderingVal = orderingVal; } - public HoodieAvroPayload(Option record) { + public HoodieAvroPayload(Option record, Properties props) { this.recordBytes = record.isPresent() ? HoodieAvroUtils.avroToBytes(record.get()) : new byte[0]; this.orderingVal = 0; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java index 2ba8cc2f0b948..35ed816dfd76f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java @@ -193,7 +193,16 @@ public HoodieRecord wrapIntoHoodieRecordPayloadWithParams( IndexedRecord indexedRecord = (IndexedRecord) data.getInsertValue(recordSchema, props).get(); String payloadClass = ConfigUtils.getPayloadClass(props); String preCombineField = ConfigUtils.getOrderingField(props); - return HoodieAvroUtils.createHoodieRecordFromAvro(indexedRecord, payloadClass, preCombineField, simpleKeyGenFieldsOpt, withOperation, partitionNameOp, populateMetaFields, schemaWithoutMetaFields); + return HoodieAvroUtils.createHoodieRecordFromAvro( + indexedRecord, + payloadClass, + preCombineField, + simpleKeyGenFieldsOpt, + withOperation, + partitionNameOp, + populateMetaFields, + schemaWithoutMetaFields, + props); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java index 626910fdaed58..a6419b667d6d5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java @@ -27,6 +27,9 @@ import java.io.IOException; import java.util.List; +import java.util.Properties; + +import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS; /** * subclass of OverwriteWithLatestAvroPayload. @@ -39,12 +42,22 @@ */ public class OverwriteNonDefaultsWithLatestAvroPayload extends OverwriteWithLatestAvroPayload { + @Deprecated public OverwriteNonDefaultsWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) { - super(record, orderingVal); + this(record, orderingVal, EMPTY_PROPS); } + @Deprecated public OverwriteNonDefaultsWithLatestAvroPayload(Option record) { - super(record); // natural order + this(record, EMPTY_PROPS); + } + + public OverwriteNonDefaultsWithLatestAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) { + super(record, orderingVal, props); + } + + public OverwriteNonDefaultsWithLatestAvroPayload(Option record, Properties props) { + super(record, props); // natural order } @Override @@ -73,7 +86,7 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue * @return the merged record option */ protected Option mergeRecords(Schema schema, GenericRecord baseRecord, GenericRecord mergedRecord) { - if (isDeleteRecord(baseRecord)) { + if (isDeleteRecord(baseRecord, EMPTY_PROPS)) { return Option.empty(); } else { final GenericRecordBuilder builder = new GenericRecordBuilder(schema); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java index d9fbd4cba05c8..e2cbb94b83953 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java @@ -28,6 +28,9 @@ import java.io.IOException; import java.util.Objects; +import java.util.Properties; + +import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS; /** * Default payload. @@ -40,12 +43,22 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload implements HoodieRecordPayload { + @Deprecated public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) { - super(record, orderingVal); + this(record, orderingVal, EMPTY_PROPS); } + @Deprecated public OverwriteWithLatestAvroPayload(Option record) { - this(record.isPresent() ? record.get() : null, 0); // natural order + this(record, EMPTY_PROPS); + } + + public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) { + super(record, orderingVal, props); + } + + public OverwriteWithLatestAvroPayload(Option record, Properties props) { + this(record.isPresent() ? record.get() : null, 0, props); // natural order } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java index 27e744c4925b6..213956f753bc1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java @@ -18,11 +18,6 @@ package org.apache.hudi.common.model; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.GenericRecordBuilder; -import org.apache.avro.generic.IndexedRecord; - import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Option; @@ -30,10 +25,17 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.generic.IndexedRecord; + import java.io.IOException; import java.util.List; import java.util.Properties; +import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS; + /** * Payload clazz that is used for partial update Hudi Table. * @@ -117,12 +119,22 @@ */ public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvroPayload { + @Deprecated public PartialUpdateAvroPayload(GenericRecord record, Comparable orderingVal) { - super(record, orderingVal); + this(record, orderingVal, EMPTY_PROPS); } + @Deprecated public PartialUpdateAvroPayload(Option record) { - super(record); // natural order + this(record, EMPTY_PROPS); + } + + public PartialUpdateAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) { + super(record, orderingVal, props); + } + + public PartialUpdateAvroPayload(Option record, Properties props) { + super(record, props); // natural order } @Override @@ -135,10 +147,10 @@ public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldVal final boolean shouldPickOldRecord = oldValue.orderingVal.compareTo(orderingVal) > 0 ? true : false; try { GenericRecord oldRecord = HoodieAvroUtils.bytesToAvro(oldValue.recordBytes, schema); - Option mergedRecord = mergeOldRecord(oldRecord, schema, shouldPickOldRecord, true); + Option mergedRecord = mergeOldRecord(oldRecord, schema, shouldPickOldRecord, true, properties); if (mergedRecord.isPresent()) { return new PartialUpdateAvroPayload((GenericRecord) mergedRecord.get(), - shouldPickOldRecord ? oldValue.orderingVal : this.orderingVal); + shouldPickOldRecord ? oldValue.orderingVal : this.orderingVal, properties); } } catch (Exception ex) { return this; @@ -148,12 +160,12 @@ public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldVal @Override public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException { - return this.mergeOldRecord(currentValue, schema, false, false); + return this.mergeOldRecord(currentValue, schema, false, false, EMPTY_PROPS); } @Override public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties prop) throws IOException { - return mergeOldRecord(currentValue, schema, isRecordNewer(orderingVal, currentValue, prop), false); + return mergeOldRecord(currentValue, schema, isRecordNewer(orderingVal, currentValue, prop), false, prop); } /** @@ -176,12 +188,14 @@ public Boolean overwriteField(Object value, Object defaultValue) { * @param isPreCombining flag for deleted record combine logic * 1 preCombine: if delete record is newer, return merged record with _hoodie_is_deleted = true * 2 combineAndGetUpdateValue: if delete record is newer, return empty since we don't need to store deleted data to storage + * @param props Configuration in {@link Properties}. * @return * @throws IOException */ private Option mergeOldRecord(IndexedRecord oldRecord, Schema schema, - boolean isOldRecordNewer, boolean isPreCombining) throws IOException { + boolean isOldRecordNewer, boolean isPreCombining, + Properties props) throws IOException { Option recordOption = getInsertValue(schema, isPreCombining); if (!recordOption.isPresent() && !isPreCombining) { @@ -191,7 +205,8 @@ private Option mergeOldRecord(IndexedRecord oldRecord, if (isOldRecordNewer && schema.getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD) != null) { // handling disorder, should use the metadata fields of the updating record - return mergeDisorderRecordsWithMetadata(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get(), isPreCombining); + return mergeDisorderRecordsWithMetadata( + schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get(), isPreCombining, props); } else if (isOldRecordNewer) { return mergeRecords(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get()); } else { @@ -220,13 +235,17 @@ public Option getInsertValue(Schema schema, boolean isPreCombinin * @param schema The record schema * @param oldRecord The current record from file * @param updatingRecord The incoming record + * @param isPreCombining Whether in pre-combining phase. + * @param props Configuration in {@link Properties}. * @return the merged record option */ protected Option mergeDisorderRecordsWithMetadata( Schema schema, GenericRecord oldRecord, - GenericRecord updatingRecord, boolean isPreCombining) { - if (isDeleteRecord(oldRecord) && !isPreCombining) { + GenericRecord updatingRecord, + boolean isPreCombining, + Properties props) { + if (isDeleteRecord(oldRecord, props) && !isPreCombining) { return Option.empty(); } else { final GenericRecordBuilder builder = new GenericRecordBuilder(schema); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java index 69cefc73d723f..a5e55c1b9d95d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java @@ -29,6 +29,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Properties; + +import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS; /** * Base class that provides support for seamlessly applying changes captured via Debezium. @@ -46,12 +49,22 @@ public abstract class AbstractDebeziumAvroPayload extends OverwriteWithLatestAvr private static final Logger LOG = LoggerFactory.getLogger(AbstractDebeziumAvroPayload.class); + @Deprecated public AbstractDebeziumAvroPayload(GenericRecord record, Comparable orderingVal) { - super(record, orderingVal); + this(record, orderingVal, EMPTY_PROPS); } + @Deprecated public AbstractDebeziumAvroPayload(Option record) { - super(record); + this(record, EMPTY_PROPS); + } + + public AbstractDebeziumAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) { + super(record, orderingVal, props); + } + + public AbstractDebeziumAvroPayload(Option record, Properties props) { + super(record, props); } @Override @@ -93,8 +106,8 @@ private Option getInsertRecord(Schema schema) throws IOException } @Override - protected boolean isDeleteRecord(GenericRecord record) { - return isDebeziumDeleteRecord(record) || super.isDeleteRecord(record); + protected boolean isDeleteRecord(GenericRecord record, Properties props) { + return isDebeziumDeleteRecord(record) || super.isDeleteRecord(record, props); } private static boolean isDebeziumDeleteRecord(GenericRecord record) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java index fceafee554cff..adb237a91a981 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java @@ -29,6 +29,9 @@ import java.io.IOException; import java.util.Objects; +import java.util.Properties; + +import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS; /** * Provides support for seamlessly applying changes captured via Debezium for MysqlDB. @@ -46,12 +49,22 @@ public class MySqlDebeziumAvroPayload extends AbstractDebeziumAvroPayload { private static final Logger LOG = LoggerFactory.getLogger(MySqlDebeziumAvroPayload.class); + @Deprecated public MySqlDebeziumAvroPayload(GenericRecord record, Comparable orderingVal) { - super(record, orderingVal); + this(record, orderingVal, EMPTY_PROPS); } + @Deprecated public MySqlDebeziumAvroPayload(Option record) { - super(record); + this(record, EMPTY_PROPS); + } + + public MySqlDebeziumAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) { + super(record, orderingVal, props); + } + + public MySqlDebeziumAvroPayload(Option record, Properties props) { + super(record, props); } private Option extractSeq(IndexedRecord record) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/PostgresDebeziumAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/PostgresDebeziumAvroPayload.java index 424f51eb13914..c29395a9c89ed 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/PostgresDebeziumAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/PostgresDebeziumAvroPayload.java @@ -34,6 +34,8 @@ import java.util.List; import java.util.Properties; +import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS; + /** * Provides support for seamlessly applying changes captured via Debezium for PostgresDB. *

@@ -51,12 +53,22 @@ public class PostgresDebeziumAvroPayload extends AbstractDebeziumAvroPayload { private static final Logger LOG = LoggerFactory.getLogger(PostgresDebeziumAvroPayload.class); public static final String DEBEZIUM_TOASTED_VALUE = "__debezium_unavailable_value"; + @Deprecated public PostgresDebeziumAvroPayload(GenericRecord record, Comparable orderingVal) { - super(record, orderingVal); + this(record, orderingVal, EMPTY_PROPS); } + @Deprecated public PostgresDebeziumAvroPayload(Option record) { - super(record); + this(record, EMPTY_PROPS); + } + + public PostgresDebeziumAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) { + super(record, orderingVal, props); + } + + public PostgresDebeziumAvroPayload(Option record, Properties props) { + super(record, props); } private Option extractLSN(IndexedRecord record) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java index 9e0655d673490..cee707b523784 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java @@ -58,6 +58,8 @@ public class ConfigUtils { */ public static final String TABLE_SERDE_PATH = "path"; + public static final Properties EMPTY_PROPS = new Properties(); + private static final Logger LOG = LoggerFactory.getLogger(ConfigUtils.class); /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java index 8b5bfaaa91019..14ee537ddf7d1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java @@ -28,14 +28,16 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.metadata.HoodieTableMetadata; +import org.apache.avro.generic.GenericRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.InvocationTargetException; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Properties; /** * A utility class for HoodieRecord. @@ -90,16 +92,57 @@ public static HoodieRecordMerger createRecordMerger(String basePath, EngineType } /** - * Instantiate a given class with an avro record payload. + * Creates a payload class via reflection, passing in an ordering/precombine value. + * + * @param payloadClass Payload class name. + * @param record The Avro record. + * @param orderingVal Ordering value. + * @param props Configuration in {@link Properties}. + * @return The payload instance in {@link HoodieRecordPayload}. + * @throws IOException upon error. */ - public static T loadPayload(String recordPayloadClass, - Object[] payloadArgs, - Class... constructorArgTypes) { + public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal, Properties props) + throws IOException { try { - return (T) ReflectionUtils.getClass(recordPayloadClass).getConstructor(constructorArgTypes) - .newInstance(payloadArgs); - } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { - throw new HoodieException("Unable to instantiate payload class ", e); + return (HoodieRecordPayload) ReflectionUtils.getClass(payloadClass) + .getConstructor(new Class[] {GenericRecord.class, Comparable.class, Properties.class}) + .newInstance(record, orderingVal, props); + } catch (NoSuchMethodException nsme) { + try { + return (HoodieRecordPayload) ReflectionUtils.loadClass( + payloadClass, new Class[] {GenericRecord.class, Comparable.class}, record, orderingVal); + } catch (Throwable e) { + throw new IOException("Could not create payload for class: " + payloadClass, e); + } + } catch (Throwable e) { + throw new IOException("Could not create payload for class: " + payloadClass, e); + } + } + + /** + * Creates a payload class via reflection, without ordering/precombine value. + * + * @param payloadClass Payload class name. + * @param record The Avro record. + * @param props Configuration in {@link Properties}. + * @return The payload instance in {@link HoodieRecordPayload}. + * @throws IOException upon error. + */ + public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Properties props) + throws IOException { + try { + return (HoodieRecordPayload) ReflectionUtils.getClass(payloadClass) + .getConstructor(new Class[] {Option.class, Properties.class}) + .newInstance(Option.of(record), props); + } catch (NoSuchMethodException nsme) { + try { + return (HoodieRecordPayload) ReflectionUtils.loadClass( + payloadClass, new Class[] {Option.class}, Option.of(record)); + } catch (Throwable e) { + throw new IOException("Could not create payload for class: " + payloadClass, e); + } + } catch (Throwable e) { + throw new IOException("Could not create payload for class: " + payloadClass, e); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java index a4f0dcca369ed..f84e29d76e3c2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java @@ -34,9 +34,11 @@ import java.io.IOException; import java.io.RandomAccessFile; +import java.util.Properties; import static org.apache.hudi.avro.HoodieAvroUtils.getNullableValAsString; import static org.apache.hudi.common.util.BinaryUtil.generateChecksum; +import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS; /** * A utility class supports spillable map. @@ -107,20 +109,25 @@ public static long computePayloadSize(R value, SizeEstimator valueSizeEst /** * Utility method to convert bytes to HoodieRecord using schema and payload class. */ - public static HoodieRecord convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz, String preCombineField, boolean withOperationField) { + public static HoodieRecord convertToHoodieRecordPayload(GenericRecord rec, + String payloadClazz, + String preCombineField, + boolean withOperationField, + Properties props) { return convertToHoodieRecordPayload(rec, payloadClazz, preCombineField, Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD), - withOperationField, Option.empty(), Option.empty()); + withOperationField, Option.empty(), Option.empty(), props); } public static HoodieRecord convertToHoodieRecordPayload(GenericRecord record, String payloadClazz, String preCombineField, boolean withOperationField, Option partitionName, - Option schemaWithoutMetaFields) { + Option schemaWithoutMetaFields, + Properties props) { return convertToHoodieRecordPayload(record, payloadClazz, preCombineField, Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD), - withOperationField, partitionName, schemaWithoutMetaFields); + withOperationField, partitionName, schemaWithoutMetaFields, props); } /** @@ -131,7 +138,8 @@ public static HoodieRecord convertToHoodieRecordPayload(GenericRecord rec Pair recordKeyPartitionPathFieldPair, boolean withOperationField, Option partitionName, - Option schemaWithoutMetaFields) { + Option schemaWithoutMetaFields, + Properties props) { final String recKey = record.get(recordKeyPartitionPathFieldPair.getKey()).toString(); final String partitionPath = (partitionName.isPresent() ? partitionName.get() : record.get(recordKeyPartitionPathFieldPair.getRight()).toString()); @@ -149,11 +157,13 @@ public static HoodieRecord convertToHoodieRecordPayload(GenericRecord rec record = recordWithoutMetaFields; } - HoodieRecord hoodieRecord = new HoodieAvroRecord<>(new HoodieKey(recKey, partitionPath), - HoodieRecordUtils.loadPayload(payloadClazz, new Object[] {record, preCombineVal}, GenericRecord.class, - Comparable.class), operation); - - return (HoodieRecord) hoodieRecord; + try { + HoodieRecord hoodieRecord = new HoodieAvroRecord<>(new HoodieKey(recKey, partitionPath), + HoodieRecordUtils.createPayload(payloadClazz, record, (Comparable) preCombineVal, props), operation); + return (HoodieRecord) hoodieRecord; + } catch (IOException e) { + throw new RuntimeException(e); + } } /** @@ -175,8 +185,12 @@ private static Object getPreCombineVal(GenericRecord rec, String preCombineField * Utility method to convert bytes to HoodieRecord using schema and payload class. */ public static R generateEmptyPayload(String recKey, String partitionPath, Comparable orderingVal, String payloadClazz) { - HoodieRecord hoodieRecord = new HoodieAvroRecord<>(new HoodieKey(recKey, partitionPath), - HoodieRecordUtils.loadPayload(payloadClazz, new Object[] {null, orderingVal}, GenericRecord.class, Comparable.class)); - return (R) hoodieRecord; + try { + HoodieRecord hoodieRecord = new HoodieAvroRecord<>(new HoodieKey(recKey, partitionPath), + HoodieRecordUtils.createPayload(payloadClazz, null, orderingVal, EMPTY_PROPS)); + return (R) hoodieRecord; + } catch (IOException e) { + throw new RuntimeException(e); + } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 2bb8e0a59eca1..262fb0fd36225 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -73,6 +73,7 @@ import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FULL_SCAN_LOG_FILES; import static org.apache.hudi.common.util.CollectionUtils.toStream; +import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_FILES; @@ -393,12 +394,12 @@ private Map> fetchBaseFileRecordsByK private HoodieRecord composeRecord(GenericRecord avroRecord, String partitionName) { if (metadataTableConfig.populateMetaFields()) { return SpillableMapUtils.convertToHoodieRecordPayload(avroRecord, - metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(), false); + metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(), false, EMPTY_PROPS); } return SpillableMapUtils.convertToHoodieRecordPayload(avroRecord, metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(), Pair.of(metadataTableConfig.getRecordKeyFieldProp(), metadataTableConfig.getPartitionFieldProp()), - false, Option.of(partitionName), Option.empty()); + false, Option.of(partitionName), Option.empty(), EMPTY_PROPS); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 3627161559af1..c0e766a49d476 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -185,11 +185,11 @@ public class HoodieMetadataPayload implements HoodieRecordPayload orderingVal) { - this(Option.ofNullable(record)); + public HoodieMetadataPayload(@Nullable GenericRecord record, Comparable orderingVal, Properties props) { + this(Option.ofNullable(record), props); } - public HoodieMetadataPayload(Option recordOpt) { + public HoodieMetadataPayload(Option recordOpt, Properties props) { if (recordOpt.isPresent()) { GenericRecord record = recordOpt.get(); // This can be simplified using SpecificData.deepcopy once this bug is fixed @@ -425,7 +425,7 @@ private HoodieMetadataColumnStats combineColumnStatsMetadata(HoodieMetadataPaylo @Override public Option combineAndGetUpdateValue(IndexedRecord oldRecord, Schema schema, Properties properties) throws IOException { - HoodieMetadataPayload anotherPayload = new HoodieMetadataPayload(Option.of((GenericRecord) oldRecord)); + HoodieMetadataPayload anotherPayload = new HoodieMetadataPayload(Option.of((GenericRecord) oldRecord), properties); HoodieRecordPayload combinedPayload = preCombine(anotherPayload); return combinedPayload.getInsertValue(schema, properties); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java index 9f578e27a462c..469ce6e928fd2 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java @@ -53,7 +53,7 @@ public void testInsert() { record.put("field1", 0); record.put("Op", "I"); - AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(record)); + AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(record), properties); try { Option outputPayload = payload.getInsertValue(avroSchema, properties); @@ -77,7 +77,7 @@ public void testUpdate() { oldRecord.put("field1", 0); oldRecord.put("Op", "I"); - AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(newRecord)); + AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(newRecord), properties); try { Option outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema, properties); @@ -101,7 +101,7 @@ public void testDelete() { oldRecord.put("field1", 2); oldRecord.put("Op", "U"); - AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(deleteRecord)); + AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(deleteRecord), properties); try { Option outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema, properties); @@ -123,7 +123,7 @@ public void testDeleteWithEmptyPayLoad() { oldRecord.put("field1", 2); oldRecord.put("Op", "U"); - AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.empty()); + AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.empty(), properties); try { Option outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema, properties); @@ -147,8 +147,8 @@ public void testPreCombineWithDelete() { oldRecord.put("field1", 3); oldRecord.put("Op", "I"); - AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(deleteRecord)); - AWSDmsAvroPayload insertPayload = new AWSDmsAvroPayload(Option.of(oldRecord)); + AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(deleteRecord), properties); + AWSDmsAvroPayload insertPayload = new AWSDmsAvroPayload(Option.of(oldRecord), properties); OverwriteWithLatestAvroPayload output = payload.preCombine(insertPayload); assertEquals(payload, output); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java index 1cb146ec97e70..4f1c7c009334c 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java @@ -79,8 +79,8 @@ public void testActiveRecords(String key) throws IOException { record2.put("ts", 1L); record2.put("_hoodie_is_deleted", false); - DefaultHoodieRecordPayload payload1 = new DefaultHoodieRecordPayload(record1, 1); - DefaultHoodieRecordPayload payload2 = new DefaultHoodieRecordPayload(record2, 2); + DefaultHoodieRecordPayload payload1 = new DefaultHoodieRecordPayload(record1, 1, props); + DefaultHoodieRecordPayload payload2 = new DefaultHoodieRecordPayload(record2, 2, props); assertEquals(payload1.preCombine(payload2, props), payload2); assertEquals(payload2.preCombine(payload1, props), payload2); @@ -107,8 +107,10 @@ public void testDeletedRecord(String key) throws IOException { delRecord1.put("ts", 1L); delRecord1.put("_hoodie_is_deleted", true); - DefaultHoodieRecordPayload payload1 = new DefaultHoodieRecordPayload(record1, 1); - DefaultHoodieRecordPayload payload2 = new DefaultHoodieRecordPayload(delRecord1, 2); + DefaultHoodieRecordPayload payload1 = new DefaultHoodieRecordPayload(record1, 1, props); + DefaultHoodieRecordPayload payload2 = new DefaultHoodieRecordPayload(delRecord1, 2, props); + assertFalse(payload1.isDeleted(schema, props)); + assertTrue(payload2.isDeleted(schema, props)); assertEquals(payload1.preCombine(payload2, props), payload2); assertEquals(payload2.preCombine(payload1, props), payload2); @@ -141,9 +143,13 @@ public void testDeleteKey() throws IOException { defaultDeleteRecord.put("ts", 2L); defaultDeleteRecord.put("_hoodie_is_deleted", true); - DefaultHoodieRecordPayload payload = new DefaultHoodieRecordPayload(record, 1); - DefaultHoodieRecordPayload deletePayload = new DefaultHoodieRecordPayload(delRecord, 2); - DefaultHoodieRecordPayload defaultDeletePayload = new DefaultHoodieRecordPayload(defaultDeleteRecord, 2); + DefaultHoodieRecordPayload payload = new DefaultHoodieRecordPayload(record, 1, props); + DefaultHoodieRecordPayload deletePayload = new DefaultHoodieRecordPayload(delRecord, 2, props); + DefaultHoodieRecordPayload defaultDeletePayload = new DefaultHoodieRecordPayload(defaultDeleteRecord, 2, props); + + assertFalse(payload.isDeleted(schema, props)); + assertTrue(deletePayload.isDeleted(schema, props)); + assertFalse(defaultDeletePayload.isDeleted(schema, props)); assertEquals(record, payload.getInsertValue(schema, props).get()); assertEquals(defaultDeleteRecord, defaultDeletePayload.getInsertValue(schema, props).get()); @@ -163,18 +169,9 @@ public void testDeleteKeyConfiguration() throws IOException { record.put("ts", 0L); record.put("_hoodie_is_deleted", false); - DefaultHoodieRecordPayload payload = new DefaultHoodieRecordPayload(record, 1); - // Verify failure when DELETE_MARKER is not configured along with DELETE_KEY try { - payload.getInsertValue(schema, props).get(); - fail("Should fail"); - } catch (IllegalArgumentException e) { - // Ignore - } - - try { - payload.combineAndGetUpdateValue(record, schema, props).get(); + DefaultHoodieRecordPayload payload = new DefaultHoodieRecordPayload(record, 1, props); fail("Should fail"); } catch (IllegalArgumentException e) { // Ignore @@ -188,7 +185,7 @@ public void testGetEmptyMetadata() { record.put("partition", "partition0"); record.put("ts", 0L); record.put("_hoodie_is_deleted", false); - DefaultHoodieRecordPayload payload = new DefaultHoodieRecordPayload(Option.of(record)); + DefaultHoodieRecordPayload payload = new DefaultHoodieRecordPayload(Option.of(record), props); assertFalse(payload.getMetadata().isPresent()); } @@ -207,7 +204,7 @@ public void testGetEventTimeInMetadata(long eventTime) throws IOException { record2.put("ts", eventTime); record2.put("_hoodie_is_deleted", false); - DefaultHoodieRecordPayload payload2 = new DefaultHoodieRecordPayload(record2, eventTime); + DefaultHoodieRecordPayload payload2 = new DefaultHoodieRecordPayload(record2, eventTime, props); payload2.combineAndGetUpdateValue(record1, schema, props); assertTrue(payload2.getMetadata().isPresent()); assertEquals(eventTime, @@ -228,7 +225,7 @@ public void testEmptyProperty() throws IOException { record2.put("ts", 1L); record2.put("_hoodie_is_deleted", false); - DefaultHoodieRecordPayload payload = new DefaultHoodieRecordPayload(Option.of(record1)); + DefaultHoodieRecordPayload payload = new DefaultHoodieRecordPayload(Option.of(record1), props); Properties properties = new Properties(); payload.getInsertValue(schema, properties); payload.combineAndGetUpdateValue(record2, schema, properties); @@ -243,7 +240,7 @@ public void testGetEventTimeInMetadataForInserts(long eventTime) throws IOExcept record.put("partition", "partition0"); record.put("ts", eventTime); record.put("_hoodie_is_deleted", false); - DefaultHoodieRecordPayload payload = new DefaultHoodieRecordPayload(record, eventTime); + DefaultHoodieRecordPayload payload = new DefaultHoodieRecordPayload(record, eventTime, props); payload.getInsertValue(schema, props); assertTrue(payload.getMetadata().isPresent()); assertEquals(eventTime, diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java index 97b0ac5c108cd..882a41d192dff 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java @@ -32,6 +32,7 @@ import java.util.Arrays; import java.util.Collections; +import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotSame; @@ -109,9 +110,9 @@ public void testActiveRecords() throws IOException { record6.put("city", "NY0"); record6.put("child", Collections.singletonList("A")); - OverwriteNonDefaultsWithLatestAvroPayload payload1 = new OverwriteNonDefaultsWithLatestAvroPayload(record1, 1); - OverwriteNonDefaultsWithLatestAvroPayload payload2 = new OverwriteNonDefaultsWithLatestAvroPayload(record2, 2); - OverwriteNonDefaultsWithLatestAvroPayload payload5 = new OverwriteNonDefaultsWithLatestAvroPayload(record5, 2); + OverwriteNonDefaultsWithLatestAvroPayload payload1 = new OverwriteNonDefaultsWithLatestAvroPayload(record1, 1, EMPTY_PROPS); + OverwriteNonDefaultsWithLatestAvroPayload payload2 = new OverwriteNonDefaultsWithLatestAvroPayload(record2, 2, EMPTY_PROPS); + OverwriteNonDefaultsWithLatestAvroPayload payload5 = new OverwriteNonDefaultsWithLatestAvroPayload(record5, 2, EMPTY_PROPS); assertEquals(payload1.preCombine(payload2), payload2); assertEquals(payload2.preCombine(payload1), payload2); @@ -166,8 +167,8 @@ public void testDeletedRecord() throws IOException { record2.put("city", "NY0"); record2.put("child", Collections.emptyList()); - OverwriteNonDefaultsWithLatestAvroPayload payload1 = new OverwriteNonDefaultsWithLatestAvroPayload(record1, 1); - OverwriteNonDefaultsWithLatestAvroPayload payload2 = new OverwriteNonDefaultsWithLatestAvroPayload(delRecord1, 2); + OverwriteNonDefaultsWithLatestAvroPayload payload1 = new OverwriteNonDefaultsWithLatestAvroPayload(record1, 1, EMPTY_PROPS); + OverwriteNonDefaultsWithLatestAvroPayload payload2 = new OverwriteNonDefaultsWithLatestAvroPayload(delRecord1, 2, EMPTY_PROPS); assertEquals(payload1.preCombine(payload2), payload2); assertEquals(payload2.preCombine(payload1), payload2); @@ -205,7 +206,7 @@ public void testNullColumn() throws IOException { record3.put("age", "2"); record3.put("job", "1"); - OverwriteNonDefaultsWithLatestAvroPayload payload2 = new OverwriteNonDefaultsWithLatestAvroPayload(record2, 1); + OverwriteNonDefaultsWithLatestAvroPayload payload2 = new OverwriteNonDefaultsWithLatestAvroPayload(record2, 1, EMPTY_PROPS); assertEquals(payload2.combineAndGetUpdateValue(record1, avroSchema).get(), record3); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java index 7c5951a7cac04..f377d0e24a49d 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.util.Arrays; +import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -62,8 +63,8 @@ public void testActiveRecords() throws IOException { record2.put("ts", 1L); record2.put("_hoodie_is_deleted", false); - OverwriteWithLatestAvroPayload payload1 = new OverwriteWithLatestAvroPayload(record1, 1); - OverwriteWithLatestAvroPayload payload2 = new OverwriteWithLatestAvroPayload(record2, 2); + OverwriteWithLatestAvroPayload payload1 = new OverwriteWithLatestAvroPayload(record1, 1, EMPTY_PROPS); + OverwriteWithLatestAvroPayload payload2 = new OverwriteWithLatestAvroPayload(record2, 2, EMPTY_PROPS); assertEquals(payload1.preCombine(payload2), payload2); assertEquals(payload2.preCombine(payload1), payload2); @@ -88,8 +89,8 @@ public void testDeletedRecord() throws IOException { delRecord1.put("ts", 1L); delRecord1.put("_hoodie_is_deleted", true); - OverwriteWithLatestAvroPayload payload1 = new OverwriteWithLatestAvroPayload(record1, 1); - OverwriteWithLatestAvroPayload payload2 = new OverwriteWithLatestAvroPayload(delRecord1, 2); + OverwriteWithLatestAvroPayload payload1 = new OverwriteWithLatestAvroPayload(record1, 1, EMPTY_PROPS); + OverwriteWithLatestAvroPayload payload2 = new OverwriteWithLatestAvroPayload(delRecord1, 2, EMPTY_PROPS); assertEquals(payload1.preCombine(payload2), payload2); assertEquals(payload2.preCombine(payload1), payload2); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java index 28313f150c81f..1c35abfa066ca 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java @@ -33,6 +33,7 @@ import java.util.Collections; import java.util.Properties; +import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -106,17 +107,17 @@ public void testActiveRecords() throws IOException { record4.put("child", Arrays.asList("B")); // Test preCombine: since payload2's ordering val is larger, so payload2 will overwrite payload1 with its non-default field's value - PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1, 0L); - PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2, 1L); - assertArrayEquals(payload1.preCombine(payload2, schema, properties).recordBytes, new PartialUpdateAvroPayload(record4, 1L).recordBytes); - assertArrayEquals(payload2.preCombine(payload1, schema, properties).recordBytes, new PartialUpdateAvroPayload(record4, 1L).recordBytes); + PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1, 0L, properties); + PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2, 1L, properties); + assertArrayEquals(payload1.preCombine(payload2, schema, properties).recordBytes, new PartialUpdateAvroPayload(record4, 1L, properties).recordBytes); + assertArrayEquals(payload2.preCombine(payload1, schema, properties).recordBytes, new PartialUpdateAvroPayload(record4, 1L, properties).recordBytes); assertEquals(record1, payload1.getInsertValue(schema).get()); assertEquals(record2, payload2.getInsertValue(schema).get()); // Test combineAndGetUpdateValue: let payload1's ordering val larger than payload2, then payload1 will overwrite payload2 with its non-default field's value record1.put("ts", 2L); - payload1 = new PartialUpdateAvroPayload(record1, 2L); + payload1 = new PartialUpdateAvroPayload(record1, 2L, properties); assertEquals(payload1.combineAndGetUpdateValue(record2, schema, properties).get(), record3); // Test combineAndGetUpdateValue: let payload1's ordering val equal to payload2, then payload2 will be considered to newer record record1.put("ts", 1L); @@ -124,10 +125,10 @@ public void testActiveRecords() throws IOException { // Test preCombine again: let payload1's ordering val larger than payload2 record1.put("ts", 2L); - payload1 = new PartialUpdateAvroPayload(record1, 2L); - payload2 = new PartialUpdateAvroPayload(record2, 1L); - assertArrayEquals(payload1.preCombine(payload2, schema, properties).recordBytes, new PartialUpdateAvroPayload(record3, 2L).recordBytes); - assertArrayEquals(payload2.preCombine(payload1, schema, properties).recordBytes, new PartialUpdateAvroPayload(record3, 2L).recordBytes); + payload1 = new PartialUpdateAvroPayload(record1, 2L, properties); + payload2 = new PartialUpdateAvroPayload(record2, 1L, properties); + assertArrayEquals(payload1.preCombine(payload2, schema, properties).recordBytes, new PartialUpdateAvroPayload(record3, 2L, properties).recordBytes); + assertArrayEquals(payload2.preCombine(payload1, schema, properties).recordBytes, new PartialUpdateAvroPayload(record3, 2L, properties).recordBytes); } @Test @@ -156,9 +157,9 @@ public void testDeletedRecord() throws IOException { record2.put("city", "NY0"); record2.put("child", Collections.emptyList()); - PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1, 0L); - PartialUpdateAvroPayload delPayload = new PartialUpdateAvroPayload(delRecord1, 1L); - PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2, 2L); + PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1, 0L, EMPTY_PROPS); + PartialUpdateAvroPayload delPayload = new PartialUpdateAvroPayload(delRecord1, 1L, EMPTY_PROPS); + PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2, 2L, EMPTY_PROPS); PartialUpdateAvroPayload mergedPayload = payload1.preCombine(delPayload, schema, new Properties()); assertTrue(HoodieAvroUtils.bytesToAvro(mergedPayload.recordBytes, schema).get("_hoodie_is_deleted").equals(true)); @@ -210,8 +211,8 @@ public void testUseLatestRecordMetaValue() throws IOException { record2.put("city", null); record2.put("child", Arrays.asList("B")); - PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1, 0L); - PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2, 1L); + PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1, 0L, EMPTY_PROPS); + PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2, 1L, EMPTY_PROPS); // let payload1 as the latest one, then should use payload1's meta field's value as the result even its ordering val is smaller GenericRecord mergedRecord1 = (GenericRecord) payload1.preCombine(payload2, schema, properties).getInsertValue(schema, properties).get(); @@ -293,8 +294,8 @@ public void testPartialUpdateGotchas() throws IOException { outputWithPreCombineUsed.put("city", "NY1"); outputWithPreCombineUsed.put("child", Arrays.asList("A")); - PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2, 0L); - PartialUpdateAvroPayload payload3 = new PartialUpdateAvroPayload(record3, 2L); + PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2, 0L, EMPTY_PROPS); + PartialUpdateAvroPayload payload3 = new PartialUpdateAvroPayload(record3, 2L, EMPTY_PROPS); // query A (no preCombine) IndexedRecord firstCombineOutput = payload2.combineAndGetUpdateValue(record1, schema, properties).get(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestMySqlDebeziumAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestMySqlDebeziumAvroPayload.java index e257e2bee023e..a6408b4125297 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestMySqlDebeziumAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestMySqlDebeziumAvroPayload.java @@ -35,6 +35,7 @@ import java.util.Objects; import java.util.Properties; +import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -63,20 +64,20 @@ void setUp() { @Test public void testInsert() throws IOException { GenericRecord insertRecord = createRecord(0, Operation.INSERT, "00001.111"); - MySqlDebeziumAvroPayload payload = new MySqlDebeziumAvroPayload(insertRecord, "00001.111"); + MySqlDebeziumAvroPayload payload = new MySqlDebeziumAvroPayload(insertRecord, "00001.111", EMPTY_PROPS); validateRecord(payload.getInsertValue(avroSchema), 0, Operation.INSERT, "00001.111"); } @Test public void testPreCombine() { GenericRecord insertRecord = createRecord(0, Operation.INSERT, "00002.111"); - MySqlDebeziumAvroPayload insertPayload = new MySqlDebeziumAvroPayload(insertRecord, "00002.111"); + MySqlDebeziumAvroPayload insertPayload = new MySqlDebeziumAvroPayload(insertRecord, "00002.111", EMPTY_PROPS); GenericRecord updateRecord = createRecord(0, Operation.UPDATE, "00001.111"); - MySqlDebeziumAvroPayload updatePayload = new MySqlDebeziumAvroPayload(updateRecord, "00001.111"); + MySqlDebeziumAvroPayload updatePayload = new MySqlDebeziumAvroPayload(updateRecord, "00001.111", EMPTY_PROPS); GenericRecord deleteRecord = createRecord(0, Operation.DELETE, "00002.11"); - MySqlDebeziumAvroPayload deletePayload = new MySqlDebeziumAvroPayload(deleteRecord, "00002.11"); + MySqlDebeziumAvroPayload deletePayload = new MySqlDebeziumAvroPayload(deleteRecord, "00002.11", EMPTY_PROPS); assertEquals(insertPayload, insertPayload.preCombine(updatePayload)); assertEquals(deletePayload, deletePayload.preCombine(updatePayload)); @@ -86,19 +87,19 @@ public void testPreCombine() { @Test public void testMergeWithUpdate() throws IOException { GenericRecord updateRecord = createRecord(1, Operation.UPDATE, "00002.11"); - MySqlDebeziumAvroPayload payload = new MySqlDebeziumAvroPayload(updateRecord, "00002.11"); + MySqlDebeziumAvroPayload payload = new MySqlDebeziumAvroPayload(updateRecord, "00002.11", EMPTY_PROPS); GenericRecord existingRecord = createRecord(1, Operation.INSERT, "00001.111"); Option mergedRecord = payload.combineAndGetUpdateValue(existingRecord, avroSchema); validateRecord(mergedRecord, 1, Operation.UPDATE, "00002.11"); GenericRecord lateRecord = createRecord(1, Operation.UPDATE, "00000.222"); - payload = new MySqlDebeziumAvroPayload(lateRecord, "00000.222"); + payload = new MySqlDebeziumAvroPayload(lateRecord, "00000.222", EMPTY_PROPS); mergedRecord = payload.combineAndGetUpdateValue(existingRecord, avroSchema); validateRecord(mergedRecord, 1, Operation.INSERT, "00001.111"); GenericRecord originalRecord = createRecord(1, Operation.INSERT, "00000.23"); - payload = new MySqlDebeziumAvroPayload(originalRecord, "00000.23"); + payload = new MySqlDebeziumAvroPayload(originalRecord, "00000.23", EMPTY_PROPS); updateRecord = createRecord(1, Operation.UPDATE, "00000.123"); mergedRecord = payload.combineAndGetUpdateValue(updateRecord, avroSchema); validateRecord(mergedRecord, 1, Operation.UPDATE, "00000.123"); @@ -107,7 +108,7 @@ public void testMergeWithUpdate() throws IOException { @Test public void testMergeWithDelete() throws IOException { GenericRecord deleteRecord = createRecord(2, Operation.DELETE, "00002.11"); - MySqlDebeziumAvroPayload payload = new MySqlDebeziumAvroPayload(deleteRecord, "00002.11"); + MySqlDebeziumAvroPayload payload = new MySqlDebeziumAvroPayload(deleteRecord, "00002.11", EMPTY_PROPS); assertTrue(payload.isDeleted(avroSchema, new Properties())); GenericRecord existingRecord = createRecord(2, Operation.UPDATE, "00001.111"); @@ -116,7 +117,7 @@ public void testMergeWithDelete() throws IOException { assertFalse(mergedRecord.isPresent()); GenericRecord lateRecord = createRecord(2, Operation.DELETE, "00000.222"); - payload = new MySqlDebeziumAvroPayload(lateRecord, "00000.222"); + payload = new MySqlDebeziumAvroPayload(lateRecord, "00000.222", EMPTY_PROPS); mergedRecord = payload.combineAndGetUpdateValue(existingRecord, avroSchema); validateRecord(mergedRecord, 2, Operation.UPDATE, "00001.111"); } @@ -124,7 +125,7 @@ public void testMergeWithDelete() throws IOException { @Test public void testMergeWithBootstrappedExistingRecords() throws IOException { GenericRecord incomingRecord = createRecord(3, Operation.UPDATE, "00002.111"); - MySqlDebeziumAvroPayload payload = new MySqlDebeziumAvroPayload(incomingRecord, "00002.111"); + MySqlDebeziumAvroPayload payload = new MySqlDebeziumAvroPayload(incomingRecord, "00002.111", EMPTY_PROPS); GenericRecord existingRecord = createRecord(3, null, null); Option mergedRecord = payload.combineAndGetUpdateValue(existingRecord, avroSchema); @@ -134,7 +135,7 @@ public void testMergeWithBootstrappedExistingRecords() throws IOException { @Test public void testInvalidIncomingRecord() { GenericRecord incomingRecord = createRecord(4, null, null); - MySqlDebeziumAvroPayload payload = new MySqlDebeziumAvroPayload(incomingRecord, "00002.111"); + MySqlDebeziumAvroPayload payload = new MySqlDebeziumAvroPayload(incomingRecord, "00002.111", EMPTY_PROPS); GenericRecord existingRecord = createRecord(4, Operation.INSERT, "00001.111"); assertThrows(HoodieDebeziumAvroPayloadException.class, diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestPostgresDebeziumAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestPostgresDebeziumAvroPayload.java index 945a0d7640666..07e83ccea5771 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestPostgresDebeziumAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestPostgresDebeziumAvroPayload.java @@ -43,6 +43,7 @@ import java.util.Objects; import java.util.Properties; +import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS; import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -72,20 +73,20 @@ void setUp() { @Test public void testInsert() throws IOException { GenericRecord insertRecord = createRecord(0, Operation.INSERT, 100L); - PostgresDebeziumAvroPayload payload = new PostgresDebeziumAvroPayload(insertRecord, 100L); + PostgresDebeziumAvroPayload payload = new PostgresDebeziumAvroPayload(insertRecord, 100L, EMPTY_PROPS); validateRecord(payload.getInsertValue(avroSchema), 0, Operation.INSERT, 100L); } @Test public void testPreCombine() { GenericRecord insertRecord = createRecord(0, Operation.INSERT, 120L); - PostgresDebeziumAvroPayload insertPayload = new PostgresDebeziumAvroPayload(insertRecord, 120L); + PostgresDebeziumAvroPayload insertPayload = new PostgresDebeziumAvroPayload(insertRecord, 120L, EMPTY_PROPS); GenericRecord updateRecord = createRecord(0, Operation.UPDATE, 99L); - PostgresDebeziumAvroPayload updatePayload = new PostgresDebeziumAvroPayload(updateRecord, 99L); + PostgresDebeziumAvroPayload updatePayload = new PostgresDebeziumAvroPayload(updateRecord, 99L, EMPTY_PROPS); GenericRecord deleteRecord = createRecord(0, Operation.DELETE, 111L); - PostgresDebeziumAvroPayload deletePayload = new PostgresDebeziumAvroPayload(deleteRecord, 111L); + PostgresDebeziumAvroPayload deletePayload = new PostgresDebeziumAvroPayload(deleteRecord, 111L, EMPTY_PROPS); assertEquals(insertPayload, insertPayload.preCombine(updatePayload)); assertEquals(deletePayload, deletePayload.preCombine(updatePayload)); @@ -95,14 +96,14 @@ public void testPreCombine() { @Test public void testMergeWithUpdate() throws IOException { GenericRecord updateRecord = createRecord(1, Operation.UPDATE, 100L); - PostgresDebeziumAvroPayload payload = new PostgresDebeziumAvroPayload(updateRecord, 100L); + PostgresDebeziumAvroPayload payload = new PostgresDebeziumAvroPayload(updateRecord, 100L, EMPTY_PROPS); GenericRecord existingRecord = createRecord(1, Operation.INSERT, 99L); Option mergedRecord = payload.combineAndGetUpdateValue(existingRecord, avroSchema); validateRecord(mergedRecord, 1, Operation.UPDATE, 100L); GenericRecord lateRecord = createRecord(1, Operation.UPDATE, 98L); - payload = new PostgresDebeziumAvroPayload(lateRecord, 98L); + payload = new PostgresDebeziumAvroPayload(lateRecord, 98L, EMPTY_PROPS); mergedRecord = payload.combineAndGetUpdateValue(existingRecord, avroSchema); validateRecord(mergedRecord, 1, Operation.INSERT, 99L); } @@ -110,7 +111,7 @@ public void testMergeWithUpdate() throws IOException { @Test public void testMergeWithDelete() throws IOException { GenericRecord deleteRecord = createRecord(2, Operation.DELETE, 100L); - PostgresDebeziumAvroPayload payload = new PostgresDebeziumAvroPayload(deleteRecord, 100L); + PostgresDebeziumAvroPayload payload = new PostgresDebeziumAvroPayload(deleteRecord, 100L, EMPTY_PROPS); assertTrue(payload.isDeleted(avroSchema, new Properties())); GenericRecord existingRecord = createRecord(2, Operation.UPDATE, 99L); @@ -119,7 +120,7 @@ public void testMergeWithDelete() throws IOException { assertFalse(mergedRecord.isPresent()); GenericRecord lateRecord = createRecord(2, Operation.DELETE, 98L); - payload = new PostgresDebeziumAvroPayload(lateRecord, 98L); + payload = new PostgresDebeziumAvroPayload(lateRecord, 98L, EMPTY_PROPS); mergedRecord = payload.combineAndGetUpdateValue(existingRecord, avroSchema); validateRecord(mergedRecord, 2, Operation.UPDATE, 99L); } @@ -127,14 +128,15 @@ public void testMergeWithDelete() throws IOException { @Test public void testMergeWithDeleteUsingEmptyRecord() throws IOException { // empty record being merged with current record. - HoodieRecord emptyRecord = new HoodieAvroRecord(new HoodieKey(), new PostgresDebeziumAvroPayload(Option.empty())); + HoodieRecord emptyRecord = + new HoodieAvroRecord(new HoodieKey(), new PostgresDebeziumAvroPayload(Option.empty(), EMPTY_PROPS)); GenericRecord existingRecord = createRecord(2, Operation.UPDATE, 99L); Option mergedRecord = emptyRecord.getData().combineAndGetUpdateValue(existingRecord, avroSchema, new TypedProperties()); // expect nothing to be committed to table assertFalse(mergedRecord.isPresent()); // Insert record being merged with empty record. GenericRecord insertedRecord = createRecord(1, Operation.INSERT, 100L); - PostgresDebeziumAvroPayload insertPayload = new PostgresDebeziumAvroPayload(insertedRecord, 100L); + PostgresDebeziumAvroPayload insertPayload = new PostgresDebeziumAvroPayload(insertedRecord, 100L, EMPTY_PROPS); PostgresDebeziumAvroPayload combinedPayload = (PostgresDebeziumAvroPayload) insertPayload.preCombine(emptyRecord.getData(), avroSchema, new TypedProperties()); assertEquals(insertPayload, combinedPayload); } @@ -142,7 +144,7 @@ public void testMergeWithDeleteUsingEmptyRecord() throws IOException { @Test public void testMergeWithBootstrappedExistingRecords() throws IOException { GenericRecord incomingRecord = createRecord(3, Operation.UPDATE, 100L); - PostgresDebeziumAvroPayload payload = new PostgresDebeziumAvroPayload(incomingRecord, 100L); + PostgresDebeziumAvroPayload payload = new PostgresDebeziumAvroPayload(incomingRecord, 100L, EMPTY_PROPS); GenericRecord existingRecord = createRecord(3, null, null); Option mergedRecord = payload.combineAndGetUpdateValue(existingRecord, avroSchema); @@ -152,7 +154,7 @@ public void testMergeWithBootstrappedExistingRecords() throws IOException { @Test public void testInvalidIncomingRecord() { GenericRecord incomingRecord = createRecord(4, null, null); - PostgresDebeziumAvroPayload payload = new PostgresDebeziumAvroPayload(incomingRecord, 100L); + PostgresDebeziumAvroPayload payload = new PostgresDebeziumAvroPayload(incomingRecord, 100L, EMPTY_PROPS); GenericRecord existingRecord = createRecord(4, Operation.INSERT, 99L); assertThrows(HoodieDebeziumAvroPayloadException.class, @@ -193,7 +195,7 @@ public void testMergeWithToastedValues() throws IOException { newVal.put("string_null_col_2", "valid string value"); newVal.put("byte_null_col_2", ByteBuffer.wrap(getUTF8Bytes("valid byte value"))); - PostgresDebeziumAvroPayload payload = new PostgresDebeziumAvroPayload(Option.of(newVal)); + PostgresDebeziumAvroPayload payload = new PostgresDebeziumAvroPayload(Option.of(newVal), EMPTY_PROPS); GenericRecord outputRecord = (GenericRecord) payload .combineAndGetUpdateValue(oldVal, avroSchema).get(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java index de262ce0d6486..1310b2f9924a2 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java @@ -52,6 +52,7 @@ import static org.apache.hudi.avro.HoodieAvroUtils.createHoodieRecordFromAvro; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA; +import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS; import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; /** @@ -177,7 +178,7 @@ public static List asDefaultPayloadRecords(List reco convertedRecords.add( createHoodieRecordFromAvro(avroData, DefaultHoodieRecordPayload.class.getName(), "timestamp", Option.of(Pair.of("_row_key", "partition_path")), - false, Option.empty(), false, Option.of(AVRO_SCHEMA))); + false, Option.empty(), false, Option.of(AVRO_SCHEMA), EMPTY_PROPS)); } return convertedRecords; } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java index f06670cc76d27..b7c436b1f594f 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java @@ -18,20 +18,29 @@ package org.apache.hudi.common.util; -import org.apache.avro.generic.GenericRecord; - import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.HoodieAvroRecordMerger; import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.exception.HoodieException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -class TestHoodieRecordUtils { +public class TestHoodieRecordUtils { @Test void loadHoodieMerge() { @@ -48,10 +57,38 @@ void loadHoodieMergeWithWrongMerger() { assertThrows(HoodieException.class, () -> HoodieRecordUtils.loadRecordMerger(mergeClassName)); } - @Test - void loadPayload() { - String payloadClassName = DefaultHoodieRecordPayload.class.getName(); - HoodieRecordPayload payload = HoodieRecordUtils.loadPayload(payloadClassName, new Object[] {null, 0}, GenericRecord.class, Comparable.class); + private static Iterable payloadClassNames() { + List opts = new ArrayList<>(); + opts.add(new Object[] {DefaultHoodieRecordPayload.class.getName()}); + opts.add(new Object[] {DummyAvroPayload.class.getName()}); + return opts; + } + + @ParameterizedTest + @MethodSource("payloadClassNames") + void testCreatePayload(String payloadClassName) throws IOException { + HoodieRecordPayload payload = HoodieRecordUtils.createPayload( + payloadClassName, null, 0, EMPTY_PROPS); assertEquals(payload.getClass().getName(), payloadClassName); + + GenericRecord record = new GenericData.Record(new Schema.Parser().parse( + "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ " + + "{\"name\": \"timestamp\",\"type\": \"long\"} ]}" + )); + record.put("timestamp", 1L); + payload = HoodieRecordUtils.createPayload( + payloadClassName, record, EMPTY_PROPS); + assertEquals(payload.getClass().getName(), payloadClassName); + } + + public static class DummyAvroPayload extends OverwriteWithLatestAvroPayload { + + public DummyAvroPayload(GenericRecord gr, Comparable orderingVal) { + super(gr, orderingVal); + } + + public DummyAvroPayload(Option gr) { + super(gr); + } } } \ No newline at end of file diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index fa31e0cb8bc45..bf3043dcaaaad 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -51,6 +51,7 @@ import org.apache.flink.util.Collector; import java.util.Objects; +import java.util.Properties; /** * The function to build the write profile incrementally for records within a checkpoint, @@ -91,6 +92,8 @@ public class BucketAssignFunction> private final Configuration conf; + private final Properties props; + private final boolean isChangingRecords; /** @@ -106,6 +109,7 @@ public class BucketAssignFunction> public BucketAssignFunction(Configuration conf) { this.conf = conf; + this.props = PayloadCreation.extractPropsFromConfiguration(conf); this.isChangingRecords = WriteOperationType.isChangingRecords( WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))); this.globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED) @@ -184,7 +188,7 @@ private void processRecord(HoodieRecord record, Collector out) throws Exce // if partition path changes, emit a delete record for old partition path, // then update the index state using location with new partition path. HoodieRecord deleteRecord = new HoodieAvroRecord<>(new HoodieKey(recordKey, oldLoc.getPartitionPath()), - payloadCreation.createDeletePayload((BaseAvroPayload) record.getData())); + payloadCreation.createDeletePayload((BaseAvroPayload) record.getData(), props)); deleteRecord.unseal(); deleteRecord.setCurrentLocation(oldLoc.toLocal("U")); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java index bfc7d7d62ad45..03b2bd040d94d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java @@ -37,6 +37,7 @@ import org.apache.flink.table.types.logical.RowType; import java.io.IOException; +import java.util.Properties; import static org.apache.hudi.util.StreamerUtil.flinkConf2TypedProperties; @@ -74,10 +75,12 @@ public class RowDataToHoodieFunction * Config options. */ private final Configuration config; + private final Properties props; public RowDataToHoodieFunction(RowType rowType, Configuration config) { this.rowType = rowType; this.config = config; + this.props = PayloadCreation.extractPropsFromConfiguration(config); } @Override @@ -94,7 +97,7 @@ public void open(Configuration parameters) throws Exception { @SuppressWarnings("unchecked") @Override public O map(I i) throws Exception { - return (O) toHoodieRecord(i); + return (O) toHoodieRecord(i, props); } /** @@ -105,11 +108,11 @@ public O map(I i) throws Exception { * @throws IOException if error occurs */ @SuppressWarnings("rawtypes") - private HoodieRecord toHoodieRecord(I record) throws Exception { + private HoodieRecord toHoodieRecord(I record, Properties props) throws Exception { GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record); final HoodieKey hoodieKey = keyGenerator.getKey(gr); - HoodieRecordPayload payload = payloadCreation.createPayload(gr); + HoodieRecordPayload payload = payloadCreation.createPayload(gr, props); HoodieOperation operation = HoodieOperation.fromValue(record.getRowKind().toByteValue()); return new HoodieAvroRecord<>(hoodieKey, payload, operation); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java index b7756a490bf31..d80cd00e712bc 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java @@ -35,6 +35,7 @@ import java.io.Serializable; import java.lang.reflect.Constructor; +import java.util.Properties; /** * Util to create hoodie pay load instance. @@ -43,14 +44,17 @@ public class PayloadCreation implements Serializable { private static final long serialVersionUID = 1L; private final boolean shouldCombine; + private final boolean shouldUsePropsForPayload; private final Constructor constructor; private final String preCombineField; private PayloadCreation( boolean shouldCombine, + boolean shouldUsePropsForPayload, Constructor constructor, @Nullable String preCombineField) { this.shouldCombine = shouldCombine; + this.shouldUsePropsForPayload = shouldUsePropsForPayload; this.constructor = constructor; this.preCombineField = preCombineField; } @@ -60,34 +64,63 @@ public static PayloadCreation instance(Configuration conf) throws Exception { boolean needCombine = conf.getBoolean(FlinkOptions.PRE_COMBINE) || WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT; boolean shouldCombine = needCombine && preCombineField != null; + boolean shouldUsePropsForPayload = true; - final Class[] argTypes; - final Constructor constructor; + Class[] argTypes; + Constructor constructor; if (shouldCombine) { - argTypes = new Class[] {GenericRecord.class, Comparable.class}; + argTypes = new Class[] {GenericRecord.class, Comparable.class, Properties.class}; } else { - argTypes = new Class[] {Option.class}; + argTypes = new Class[] {Option.class, Properties.class}; } final String clazz = conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME); - constructor = ReflectionUtils.getClass(clazz).getConstructor(argTypes); - return new PayloadCreation(shouldCombine, constructor, preCombineField); + try { + constructor = ReflectionUtils.getClass(clazz).getConstructor(argTypes); + } catch (NoSuchMethodException e) { + shouldUsePropsForPayload = false; + if (shouldCombine) { + argTypes = new Class[] {GenericRecord.class, Comparable.class}; + } else { + argTypes = new Class[] {Option.class}; + } + constructor = ReflectionUtils.getClass(clazz).getConstructor(argTypes); + } + return new PayloadCreation(shouldCombine, shouldUsePropsForPayload, constructor, preCombineField); + } + + public static Properties extractPropsFromConfiguration(Configuration config) { + Properties props = new Properties(); + config.addAllToProperties(props); + return props; } - public HoodieRecordPayload createPayload(GenericRecord record) throws Exception { + public HoodieRecordPayload createPayload(GenericRecord record, Properties props) throws Exception { if (shouldCombine) { ValidationUtils.checkState(preCombineField != null); Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(record, preCombineField, false, false); + if (shouldUsePropsForPayload) { + return (HoodieRecordPayload) constructor.newInstance(record, orderingVal, props); + } return (HoodieRecordPayload) constructor.newInstance(record, orderingVal); } else { + if (shouldUsePropsForPayload) { + return (HoodieRecordPayload) this.constructor.newInstance(Option.of(record), props); + } return (HoodieRecordPayload) this.constructor.newInstance(Option.of(record)); } } - public HoodieRecordPayload createDeletePayload(BaseAvroPayload payload) throws Exception { + public HoodieRecordPayload createDeletePayload(BaseAvroPayload payload, Properties props) throws Exception { if (shouldCombine) { + if (shouldUsePropsForPayload) { + return (HoodieRecordPayload) constructor.newInstance(null, payload.getOrderingVal(), props); + } return (HoodieRecordPayload) constructor.newInstance(null, payload.getOrderingVal()); } else { + if (shouldUsePropsForPayload) { + return (HoodieRecordPayload) this.constructor.newInstance(Option.empty(), props); + } return (HoodieRecordPayload) this.constructor.newInstance(Option.empty()); } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCustomSerDe.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCustomSerDe.java index 89962bf834785..8f68aa1f4a545 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCustomSerDe.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCustomSerDe.java @@ -18,12 +18,6 @@ package org.apache.hudi.sink.compact; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; - -import org.apache.avro.LogicalTypes; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericFixed; import org.apache.hudi.common.model.EventTimeAvroPayload; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; @@ -31,13 +25,21 @@ import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.util.collection.BitCaskDiskMap; import org.apache.hudi.common.util.collection.RocksDbDiskMap; + +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericFixed; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; - -import java.io.IOException; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.io.IOException; + +import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + /** * Tests for custom SerDe of non-primitive avro types when using Avro versions > 1.10.0. * The avro version used by hudi-flink module is 1.10.0, these tests are placed here so that avro 1.10.0 is used, @@ -75,7 +77,7 @@ private static HoodieRecord createAvroRecordWithDecimalOrderingField() { // nullifying the record attribute in EventTimeAvroPayload here as it is not required in the test return new HoodieAvroRecord<>(new HoodieKey("recordKey", "partitionPath"), - new EventTimeAvroPayload(null, (Comparable) genericFixed)); + new EventTimeAvroPayload(null, (Comparable) genericFixed, EMPTY_PROPS)); } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index a088982138b34..e965d17cc2502 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieSparkRecord; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; @@ -59,6 +60,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.stream.Collectors; import scala.Tuple2; @@ -126,19 +128,6 @@ public static Option>> createUserDefinedBulkI } } - /** - * Create a payload class via reflection, passing in an ordering/precombine value. - */ - public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal) - throws IOException { - try { - return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass, - new Class[] {GenericRecord.class, Comparable.class}, record, orderingVal); - } catch (Throwable e) { - throw new IOException("Could not create payload for class: " + payloadClass, e); - } - } - public static Map getExtraMetadata(Map properties) { Map extraMetadataMap = new HashMap<>(); if (properties.containsKey(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX().key())) { @@ -157,19 +146,6 @@ public static Map getExtraMetadata(Map propertie return extraMetadataMap; } - /** - * Create a payload class via reflection, do not ordering/precombine value. - */ - public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record) - throws IOException { - try { - return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass, - new Class[] {Option.class}, Option.of(record)); - } catch (Throwable e) { - throw new IOException("Could not create payload for class: " + payloadClass, e); - } - } - public static HoodieWriteConfig createHoodieConfig(String schemaStr, String basePath, String tblName, Map parameters) { boolean asyncCompact = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key())); @@ -255,8 +231,8 @@ public static HoodieWriteResult doDeletePartitionsOperation(SparkRDDWriteClient } public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable orderingVal, HoodieKey hKey, - String payloadClass, scala.Option recordLocation) throws IOException { - HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal); + String payloadClass, scala.Option recordLocation, Properties props) throws IOException { + HoodieRecordPayload payload = HoodieRecordUtils.createPayload(payloadClass, gr, orderingVal, props); HoodieAvroRecord record = new HoodieAvroRecord<>(hKey, payload); if (recordLocation.isDefined()) { record.setCurrentLocation(recordLocation.get()); @@ -265,8 +241,9 @@ public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable order } public static HoodieRecord createHoodieRecord(GenericRecord gr, HoodieKey hKey, - String payloadClass, scala.Option recordLocation) throws IOException { - HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr); + String payloadClass, scala.Option recordLocation, + Properties props) throws IOException { + HoodieRecordPayload payload = HoodieRecordUtils.createPayload(payloadClass, gr, props); HoodieAvroRecord record = new HoodieAvroRecord<>(hKey, payload); if (recordLocation.isDefined()) { record.setCurrentLocation(recordLocation.get()); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala index e9201cc66cc46..0bff0f26eb71d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala @@ -143,10 +143,10 @@ object HoodieCreateRecordUtils { val orderingVal = HoodieAvroUtils.getNestedFieldVal(avroRec, config.getString(PRECOMBINE_FIELD), false, consistentLogicalTimestampEnabled).asInstanceOf[Comparable[_]] DataSourceUtils.createHoodieRecord(processedRecord, orderingVal, hoodieKey, - config.getString(PAYLOAD_CLASS_NAME), recordLocation) + config.getString(PAYLOAD_CLASS_NAME), recordLocation, config.getProps) } else { DataSourceUtils.createHoodieRecord(processedRecord, hoodieKey, - config.getString(PAYLOAD_CLASS_NAME), recordLocation) + config.getString(PAYLOAD_CLASS_NAME), recordLocation, config.getProps) } hoodieRecord } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ValidateDuplicateKeyPayload.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ValidateDuplicateKeyPayload.scala index 2619d1d9fe151..f998ab6635274 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ValidateDuplicateKeyPayload.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ValidateDuplicateKeyPayload.scala @@ -30,11 +30,11 @@ import java.util.Properties * Validate the duplicate key for insert statement without enable the INSERT_DROP_DUPS_OPT * config. */ -class ValidateDuplicateKeyPayload(record: GenericRecord, orderingVal: Comparable[_]) - extends DefaultHoodieRecordPayload(record, orderingVal) { +class ValidateDuplicateKeyPayload(record: GenericRecord, orderingVal: Comparable[_], props: Properties) + extends DefaultHoodieRecordPayload(record, orderingVal, props) { - def this(record: HOption[GenericRecord]) { - this(if (record.isPresent) record.get else null, 0) + def this(record: HOption[GenericRecord], props: Properties) { + this(if (record.isPresent) record.get else null, 0, props) } override def combineAndGetUpdateValue(currentValue: IndexedRecord, diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java index 59674b928fdfa..d96dff823f9dc 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java @@ -18,9 +18,6 @@ package org.apache.hudi; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -28,6 +25,10 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; import org.apache.spark.sql.Row; import java.io.IOException; @@ -43,6 +44,8 @@ import java.util.stream.IntStream; import java.util.stream.Stream; +import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS; + /** * Class to be used in quickstart guide for generating inserts and updates against a corpus. Test data uses a toy Uber * trips, data model. @@ -127,7 +130,7 @@ public static OverwriteWithLatestAvroPayload generateRandomValue(HoodieKey key, GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + riderDriverSuffix, "driver-" + riderDriverSuffix, generateRangeRandomTimestamp(7)); - return new OverwriteWithLatestAvroPayload(Option.of(rec)); + return new OverwriteWithLatestAvroPayload(Option.of(rec), EMPTY_PROPS); } /** diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java index 6117cdcae1edc..190d45ea2c534 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java @@ -84,7 +84,7 @@ public JavaRDD generateInputRecords(String tableName, String sourc Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))); try { return DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr), - props.getString("hoodie.datasource.write.payload.class"), scala.Option.apply(null)); + props.getString("hoodie.datasource.write.payload.class"), scala.Option.apply(null), props); } catch (IOException ioe) { throw new HoodieIOException(ioe.getMessage(), ioe); } diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java index 1411d4f4796c0..12b7402aa1416 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java @@ -18,9 +18,14 @@ package org.apache.hudi.payload; -import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.util.Option; +import org.apache.avro.generic.GenericRecord; + +import java.util.Properties; + +import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS; + /** * Provides support for seamlessly applying changes captured via Amazon Database Migration Service onto S3. * @@ -38,11 +43,21 @@ @Deprecated public class AWSDmsAvroPayload extends org.apache.hudi.common.model.AWSDmsAvroPayload { + @Deprecated public AWSDmsAvroPayload(GenericRecord record, Comparable orderingVal) { - super(record, orderingVal); + this(record, orderingVal, EMPTY_PROPS); } + @Deprecated public AWSDmsAvroPayload(Option record) { - super(record); + this(record, EMPTY_PROPS); + } + + public AWSDmsAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) { + super(record, orderingVal, props); + } + + public AWSDmsAvroPayload(Option record, Properties props) { + super(record, props); } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala index 0989b8b09aee4..24bb7eda99d9a 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala @@ -54,16 +54,18 @@ import scala.collection.JavaConverters._ * a HoodieWriteHandle.IGNORE_RECORD, and the write handles will ignore this record. * * NOTE: Please note that, ctor parameter SHOULD NOT be used w/in the class body as - * otherwise Scala will instantiate them as fields making whole [[ExpressionPayload]] - * non-serializable. As an additional hedge, these are annotated as [[transient]] to - * prevent this from happening. + * otherwise Scala will instantiate them as fields making whole [[ExpressionPayload]] + * non-serializable. As an additional hedge, these are annotated as [[transient]] to + * prevent this from happening. */ class ExpressionPayload(@transient record: GenericRecord, - @transient orderingVal: Comparable[_]) - extends DefaultHoodieRecordPayload(record, orderingVal) with Logging { + @transient orderingVal: Comparable[_], + @transient props: Properties) + extends DefaultHoodieRecordPayload(record, orderingVal, props) + with Logging { - def this(recordOpt: HOption[GenericRecord]) { - this(recordOpt.orElse(null), 0) + def this(recordOpt: HOption[GenericRecord], props: Properties) { + this(recordOpt.orElse(null), 0, props) } override def combineAndGetUpdateValue(currentValue: IndexedRecord, @@ -236,7 +238,7 @@ class ExpressionPayload(@transient record: GenericRecord, val recordSchema = getRecordSchema(properties) val incomingRecord = ConvertibleRecord(bytesToAvro(recordBytes, recordSchema)) - if (super.isDeleteRecord(incomingRecord.asAvro)) { + if (super.isDeleteRecord(incomingRecord.asAvro, properties)) { HOption.empty[IndexedRecord]() } else if (isMORTable(properties)) { // For the MOR table, both the matched and not-matched record will step into the getInsertValue() method. diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java index ed3976d4f78df..b34d5a381673e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java @@ -74,6 +74,7 @@ import java.util.stream.Stream; import static org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty; +import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; @@ -346,7 +347,8 @@ public void testSerHoodieMetadataPayload() throws IOException { byte[] recordToBytes = HoodieAvroUtils.indexedRecordToBytes(record); GenericRecord genericRecord = HoodieAvroUtils.bytesToAvro(recordToBytes, record.getSchema()); - HoodieMetadataPayload genericRecordHoodieMetadataPayload = new HoodieMetadataPayload(Option.of(genericRecord)); + HoodieMetadataPayload genericRecordHoodieMetadataPayload = + new HoodieMetadataPayload(Option.of(genericRecord), EMPTY_PROPS); byte[] bytes = SerializationUtils.serialize(genericRecordHoodieMetadataPayload); HoodieMetadataPayload deserGenericRecordHoodieMetadataPayload = SerializationUtils.deserialize(bytes); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala index 61a7a04823abf..7c19d37511cae 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala @@ -22,7 +22,8 @@ import org.apache.avro.generic.GenericRecord import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.model._ -import org.apache.hudi.common.testutils.{SchemaTestUtil, PreCombineTestUtils} +import org.apache.hudi.common.testutils.{PreCombineTestUtils, SchemaTestUtil} +import org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS import org.apache.hudi.common.util.Option import org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH import org.apache.hudi.config.HoodiePayloadConfig @@ -538,10 +539,10 @@ class TestDataSourceDefaults extends ScalaAssertionSupport { } @Test def testOverwriteWithLatestAvroPayload(): Unit = { - val overWritePayload1 = new OverwriteWithLatestAvroPayload(baseRecord, 1) + val overWritePayload1 = new OverwriteWithLatestAvroPayload(baseRecord, 1, EMPTY_PROPS) val laterRecord = SchemaTestUtil .generateAvroRecordFromJson(schema, 2, "001", "f1") - val overWritePayload2 = new OverwriteWithLatestAvroPayload(laterRecord, 2) + val overWritePayload2 = new OverwriteWithLatestAvroPayload(laterRecord, 2, EMPTY_PROPS) // it will provide the record with greatest combine value val combinedPayload12 = overWritePayload1.preCombine(overWritePayload2) @@ -562,12 +563,20 @@ class TestDataSourceDefaults extends ScalaAssertionSupport { val baseOrderingVal: Object = baseRecord.get("favoriteIntNumber") val fieldSchema: Schema = baseRecord.getSchema().getField("favoriteIntNumber").schema() - val basePayload = new OverwriteWithLatestAvroPayload(baseRecord, HoodieAvroUtils.convertValueForSpecificDataTypes(fieldSchema, baseOrderingVal, false).asInstanceOf[Comparable[_]]) + val basePayload = new OverwriteWithLatestAvroPayload( + baseRecord, + HoodieAvroUtils.convertValueForSpecificDataTypes( + fieldSchema, baseOrderingVal, false).asInstanceOf[Comparable[_]], + EMPTY_PROPS) val laterRecord = SchemaTestUtil .generateAvroRecordFromJson(schema, 2, "001", "f1") val laterOrderingVal: Object = laterRecord.get("favoriteIntNumber") - val newerPayload = new OverwriteWithLatestAvroPayload(laterRecord, HoodieAvroUtils.convertValueForSpecificDataTypes(fieldSchema, laterOrderingVal, false).asInstanceOf[Comparable[_]]) + val newerPayload = new OverwriteWithLatestAvroPayload( + laterRecord, + HoodieAvroUtils.convertValueForSpecificDataTypes( + fieldSchema, laterOrderingVal, false).asInstanceOf[Comparable[_]], + EMPTY_PROPS) // it will provide the record with greatest combine value val preCombinedPayload = basePayload.preCombine(newerPayload) @@ -589,10 +598,14 @@ class TestDataSourceDefaults extends ScalaAssertionSupport { val earlierOrderingVal: Object = earlierRecord.get("favoriteIntNumber") val laterPayload = new DefaultHoodieRecordPayload(laterRecord, - HoodieAvroUtils.convertValueForSpecificDataTypes(fieldSchema, laterOrderingVal, false).asInstanceOf[Comparable[_]]) + HoodieAvroUtils.convertValueForSpecificDataTypes( + fieldSchema, laterOrderingVal, false).asInstanceOf[Comparable[_]], + EMPTY_PROPS) val earlierPayload = new DefaultHoodieRecordPayload(earlierRecord, - HoodieAvroUtils.convertValueForSpecificDataTypes(fieldSchema, earlierOrderingVal, false).asInstanceOf[Comparable[_]]) + HoodieAvroUtils.convertValueForSpecificDataTypes( + fieldSchema, earlierOrderingVal, false).asInstanceOf[Comparable[_]], + EMPTY_PROPS) // it will provide the record with greatest combine value val preCombinedPayload = laterPayload.preCombine(earlierPayload) @@ -612,10 +625,10 @@ class TestDataSourceDefaults extends ScalaAssertionSupport { } @Test def testEmptyHoodieRecordPayload(): Unit = { - val emptyPayload1 = new EmptyHoodieRecordPayload(baseRecord, 1) + val emptyPayload1 = new EmptyHoodieRecordPayload(baseRecord, 1, EMPTY_PROPS) val laterRecord = SchemaTestUtil .generateAvroRecordFromJson(schema, 2, "001", "f1") - val emptyPayload2 = new EmptyHoodieRecordPayload(laterRecord, 2) + val emptyPayload2 = new EmptyHoodieRecordPayload(laterRecord, 2, EMPTY_PROPS) // it will provide an empty record val combinedPayload12 = emptyPayload1.preCombine(emptyPayload2) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index ece1deacd7a25..7c34f44941b90 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -27,9 +27,9 @@ import org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPU import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType -import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType} +import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, WriteOperationType} import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline, TimelineUtils} -import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings} import org.apache.hudi.common.util @@ -1775,6 +1775,41 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup }) assertEquals(3, clusterInstants.size) } + + @Test + def testDefaultHoodieRecordPayloadWithCustomDeleteFields(): Unit = { + val options = Map( + HoodieTableConfig.NAME.key -> "test_table", + HoodieWriteConfig.BASE_PATH.key -> basePath, + DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator", + DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "", + DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key -> classOf[DefaultHoodieRecordPayload].getName, + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", + DefaultHoodieRecordPayload.DELETE_KEY -> "_change_operation_type", + DefaultHoodieRecordPayload.DELETE_MARKER -> "d" + ) + + val structType = new StructType() + .add("id", "string", false) + .add("ts", "long", false) + .add("value", "long", true) + .add("_change_operation_type", "string", false) + val rows = Seq(Row("key1", 0L, 1000L, "c"), Row("key1", 2L, 2000L, "d"), + Row("key2", 3L, 3000L, "c"), Row("key3", 4L, 4000L, "c")) + val df = spark.createDataFrame(spark.sparkContext.parallelize(rows), structType) + + df.write.format("hudi").options(options).mode("overwrite").save(basePath) + val actualDf = spark.read.format("hudi").load(basePath) + .select("id", "ts", "value", "_change_operation_type") + val expectedDf = spark.createDataFrame(spark.sparkContext.parallelize( + Seq(Row("key2", 3L, 3000L, "c"), Row("key3", 4L, 4000L, "c")) + ), structType) + + assertEquals(2, actualDf.count()) + assertEquals(2, actualDf.intersect(expectedDf).count()) + } } object TestCOWDataSource { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index 936eef67a9b01..355ccf3320d4e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -54,6 +54,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CommitUtils; +import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; @@ -630,11 +631,12 @@ private Pair>> fetchFromSourc GenericRecord genRec = genericRecordIterator.next(); HoodieKey hoodieKey = new HoodieKey(builtinKeyGenerator.getRecordKey(genRec), builtinKeyGenerator.getPartitionPath(genRec)); GenericRecord gr = isDropPartitionColumns() ? HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec; - HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(cfg.payloadClassName, gr, + HoodieRecordPayload payload = shouldCombine ? HoodieRecordUtils.createPayload(cfg.payloadClassName, gr, (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false, props.getBoolean( KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), - Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())))) - : DataSourceUtils.createPayload(cfg.payloadClassName, gr); + Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))), + props) + : HoodieRecordUtils.createPayload(cfg.payloadClassName, gr, props); avroRecords.add(new HoodieAvroRecord<>(hoodieKey, payload)); } return avroRecords.iterator(); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 5b86f3fc90ac0..9ec3451455855 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -38,7 +38,6 @@ import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.PartialUpdateAvroPayload; import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.model.WriteOperationType; @@ -52,6 +51,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.TestHoodieRecordUtils.DummyAvroPayload; import org.apache.hudi.config.HoodieArchivalConfig; import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.config.HoodieCompactionConfig; @@ -2582,13 +2582,6 @@ public TestGenerator(TypedProperties props) { } } - public static class DummyAvroPayload extends OverwriteWithLatestAvroPayload { - - public DummyAvroPayload(GenericRecord gr, Comparable orderingVal) { - super(gr, orderingVal); - } - } - /** * Return empty table. */ diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestAWSDatabaseMigrationServiceSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestAWSDatabaseMigrationServiceSource.java index 3b1da6d00b51a..ed71a1711480f 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestAWSDatabaseMigrationServiceSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestAWSDatabaseMigrationServiceSource.java @@ -34,6 +34,7 @@ import java.io.Serializable; import java.util.Arrays; +import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -52,15 +53,15 @@ public void testPayload() throws IOException { record.put("id", "1"); record.put("Op", ""); record.put("ts", 0L); - AWSDmsAvroPayload payload = new AWSDmsAvroPayload(record, (Comparable) record.get("ts")); + AWSDmsAvroPayload payload = new AWSDmsAvroPayload(record, (Comparable) record.get("ts"), EMPTY_PROPS); assertTrue(payload.combineAndGetUpdateValue(null, schema).isPresent()); record.put("Op", "I"); - payload = new AWSDmsAvroPayload(record, (Comparable) record.get("ts")); + payload = new AWSDmsAvroPayload(record, (Comparable) record.get("ts"), EMPTY_PROPS); assertTrue(payload.combineAndGetUpdateValue(null, schema).isPresent()); record.put("Op", "D"); - payload = new AWSDmsAvroPayload(record, (Comparable) record.get("ts")); + payload = new AWSDmsAvroPayload(record, (Comparable) record.get("ts"), EMPTY_PROPS); assertFalse(payload.combineAndGetUpdateValue(null, schema).isPresent()); }