From cbea8d283f082b719271c18016c948a9927a3c56 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Tue, 19 Mar 2019 10:21:57 +0000 Subject: [PATCH 1/2] Collect lower/upper bounds for nested struct fields in ParquetMetrics --- .../iceberg/parquet/ParquetMetrics.java | 26 +++++++++--- .../iceberg/parquet/TestParquetMetrics.java | 42 ++++++++++++++++++- 2 files changed, 62 insertions(+), 6 deletions(-) diff --git a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetrics.java b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetrics.java index 17ec12dd85ac..f36b12ccefe3 100644 --- a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetrics.java +++ b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetrics.java @@ -27,16 +27,19 @@ import com.netflix.iceberg.expressions.Literal; import com.netflix.iceberg.io.InputFile; import com.netflix.iceberg.types.Conversions; +import com.netflix.iceberg.types.Type; import com.netflix.iceberg.types.Types; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -71,7 +74,8 @@ public static Metrics fromMetadata(ParquetMetadata metadata) { for (BlockMetaData block : blocks) { rowCount += block.getRowCount(); for (ColumnChunkMetaData column : block.getColumns()) { - int fieldId = fileSchema.aliasToId(column.getPath().toDotString()); + ColumnPath path = column.getPath(); + int fieldId = fileSchema.aliasToId(path.toDotString()); increment(columnSizes, fieldId, column.getTotalSize()); increment(valueCounts, fieldId, column.getValueCount()); @@ -81,10 +85,8 @@ public static Metrics fromMetadata(ParquetMetadata metadata) { } else if (!stats.isEmpty()) { increment(nullValueCounts, fieldId, stats.getNumNulls()); - // only add min/max stats for top-level fields - // TODO: allow struct nesting, but not maps or arrays - Types.NestedField field = fileSchema.asStruct().field(fieldId); - if (field != null && stats.hasNonNullValue()) { + Types.NestedField field = fileSchema.findField(fieldId); + if (field != null && stats.hasNonNullValue() && shouldStoreBounds(path, fileSchema)) { updateMin(lowerBounds, fieldId, fromParquetPrimitive(field.type(), column.getPrimitiveType(), stats.genericGetMin())); updateMax(upperBounds, fieldId, @@ -105,6 +107,20 @@ public static Metrics fromMetadata(ParquetMetadata metadata) { toBufferMap(fileSchema, lowerBounds), toBufferMap(fileSchema, upperBounds)); } + // we allow struct nesting, but not maps or arrays + private static boolean shouldStoreBounds(ColumnPath columnPath, Schema schema) { + Iterator pathIterator = columnPath.iterator(); + Type currentType = schema.asStruct(); + + while (pathIterator.hasNext()) { + if (currentType == null || !currentType.isStructType()) return false; + String fieldName = pathIterator.next(); + currentType = currentType.asStructType().fieldType(fieldName); + } + + return currentType != null && currentType.isPrimitiveType(); + } + private static void increment(Map columns, int fieldId, long amount) { if (columns != null) { if (columns.containsKey(fieldId)) { diff --git a/parquet/src/test/java/com/netflix/iceberg/parquet/TestParquetMetrics.java b/parquet/src/test/java/com/netflix/iceberg/parquet/TestParquetMetrics.java index 1e18ce12b3aa..3997a2947932 100644 --- a/parquet/src/test/java/com/netflix/iceberg/parquet/TestParquetMetrics.java +++ b/parquet/src/test/java/com/netflix/iceberg/parquet/TestParquetMetrics.java @@ -152,7 +152,7 @@ public void testMetricsForTopLevelFields() throws IOException { assertBounds(12, FixedType.ofLength(4), ByteBuffer.wrap(fixed.bytes()), ByteBuffer.wrap(fixed.bytes()), metrics); assertCounts(13, 2L, 0L, metrics); - assertBounds(13, FixedType.ofLength(4), + assertBounds(13, BinaryType.get(), ByteBuffer.wrap("S".getBytes()), ByteBuffer.wrap("W".getBytes()), metrics); } @@ -181,6 +181,46 @@ public void testMetricsForDecimals() throws IOException { assertBounds(3, DecimalType.of(22, 2), new BigDecimal("5.80"), new BigDecimal("5.80"), metrics); } + @Test + public void testMetricsForNestedStructFields() throws IOException { + StructType leafStructType = StructType.of( + optional(5, "leafLongCol", LongType.get()), + optional(6, "leafBinaryCol", BinaryType.get()) + ); + StructType nestedStructType = StructType.of( + required(3, "longCol", LongType.get()), + required(4, "leafStructCol", leafStructType) + ); + Schema schema = new Schema( + required(1, "intCol", IntegerType.get()), + required(2, "nestedStructCol", nestedStructType) + ); + + Record leafStruct = new Record(AvroSchemaUtil.convert(leafStructType)); + leafStruct.put("leafLongCol", 20L); + leafStruct.put("leafBinaryCol", "A".getBytes()); + Record nestedStruct = new Record(AvroSchemaUtil.convert(nestedStructType)); + nestedStruct.put("longCol", 100L); + nestedStruct.put("leafStructCol", leafStruct); + Record record = new Record(AvroSchemaUtil.convert(schema.asStruct())); + record.put("intCol", Integer.MAX_VALUE); + record.put("nestedStructCol", nestedStruct); + + File parquetFile = writeRecords(schema, record); + + Metrics metrics = ParquetMetrics.fromInputFile(localInput(parquetFile)); + Assert.assertEquals(1L, (long) metrics.recordCount()); + assertCounts(1, 1L, 0L, metrics); + assertBounds(1, IntegerType.get(), Integer.MAX_VALUE, Integer.MAX_VALUE, metrics); + assertCounts(3, 1L, 0L, metrics); + assertBounds(3, LongType.get(), 100L, 100L, metrics); + assertCounts(5, 1L, 0L, metrics); + assertBounds(5, LongType.get(), 20L, 20L, metrics); + assertCounts(6, 1L, 0L, metrics); + assertBounds(6, BinaryType.get(), + ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics); + } + @Test public void testMetricsForListAndMapElements() throws IOException { StructType structType = StructType.of( From 884160824b7b43e0a4b0fa2e09fd576990642075 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Tue, 19 Mar 2019 20:04:41 +0000 Subject: [PATCH 2/2] Fix styling --- .../main/java/com/netflix/iceberg/parquet/ParquetMetrics.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetrics.java b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetrics.java index f36b12ccefe3..137b9abbd26b 100644 --- a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetrics.java +++ b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetrics.java @@ -113,7 +113,9 @@ private static boolean shouldStoreBounds(ColumnPath columnPath, Schema schema) { Type currentType = schema.asStruct(); while (pathIterator.hasNext()) { - if (currentType == null || !currentType.isStructType()) return false; + if (currentType == null || !currentType.isStructType()) { + return false; + } String fieldName = pathIterator.next(); currentType = currentType.asStructType().fieldType(fieldName); }