Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload {

public static final String OP_FIELD = "Op";

public AWSDmsAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
public AWSDmsAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) {
super(record, orderingVal, props);
}

public AWSDmsAvroPayload(Option<GenericRecord> record) {
this(record.isPresent() ? record.get() : null, 0); // natural order
public AWSDmsAvroPayload(Option<GenericRecord> record, Properties props) {
this(record.isPresent() ? record.get() : null, 0, props); // natural order
}

/**
Expand Down Expand Up @@ -96,8 +96,8 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue
}

@Override
protected boolean isDeleteRecord(GenericRecord record) {
return isDMSDeleteRecord(record) || super.isDeleteRecord(record);
protected boolean isDeleteRecord(GenericRecord record, Properties props) {
return isDMSDeleteRecord(record) || super.isDeleteRecord(record, props);
}

private static boolean isDMSDeleteRecord(GenericRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.io.Serializable;
import java.util.Properties;

import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS;

/**
* Base class for all AVRO record based payloads, that can be ordered based on a field.
*/
Expand All @@ -43,16 +45,20 @@ public abstract class BaseAvroPayload implements Serializable {

protected final boolean isDeletedRecord;

public BaseAvroPayload(GenericRecord record, Comparable orderingVal) {
this(record, orderingVal, EMPTY_PROPS);
}

/**
* Instantiate {@link BaseAvroPayload}.
*
* @param record Generic record for the payload.
* @param orderingVal {@link Comparable} to be used in pre combine.
*/
public BaseAvroPayload(GenericRecord record, Comparable orderingVal) {
public BaseAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) {
this.recordBytes = record != null ? HoodieAvroUtils.avroToBytes(record) : new byte[0];
this.orderingVal = orderingVal;
this.isDeletedRecord = record == null || isDeleteRecord(record);
this.isDeletedRecord = record == null || isDeleteRecord(record, props);

if (orderingVal == null) {
throw new HoodieException("Ordering value is null for record: " + record);
Expand Down Expand Up @@ -83,7 +89,7 @@ public boolean canProduceSentinel() {
* @param genericRecord instance of {@link GenericRecord} of interest.
* @returns {@code true} if record represents a delete record. {@code false} otherwise.
*/
protected boolean isDeleteRecord(GenericRecord genericRecord) {
protected boolean isDeleteRecord(GenericRecord genericRecord, Properties props) {
final String isDeleteKey = HoodieRecord.HOODIE_IS_DELETED_FIELD;
// Modify to be compatible with new version Avro.
// The new version Avro throws for GenericRecord.get if the field name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload {
public static final String DELETE_MARKER = "hoodie.payload.delete.marker";
private Option<Object> eventTime = Option.empty();

public DefaultHoodieRecordPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
public DefaultHoodieRecordPayload(GenericRecord record, Comparable orderingVal, Properties props) {
super(record, orderingVal, props);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The source of the props seems a chaos, I already saw several ways how it was produced:

  1. config.getPayloadConfig().getProps() in HoodieMergeHandle;
  2. payloadProps.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, preCombineField); in HoodieFileSliceReader;
  3. config.getProps() in HoodieIndexUtils.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this should be cleaned up separately. Regardless, the payload-related configs should always be included so there's no correctness issue. And we're not storing any props with the record so record processing is consistent.


public DefaultHoodieRecordPayload(Option<GenericRecord> record) {
this(record.isPresent() ? record.get() : null, 0); // natural order
public DefaultHoodieRecordPayload(Option<GenericRecord> record, Properties props) {
this(record.isPresent() ? record.get() : null, 0, props); // natural order
}

@Override
Expand Down Expand Up @@ -86,30 +86,27 @@ public Option<IndexedRecord> getInsertValue(Schema schema, Properties properties
GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema);
eventTime = updateEventTime(incomingRecord, properties);

return isDeleteRecord(incomingRecord, properties) ? Option.empty() : Option.of(incomingRecord);
return isDeleted(schema, properties) ? Option.empty() : Option.of(incomingRecord);
}

/**
* @param genericRecord instance of {@link GenericRecord} of interest.
* @param properties payload related properties
* @returns {@code true} if record represents a delete record. {@code false} otherwise.
*/
protected boolean isDeleteRecord(GenericRecord genericRecord, Properties properties) {
final String deleteKey = properties.getProperty(DELETE_KEY);
@Override
protected boolean isDeleteRecord(GenericRecord record, Properties props) {
final String deleteKey = props.getProperty(DELETE_KEY);
if (StringUtils.isNullOrEmpty(deleteKey)) {
return isDeleteRecord(genericRecord);
return super.isDeleteRecord(record, props);
Copy link
Contributor

@danny0405 danny0405 Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line the actual fix, I didn't see the props got used by the super method, so do we still need to pass around all the props here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation of this method is kept the same. However, before my fix, this method isDeleteRecord(GenericRecord genericRecord, Properties properties) was a protected method not overriding any method in super class, and it was not called in the constructor to determine whether a record is a delete.

After the fix, the method isDeleteRecord(GenericRecord genericRecord, Properties properties) overrides the same in the super class, and is now called in the constructor (see super class BaseAvroPayload constructor, where the following is called)

this.isDeletedRecord = record == null || isDeleteRecord(record, props);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the BaseAvroPayload.isDeleteRecord does not really use the passed in props, so do we still to change all the constructors just for passing the props, it seems just an additional invocation of BaseAvroPayload.isDeleteRecord could fix the problem and this method does not need any props actually.

We can address the props issue is separate task maybe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BaseAvroPayload.isDeleted(Schema schema, Properties props) already passes around the properties, so can we just overwrite it for DefaultHoodieRecordPayload and there is no need to change all the constructors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BaseAvroPayload.isDeleteRecord(GenericRecord genericRecord, Properties props) uses the props, and it is called by this.isDeletedRecord = record == null || isDeleteRecord(record, props); in the constructor. BaseAvroPayload.isDeleteRecord(GenericRecord genericRecord) is only for backwards compatibility, called from user-implemented record payload implementation.
Screenshot 2023-10-25 at 01 20 47

Copy link
Contributor

@danny0405 danny0405 Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm talking about the maste code, in master, BaseAvroPayload.isDeleteRecord only handles _hoodie_is_deleted field which looks reasonable. If BaseAvroPayload.public boolean isDeleted(Schema schema, Properties props) is the culprit that incurs the issue then just fix it, we can override it in DefaultHoodieRecordPayload and utilize those specific options there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention is that isDeleted(Schema schema, Properties props) should not deserialize the bytes for perf reason so we cannot implement the custom delete marker there for DefaultHoodieRecordPayload. The way now in this fix is to check the delete key and marker in the constructor before the Avro record is serialized; that’s why props needs to be passed in to the constructor.

}

ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(properties.getProperty(DELETE_MARKER)),
ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(props.getProperty(DELETE_MARKER)),
() -> DELETE_MARKER + " should be configured with " + DELETE_KEY);
// Modify to be compatible with new version Avro.
// The new version Avro throws for GenericRecord.get if the field name
// does not exist in the schema.
if (genericRecord.getSchema().getField(deleteKey) == null) {
if (record.getSchema().getField(deleteKey) == null) {
return false;
}
Object deleteMarker = genericRecord.get(deleteKey);
return deleteMarker != null && properties.getProperty(DELETE_MARKER).equals(deleteMarker.toString());
Object deleteMarker = record.get(deleteKey);
return deleteMarker != null && props.getProperty(DELETE_MARKER).equals(deleteMarker.toString());

}

private static Option<Object> updateEventTime(GenericRecord record, Properties properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

import java.util.Properties;

/**
* Empty payload used for deletions.
*/
Expand All @@ -32,7 +34,7 @@ public class EmptyHoodieRecordPayload implements HoodieRecordPayload<EmptyHoodie
public EmptyHoodieRecordPayload() {
}

public EmptyHoodieRecordPayload(GenericRecord record, Comparable orderingVal) {
public EmptyHoodieRecordPayload(GenericRecord record, Comparable orderingVal, Properties props) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
*/
public class EventTimeAvroPayload extends DefaultHoodieRecordPayload {

public EventTimeAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
public EventTimeAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) {
super(record, orderingVal, props);
}

public EventTimeAvroPayload(Option<GenericRecord> record) {
this(record.isPresent() ? record.get() : null, 0); // natural order
public EventTimeAvroPayload(Option<GenericRecord> record, Properties props) {
this(record.isPresent() ? record.get() : null, 0, props); // natural order
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.avro.generic.IndexedRecord;

import java.io.IOException;
import java.util.Properties;

/**
* This is a payload to wrap a existing Hoodie Avro Record. Useful to create a HoodieRecord over existing GenericRecords
Expand All @@ -38,12 +39,12 @@ public class HoodieAvroPayload implements HoodieRecordPayload<HoodieAvroPayload>
private final byte[] recordBytes;
private final Comparable<?> orderingVal;

public HoodieAvroPayload(GenericRecord record, Comparable<?> orderingVal) {
public HoodieAvroPayload(GenericRecord record, Comparable<?> orderingVal, Properties props) {
this.recordBytes = record == null ? new byte[0] : HoodieAvroUtils.avroToBytes(record);
this.orderingVal = orderingVal;
}

public HoodieAvroPayload(Option<GenericRecord> record) {
public HoodieAvroPayload(Option<GenericRecord> record, Properties props) {
this.recordBytes = record.isPresent() ? HoodieAvroUtils.avroToBytes(record.get()) : new byte[0];
this.orderingVal = 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@

import java.io.IOException;
import java.util.List;
import java.util.Properties;

import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS;

/**
* subclass of OverwriteWithLatestAvroPayload.
Expand All @@ -39,12 +42,12 @@
*/
public class OverwriteNonDefaultsWithLatestAvroPayload extends OverwriteWithLatestAvroPayload {

public OverwriteNonDefaultsWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
public OverwriteNonDefaultsWithLatestAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) {
super(record, orderingVal, props);
}

public OverwriteNonDefaultsWithLatestAvroPayload(Option<GenericRecord> record) {
super(record); // natural order
public OverwriteNonDefaultsWithLatestAvroPayload(Option<GenericRecord> record, Properties props) {
super(record, props); // natural order
}

@Override
Expand Down Expand Up @@ -73,7 +76,7 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue
* @return the merged record option
*/
protected Option<IndexedRecord> mergeRecords(Schema schema, GenericRecord baseRecord, GenericRecord mergedRecord) {
if (isDeleteRecord(baseRecord)) {
if (isDeleteRecord(baseRecord, EMPTY_PROPS)) {
return Option.empty();
} else {
final GenericRecordBuilder builder = new GenericRecordBuilder(schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.io.IOException;
import java.util.Objects;
import java.util.Properties;

/**
* Default payload.
Expand All @@ -40,12 +41,12 @@
public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> {

public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) {
super(record, orderingVal, props);
}

public OverwriteWithLatestAvroPayload(Option<GenericRecord> record) {
this(record.isPresent() ? record.get() : null, 0); // natural order
public OverwriteWithLatestAvroPayload(Option<GenericRecord> record, Properties props) {
this(record.isPresent() ? record.get() : null, 0, props); // natural order
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.List;
import java.util.Properties;

import static org.apache.hudi.common.util.ConfigUtils.EMPTY_PROPS;

/**
* Payload clazz that is used for partial update Hudi Table.
*
Expand Down Expand Up @@ -117,12 +119,12 @@
*/
public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvroPayload {

public PartialUpdateAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
public PartialUpdateAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) {
super(record, orderingVal, props);
}

public PartialUpdateAvroPayload(Option<GenericRecord> record) {
super(record); // natural order
public PartialUpdateAvroPayload(Option<GenericRecord> record, Properties props) {
super(record, props); // natural order
}

@Override
Expand All @@ -138,7 +140,7 @@ public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldVal
Option<IndexedRecord> mergedRecord = mergeOldRecord(oldRecord, schema, shouldPickOldRecord, true);
if (mergedRecord.isPresent()) {
return new PartialUpdateAvroPayload((GenericRecord) mergedRecord.get(),
shouldPickOldRecord ? oldValue.orderingVal : this.orderingVal);
shouldPickOldRecord ? oldValue.orderingVal : this.orderingVal, properties);
}
} catch (Exception ex) {
return this;
Expand Down Expand Up @@ -226,7 +228,7 @@ protected Option<IndexedRecord> mergeDisorderRecordsWithMetadata(
Schema schema,
GenericRecord oldRecord,
GenericRecord updatingRecord, boolean isPreCombining) {
if (isDeleteRecord(oldRecord) && !isPreCombining) {
if (isDeleteRecord(oldRecord, EMPTY_PROPS) && !isPreCombining) {
return Option.empty();
} else {
final GenericRecordBuilder builder = new GenericRecordBuilder(schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Properties;

/**
* Base class that provides support for seamlessly applying changes captured via Debezium.
Expand All @@ -46,12 +47,12 @@ public abstract class AbstractDebeziumAvroPayload extends OverwriteWithLatestAvr

private static final Logger LOG = LoggerFactory.getLogger(AbstractDebeziumAvroPayload.class);

public AbstractDebeziumAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
public AbstractDebeziumAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) {
super(record, orderingVal, props);
}

public AbstractDebeziumAvroPayload(Option<GenericRecord> record) {
super(record);
public AbstractDebeziumAvroPayload(Option<GenericRecord> record, Properties props) {
super(record, props);
}

@Override
Expand Down Expand Up @@ -93,8 +94,8 @@ private Option<IndexedRecord> getInsertRecord(Schema schema) throws IOException
}

@Override
protected boolean isDeleteRecord(GenericRecord record) {
return isDebeziumDeleteRecord(record) || super.isDeleteRecord(record);
protected boolean isDeleteRecord(GenericRecord record, Properties props) {
return isDebeziumDeleteRecord(record) || super.isDeleteRecord(record, props);
}

private static boolean isDebeziumDeleteRecord(GenericRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.IOException;
import java.util.Objects;
import java.util.Properties;

/**
* Provides support for seamlessly applying changes captured via Debezium for MysqlDB.
Expand All @@ -46,12 +47,12 @@ public class MySqlDebeziumAvroPayload extends AbstractDebeziumAvroPayload {

private static final Logger LOG = LoggerFactory.getLogger(MySqlDebeziumAvroPayload.class);

public MySqlDebeziumAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
public MySqlDebeziumAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) {
super(record, orderingVal, props);
}

public MySqlDebeziumAvroPayload(Option<GenericRecord> record) {
super(record);
public MySqlDebeziumAvroPayload(Option<GenericRecord> record, Properties props) {
super(record, props);
}

private Option<String> extractSeq(IndexedRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ public class PostgresDebeziumAvroPayload extends AbstractDebeziumAvroPayload {
private static final Logger LOG = LoggerFactory.getLogger(PostgresDebeziumAvroPayload.class);
public static final String DEBEZIUM_TOASTED_VALUE = "__debezium_unavailable_value";

public PostgresDebeziumAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
public PostgresDebeziumAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) {
super(record, orderingVal, props);
}

public PostgresDebeziumAvroPayload(Option<GenericRecord> record) {
super(record);
public PostgresDebeziumAvroPayload(Option<GenericRecord> record, Properties props) {
super(record, props);
}

private Option<Long> extractLSN(IndexedRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class ConfigUtils {
*/
public static final String TABLE_SERDE_PATH = "path";

public static final Properties EMPTY_PROPS = new Properties();

private static final Logger LOG = LoggerFactory.getLogger(ConfigUtils.class);

/**
Expand Down
Loading