Skip to content

Conversation

@yihua
Copy link
Contributor

@yihua yihua commented Oct 19, 2023

Change Logs

This PR fixes the DefaultHoodieRecordPayload to allow the records with custom delete key (hoodie.payload.delete.field) and delete marker (hoodie.payload.delete.marker) to be properly ingested. Before this fix, the write fails with the following exception

Error for key:HoodieKey { recordKey=0 partitionPath=} is 
java.util.NoSuchElementException: No value present in Option
	at org.apache.hudi.common.util.Option.get(Option.java:89)
	at org.apache.hudi.common.model.HoodieAvroRecord.prependMetaFields(HoodieAvroRecord.java:132)
	at org.apache.hudi.io.HoodieCreateHandle.doWrite(HoodieCreateHandle.java:144)
	at org.apache.hudi.io.HoodieWriteHandle.write(HoodieWriteHandle.java:180)
	at org.apache.hudi.execution.CopyOnWriteInsertHandler.consume(CopyOnWriteInsertHandler.java:98)
	at org.apache.hudi.execution.CopyOnWriteInsertHandler.consume(CopyOnWriteInsertHandler.java:42)
	at org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:69)

The bug is introduced by the RFC-46 implementation and the refactoring on payload merging.

To fix the issue, the following changes are made:

  • After RFC-46, whether a record is a delete or not is determined at the instantiation of the record payload. For DefaultHoodieRecordPayload, this means that the configuration of delete key (hoodie.payload.delete.field) and delete marker (hoodie.payload.delete.marker) must be passed to the constructor. As the record payload (HoodieRecordPayload) creation is realized through Java reflection, the signature of the constructor of all existing record payload implementations intended to be used by the user has to be changed.
    • For payload classes that are not affected by the bug, keep both old (without props) and new constructors with props for backwards compatibility. For payload classes affected by the bug, i.e., extending DefaultHoodieRecordPayload, only the constructors with props are kept. We change all payload classes so that the new constructor is called once in reflection, instead of twice with the first attempt failing if not including new constructors with the properties, to avoid unnecessary fallbacks and performance hits on each record.
  @Deprecated
  public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) {
    this(record, orderingVal, EMPTY_PROPS);
  }

  @Deprecated
  public OverwriteWithLatestAvroPayload(Option<GenericRecord> record) {
    this(record, EMPTY_PROPS);
  }

  public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal, Properties props) {
    super(record, orderingVal, props);
  }

  public OverwriteWithLatestAvroPayload(Option<GenericRecord> record, Properties props) {
    this(record.isPresent() ? record.get() : null, 0, props); // natural order
  }
  • Moves general payload creation methods from DataSourceUtils to HoodieRecordUtils and modified them according to the constructor changes.
  • Fixes all places around instantiating HoodieRecordPayload to properly pass the configurations in Properties.
  • To fix the logic for determining deletes, BaseAvroPayload#isDeleteRecord now takes the configuration in Properties, which is called by the new constructors of the payload classes. The old method is still kept for backwards compatibility.
protected boolean isDeleteRecord(GenericRecord genericRecord, Properties props)
  • DefaultHoodieRecordPayload overrides the implementation of isDeleteRecord to use custom delete key and marker to determine deletes, which is the behavior before RFC-46.
  • Adds new tests to validate that DefaultHoodieRecordPayload and deletes with custom delete key and marker work as expected.

Impact

Fixes the DefaultHoodieRecordPayload to properly handle custom delete key (hoodie.payload.delete.field) and delete marker (hoodie.payload.delete.marker. Deletes using default _hoodie_is_deleted field are not affected, i.e., working before this fix.

Risk level

medium

Documentation Update

HUDI-6966 for updating docs on custom payload implementation.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@nsivabalan nsivabalan added the priority:critical Production degraded; pipelines stalled label Oct 19, 2023
@yihua yihua marked this pull request as ready for review October 21, 2023 22:50
@apache apache deleted a comment from hudi-bot Oct 22, 2023
@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@yihua
Copy link
Contributor Author

yihua commented Oct 23, 2023

@danny0405 I also changed the payload creation logic for Flink. Could you also review the relevant changes?

@yihua yihua requested a review from danny0405 October 23, 2023 21:58
Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. just 1 minor comment

private final Comparable<?> orderingVal;

public HoodieAvroPayload(GenericRecord record, Comparable<?> orderingVal) {
this(record, orderingVal, EMPTY_PROPS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we mark these as deprecated ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my understanding this is only used internally and props are not used, so I don't mark it as deprecated.

Constructor<?> constructor,
@Nullable String preCombineField) {
this.shouldCombine = shouldCombine;
this.shouldUsePropsForPayload = shouldUsePropsForPayload;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldUsePropsForPayload should be always true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, for the record payload class implemented by a user outside Hudi repo, the class may have the constructor with old signature, i.e., {GenericRecord.class, Comparable.class} or {Option.class}, so to be compatible on that, there's a fallback mechanism to create payload without properties (which is OK is the payload does not leverage any props). See PayloadCreation#instance.

}

public static Properties extractPropsFromConfiguration(Configuration config) {
Properties props = new Properties();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If all we want is payload properties, you can use StreamerUtil.getPayloadConfig.

Copy link
Contributor Author

@yihua yihua Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need hoodie.payload.delete.field and hoodie.payload.delete.marker here, which are not included in StreamerUtil.getPayloadConfig

Copy link
Contributor

@danny0405 danny0405 Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just set it up correctly in the code. BTW, Flink never supports these 2 options I think.

super(record, orderingVal);
public DefaultHoodieRecordPayload(GenericRecord record, Comparable orderingVal, Properties props) {
super(record, orderingVal, props);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The source of the props seems a chaos, I already saw several ways how it was produced:

  1. config.getPayloadConfig().getProps() in HoodieMergeHandle;
  2. payloadProps.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, preCombineField); in HoodieFileSliceReader;
  3. config.getProps() in HoodieIndexUtils.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this should be cleaned up separately. Regardless, the payload-related configs should always be included so there's no correctness issue. And we're not storing any props with the record so record processing is consistent.

final String deleteKey = props.getProperty(DELETE_KEY);
if (StringUtils.isNullOrEmpty(deleteKey)) {
return isDeleteRecord(genericRecord);
return super.isDeleteRecord(record, props);
Copy link
Contributor

@danny0405 danny0405 Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line the actual fix, I didn't see the props got used by the super method, so do we still need to pass around all the props here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation of this method is kept the same. However, before my fix, this method isDeleteRecord(GenericRecord genericRecord, Properties properties) was a protected method not overriding any method in super class, and it was not called in the constructor to determine whether a record is a delete.

After the fix, the method isDeleteRecord(GenericRecord genericRecord, Properties properties) overrides the same in the super class, and is now called in the constructor (see super class BaseAvroPayload constructor, where the following is called)

this.isDeletedRecord = record == null || isDeleteRecord(record, props);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the BaseAvroPayload.isDeleteRecord does not really use the passed in props, so do we still to change all the constructors just for passing the props, it seems just an additional invocation of BaseAvroPayload.isDeleteRecord could fix the problem and this method does not need any props actually.

We can address the props issue is separate task maybe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BaseAvroPayload.isDeleted(Schema schema, Properties props) already passes around the properties, so can we just overwrite it for DefaultHoodieRecordPayload and there is no need to change all the constructors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BaseAvroPayload.isDeleteRecord(GenericRecord genericRecord, Properties props) uses the props, and it is called by this.isDeletedRecord = record == null || isDeleteRecord(record, props); in the constructor. BaseAvroPayload.isDeleteRecord(GenericRecord genericRecord) is only for backwards compatibility, called from user-implemented record payload implementation.
Screenshot 2023-10-25 at 01 20 47

Copy link
Contributor

@danny0405 danny0405 Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm talking about the maste code, in master, BaseAvroPayload.isDeleteRecord only handles _hoodie_is_deleted field which looks reasonable. If BaseAvroPayload.public boolean isDeleted(Schema schema, Properties props) is the culprit that incurs the issue then just fix it, we can override it in DefaultHoodieRecordPayload and utilize those specific options there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention is that isDeleted(Schema schema, Properties props) should not deserialize the bytes for perf reason so we cannot implement the custom delete marker there for DefaultHoodieRecordPayload. The way now in this fix is to check the delete key and marker in the constructor before the Avro record is serialized; that’s why props needs to be passed in to the constructor.

@nsivabalan nsivabalan self-assigned this Nov 15, 2023
@nsivabalan
Copy link
Contributor

hey @yihua @danny0405 : can you folks sync up and resolve soon. We wanted to get this landed for 0.14.1.

@nsivabalan
Copy link
Contributor

attempting a diff approach here #10150

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:critical Production degraded; pipelines stalled release-0.14.1

Projects

Status: ✅ Done

Development

Successfully merging this pull request may close these issues.

4 participants