Skip to content

Conversation

@TengHuo
Copy link
Contributor

@TengHuo TengHuo commented Nov 28, 2022

Trouble shooting detail in this issue and fix: #7284

Change Logs

  • Add a new parameter in method HoodieBaseRelation#convertToAvroSchema for generating a aualified name according to the table name for Avro schema
  • Use the new API AvroConversionUtils#convertStructTypeToAvroSchema in HoodieBaseRelation#convertToAvroSchema for struct schema to avro schema converting
  • Add a new test case TestMorTable for verifying the issue is fixed

Impact

No public API changed.

Risk level (write none, low medium or high below)

low

Documentation Update

No doc or configuration changed.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@codope codope added the area:schema Schema evolution and data types label Nov 28, 2022
@codope codope added priority:critical Production degraded; pipelines stalled priority:high Significant impact; potential bugs status:triaged Issue has been reviewed and categorized and removed priority:critical Production degraded; pipelines stalled labels Nov 28, 2022
@TengHuo
Copy link
Contributor Author

TengHuo commented Nov 28, 2022

And I found there is another similar issue in HoodieCatalogTable#initHoodieTable

.setTableCreateSchema(SchemaConverters.toAvroType(finalSchema).toString())

When creating a table from an empty path, it will set the record name in hoodie.table.create.schema as $tableName_record.

However, if the path is an existing Hudi table (hoodieTableExists is true) when creating a table, it will rewrite the record name in hoodie.table.create.schema as topLevelRecord, which is the default value of recordName in SchemaConverters#toAvroType.

May I ask if it is an issue or by design behaviour?

@nsivabalan nsivabalan added priority:blocker Production down; release blocker release-0.12.2 Patches targetted for 0.12.2 and removed priority:high Significant impact; potential bugs labels Dec 5, 2022
@alexeykudinkin alexeykudinkin removed the release-0.12.2 Patches targetted for 0.12.2 label Dec 15, 2022
@TengHuo
Copy link
Contributor Author

TengHuo commented Dec 22, 2022

Hi
is there anyone can help to review it? Really appreciate

@alexeykudinkin alexeykudinkin added priority:critical Production degraded; pipelines stalled and removed priority:blocker Production down; release blocker labels Jan 25, 2023
@alexeykudinkin
Copy link
Contributor

@TengHuo can you please rebase to the latest master and verify whether fix is still relevant?

@TengHuo
Copy link
Contributor Author

TengHuo commented Jan 25, 2023

@TengHuo can you please rebase to the latest master and verify whether fix is still relevant?

@alexeykudinkin got it, np. Let me rebase it to the latest master branch.

@TengHuo
Copy link
Contributor Author

TengHuo commented Jan 25, 2023

Hi @alexeykudinkin

Before rebase, I have one thing want to check with you.

Last week, there was an issue about a similar exception about Avro schema namespace, #7691.

And @danny0405 mentioned in that ticket that it uses a constant namespace "record" in Flink side, #7691 (comment).

And in Spark side, I found we are using a namespace pattern "namespace": "hoodie.test_mor_tab" (test_mor_tab is Hudi table name) in writer schema, and a
constant "name": "Record" in reader schema. #7284 (comment)

May I ask which one we should follow? Think we need to keep it consistent between Spark and Flink.

@danny0405
Copy link
Contributor

danny0405 commented Jan 25, 2023

Before rebase, I have one thing want to check with you.
Last week, there was an issue about a similar exception about Avro schema namespace, #7691.
And @danny0405 mentioned in that ticket that it uses a constant namespace "record" in Flink side, #7691 (comment).
And in Spark side, I found we are using a namespace pattern "namespace": "hoodie.test_mor_tab" (test_mor_tab is Hudi table name) in writer schema, and a
constant "name": "Record" in reader schema. #7284 (comment)
May I ask which one we should follow? Think we need to keep it consistent between Spark and Flink.

Correct. We need to unify the schema handling across Flink and Spark integrations.

If the namespace check is an Avro behavior and there is no way to work around, I'm afraid we must unify all the avro schema name spaces for read/writer schema then, does the hoodie.table_name namespace makes any sense here? How about we all use the constant record as the namespace name.

@danny0405 the problem is that we'd need to have different names b/c names are used by Avro to lookup the fields w/in the unions

@TengHuo
Copy link
Contributor Author

TengHuo commented Feb 2, 2023

Agree @danny0405, think it's better we unify Avro schema handling across Spark and Flink in Hudi.

Currently, we have Avro schema tools class org.apache.hudi.avro.AvroSchemaUtils in module hudi-common to manipulate Avro schema. And Hudi Spark is using org.apache.spark.sql.avro.SchemaConverters to do conversion between Spark DataType and Avro schema. Hudi Flink is using org.apache.hudi.util.AvroSchemaConverter to do conversion between Flink DataType and Avro schema.

I noticed that there is different behaviour when setting the name of a new Avro schema.

In Spark side, it exposes the name and namespace of Avro schema as method parameter.

  /**
   * Converts a Spark SQL schema to a corresponding Avro schema.
   *
   * @since 2.4.0
   */
  def toAvroType(catalystType: DataType,
                 nullable: Boolean = false,
                 recordName: String = "topLevelRecord",
                 nameSpace: String = ""): Schema

reference:

In Flink side, it uses a constant name "record"

  /**
   * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema.
   *
   * <p>Use "record" as the type name.
   *
   * @param schema the schema type, usually it should be the top level record type, e.g. not a
   *               nested type
   * @return Avro's {@link Schema} matching this logical type.
   */
  public static Schema convertToSchema(LogicalType schema) {
    return convertToSchema(schema, "record");
  }

reference:

(please correct me if I'm wrong)

May I know if it is possible we unify all non engine related schemas things in one place? e.g. name conversion rule

@codope
Copy link
Member

codope commented Feb 2, 2023

@TengHuo Thanks for the detailed comment. I do agree that we should unify the schema. It bodes well for engine interoperability as well. Does this PR already cover the unification?
@danny0405 What do you think?

@TengHuo
Copy link
Contributor Author

TengHuo commented Feb 2, 2023

@TengHuo Thanks for the detailed comment. I do agree that we should unify the schema. It bodes well for engine interoperability as well. Does this PR already cover the unification? @danny0405 What do you think?

@codope
nope, this PR doesn't include schema unification. I was planning to fix an Avro schema namespace inconsistent issue in Spark side only. Then I notice there are different Avro schema name & namespace handling code between Flink and Spark.

So I think my fix in this PR can't solve this issue for all situation.

@danny0405
Copy link
Contributor

parameter

I'm +1 too to unify the avro record schema namespace, let's make the namespace parametric for Flink tool #AvroSchemaConverter.

@TengHuo
Copy link
Contributor Author

TengHuo commented Apr 13, 2023

This issue has been fixed in the latest master. This test case can pass now. Detail in #7284

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:schema Schema evolution and data types priority:critical Production degraded; pipelines stalled status:triaged Issue has been reviewed and categorized

Projects

Archived in project

Development

Successfully merging this pull request may close these issues.

[SUPPORT] Inconsistent reader and writer schema in HoodieAvroDataBlock cause exception

6 participants