Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -1139,18 +1139,19 @@ public static HoodieRecord createHoodieRecordFromAvro(
Boolean withOperation,
Option<String> partitionNameOp,
Boolean populateMetaFields,
Option<Schema> schemaWithoutMetaFields) {
Option<Schema> schemaWithoutMetaFields,
Properties props) {
if (populateMetaFields) {
return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) data,
payloadClass, preCombineField, withOperation);
payloadClass, preCombineField, withOperation, props);
// Support HoodieFileSliceReader
} else if (simpleKeyGenFieldsOpt.isPresent()) {
// TODO in HoodieFileSliceReader may partitionName=option#empty
return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) data,
payloadClass, preCombineField, simpleKeyGenFieldsOpt.get(), withOperation, partitionNameOp, schemaWithoutMetaFields);
payloadClass, preCombineField, simpleKeyGenFieldsOpt.get(), withOperation, partitionNameOp, schemaWithoutMetaFields, props);
} else {
return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) data,
payloadClass, preCombineField, withOperation, partitionNameOp, schemaWithoutMetaFields);
payloadClass, preCombineField, withOperation, partitionNameOp, schemaWithoutMetaFields, props);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.io.IOException;
import java.util.Properties;

import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS;

/**
* Provides support for seamlessly applying changes captured via Amazon Database Migration Service onto S3.
*
Expand All @@ -45,12 +47,22 @@ public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload {

public static final String OP_FIELD = "Op";

@Deprecated
public AWSDmsAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
this(record, orderingVal, EMPTY_PROPS);
}

@Deprecated
public AWSDmsAvroPayload(Option<GenericRecord> record) {
this(record.isPresent() ? record.get() : null, 0); // natural order
this(record, EMPTY_PROPS);
}

public AWSDmsAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) {
super(record, orderingVal, props);
}

public AWSDmsAvroPayload(Option<GenericRecord> record, Properties props) {
this(record.isPresent() ? record.get() : null, 0, props); // natural order
}

/**
Expand Down Expand Up @@ -96,8 +108,8 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue
}

@Override
protected boolean isDeleteRecord(GenericRecord record) {
return isDMSDeleteRecord(record) || super.isDeleteRecord(record);
protected boolean isDeleteRecord(GenericRecord record, Properties props) {
return isDMSDeleteRecord(record) || super.isDeleteRecord(record, props);
}

private static boolean isDMSDeleteRecord(GenericRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.io.Serializable;
import java.util.Properties;

import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS;

/**
* Base class for all AVRO record based payloads, that can be ordered based on a field.
*/
Expand All @@ -43,16 +45,22 @@ public abstract class BaseAvroPayload implements Serializable {

protected final boolean isDeletedRecord;

@Deprecated
public BaseAvroPayload(GenericRecord record, Comparable orderingVal) {
this(record, orderingVal, EMPTY_PROPS);
}

/**
* Instantiate {@link BaseAvroPayload}.
*
* @param record Generic record for the payload.
* @param orderingVal {@link Comparable} to be used in pre combine.
* @param props Configuration in {@link Properties}.
*/
public BaseAvroPayload(GenericRecord record, Comparable orderingVal) {
public BaseAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) {
this.recordBytes = record != null ? HoodieAvroUtils.avroToBytes(record) : new byte[0];
this.orderingVal = orderingVal;
this.isDeletedRecord = record == null || isDeleteRecord(record);
this.isDeletedRecord = record == null || isDeleteRecord(record, props);

if (orderingVal == null) {
throw new HoodieException("Ordering value is null for record: " + record);
Expand All @@ -79,11 +87,17 @@ public boolean canProduceSentinel() {
return false;
}

@Deprecated
protected boolean isDeleteRecord(GenericRecord genericRecord) {
return isDeleteRecord(genericRecord, EMPTY_PROPS);
}

/**
* @param genericRecord instance of {@link GenericRecord} of interest.
* @param props Configuration in {@link Properties}.
* @returns {@code true} if record represents a delete record. {@code false} otherwise.
*/
protected boolean isDeleteRecord(GenericRecord genericRecord) {
protected boolean isDeleteRecord(GenericRecord genericRecord, Properties props) {
final String isDeleteKey = HoodieRecord.HOODIE_IS_DELETED_FIELD;
// Modify to be compatible with new version Avro.
// The new version Avro throws for GenericRecord.get if the field name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload {
public static final String DELETE_MARKER = "hoodie.payload.delete.marker";
private Option<Object> eventTime = Option.empty();

public DefaultHoodieRecordPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
public DefaultHoodieRecordPayload(GenericRecord record, Comparable orderingVal, Properties props) {
super(record, orderingVal, props);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The source of the props seems a chaos, I already saw several ways how it was produced:

  1. config.getPayloadConfig().getProps() in HoodieMergeHandle;
  2. payloadProps.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, preCombineField); in HoodieFileSliceReader;
  3. config.getProps() in HoodieIndexUtils.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this should be cleaned up separately. Regardless, the payload-related configs should always be included so there's no correctness issue. And we're not storing any props with the record so record processing is consistent.


public DefaultHoodieRecordPayload(Option<GenericRecord> record) {
this(record.isPresent() ? record.get() : null, 0); // natural order
public DefaultHoodieRecordPayload(Option<GenericRecord> record, Properties props) {
this(record.isPresent() ? record.get() : null, 0, props); // natural order
}

@Override
Expand All @@ -75,7 +75,7 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue
/*
* Now check if the incoming record is a delete record.
*/
return isDeleteRecord(incomingRecord, properties) ? Option.empty() : Option.of(incomingRecord);
return isDeleted(schema, properties) ? Option.empty() : Option.of(incomingRecord);
}

@Override
Expand All @@ -86,30 +86,26 @@ public Option<IndexedRecord> getInsertValue(Schema schema, Properties properties
GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema);
eventTime = updateEventTime(incomingRecord, properties);

return isDeleteRecord(incomingRecord, properties) ? Option.empty() : Option.of(incomingRecord);
return isDeleted(schema, properties) ? Option.empty() : Option.of(incomingRecord);
}

/**
* @param genericRecord instance of {@link GenericRecord} of interest.
* @param properties payload related properties
* @returns {@code true} if record represents a delete record. {@code false} otherwise.
*/
protected boolean isDeleteRecord(GenericRecord genericRecord, Properties properties) {
final String deleteKey = properties.getProperty(DELETE_KEY);
@Override
protected boolean isDeleteRecord(GenericRecord record, Properties props) {
final String deleteKey = props.getProperty(DELETE_KEY);
if (StringUtils.isNullOrEmpty(deleteKey)) {
return isDeleteRecord(genericRecord);
return super.isDeleteRecord(record, props);
Copy link
Contributor

@danny0405 danny0405 Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line the actual fix, I didn't see the props got used by the super method, so do we still need to pass around all the props here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation of this method is kept the same. However, before my fix, this method isDeleteRecord(GenericRecord genericRecord, Properties properties) was a protected method not overriding any method in super class, and it was not called in the constructor to determine whether a record is a delete.

After the fix, the method isDeleteRecord(GenericRecord genericRecord, Properties properties) overrides the same in the super class, and is now called in the constructor (see super class BaseAvroPayload constructor, where the following is called)

this.isDeletedRecord = record == null || isDeleteRecord(record, props);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the BaseAvroPayload.isDeleteRecord does not really use the passed in props, so do we still to change all the constructors just for passing the props, it seems just an additional invocation of BaseAvroPayload.isDeleteRecord could fix the problem and this method does not need any props actually.

We can address the props issue is separate task maybe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BaseAvroPayload.isDeleted(Schema schema, Properties props) already passes around the properties, so can we just overwrite it for DefaultHoodieRecordPayload and there is no need to change all the constructors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BaseAvroPayload.isDeleteRecord(GenericRecord genericRecord, Properties props) uses the props, and it is called by this.isDeletedRecord = record == null || isDeleteRecord(record, props); in the constructor. BaseAvroPayload.isDeleteRecord(GenericRecord genericRecord) is only for backwards compatibility, called from user-implemented record payload implementation.
Screenshot 2023-10-25 at 01 20 47

Copy link
Contributor

@danny0405 danny0405 Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm talking about the maste code, in master, BaseAvroPayload.isDeleteRecord only handles _hoodie_is_deleted field which looks reasonable. If BaseAvroPayload.public boolean isDeleted(Schema schema, Properties props) is the culprit that incurs the issue then just fix it, we can override it in DefaultHoodieRecordPayload and utilize those specific options there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention is that isDeleted(Schema schema, Properties props) should not deserialize the bytes for perf reason so we cannot implement the custom delete marker there for DefaultHoodieRecordPayload. The way now in this fix is to check the delete key and marker in the constructor before the Avro record is serialized; that’s why props needs to be passed in to the constructor.

}

ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(properties.getProperty(DELETE_MARKER)),
ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(props.getProperty(DELETE_MARKER)),
() -> DELETE_MARKER + " should be configured with " + DELETE_KEY);
// Modify to be compatible with new version Avro.
// The new version Avro throws for GenericRecord.get if the field name
// does not exist in the schema.
if (genericRecord.getSchema().getField(deleteKey) == null) {
if (record.getSchema().getField(deleteKey) == null) {
return false;
}
Object deleteMarker = genericRecord.get(deleteKey);
return deleteMarker != null && properties.getProperty(DELETE_MARKER).equals(deleteMarker.toString());
Object deleteMarker = record.get(deleteKey);
return deleteMarker != null && props.getProperty(DELETE_MARKER).equals(deleteMarker.toString());
}

private static Option<Object> updateEventTime(GenericRecord record, Properties properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

import java.util.Properties;

/**
* Empty payload used for deletions.
*/
Expand All @@ -32,9 +34,13 @@ public class EmptyHoodieRecordPayload implements HoodieRecordPayload<EmptyHoodie
public EmptyHoodieRecordPayload() {
}

@Deprecated
public EmptyHoodieRecordPayload(GenericRecord record, Comparable orderingVal) {
}

public EmptyHoodieRecordPayload(GenericRecord record, Comparable orderingVal, Properties props) {
}

@Override
public EmptyHoodieRecordPayload preCombine(EmptyHoodieRecordPayload oldValue) {
return oldValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
*/
public class EventTimeAvroPayload extends DefaultHoodieRecordPayload {

public EventTimeAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
public EventTimeAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) {
super(record, orderingVal, props);
}

public EventTimeAvroPayload(Option<GenericRecord> record) {
this(record.isPresent() ? record.get() : null, 0); // natural order
public EventTimeAvroPayload(Option<GenericRecord> record, Properties props) {
this(record.isPresent() ? record.get() : null, 0, props); // natural order
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,16 @@ public HoodieRecord wrapIntoHoodieRecordPayloadWithParams(
Option<Schema> schemaWithoutMetaFields) {
String payloadClass = ConfigUtils.getPayloadClass(props);
String preCombineField = ConfigUtils.getOrderingField(props);
return HoodieAvroUtils.createHoodieRecordFromAvro(data, payloadClass, preCombineField, simpleKeyGenFieldsOpt, withOperation, partitionNameOp, populateMetaFields, schemaWithoutMetaFields);
return HoodieAvroUtils.createHoodieRecordFromAvro(
data,
payloadClass,
preCombineField,
simpleKeyGenFieldsOpt,
withOperation,
partitionNameOp,
populateMetaFields,
schemaWithoutMetaFields,
props);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import org.apache.avro.generic.IndexedRecord;

import java.io.IOException;
import java.util.Properties;

import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS;

/**
* This is a payload to wrap a existing Hoodie Avro Record. Useful to create a HoodieRecord over existing GenericRecords
Expand All @@ -39,11 +42,19 @@ public class HoodieAvroPayload implements HoodieRecordPayload<HoodieAvroPayload>
private final Comparable<?> orderingVal;

public HoodieAvroPayload(GenericRecord record, Comparable<?> orderingVal) {
this(record, orderingVal, EMPTY_PROPS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we mark these as deprecated ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my understanding this is only used internally and props are not used, so I don't mark it as deprecated.

}

public HoodieAvroPayload(Option<GenericRecord> record) {
this(record, EMPTY_PROPS);
}

public HoodieAvroPayload(GenericRecord record, Comparable<?> orderingVal, Properties props) {
this.recordBytes = record == null ? new byte[0] : HoodieAvroUtils.avroToBytes(record);
this.orderingVal = orderingVal;
}

public HoodieAvroPayload(Option<GenericRecord> record) {
public HoodieAvroPayload(Option<GenericRecord> record, Properties props) {
this.recordBytes = record.isPresent() ? HoodieAvroUtils.avroToBytes(record.get()) : new byte[0];
this.orderingVal = 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,16 @@ public HoodieRecord wrapIntoHoodieRecordPayloadWithParams(
IndexedRecord indexedRecord = (IndexedRecord) data.getInsertValue(recordSchema, props).get();
String payloadClass = ConfigUtils.getPayloadClass(props);
String preCombineField = ConfigUtils.getOrderingField(props);
return HoodieAvroUtils.createHoodieRecordFromAvro(indexedRecord, payloadClass, preCombineField, simpleKeyGenFieldsOpt, withOperation, partitionNameOp, populateMetaFields, schemaWithoutMetaFields);
return HoodieAvroUtils.createHoodieRecordFromAvro(
indexedRecord,
payloadClass,
preCombineField,
simpleKeyGenFieldsOpt,
withOperation,
partitionNameOp,
populateMetaFields,
schemaWithoutMetaFields,
props);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@

import java.io.IOException;
import java.util.List;
import java.util.Properties;

import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS;

/**
* subclass of OverwriteWithLatestAvroPayload.
Expand All @@ -39,12 +42,22 @@
*/
public class OverwriteNonDefaultsWithLatestAvroPayload extends OverwriteWithLatestAvroPayload {

@Deprecated
public OverwriteNonDefaultsWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
this(record, orderingVal, EMPTY_PROPS);
}

@Deprecated
public OverwriteNonDefaultsWithLatestAvroPayload(Option<GenericRecord> record) {
super(record); // natural order
this(record, EMPTY_PROPS);
}

public OverwriteNonDefaultsWithLatestAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) {
super(record, orderingVal, props);
}

public OverwriteNonDefaultsWithLatestAvroPayload(Option<GenericRecord> record, Properties props) {
super(record, props); // natural order
}

@Override
Expand Down Expand Up @@ -73,7 +86,7 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue
* @return the merged record option
*/
protected Option<IndexedRecord> mergeRecords(Schema schema, GenericRecord baseRecord, GenericRecord mergedRecord) {
if (isDeleteRecord(baseRecord)) {
if (isDeleteRecord(baseRecord, EMPTY_PROPS)) {
return Option.empty();
} else {
final GenericRecordBuilder builder = new GenericRecordBuilder(schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@

import java.io.IOException;
import java.util.Objects;
import java.util.Properties;

import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS;

/**
* Default payload.
Expand All @@ -40,12 +43,22 @@
public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> {

@Deprecated
public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
this(record, orderingVal, EMPTY_PROPS);
}

@Deprecated
public OverwriteWithLatestAvroPayload(Option<GenericRecord> record) {
this(record.isPresent() ? record.get() : null, 0); // natural order
this(record, EMPTY_PROPS);
}

public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) {
super(record, orderingVal, props);
}

public OverwriteWithLatestAvroPayload(Option<GenericRecord> record, Properties props) {
this(record.isPresent() ? record.get() : null, 0, props); // natural order
}

@Override
Expand Down
Loading