-
Notifications
You must be signed in to change notification settings - Fork 2.9k
ORC metrics #199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ORC metrics #199
Conversation
| Literal<?> min = null; | ||
| if (columnStats.hasIntStatistics()) { | ||
| if (column.type().typeId() == Type.TypeID.INTEGER) { | ||
| min = Literal.of((int) columnStats.getIntStatistics().getMinimum()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why convert to bytebuffers in a separate function? It seems that if we do it here itself, we save traversing the columns again and possibly also simplify the code a little.
e.g instead of
Literal.of(columnStats.getDoubleStatistics().getMaximum()); we could do
Conversions.toByteBuffer(column.type(), columnStats.getDoubleStatistics().getMinimum());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think that should work. I'll try that. Thanks.
| Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap(); | ||
|
|
||
| for (int i = 0; i < colStats.length; i++) { | ||
| columSizes.put(i, colStats[i].getBytesOnDisk()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems here we are not explicitly using the fieldId from the schema as key in these stats maps. I wonder if the assumption that the i used to traverse colStats is also the right field id for a column, would always hold true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, when we add a new column in the middle, during schema evolution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is a good point. I've simplified the code and removed this assumption by iterating the schema columns directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, these maps should definitely be keyed by fieldId.
| } | ||
|
|
||
| private static Optional<ByteBuffer> fromOrcMin(Types.NestedField column, | ||
| OrcProto.ColumnStatistics columnStats) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is OrcProto.ColumnStatistics a pubic API for ORC . I see that proper column statistics interfaces are also defined e.g see DecimalColumnStatistics.
I guess the Proto classes are used to avoid a typecast?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found the right interfaces! Thanks for the pointers.
| return Optional.ofNullable(max); | ||
| } | ||
|
|
||
| static Map<Integer, ?> fromBufferMap(Schema schema, Map<Integer, ByteBuffer> map) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this is only used in the test. Should this then be in the test class? Or do u see this being used in other places
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll make it for testing only since I don't see any other use yet.
|
|
||
| for(Types.NestedField col : schema.columns()) { | ||
| final int i = col.fieldId(); | ||
| columSizes.put(i, colStats[i].getBytesOnDisk()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can still cause issues, e.g col having fieldId i may not be the same column whose stats are at colstats[i]
E.g if we assume the initial schema is {0 a int, 1 b int} and we add a column in the middle so the new schema is {0 a int, 2 c int, 1 b int} . So col c has fieldId 2 but is at index 1. Note that I'm making an assumption about ORC here about col c being at index 1.
I see Parquet does it by field name org.apache.iceberg.parquet.ParquetUtil#footerMetrics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. This needs to use the mapping in the ORC file to get the correct column for each field ID.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this assumption of id->index in column statistics is directly done in ORC itself. I don't think there's anything else to map column to stats object if not for the index, which afaict is an assumption done in the Reader interface, the implementation follows this assumption.
E.g if we assume the initial schema is {0 a int, 1 b int} and we add a column in the middle so the new schema is {0 a int, 2 c int, 1 b int} . So col c has fieldId 2 but is at index 1. Note that I'm making an assumption about ORC here about col c being at index 1.
In this example, if we follow the assumption done that array index (in column stats) maps to column id (preserved in filedId) then this is still correct. I don't think ColumnStatistics in the ProtoBuf object (in the file footer) gets rearranged to match the column order; however, it seems like the indices are preserved.
I see Parquet does it by field name org.apache.iceberg.parquet.ParquetUtil#footerMetrics
This seems to be possible because the column object obtained from the iteration contains the related stats information; instead in ORC these two pieces of information are detached.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took a better look at this issue and I've created #213 - I think it'd be best to work on a fix for column mapping in a separate PR.
|
@edgarRd, is it possible to separate out the listPartition support? It seems like that could go in earlier while we review the metrics support. |
| for(Types.NestedField col : schema.columns()) { | ||
| final int i = col.fieldId(); | ||
| columSizes.put(i, colStats[i].getBytesOnDisk()); | ||
| valueCounts.put(i, colStats[i].getNumberOfValues()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The metrics filters also require null value counts to determine whether any or all of the values in a column are null. Is that available from ORC?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. According to the spec, ORC does not record counts of null values, instead it just stores whether a column has null values or not. From the spec:
From Hive 1.1.0 onwards, the column statistics will also record if there are any null values within the row group by setting the hasNull flag. The hasNull flag is used by ORC’s predicate pushdown to better answer ‘IS NULL’ queries.
| Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap(); | ||
|
|
||
| for(Types.NestedField col : schema.columns()) { | ||
| final int i = col.fieldId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name should be fieldId, not i because i implies it is an index.
| TypeDescription type = fieldTypes.get(c); | ||
| fields.add(Types.NestedField.optional(columnIds.get(type), name, | ||
| convertOrcToType(type, columnIds))); | ||
| fields.add(Types.NestedField.optional(type.getId(), name, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did ORC add field IDs? Why wasn't getId used before?
|
|
||
| public OrcMetricsTest() { | ||
| orcSchema = TypeDescription.fromString("struct<w:int,x:bigint,y:int,z:double>"); | ||
| icebergSchema = TypeConversion.fromOrc(orcSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this test should create an Iceberg schema with non-sequential field IDs and convert that to ORC. That would validate that the problems that @rdsr pointed out -- using type ID as an index or index as the type ID -- are fixed.
| } | ||
|
|
||
| @Test | ||
| public void testOrcMetricsPrimitive() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aokolnychyi wrote a thorough test for Parquet. Could this reuse that code instead of writing a new one?
|
Shall we also follow different metrics modes introduced in PR #263? |
|
@edgarRd what's remaining in this module? Something we can help with? |
|
Thanks for the follow up on this, @rdsr. I'm finishing the tests for this PR with the recently added column mapping changes. I'll push as soon as I get those working. |
|
Thanks, @edgarRd! I'm looking forward to getting this working for the 0.8.0 release! |
|
@edgarRd, thanks for updating this! Is it ready for review? |
| # | ||
|
|
||
| #Wed Jan 29 07:37:53 PST 2020 | ||
| distributionUrl=https\://services.gradle.org/distributions/gradle-5.4.1-all.zip |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change needed?
| private static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id"; | ||
| private static final String ICEBERG_REQUIRED_ATTRIBUTE = "iceberg.required"; | ||
| static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id"; | ||
| static final String ICEBERG_REQUIRED_ATTRIBUTE = "iceberg.required"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see where these are used outside of this class. Why make the package-private instead of private?
| private static Metrics buildOrcMetrics(final long numOfRows, final TypeDescription orcSchema, | ||
| final ColumnStatistics[] colStats) { | ||
| final Schema schema = ORCSchemaUtil.convert(orcSchema); | ||
| Map<Integer, Long> columSizes = Maps.newHashMapWithExpectedSize(colStats.length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: columSizes -> columnSizes
|
Merged! Thanks for all the work to get this in, @edgarRd! |
|
Thanks @rdblue, @rdsr and @shardulm94 for the review! |
| Optional<ByteBuffer> orcMin = (colStat.getNumberOfValues() > 0) ? | ||
| fromOrcMin(icebergCol, colStat) : Optional.empty(); | ||
| orcMin.ifPresent(byteBuffer -> lowerBounds.put(icebergCol.fieldId(), byteBuffer)); | ||
| Optional<ByteBuffer> orcMax = (colStat.getNumberOfValues() > 0) ? | ||
| fromOrcMax(icebergCol, colStat) : Optional.empty(); | ||
| orcMax.ifPresent(byteBuffer -> upperBounds.put(icebergCol.fieldId(), byteBuffer)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@edgarRd @rdblue @rdsr @shardulm94 In ORC, the column stats will have min/max values even when there're null values within the same file. (See here) Is this okay for Iceberg?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only time there should not be a min/max value is when there are no non-null values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it! Thanks for the clarification!
This PR adds the following:
Thanks.