diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index 3c9bd5cd56c1..529fac065000 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -80,13 +80,13 @@ private static class ReadBuilder extends TypeWithSchemaVisitor message(Types.StructType expected, MessageType message, + public ParquetValueReader message(org.apache.iceberg.types.Type expected, MessageType message, List> fieldReaders) { - return struct(expected, message.asGroupType(), fieldReaders); + return struct(expected == null ? null : expected.asStructType(), message.asGroupType(), fieldReaders); } @Override - public ParquetValueReader struct(Types.StructType expected, GroupType struct, + public ParquetValueReader struct(org.apache.iceberg.types.Type expected, GroupType struct, List> fieldReaders) { // match the expected struct's order Map> readersById = Maps.newHashMap(); @@ -103,7 +103,7 @@ public ParquetValueReader struct(Types.StructType expected, GroupType s } List expectedFields = expected != null ? - expected.fields() : ImmutableList.of(); + expected.asStructType().fields() : ImmutableList.of(); List> reorderedFields = Lists.newArrayListWithExpectedSize( expectedFields.size()); List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); @@ -132,7 +132,7 @@ public ParquetValueReader struct(Types.StructType expected, GroupType s } @Override - public ParquetValueReader list(Types.ListType expectedList, GroupType array, + public ParquetValueReader list(org.apache.iceberg.types.Type expectedList, GroupType array, ParquetValueReader elementReader) { GroupType repeated = array.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); @@ -147,7 +147,7 @@ public ParquetValueReader list(Types.ListType expectedList, GroupType array, } @Override - public ParquetValueReader map(Types.MapType expectedMap, GroupType map, + public ParquetValueReader map(org.apache.iceberg.types.Type expectedMap, GroupType map, ParquetValueReader keyReader, ParquetValueReader valueReader) { GroupType repeatedKeyValue = map.getFields().get(0).asGroupType(); @@ -168,7 +168,7 @@ public ParquetValueReader map(Types.MapType expectedMap, GroupType map, @Override @SuppressWarnings("CyclomaticComplexity") - public ParquetValueReader primitive(org.apache.iceberg.types.Type.PrimitiveType expected, + public ParquetValueReader primitive(org.apache.iceberg.types.Type expected, PrimitiveType primitive) { ColumnDescriptor desc = type.getColumnDescription(currentPath()); diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java index c91b659b632c..01575564039c 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java @@ -29,11 +29,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; -import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.MapType; -import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.types.logical.RowType.RowField; import org.apache.flink.table.types.logical.SmallIntType; import org.apache.flink.table.types.logical.TinyIntType; import org.apache.iceberg.parquet.ParquetValueReaders; @@ -68,27 +64,27 @@ private static class WriteBuilder extends ParquetWithFlinkSchemaVisitor message(RowType sStruct, MessageType message, List> fields) { + public ParquetValueWriter message(LogicalType sStruct, MessageType message, List> fields) { return struct(sStruct, message.asGroupType(), fields); } @Override - public ParquetValueWriter struct(RowType sStruct, GroupType struct, + public ParquetValueWriter struct(LogicalType fStruct, GroupType struct, List> fieldWriters) { List fields = struct.getFields(); - List flinkFields = sStruct.getFields(); + List flinkFields = fStruct.getChildren(); List> writers = Lists.newArrayListWithExpectedSize(fieldWriters.size()); List flinkTypes = Lists.newArrayList(); for (int i = 0; i < fields.size(); i += 1) { writers.add(newOption(struct.getType(i), fieldWriters.get(i))); - flinkTypes.add(flinkFields.get(i).getType()); + flinkTypes.add(flinkFields.get(i)); } return new RowDataWriter(writers, flinkTypes); } @Override - public ParquetValueWriter list(ArrayType sArray, GroupType array, ParquetValueWriter elementWriter) { + public ParquetValueWriter list(LogicalType fArray, GroupType array, ParquetValueWriter elementWriter) { GroupType repeated = array.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); @@ -97,11 +93,11 @@ public ParquetValueWriter list(ArrayType sArray, GroupType array, ParquetValu return new ArrayDataWriter<>(repeatedD, repeatedR, newOption(repeated.getType(0), elementWriter), - sArray.getElementType()); + arrayElementType(fArray)); } @Override - public ParquetValueWriter map(MapType sMap, GroupType map, + public ParquetValueWriter map(LogicalType fMap, GroupType map, ParquetValueWriter keyWriter, ParquetValueWriter valueWriter) { GroupType repeatedKeyValue = map.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); @@ -112,10 +108,9 @@ public ParquetValueWriter map(MapType sMap, GroupType map, return new MapDataWriter<>(repeatedD, repeatedR, newOption(repeatedKeyValue.getType(0), keyWriter), newOption(repeatedKeyValue.getType(1), valueWriter), - sMap.getKeyType(), sMap.getValueType()); + mapKeyType(fMap), mapValueType(fMap)); } - private ParquetValueWriter newOption(org.apache.parquet.schema.Type fieldType, ParquetValueWriter writer) { int maxD = type.getMaxDefinitionLevel(path(fieldType.getName())); return ParquetValueWriters.option(fieldType, maxD, writer); diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/ParquetWithFlinkSchemaVisitor.java b/flink/src/main/java/org/apache/iceberg/flink/data/ParquetWithFlinkSchemaVisitor.java index 541986f93889..c707cf2b163d 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/ParquetWithFlinkSchemaVisitor.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/ParquetWithFlinkSchemaVisitor.java @@ -19,181 +19,48 @@ package org.apache.iceberg.flink.data; -import java.util.Deque; -import java.util.List; import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.types.logical.RowType.RowField; -import org.apache.iceberg.avro.AvroSchemaUtil; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.OriginalType; -import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.Type; +import org.apache.iceberg.parquet.ParquetTypeWithPartnerVisitor; -public class ParquetWithFlinkSchemaVisitor { - private final Deque fieldNames = Lists.newLinkedList(); +public class ParquetWithFlinkSchemaVisitor extends ParquetTypeWithPartnerVisitor { - public static T visit(LogicalType sType, Type type, ParquetWithFlinkSchemaVisitor visitor) { - Preconditions.checkArgument(sType != null, "Invalid DataType: null"); - if (type instanceof MessageType) { - Preconditions.checkArgument(sType instanceof RowType, "Invalid struct: %s is not a struct", sType); - RowType struct = (RowType) sType; - return visitor.message(struct, (MessageType) type, visitFields(struct, type.asGroupType(), visitor)); - } else if (type.isPrimitive()) { - return visitor.primitive(sType, type.asPrimitiveType()); - } else { - // if not a primitive, the typeId must be a group - GroupType group = type.asGroupType(); - OriginalType annotation = group.getOriginalType(); - if (annotation != null) { - switch (annotation) { - case LIST: - Preconditions.checkArgument(!group.isRepetition(Type.Repetition.REPEATED), - "Invalid list: top-level group is repeated: %s", group); - Preconditions.checkArgument(group.getFieldCount() == 1, - "Invalid list: does not contain single repeated field: %s", group); - - GroupType repeatedElement = group.getFields().get(0).asGroupType(); - Preconditions.checkArgument(repeatedElement.isRepetition(Type.Repetition.REPEATED), - "Invalid list: inner group is not repeated"); - Preconditions.checkArgument(repeatedElement.getFieldCount() <= 1, - "Invalid list: repeated group is not a single field: %s", group); - - Preconditions.checkArgument(sType instanceof ArrayType, "Invalid list: %s is not an array", sType); - ArrayType array = (ArrayType) sType; - RowType.RowField element = new RowField( - "element", array.getElementType(), "element of " + array.asSummaryString()); - - visitor.fieldNames.push(repeatedElement.getName()); - try { - T elementResult = null; - if (repeatedElement.getFieldCount() > 0) { - elementResult = visitField(element, repeatedElement.getType(0), visitor); - } - - return visitor.list(array, group, elementResult); - - } finally { - visitor.fieldNames.pop(); - } - - case MAP: - Preconditions.checkArgument(!group.isRepetition(Type.Repetition.REPEATED), - "Invalid map: top-level group is repeated: %s", group); - Preconditions.checkArgument(group.getFieldCount() == 1, - "Invalid map: does not contain single repeated field: %s", group); - - GroupType repeatedKeyValue = group.getType(0).asGroupType(); - Preconditions.checkArgument(repeatedKeyValue.isRepetition(Type.Repetition.REPEATED), - "Invalid map: inner group is not repeated"); - Preconditions.checkArgument(repeatedKeyValue.getFieldCount() <= 2, - "Invalid map: repeated group does not have 2 fields"); - - Preconditions.checkArgument(sType instanceof MapType, "Invalid map: %s is not a map", sType); - MapType map = (MapType) sType; - RowField keyField = new RowField("key", map.getKeyType(), "key of " + map.asSummaryString()); - RowField valueField = new RowField( - "value", map.getValueType(), "value of " + map.asSummaryString()); - - visitor.fieldNames.push(repeatedKeyValue.getName()); - try { - T keyResult = null; - T valueResult = null; - switch (repeatedKeyValue.getFieldCount()) { - case 2: - // if there are 2 fields, both key and value are projected - keyResult = visitField(keyField, repeatedKeyValue.getType(0), visitor); - valueResult = visitField(valueField, repeatedKeyValue.getType(1), visitor); - break; - case 1: - // if there is just one, use the name to determine what it is - Type keyOrValue = repeatedKeyValue.getType(0); - if (keyOrValue.getName().equalsIgnoreCase("key")) { - keyResult = visitField(keyField, keyOrValue, visitor); - // value result remains null - } else { - valueResult = visitField(valueField, keyOrValue, visitor); - // key result remains null - } - break; - default: - // both results will remain null - } - - return visitor.map(map, group, keyResult, valueResult); - - } finally { - visitor.fieldNames.pop(); - } - - default: - } - } - Preconditions.checkArgument(sType instanceof RowType, "Invalid struct: %s is not a struct", sType); - RowType struct = (RowType) sType; - return visitor.struct(struct, group, visitFields(struct, group, visitor)); + @Override + protected LogicalType arrayElementType(LogicalType arrayType) { + if (arrayType == null) { + return null; } - } - private static T visitField(RowType.RowField sField, Type field, ParquetWithFlinkSchemaVisitor visitor) { - visitor.fieldNames.push(field.getName()); - try { - return visit(sField.getType(), field, visitor); - } finally { - visitor.fieldNames.pop(); - } + return ((ArrayType) arrayType).getElementType(); } - private static List visitFields(RowType struct, GroupType group, - ParquetWithFlinkSchemaVisitor visitor) { - List sFields = struct.getFields(); - Preconditions.checkArgument(sFields.size() == group.getFieldCount(), - "Structs do not match: %s and %s", struct, group); - List results = Lists.newArrayListWithExpectedSize(group.getFieldCount()); - for (int i = 0; i < sFields.size(); i += 1) { - Type field = group.getFields().get(i); - RowType.RowField sField = sFields.get(i); - Preconditions.checkArgument(field.getName().equals(AvroSchemaUtil.makeCompatibleName(sField.getName())), - "Structs do not match: field %s != %s", field.getName(), sField.getName()); - results.add(visitField(sField, field, visitor)); + @Override + protected LogicalType mapKeyType(LogicalType mapType) { + if (mapType == null) { + return null; } - return results; - } - - public T message(RowType sStruct, MessageType message, List fields) { - return null; - } - - public T struct(RowType sStruct, GroupType struct, List fields) { - return null; + return ((MapType) mapType).getKeyType(); } - public T list(ArrayType sArray, GroupType array, T element) { - return null; - } - - public T map(MapType sMap, GroupType map, T key, T value) { - return null; - } + @Override + protected LogicalType mapValueType(LogicalType mapType) { + if (mapType == null) { + return null; + } - public T primitive(LogicalType sPrimitive, PrimitiveType primitive) { - return null; + return ((MapType) mapType).getValueType(); } - protected String[] currentPath() { - return Lists.newArrayList(fieldNames.descendingIterator()).toArray(new String[0]); - } + @Override + protected LogicalType fieldType(LogicalType structType, int pos, Integer fieldId) { + if (structType == null || ((RowType) structType).getFieldCount() < pos + 1) { + return null; + } - protected String[] path(String name) { - List list = Lists.newArrayList(fieldNames.descendingIterator()); - list.add(name); - return list.toArray(new String[0]); + return ((RowType) structType).getTypeAt(pos); } } diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index 710c771036d4..58ff52cd32fe 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -83,14 +83,14 @@ private FallbackReadBuilder(MessageType type, Map idToConstant) { } @Override - public ParquetValueReader message(Types.StructType expected, MessageType message, + public ParquetValueReader message(org.apache.iceberg.types.Type expected, MessageType message, List> fieldReaders) { // the top level matches by ID, but the remaining IDs are missing return super.struct(expected, message, fieldReaders); } @Override - public ParquetValueReader struct(Types.StructType expected, GroupType struct, + public ParquetValueReader struct(org.apache.iceberg.types.Type expected, GroupType struct, List> fieldReaders) { // the expected struct is ignored because nested fields are never found when the List> newFields = Lists.newArrayListWithExpectedSize( @@ -107,7 +107,7 @@ public ParquetValueReader struct(Types.StructType expected, GroupType struct, } } - return createStructReader(types, newFields, expected); + return createStructReader(types, newFields, expected.asStructType()); } } @@ -121,13 +121,13 @@ private ReadBuilder(MessageType type, Map idToConstant) { } @Override - public ParquetValueReader message(Types.StructType expected, MessageType message, + public ParquetValueReader message(org.apache.iceberg.types.Type expected, MessageType message, List> fieldReaders) { return struct(expected, message.asGroupType(), fieldReaders); } @Override - public ParquetValueReader struct(Types.StructType expected, GroupType struct, + public ParquetValueReader struct(org.apache.iceberg.types.Type expected, GroupType struct, List> fieldReaders) { // match the expected struct's order Map> readersById = Maps.newHashMap(); @@ -145,7 +145,7 @@ public ParquetValueReader struct(Types.StructType expected, GroupType struct, } List expectedFields = expected != null ? - expected.fields() : ImmutableList.of(); + expected.asStructType().fields() : ImmutableList.of(); List> reorderedFields = Lists.newArrayListWithExpectedSize( expectedFields.size()); List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); @@ -170,11 +170,11 @@ public ParquetValueReader struct(Types.StructType expected, GroupType struct, } } - return createStructReader(types, reorderedFields, expected); + return createStructReader(types, reorderedFields, expected == null ? null : expected.asStructType()); } @Override - public ParquetValueReader list(Types.ListType expectedList, GroupType array, + public ParquetValueReader list(org.apache.iceberg.types.Type expectedList, GroupType array, ParquetValueReader elementReader) { if (expectedList == null) { return null; @@ -194,7 +194,7 @@ public ParquetValueReader list(Types.ListType expectedList, GroupType array, } @Override - public ParquetValueReader map(Types.MapType expectedMap, GroupType map, + public ParquetValueReader map(org.apache.iceberg.types.Type expectedMap, GroupType map, ParquetValueReader keyReader, ParquetValueReader valueReader) { if (expectedMap == null) { @@ -219,7 +219,7 @@ public ParquetValueReader map(Types.MapType expectedMap, GroupType map, @Override @SuppressWarnings("checkstyle:CyclomaticComplexity") - public ParquetValueReader primitive(org.apache.iceberg.types.Type.PrimitiveType expected, + public ParquetValueReader primitive(org.apache.iceberg.types.Type expected, PrimitiveType primitive) { if (expected == null) { return null; @@ -291,13 +291,13 @@ public ParquetValueReader primitive(org.apache.iceberg.types.Type.PrimitiveTy case BINARY: return new ParquetValueReaders.BytesReader(desc); case INT32: - if (expected != null && expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG) { + if (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG) { return new ParquetValueReaders.IntAsLongReader(desc); } else { return new ParquetValueReaders.UnboxedReader<>(desc); } case FLOAT: - if (expected != null && expected.typeId() == org.apache.iceberg.types.Type.TypeID.DOUBLE) { + if (expected.typeId() == org.apache.iceberg.types.Type.TypeID.DOUBLE) { return new ParquetValueReaders.FloatAsDoubleReader(desc); } else { return new ParquetValueReaders.UnboxedReader<>(desc); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvroValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvroValueReaders.java index fe2c8467bd40..3e87fb141862 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvroValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvroValueReaders.java @@ -22,6 +22,7 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.UUID; @@ -76,13 +77,13 @@ private static class ReadBuilder extends TypeWithSchemaVisitor message(Types.StructType expected, MessageType message, + public ParquetValueReader message(org.apache.iceberg.types.Type expected, MessageType message, List> fieldReaders) { return struct(expected, message.asGroupType(), fieldReaders); } @Override - public ParquetValueReader struct(Types.StructType expected, GroupType struct, + public ParquetValueReader struct(org.apache.iceberg.types.Type expected, GroupType struct, List> fieldReaders) { Schema avroSchema = avroSchemas.get(expected); @@ -99,7 +100,7 @@ public ParquetValueReader struct(Types.StructType expected, GroupType struct, } List expectedFields = expected != null ? - expected.fields() : ImmutableList.of(); + expected.asStructType().fields() : ImmutableList.of(); List> reorderedFields = Lists.newArrayListWithExpectedSize( expectedFields.size()); List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); @@ -119,7 +120,7 @@ public ParquetValueReader struct(Types.StructType expected, GroupType struct, } @Override - public ParquetValueReader list(Types.ListType expectedList, GroupType array, + public ParquetValueReader list(org.apache.iceberg.types.Type expectedList, GroupType array, ParquetValueReader elementReader) { GroupType repeated = array.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); @@ -134,7 +135,7 @@ public ParquetValueReader list(Types.ListType expectedList, GroupType array, } @Override - public ParquetValueReader map(Types.MapType expectedMap, GroupType map, + public ParquetValueReader map(org.apache.iceberg.types.Type expectedMap, GroupType map, ParquetValueReader keyReader, ParquetValueReader valueReader) { GroupType repeatedKeyValue = map.getFields().get(0).asGroupType(); @@ -154,11 +155,11 @@ public ParquetValueReader map(Types.MapType expectedMap, GroupType map, } @Override - public ParquetValueReader primitive(org.apache.iceberg.types.Type.PrimitiveType expected, + public ParquetValueReader primitive(org.apache.iceberg.types.Type expected, PrimitiveType primitive) { ColumnDescriptor desc = type.getColumnDescription(currentPath()); - boolean isMapKey = fieldNames.contains("key"); + boolean isMapKey = Arrays.asList(currentPath()).contains("key"); if (primitive.getOriginalType() != null) { switch (primitive.getOriginalType()) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetTypeWithPartnerVisitor.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetTypeWithPartnerVisitor.java new file mode 100644 index 000000000000..19e2a0ead7d0 --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetTypeWithPartnerVisitor.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.parquet; + +import java.util.Deque; +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; + +public abstract class ParquetTypeWithPartnerVisitor { + private final Deque fieldNames = Lists.newLinkedList(); + + public static T visit(P partnerType, Type type, ParquetTypeWithPartnerVisitor visitor) { + if (type instanceof MessageType) { + return visitor.message(partnerType, (MessageType) type, visitFields(partnerType, type.asGroupType(), visitor)); + } else if (type.isPrimitive()) { + return visitor.primitive(partnerType, type.asPrimitiveType()); + } else { + // if not a primitive, the typeId must be a group + GroupType group = type.asGroupType(); + OriginalType annotation = group.getOriginalType(); + if (annotation != null) { + switch (annotation) { + case LIST: + return visitList(partnerType, group, visitor); + case MAP: + return visitMap(partnerType, group, visitor); + default: + } + } + return visitor.struct(partnerType, group, visitFields(partnerType, group, visitor)); + } + } + + private static T visitList(P list, GroupType group, ParquetTypeWithPartnerVisitor visitor) { + Preconditions.checkArgument(!group.isRepetition(Type.Repetition.REPEATED), + "Invalid list: top-level group is repeated: %s", group); + Preconditions.checkArgument(group.getFieldCount() == 1, + "Invalid list: does not contain single repeated field: %s", group); + + GroupType repeatedElement = group.getFields().get(0).asGroupType(); + Preconditions.checkArgument(repeatedElement.isRepetition(Type.Repetition.REPEATED), + "Invalid list: inner group is not repeated"); + Preconditions.checkArgument(repeatedElement.getFieldCount() <= 1, + "Invalid list: repeated group is not a single field: %s", group); + + visitor.beforeRepeatedElement(repeatedElement); + try { + T elementResult = null; + if (repeatedElement.getFieldCount() > 0) { + Type elementField = repeatedElement.getType(0); + visitor.beforeElementField(elementField); + try { + elementResult = visit(visitor.arrayElementType(list), elementField, visitor); + } finally { + visitor.afterElementField(elementField); + } + } + return visitor.list(list, group, elementResult); + } finally { + visitor.afterRepeatedElement(repeatedElement); + } + } + + private static T visitMap(P map, GroupType group, ParquetTypeWithPartnerVisitor visitor) { + Preconditions.checkArgument(!group.isRepetition(Type.Repetition.REPEATED), + "Invalid map: top-level group is repeated: %s", group); + Preconditions.checkArgument(group.getFieldCount() == 1, + "Invalid map: does not contain single repeated field: %s", group); + + GroupType repeatedKeyValue = group.getType(0).asGroupType(); + Preconditions.checkArgument(repeatedKeyValue.isRepetition(Type.Repetition.REPEATED), + "Invalid map: inner group is not repeated"); + Preconditions.checkArgument(repeatedKeyValue.getFieldCount() <= 2, + "Invalid map: repeated group does not have 2 fields"); + + visitor.beforeRepeatedKeyValue(repeatedKeyValue); + try { + T keyResult = null; + T valueResult = null; + switch (repeatedKeyValue.getFieldCount()) { + case 2: + // if there are 2 fields, both key and value are projected + Type keyType = repeatedKeyValue.getType(0); + visitor.beforeKeyField(keyType); + try { + keyResult = visit(visitor.mapKeyType(map), keyType, visitor); + } finally { + visitor.afterKeyField(keyType); + } + Type valueType = repeatedKeyValue.getType(1); + visitor.beforeValueField(valueType); + try { + valueResult = visit(visitor.mapValueType(map), repeatedKeyValue.getType(1), visitor); + } finally { + visitor.afterValueField(valueType); + } + break; + case 1: + // if there is just one, use the name to determine what it is + Type keyOrValue = repeatedKeyValue.getType(0); + if (keyOrValue.getName().equalsIgnoreCase("key")) { + visitor.beforeKeyField(keyOrValue); + try { + keyResult = visit(visitor.mapKeyType(map), keyOrValue, visitor); + } finally { + visitor.afterKeyField(keyOrValue); + } + // value result remains null + } else { + visitor.beforeValueField(keyOrValue); + try { + valueResult = visit(visitor.mapValueType(map), keyOrValue, visitor); + } finally { + visitor.afterValueField(keyOrValue); + } + // key result remains null + } + break; + default: + // both results will remain null + } + + return visitor.map(map, group, keyResult, valueResult); + + } finally { + visitor.afterRepeatedKeyValue(repeatedKeyValue); + } + } + + protected static List visitFields(P struct, GroupType group, ParquetTypeWithPartnerVisitor visitor) { + List results = Lists.newArrayListWithExpectedSize(group.getFieldCount()); + for (int i = 0; i < group.getFieldCount(); i += 1) { + Type field = group.getFields().get(i); + visitor.beforeField(field); + try { + Integer fieldId = field.getId() == null ? null : field.getId().intValue(); + results.add(visit(visitor.fieldType(struct, i, fieldId), field, visitor)); + } finally { + visitor.afterField(field); + } + } + + return results; + } + + protected abstract P arrayElementType(P arrayType); + protected abstract P mapKeyType(P mapType); + protected abstract P mapValueType(P mapType); + protected abstract P fieldType(P structType, int pos, Integer parquetFieldId); + + public T message(P struct, MessageType message, List fields) { + return null; + } + + public T struct(P pStruct, GroupType struct, List fields) { + return null; + } + + public T list(P pArray, GroupType array, T element) { + return null; + } + + public T map(P pMap, GroupType map, T key, T value) { + return null; + } + + public T primitive(P pType, PrimitiveType primitive) { + return null; + } + + + public void beforeField(Type type) { + fieldNames.push(type.getName()); + } + + public void afterField(Type type) { + fieldNames.pop(); + } + + public void beforeRepeatedElement(Type element) { + beforeField(element); + } + + public void afterRepeatedElement(Type element) { + afterField(element); + } + + public void beforeElementField(Type element) { + beforeField(element); + } + + public void afterElementField(Type element) { + afterField(element); + } + + public void beforeRepeatedKeyValue(Type keyValue) { + beforeField(keyValue); + } + + public void afterRepeatedKeyValue(Type keyValue) { + afterField(keyValue); + } + + public void beforeKeyField(Type key) { + beforeField(key); + } + + public void afterKeyField(Type key) { + afterField(key); + } + + public void beforeValueField(Type value) { + beforeField(value); + } + + public void afterValueField(Type value) { + afterField(value); + } + + protected String[] currentPath() { + return Lists.newArrayList(fieldNames.descendingIterator()).toArray(new String[0]); + } + + protected String[] path(String name) { + List list = Lists.newArrayList(fieldNames.descendingIterator()); + list.add(name); + return list.toArray(new String[0]); + } + +} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/TypeWithSchemaVisitor.java b/parquet/src/main/java/org/apache/iceberg/parquet/TypeWithSchemaVisitor.java index 6dd76d722074..5cf84f24ed6b 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/TypeWithSchemaVisitor.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/TypeWithSchemaVisitor.java @@ -19,187 +19,50 @@ package org.apache.iceberg.parquet; -import java.util.LinkedList; -import java.util.List; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; -import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.OriginalType; -import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.Type; /** * Visitor for traversing a Parquet type with a companion Iceberg type. * * @param the Java class returned by the visitor */ -public class TypeWithSchemaVisitor { - @SuppressWarnings({"checkstyle:VisibilityModifier", "checkstyle:IllegalType"}) - protected LinkedList fieldNames = Lists.newLinkedList(); +public class TypeWithSchemaVisitor extends ParquetTypeWithPartnerVisitor { - @SuppressWarnings("checkstyle:CyclomaticComplexity") - public static T visit(org.apache.iceberg.types.Type iType, Type type, TypeWithSchemaVisitor visitor) { - if (type instanceof MessageType) { - Types.StructType struct = iType != null ? iType.asStructType() : null; - return visitor.message(struct, (MessageType) type, - visitFields(struct, type.asGroupType(), visitor)); - - } else if (type.isPrimitive()) { - org.apache.iceberg.types.Type.PrimitiveType iPrimitive = iType != null ? - iType.asPrimitiveType() : null; - return visitor.primitive(iPrimitive, type.asPrimitiveType()); - - } else { - // if not a primitive, the typeId must be a group - GroupType group = type.asGroupType(); - OriginalType annotation = group.getOriginalType(); - if (annotation != null) { - switch (annotation) { - case LIST: - Preconditions.checkArgument(!group.isRepetition(Type.Repetition.REPEATED), - "Invalid list: top-level group is repeated: %s", group); - Preconditions.checkArgument(group.getFieldCount() == 1, - "Invalid list: does not contain single repeated field: %s", group); - - GroupType repeatedElement = group.getFields().get(0).asGroupType(); - Preconditions.checkArgument(repeatedElement.isRepetition(Type.Repetition.REPEATED), - "Invalid list: inner group is not repeated"); - Preconditions.checkArgument(repeatedElement.getFieldCount() <= 1, - "Invalid list: repeated group is not a single field: %s", group); - - Types.ListType list = null; - Types.NestedField element = null; - if (iType != null) { - list = iType.asListType(); - element = list.fields().get(0); - } - - visitor.fieldNames.push(repeatedElement.getName()); - try { - T elementResult = null; - if (repeatedElement.getFieldCount() > 0) { - elementResult = visitField(element, repeatedElement.getType(0), visitor); - } - - return visitor.list(list, group, elementResult); - } finally { - visitor.fieldNames.pop(); - } - - case MAP: - Preconditions.checkArgument(!group.isRepetition(Type.Repetition.REPEATED), - "Invalid map: top-level group is repeated: %s", group); - Preconditions.checkArgument(group.getFieldCount() == 1, - "Invalid map: does not contain single repeated field: %s", group); - - GroupType repeatedKeyValue = group.getType(0).asGroupType(); - Preconditions.checkArgument(repeatedKeyValue.isRepetition(Type.Repetition.REPEATED), - "Invalid map: inner group is not repeated"); - Preconditions.checkArgument(repeatedKeyValue.getFieldCount() <= 2, - "Invalid map: repeated group does not have 2 fields"); - - Types.MapType map = null; - Types.NestedField keyField = null; - Types.NestedField valueField = null; - if (iType != null) { - map = iType.asMapType(); - keyField = map.fields().get(0); - valueField = map.fields().get(1); - } - - visitor.fieldNames.push(repeatedKeyValue.getName()); - try { - T keyResult = null; - T valueResult = null; - switch (repeatedKeyValue.getFieldCount()) { - case 2: - // if there are 2 fields, both key and value are projected - keyResult = visitField(keyField, repeatedKeyValue.getType(0), visitor); - valueResult = visitField(valueField, repeatedKeyValue.getType(1), visitor); - break; - case 1: - // if there is just one, use the name to determine what it is - Type keyOrValue = repeatedKeyValue.getType(0); - if (keyOrValue.getName().equalsIgnoreCase("key")) { - keyResult = visitField(keyField, keyOrValue, visitor); - // value result remains null - } else { - valueResult = visitField(valueField, keyOrValue, visitor); - // key result remains null - } - break; - default: - // both results will remain null - } - - return visitor.map(map, group, keyResult, valueResult); - - } finally { - visitor.fieldNames.pop(); - } - - default: - } - } - - Types.StructType struct = iType != null ? iType.asStructType() : null; - return visitor.struct(struct, group, visitFields(struct, group, visitor)); + @Override + protected Type arrayElementType(Type arrayType) { + if (arrayType == null) { + return null; } - } - private static T visitField(Types.NestedField iField, Type field, TypeWithSchemaVisitor visitor) { - visitor.fieldNames.push(field.getName()); - try { - return visit(iField != null ? iField.type() : null, field, visitor); - } finally { - visitor.fieldNames.pop(); - } + return arrayType.asListType().elementType(); } - private static List visitFields(Types.StructType struct, GroupType group, TypeWithSchemaVisitor visitor) { - List results = Lists.newArrayListWithExpectedSize(group.getFieldCount()); - for (Type field : group.getFields()) { - int id = -1; - if (field.getId() != null) { - id = field.getId().intValue(); - } - Types.NestedField iField = (struct != null && id >= 0) ? struct.field(id) : null; - results.add(visitField(iField, field, visitor)); + @Override + protected Type mapKeyType(Type mapType) { + if (mapType == null) { + return null; } - return results; - } - - public T message(Types.StructType iStruct, MessageType message, List fields) { - return null; - } - - public T struct(Types.StructType iStruct, GroupType struct, List fields) { - return null; + return mapType.asMapType().keyType(); } - public T list(Types.ListType iList, GroupType array, T element) { - return null; - } - - public T map(Types.MapType iMap, GroupType map, T key, T value) { - return null; - } + @Override + protected Type mapValueType(Type mapType) { + if (mapType == null) { + return null; + } - public T primitive(org.apache.iceberg.types.Type.PrimitiveType iPrimitive, - PrimitiveType primitive) { - return null; + return mapType.asMapType().valueType(); } - protected String[] currentPath() { - return Lists.newArrayList(fieldNames.descendingIterator()).toArray(new String[0]); - } + @Override + protected Type fieldType(Type structType, int pos, Integer fieldId) { + if (structType == null || fieldId == null) { + return null; + } - protected String[] path(String name) { - List list = Lists.newArrayList(fieldNames.descendingIterator()); - list.add(name); - return list.toArray(new String[0]); + Types.NestedField field = structType.asStructType().field(fieldId); + return field == null ? null : field.type(); } } diff --git a/pig/src/main/java/org/apache/iceberg/pig/PigParquetReader.java b/pig/src/main/java/org/apache/iceberg/pig/PigParquetReader.java index c90c8a910b16..638b6ba52350 100644 --- a/pig/src/main/java/org/apache/iceberg/pig/PigParquetReader.java +++ b/pig/src/main/java/org/apache/iceberg/pig/PigParquetReader.java @@ -87,14 +87,14 @@ private static class FallbackReadBuilder extends ReadBuilder { @Override public ParquetValueReader message( - Types.StructType expected, MessageType message, List> fieldReaders) { + org.apache.iceberg.types.Type expected, MessageType message, List> fieldReaders) { // the top level matches by ID, but the remaining IDs are missing return super.struct(expected, message, fieldReaders); } @Override public ParquetValueReader struct( - Types.StructType ignored, GroupType struct, List> fieldReaders) { + org.apache.iceberg.types.Type ignored, GroupType struct, List> fieldReaders) { // the expected struct is ignored because nested fields are never found when the List> newFields = Lists.newArrayListWithExpectedSize( fieldReaders.size()); @@ -126,13 +126,13 @@ MessageType getMessageType() { @Override public ParquetValueReader message( - Types.StructType expected, MessageType message, List> fieldReaders) { + org.apache.iceberg.types.Type expected, MessageType message, List> fieldReaders) { return struct(expected, message.asGroupType(), fieldReaders); } @Override public ParquetValueReader struct( - Types.StructType expected, GroupType struct, List> fieldReaders) { + org.apache.iceberg.types.Type expected, GroupType struct, List> fieldReaders) { // match the expected struct's order Map> readersById = Maps.newHashMap(); Map typesById = Maps.newHashMap(); @@ -146,7 +146,7 @@ public ParquetValueReader struct( } List expectedFields = expected != null ? - expected.fields() : ImmutableList.of(); + expected.asStructType().fields() : ImmutableList.of(); List> reorderedFields = Lists.newArrayListWithExpectedSize( expectedFields.size()); List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); @@ -173,7 +173,7 @@ public ParquetValueReader struct( @Override public ParquetValueReader list( - Types.ListType expectedList, GroupType array, ParquetValueReader elementReader) { + org.apache.iceberg.types.Type expectedList, GroupType array, ParquetValueReader elementReader) { GroupType repeated = array.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); @@ -188,7 +188,8 @@ public ParquetValueReader list( @Override public ParquetValueReader map( - Types.MapType expectedMap, GroupType map, ParquetValueReader keyReader, ParquetValueReader valueReader) { + org.apache.iceberg.types.Type expectedMap, GroupType map, ParquetValueReader keyReader, + ParquetValueReader valueReader) { GroupType repeatedKeyValue = map.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); @@ -207,7 +208,7 @@ public ParquetValueReader map( @Override public ParquetValueReader primitive( - org.apache.iceberg.types.Type.PrimitiveType expected, PrimitiveType primitive) { + org.apache.iceberg.types.Type expected, PrimitiveType primitive) { ColumnDescriptor desc = type.getColumnDescription(currentPath()); if (primitive.getOriginalType() != null) { diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/ParquetWithSparkSchemaVisitor.java b/spark/src/main/java/org/apache/iceberg/spark/data/ParquetWithSparkSchemaVisitor.java index 924cc3e2325a..6b6705035f3c 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/ParquetWithSparkSchemaVisitor.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/ParquetWithSparkSchemaVisitor.java @@ -19,22 +19,10 @@ package org.apache.iceberg.spark.data; -import java.util.Deque; -import java.util.List; -import org.apache.iceberg.avro.AvroSchemaUtil; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.OriginalType; -import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.Type; -import org.apache.parquet.schema.Type.Repetition; +import org.apache.iceberg.parquet.ParquetTypeWithPartnerVisitor; import org.apache.spark.sql.types.ArrayType; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.MapType; -import org.apache.spark.sql.types.Metadata; -import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; /** @@ -42,167 +30,41 @@ * * @param the Java class returned by the visitor */ -public class ParquetWithSparkSchemaVisitor { - private final Deque fieldNames = Lists.newLinkedList(); +public class ParquetWithSparkSchemaVisitor extends ParquetTypeWithPartnerVisitor { - public static T visit(DataType sType, Type type, ParquetWithSparkSchemaVisitor visitor) { - Preconditions.checkArgument(sType != null, "Invalid DataType: null"); - if (type instanceof MessageType) { - Preconditions.checkArgument(sType instanceof StructType, "Invalid struct: %s is not a struct", sType); - StructType struct = (StructType) sType; - return visitor.message(struct, (MessageType) type, visitFields(struct, type.asGroupType(), visitor)); - - } else if (type.isPrimitive()) { - return visitor.primitive(sType, type.asPrimitiveType()); - - } else { - // if not a primitive, the typeId must be a group - GroupType group = type.asGroupType(); - OriginalType annotation = group.getOriginalType(); - if (annotation != null) { - switch (annotation) { - case LIST: - Preconditions.checkArgument(!group.isRepetition(Repetition.REPEATED), - "Invalid list: top-level group is repeated: %s", group); - Preconditions.checkArgument(group.getFieldCount() == 1, - "Invalid list: does not contain single repeated field: %s", group); - - GroupType repeatedElement = group.getFields().get(0).asGroupType(); - Preconditions.checkArgument(repeatedElement.isRepetition(Repetition.REPEATED), - "Invalid list: inner group is not repeated"); - Preconditions.checkArgument(repeatedElement.getFieldCount() <= 1, - "Invalid list: repeated group is not a single field: %s", group); - - Preconditions.checkArgument(sType instanceof ArrayType, "Invalid list: %s is not an array", sType); - ArrayType array = (ArrayType) sType; - StructField element = new StructField( - "element", array.elementType(), array.containsNull(), Metadata.empty()); - - visitor.fieldNames.push(repeatedElement.getName()); - try { - T elementResult = null; - if (repeatedElement.getFieldCount() > 0) { - elementResult = visitField(element, repeatedElement.getType(0), visitor); - } - - return visitor.list(array, group, elementResult); - - } finally { - visitor.fieldNames.pop(); - } - - case MAP: - Preconditions.checkArgument(!group.isRepetition(Repetition.REPEATED), - "Invalid map: top-level group is repeated: %s", group); - Preconditions.checkArgument(group.getFieldCount() == 1, - "Invalid map: does not contain single repeated field: %s", group); - - GroupType repeatedKeyValue = group.getType(0).asGroupType(); - Preconditions.checkArgument(repeatedKeyValue.isRepetition(Repetition.REPEATED), - "Invalid map: inner group is not repeated"); - Preconditions.checkArgument(repeatedKeyValue.getFieldCount() <= 2, - "Invalid map: repeated group does not have 2 fields"); - - Preconditions.checkArgument(sType instanceof MapType, "Invalid map: %s is not a map", sType); - MapType map = (MapType) sType; - StructField keyField = new StructField("key", map.keyType(), false, Metadata.empty()); - StructField valueField = new StructField( - "value", map.valueType(), map.valueContainsNull(), Metadata.empty()); - - visitor.fieldNames.push(repeatedKeyValue.getName()); - try { - T keyResult = null; - T valueResult = null; - switch (repeatedKeyValue.getFieldCount()) { - case 2: - // if there are 2 fields, both key and value are projected - keyResult = visitField(keyField, repeatedKeyValue.getType(0), visitor); - valueResult = visitField(valueField, repeatedKeyValue.getType(1), visitor); - break; - case 1: - // if there is just one, use the name to determine what it is - Type keyOrValue = repeatedKeyValue.getType(0); - if (keyOrValue.getName().equalsIgnoreCase("key")) { - keyResult = visitField(keyField, keyOrValue, visitor); - // value result remains null - } else { - valueResult = visitField(valueField, keyOrValue, visitor); - // key result remains null - } - break; - default: - // both results will remain null - } - - return visitor.map(map, group, keyResult, valueResult); - - } finally { - visitor.fieldNames.pop(); - } - - default: - } - } - - Preconditions.checkArgument(sType instanceof StructType, "Invalid struct: %s is not a struct", sType); - StructType struct = (StructType) sType; - return visitor.struct(struct, group, visitFields(struct, group, visitor)); + @Override + protected DataType arrayElementType(DataType arrayType) { + if (arrayType == null) { + return null; } - } - private static T visitField(StructField sField, Type field, ParquetWithSparkSchemaVisitor visitor) { - visitor.fieldNames.push(field.getName()); - try { - return visit(sField.dataType(), field, visitor); - } finally { - visitor.fieldNames.pop(); - } + return ((ArrayType) arrayType).elementType(); } - private static List visitFields(StructType struct, GroupType group, - ParquetWithSparkSchemaVisitor visitor) { - StructField[] sFields = struct.fields(); - Preconditions.checkArgument(sFields.length == group.getFieldCount(), - "Structs do not match: %s and %s", struct, group); - List results = Lists.newArrayListWithExpectedSize(group.getFieldCount()); - for (int i = 0; i < sFields.length; i += 1) { - Type field = group.getFields().get(i); - StructField sField = sFields[i]; - Preconditions.checkArgument(field.getName().equals(AvroSchemaUtil.makeCompatibleName(sField.name())), - "Structs do not match: field %s != %s", field.getName(), sField.name()); - results.add(visitField(sField, field, visitor)); + @Override + protected DataType mapKeyType(DataType mapType) { + if (mapType == null) { + return null; } - return results; - } - - public T message(StructType sStruct, MessageType message, List fields) { - return null; - } - - public T struct(StructType sStruct, GroupType struct, List fields) { - return null; + return ((MapType) mapType).keyType(); } - public T list(ArrayType sArray, GroupType array, T element) { - return null; - } - - public T map(MapType sMap, GroupType map, T key, T value) { - return null; - } + @Override + protected DataType mapValueType(DataType mapType) { + if (mapType == null) { + return null; + } - public T primitive(DataType sPrimitive, PrimitiveType primitive) { - return null; + return ((MapType) mapType).valueType(); } - protected String[] currentPath() { - return Lists.newArrayList(fieldNames.descendingIterator()).toArray(new String[0]); - } + @Override + protected DataType fieldType(DataType structType, int pos, Integer fieldId) { + if (structType == null || ((StructType) structType).size() < pos + 1) { + return null; + } - protected String[] path(String name) { - List list = Lists.newArrayList(fieldNames.descendingIterator()); - list.add(name); - return list.toArray(new String[0]); + return ((StructType) structType).apply(pos).dataType(); } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index ec91e32e9906..cc6428475d8a 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -95,14 +95,14 @@ private static class FallbackReadBuilder extends ReadBuilder { } @Override - public ParquetValueReader message(Types.StructType expected, MessageType message, + public ParquetValueReader message(org.apache.iceberg.types.Type expected, MessageType message, List> fieldReaders) { // the top level matches by ID, but the remaining IDs are missing - return super.struct(expected, message, fieldReaders); + return super.struct(expected.asStructType(), message, fieldReaders); } @Override - public ParquetValueReader struct(Types.StructType ignored, GroupType struct, + public ParquetValueReader struct(org.apache.iceberg.types.Type ignored, GroupType struct, List> fieldReaders) { // the expected struct is ignored because nested fields are never found when the List> newFields = Lists.newArrayListWithExpectedSize( @@ -130,13 +130,13 @@ private static class ReadBuilder extends TypeWithSchemaVisitor message(Types.StructType expected, MessageType message, + public ParquetValueReader message(org.apache.iceberg.types.Type expected, MessageType message, List> fieldReaders) { return struct(expected, message.asGroupType(), fieldReaders); } @Override - public ParquetValueReader struct(Types.StructType expected, GroupType struct, + public ParquetValueReader struct(org.apache.iceberg.types.Type expected, GroupType struct, List> fieldReaders) { // match the expected struct's order Map> readersById = Maps.newHashMap(); @@ -153,7 +153,7 @@ public ParquetValueReader struct(Types.StructType expected, GroupType struct, } List expectedFields = expected != null ? - expected.fields() : ImmutableList.of(); + expected.asStructType().fields() : ImmutableList.of(); List> reorderedFields = Lists.newArrayListWithExpectedSize( expectedFields.size()); List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); @@ -182,7 +182,7 @@ public ParquetValueReader struct(Types.StructType expected, GroupType struct, } @Override - public ParquetValueReader list(Types.ListType expectedList, GroupType array, + public ParquetValueReader list(org.apache.iceberg.types.Type expectedList, GroupType array, ParquetValueReader elementReader) { GroupType repeated = array.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); @@ -197,7 +197,7 @@ public ParquetValueReader list(Types.ListType expectedList, GroupType array, } @Override - public ParquetValueReader map(Types.MapType expectedMap, GroupType map, + public ParquetValueReader map(org.apache.iceberg.types.Type expectedMap, GroupType map, ParquetValueReader keyReader, ParquetValueReader valueReader) { GroupType repeatedKeyValue = map.getFields().get(0).asGroupType(); @@ -217,7 +217,7 @@ public ParquetValueReader map(Types.MapType expectedMap, GroupType map, } @Override - public ParquetValueReader primitive(org.apache.iceberg.types.Type.PrimitiveType expected, + public ParquetValueReader primitive(org.apache.iceberg.types.Type expected, PrimitiveType primitive) { ColumnDescriptor desc = type.getColumnDescription(currentPath()); diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java index ac345566dd37..e044c69db6a2 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java @@ -43,11 +43,9 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.util.ArrayData; import org.apache.spark.sql.catalyst.util.MapData; -import org.apache.spark.sql.types.ArrayType; import org.apache.spark.sql.types.ByteType; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; -import org.apache.spark.sql.types.MapType; import org.apache.spark.sql.types.ShortType; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -70,16 +68,16 @@ private static class WriteBuilder extends ParquetWithSparkSchemaVisitor message(StructType sStruct, MessageType message, + public ParquetValueWriter message(DataType sStruct, MessageType message, List> fieldWriters) { return struct(sStruct, message.asGroupType(), fieldWriters); } @Override - public ParquetValueWriter struct(StructType sStruct, GroupType struct, + public ParquetValueWriter struct(DataType sStruct, GroupType struct, List> fieldWriters) { List fields = struct.getFields(); - StructField[] sparkFields = sStruct.fields(); + StructField[] sparkFields = ((StructType) sStruct).fields(); List> writers = Lists.newArrayListWithExpectedSize(fieldWriters.size()); List sparkTypes = Lists.newArrayList(); for (int i = 0; i < fields.size(); i += 1) { @@ -91,7 +89,7 @@ public ParquetValueWriter struct(StructType sStruct, GroupType struct, } @Override - public ParquetValueWriter list(ArrayType sArray, GroupType array, ParquetValueWriter elementWriter) { + public ParquetValueWriter list(DataType sArray, GroupType array, ParquetValueWriter elementWriter) { GroupType repeated = array.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); @@ -100,11 +98,11 @@ public ParquetValueWriter list(ArrayType sArray, GroupType array, ParquetValu return new ArrayDataWriter<>(repeatedD, repeatedR, newOption(repeated.getType(0), elementWriter), - sArray.elementType()); + arrayElementType(sArray)); } @Override - public ParquetValueWriter map(MapType sMap, GroupType map, + public ParquetValueWriter map(DataType sMap, GroupType map, ParquetValueWriter keyWriter, ParquetValueWriter valueWriter) { GroupType repeatedKeyValue = map.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); @@ -115,7 +113,7 @@ public ParquetValueWriter map(MapType sMap, GroupType map, return new MapDataWriter<>(repeatedD, repeatedR, newOption(repeatedKeyValue.getType(0), keyWriter), newOption(repeatedKeyValue.getType(1), valueWriter), - sMap.keyType(), sMap.valueType()); + mapKeyType(sMap), mapValueType(sMap)); } private ParquetValueWriter newOption(org.apache.parquet.schema.Type fieldType, ParquetValueWriter writer) { diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java index 2834135aa3e2..63121a8cec1f 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java @@ -82,8 +82,8 @@ private static class VectorizedReaderBuilder extends TypeWithSchemaVisitor message( - Types.StructType expected, MessageType message, - List> fieldReaders) { + org.apache.iceberg.types.Type expected, MessageType message, + List> fieldReaders) { GroupType groupType = message.asGroupType(); Map> readersById = Maps.newHashMap(); List fields = groupType.getFields(); @@ -93,7 +93,7 @@ public VectorizedReader message( .forEach(pos -> readersById.put(fields.get(pos).getId().intValue(), fieldReaders.get(pos))); List icebergFields = expected != null ? - expected.fields() : ImmutableList.of(); + expected.asStructType().fields() : ImmutableList.of(); List> reorderedFields = Lists.newArrayListWithExpectedSize( icebergFields.size()); @@ -114,7 +114,7 @@ public VectorizedReader message( @Override public VectorizedReader struct( - Types.StructType expected, GroupType groupType, + org.apache.iceberg.types.Type expected, GroupType groupType, List> fieldReaders) { if (expected != null) { throw new UnsupportedOperationException("Vectorized reads are not supported yet for struct fields"); @@ -124,7 +124,7 @@ public VectorizedReader struct( @Override public VectorizedReader primitive( - org.apache.iceberg.types.Type.PrimitiveType expected, + org.apache.iceberg.types.Type expected, PrimitiveType primitive) { // Create arrow vector for this field