-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-7486] Classify schema exceptions when converting from avro to spark row representation #10778
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-7486] Classify schema exceptions when converting from avro to spark row representation #10778
Conversation
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
Outdated
Show resolved
Hide resolved
...spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
Outdated
Show resolved
Hide resolved
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
Outdated
Show resolved
Hide resolved
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
Show resolved
Hide resolved
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
Outdated
Show resolved
Hide resolved
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
Outdated
Show resolved
Hide resolved
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
Show resolved
Hide resolved
|
|
||
| def maybeWrapDataFrameWithException(df: DataFrame, exceptionClass: String, msg: String, shouldWrap: Boolean): DataFrame = { | ||
| if (shouldWrap) { | ||
| HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, injectSQLConf(df.queryExecution.toRdd.mapPartitions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is SQLConf injection necessary here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#6352 alexey says that "Yes, it will propagate to all RDDs in the execution chain (up to a shuffling point)". So I guess the question is if there is a shuffling point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My doubt is if wrapping the df with exception needs the SQL conf injection. So you're saying this needs SQL conf injection so the configs can still be propagated to the executors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep an eye on the DAG and Spark execution when this is rolled out. I think this is OK for now.
...client/hudi-spark-client/src/main/scala/org/apache/hudi/util/ExceptionWrappingIterator.scala
Outdated
Show resolved
Hide resolved
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java
Outdated
Show resolved
Hide resolved
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
Outdated
Show resolved
Hide resolved
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
Outdated
Show resolved
Hide resolved
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java
Outdated
Show resolved
Hide resolved
|
|
||
| def maybeWrapDataFrameWithException(df: DataFrame, exceptionClass: String, msg: String, shouldWrap: Boolean): DataFrame = { | ||
| if (shouldWrap) { | ||
| HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, injectSQLConf(df.queryExecution.toRdd.mapPartitions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep an eye on the DAG and Spark execution when this is rolled out. I think this is OK for now.
yihua
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Block merging for a couple of comments on docs to address
| package org.apache.hudi.util | ||
|
|
||
| import org.apache.hudi.common.util.ReflectionUtils | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docs here?
hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java
Outdated
Show resolved
Hide resolved
…rdCreationException.java Co-authored-by: Y Ethan Guo <[email protected]>
…SchemaException.java Co-authored-by: Y Ethan Guo <[email protected]>
yihua
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…park row representation (#10778) * make exceptions more specific * use hudi avro exception * Address review comments * fix unnecessary changes * add exception wrapping * style * address review comments * remove . from config * address review comments * fix merge * fix checkstyle * Update hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java Co-authored-by: Y Ethan Guo <[email protected]> * Update hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java Co-authored-by: Y Ethan Guo <[email protected]> * add javadoc to exception wrapper --------- Co-authored-by: Jonathan Vexler <=> Co-authored-by: Y Ethan Guo <[email protected]>
Change Logs
All issues related to schema should throw exceptions that are HoodieSchemaExceptions. Classify exceptions when converting from avro to spark row format as schema compatibility exceptions because they are due to illegal schema, or the records are incompatible with the provided schema.
Additionally, introduce a new exception type HoodieRecordCreationException which is thrown when turning an engine specific record into a HoodieRecord
Impact
Exceptions can be classified easier
Risk level (write none, low medium or high below)
low
Documentation Update
N/A
Contributor's checklist