-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-9705] Fix bugs in spark and avro reader contexts for type promotion and field renaming #13714
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-9705] Fix bugs in spark and avro reader contexts for type promotion and field renaming #13714
Conversation
...scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
Show resolved
Hide resolved
| fileOutputSchema = dataSchema; | ||
| renamedColumns = Collections.emptyMap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to myself: the FileGroupRecordBuffer handles the schema-on-read evolution with composeEvolvedSchemaTransformer for log blocks. Only parquet log blocks requires calling readerContext.getFileRecordIterator before schema-on-read evolution is applied in FileGroupRecordBuffer, thus no need to handle schema-on-read in this case.
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
Outdated
Show resolved
Hide resolved
hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java
Show resolved
Hide resolved
yihua
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall
| case DOUBLE: | ||
| // To maintain precision, you need to convert Float -> String -> Double | ||
| return writerSchema.getType().equals(Schema.Type.FLOAT); | ||
| return writerSchema.getType().equals(Schema.Type.FLOAT) && !writerSchema.getType().equals(Schema.Type.STRING); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how could a type equals FLOAT and also STRING ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we can get this PR in, I think the areSchemasProjectionEquivalent is going to fit the needs of this and has some better testing. I will wait to see if this can be brought into a mergable shape.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that PR is landed.
| return Pair.of(requiredSchema, Collections.emptyMap()); | ||
| } | ||
| long commitInstantTime = Long.parseLong(FSUtils.getCommitTime(path.getName())); | ||
| InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(commitInstantTime, metaClient); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems not right, the search happens in file split level, this would trigger the metaClient metadata file listing for every file slice read. Can we reuse the cache somewhere and shared by all the readers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any example of how to do this? I noticed that this is how it is currently done in the merge path. This path will at least cache per JVM. There are some other cases where I see calls to InternalSchemaCache.getInternalSchemaByVersionId but that skips the cache entirely so the commit metadata is parsed per file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we already did this in FileGroupRecordBuffer.composeEvolvedSchemaTransformer, and we have optimized the logic in #13525 to get rid of the timeline listing, should be good now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the existing logic of schema evolution on read in other places follows the same code logic, so this is OK in the sense that it brings feature parity and does not introduce regression.
I think what makes more sense is to have a schema history (schemas for range of completion/instant time, e.g., schema1: ts1-ts100, schema2: ts101-ts1000, etc.) constructed on driver and distribute that to executors. This schema history can be stored under .hoodie so one file read gets the whole schema history and executor does not pay cost of scanning commit metadata or reading schema from file (assuming that the file schema is based on the writer/table schema of the commit). This essentially needs a new schema system / abstraction, which is under the scope of RFC-88 @danny0405
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we have a plan to re-impl the schema evolution based on new schema abstraction in 1.2 release.
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
Show resolved
Hide resolved
yihua
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
@the-other-tim-brown you can decide whether the schema utils newly available on master can be reused before merging this PR. |
1a11deb to
145cb8e
Compare
yihua
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…tion and field renaming (apache#13714)
Change Logs
This PR fixes a few issues discovered while trying to move the Copy-on-Write path to use the FileGroupReader for reading base files and merging with incoming records. The issues mainly stem from schema evolution cases.
Cases fixed:
Impact
Unblocks moving the writer path to reuse the same reader paths we use elsewhere in the code
Risk level (write none, low medium or high below)
Low
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist