-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Parquet: Fixes get null values for the nested field partition column #4627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
|
||
| @Test | ||
| public void testReadPartitionColumn() throws Exception { | ||
| Assume.assumeTrue("Temporary skip ORC", FileFormat.ORC != fileFormat); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Temporary skip ORC test because #4599 is working on fixing it.
|
|
||
| @Test | ||
| public void testReadPartitionColumn() throws Exception { | ||
| Assume.assumeTrue("Temporary skip ORC", !"orc".equals(format)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as here too, temporary skip ORC test because #4599 is working on fixing it.
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/StructRecord.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/InnerRecord.java
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
Outdated
Show resolved
Hide resolved
|
Thanks, @kbendick for the detailed code reviewing. Comments have been addressed. Please take another look when you are free. |
kbendick
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @ConeyLiu! Thanks for the PR. Sorry for the delay in reviewing it, this seems important.
I tested this locally and you are absolutely correct, this is an issue.
Could you rebase this off of latest master @ConeyLiu? There have been changes in TestFlinkInputFormat class it seems (and we're on 1.15 now, technically, but that's not too urgent).
cc @rdblue
| expectedFields.size()); | ||
| List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size()); | ||
| // Inferring MaxDefinitionLevel from parent field | ||
| int inferredMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Now that we're putting the field depth into the map maxDefinitionLevelsById, and we have the same check for idToConstant.containsKey(id), do we need to have this fallback?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kbendick, we could not update the maxDefinitionLevelsById if the fieldType.getId() is null, you could see it at L101.
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
Outdated
Show resolved
Hide resolved
|
Hi @kbendick, is there any more advice for this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice observation and fix. It looks good to me. Maybe worth to have some others look at it as well, @RussellSpitzer @aokolnychyi ?
| if (idToConstant.containsKey(id)) { | ||
| // containsKey is used because the constant may be null | ||
| reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id))); | ||
| int fieldMaxDefinitionLevel = maxDefinitionLevelsById.getOrDefault(id, inferredMaxDefinitionLevel); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Im kind of new to this code, so curious, why do we take from parent? Is it because the field is indeed null here and we will thus just take parent's definition level?
Should we call it parentMaxDefinitionLevel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The max definition level of the current column path is: type.getMaxDefinitionLevel(currentPath()) - 1, the children should be type.getMaxDefinitionLevel(currentPath()). So the fieldMaxDefinitionLevel is the inferred children max definition level, I have updated the comments. And it is used only when we could not find the value from maxDefinitionLevelsById.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I still think the name 'inferred' is a bit confusing, it indicates to me that it's the one that will be chosen. Will it be better to call it 'defaultMaxDefintionLevel' or 'parentMaxDefinitionLevel'?
Also just curious, what is the example of a case where we need this default value? I tried to walk through one example and found the expected field is always in the actual struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to defaultMaxDefinitionLevel
parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
Show resolved
Hide resolved
|
@ConeyLiu: Thanks for the finding and the fix. Do we have the same issue for ORC and Avro too? |
ccd0ca7 to
43bb36b
Compare
|
Thanks @szehon-ho @pvary for the review.
Updated to Spark 3.3 and Flink 1.16, the Pig part is kept.
Avro is OK, while ORC has a similar problem at here #4604. |
| Type fieldType = fields.get(i); | ||
| int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1; | ||
| if (fieldType.getId() != null) { | ||
| int id = fieldType.getId().intValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szehon-ho here the fieldType.getId() could be null, I guess this is for a compatible purpose. So I add the int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); in the following code in case we get a null value from maxDefinitionLevelsById.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Yea that's ok to me, just in my understanding I'm not entirely sure when it is ever null.
Sorry would you be able to change the comment as well to "Defaulting to parent max definition level" or something like that? Otherwise patch looks good to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, looks good to me.
szehon-ho
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a small nit so I'll just approve it
|
Thanks, @szehon-ho for merging this and all for the review. Will submit a backport PR to other spark/flink versions. |
We use
ConstantReaderfor the partition column, and theConstantReaderfieldcolumnisNullReader.NULL_COLUMN. When theConstantReaderor it's parent(when the parent has the only the constant children) wrapped intoOptionReader, theOptionReaderwill always return null values because the following code:Closes #4626