Skip to content

Conversation

@voonhous
Copy link
Member

@voonhous voonhous commented Oct 11, 2022

Change Logs

Added a more comprehensive validation routine for checking primary key and preCombineField to allow nested field.

Ensure that primaryKey and preCombineField definition.

Note: Flink's primaryKey and preCombineField validation is more strict than that of Spark's validation. Spark's validation will only check if the upper most parent exists without validating nested fields down to it's lowest level.

Impact

No public API changed

Risk level (write none, low medium or high below)

low

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

Arrays.stream(recordKeys)
.filter(field -> !fields.contains(field))
.filter(field -> !fields.contains(getRootLevelFieldName(field)))
.findAny()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we support nested primary key also in this patch ?

Copy link
Member Author

@voonhous voonhous Oct 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uhm, from what i understand, Spark-SQL supports nested primaryKey and precombineField.

The changes I made here is to standardize the validations in Spark and Flink.
https://issues.apache.org/jira/browse/HUDI-4051
https://github.com/apache/hudi/pull/5517/files

String preCombineField = conf.get(FlinkOptions.PRECOMBINE_FIELD);
if (!fields.contains(preCombineField)) {
if (!fields.contains(getRootLevelFieldName(preCombineField))) {
if (OptionsResolver.isDefaultHoodieRecordPayloadClazz(conf)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may also need to validate the nested field names.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. This change is to standardize the validations between Flink and Spark. As such, no checks on the nested field names were made.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 I have added the feature to validate nested fields for Flink.

Can you please help to review this PR again?

Thank you.

// nested pk field is allowed
ResolvedSchema schema6 = SchemaBuilder.instance()
.field("f0",
DataTypes.ROW(DataTypes.FIELD("id", DataTypes.INT()), DataTypes.FIELD("date", DataTypes.VARCHAR(20))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we write a IT test in ITTestHoodieDataSource.

Copy link
Member Author

@voonhous voonhous Oct 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, do you mean add IT tests?

Or remove the UT i included here and rewrite them as IT tests?

@yihua yihua added writer-core engine:flink Flink integration labels Oct 15, 2022
@voonhous voonhous force-pushed the HUDI-5004 branch 5 times, most recently from f4336b7 to a0baf96 Compare November 29, 2022 09:31
@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

private void sanityCheck(Configuration conf, ResolvedSchema schema) {
List<String> fields = schema.getColumnNames();
Schema inferredSchema = AvroSchemaConverter.convertToSchema(schema.toPhysicalRowDataType().notNull().getLogicalType());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to convert the ResolvedSchema as an avro schema for validation, the ResolvedSchema#getColumnDataTypes can fetch the data type of each field.

Also we need to fix the RowDataKeyGen#getRecordKey for nested primary keys.

Copy link
Member Author

@voonhous voonhous Dec 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will take a look.

The main reasons for doing this is:

  1. AvroSchemaConverter.convertToSchema was already imported and used somewhere else in the code so, just reuse
  2. Convert it to an AvroSchema so that the helper functions can be written in HoodieAvroUtils, where the validation for creation of tables using the Spark as entrypoint can be reused.

Let me try to see if we can use the ResolvedSchema instead, will get back to you.

Also we need to fix the RowDataKeyGen#getRecordKey for nested primary keys.

Got it!

@voonhous
Copy link
Member Author

Closing this PR as @hbgstc123 has already fixed this issue by disabling schema sanity checks prior to creating the source in this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

engine:flink Flink integration

Projects

Status: 🚧 Needs Repro
Archived in project

Development

Successfully merging this pull request may close these issues.

4 participants