Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7966] Handle NPE from AvroSchemaUtils.createNewSchemaFromFieldsWithReference #11585

Merged
merged 2 commits into from
Jul 8, 2024

Conversation

codope
Copy link
Member

@codope codope commented Jul 7, 2024

Change Logs

Running long-running deltastreamer with following properties: https://github.com/apache/hudi/blob/dbfe8b23c0b4f160b26379053873cfc2a46acef4/docker/demo/config/test-suite/test-nonpartitioned.properties

The job throws NPE during validation phase:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 69.0 failed 4 times, most recent failure: Lost task 0.3 in stage 69.0 (TID 345) (10.0.103.207 executor 1): java.lang.NullPointerException  at org.apache.avro.JsonProperties$2$1$1.<init>(JsonProperties.java:175)  at org.apache.avro.JsonProperties$2$1.iterator(JsonProperties.java:174)  at org.apache.avro.JsonProperties.getObjectProps(JsonProperties.java:305)  at org.apache.hudi.avro.AvroSchemaUtils.createNewSchemaFromFieldsWithReference(AvroSchemaUtils.java:306)  at org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchemaBase(AvroSchemaUtils.java:293)  at org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchemaDedupNested(AvroSchemaUtils.java:245)  at org.apache.hudi.common.table.read.HoodieFileGroupReaderSchemaHandler.generateRequiredSchema(HoodieFileGroupReaderSchemaHandler.java:146)  at org.apache.hudi.common.table.read.HoodieFileGroupReaderSchemaHandler.prepareRequiredSchema(HoodieFileGroupReaderSchemaHandler.java:150)  at org.apache.hudi.common.table.read.HoodieFileGroupReaderSchemaHandler.<init>(HoodieFileGroupReaderSchemaHandler.java:84)  at org.apache.hudi.common.table.read.HoodieFileGroupReader.<init>(HoodieFileGroupReader.java:113)  at org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedParquetFileFormat.$anonfun$buildReaderWithPartitionValues$3(HoodieFileGroupReaderBasedParquetFileFormat.scala:170)  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:209)  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)  at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)  at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)  at org.apache.spark.scheduler.Task.run(Task.scala:136)  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)  at java.lang.Thread.run(Thread.java:750)

The code assumes that all schema must have properties, which may not necessaily be true. This PR handles NPE by simply logging a warning if props are not present. Fields are still added (no behavior changes) and if props are present then those are added too.

Impact

Bug fix, avoid blocking ingestion.

Risk level (write none, low medium or high below)

low

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:S PR with lines of changes in (10, 100] label Jul 7, 2024
newSchema.addProp(prop.getKey(), prop.getValue());
}
} else {
LOG.warn("Schema.getObjectProps() returned null for schema: {}", schema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we even need to log a warn ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we throw here, what is the schema without any fields there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schema has fields but no props. Throwing error would result in the same failure. The props were copied in eb20273. We need to understand what's the side-effect of not having props. I think schema fields should be sufficient. @jonvex can help clarify this point.

do we even need to log a warn ?

Logging a warn for debugging purpose.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I found the doc mentioned the attributes: https://avro.apache.org/docs/1.11.1/specification/#schema-declaration, it should not affect the serialization/deserialization, but it doesn't say how it is affected.

@hudi-bot
Copy link

hudi-bot commented Jul 8, 2024

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@nsivabalan nsivabalan merged commit 6d01bcf into apache:master Jul 8, 2024
47 of 48 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:S PR with lines of changes in (10, 100]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants