diff --git a/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java b/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java index db23a5de57ad..73839ef2131a 100644 --- a/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java @@ -21,9 +21,15 @@ import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.data.DataTest; import org.apache.iceberg.data.DataTestHelpers; import org.apache.iceberg.data.GenericRecord; @@ -34,7 +40,13 @@ import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetWriter; import org.junit.Assert; +import org.junit.Test; + +import static org.apache.iceberg.types.Types.NestedField.optional; public class TestGenericData extends DataTest { @Override @@ -78,4 +90,52 @@ protected void writeAndValidate(Schema schema) throws IOException { } } } + + @Test + public void testTwoLevelList() throws IOException { + Schema schema = new Schema( + optional(1, "arraybytes", Types.ListType.ofRequired(3, Types.BinaryType.get())), + optional(2, "topbytes", Types.BinaryType.get()) + ); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + + File testFile = temp.newFile(); + Assert.assertTrue(testFile.delete()); + + ParquetWriter + writer = AvroParquetWriter.builder(new Path(testFile.toURI())) + .withDataModel(GenericData.get()) + .withSchema(avroSchema) + .config("parquet.avro.add-list-element-records", "true") + .config("parquet.avro.write-old-list-structure", "true") + .build(); + + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema); + List expectedByteList = new ArrayList(); + byte[] expectedByte = {0x00, 0x01}; + ByteBuffer expectedBinary = ByteBuffer.wrap(expectedByte); + expectedByteList.add(expectedBinary); + recordBuilder.set("arraybytes", expectedByteList); + recordBuilder.set("topbytes", expectedBinary); + GenericData.Record expectedRecord = recordBuilder.build(); + + writer.write(expectedRecord); + writer.close(); + + // test reuseContainers + try (CloseableIterable reader = Parquet.read(Files.localInput(testFile)) + .project(schema) + .reuseContainers() + .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema)) + .build()) { + CloseableIterator it = reader.iterator(); + Assert.assertTrue("Should have at least one row", it.hasNext()); + while (it.hasNext()) { + GenericRecord actualRecord = (GenericRecord) it.next(); + Assert.assertEquals(actualRecord.get(0, ArrayList.class).get(0), expectedBinary); + Assert.assertEquals(actualRecord.get(1, ByteBuffer.class), expectedBinary); + Assert.assertFalse("Should not have more than one row", it.hasNext()); + } + } + } } diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index d7088b4700e5..b0fb3538e39b 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -36,6 +36,7 @@ import org.apache.flink.table.data.TimestampData; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; +import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; import org.apache.iceberg.parquet.TypeWithSchemaVisitor; @@ -143,13 +144,12 @@ public ParquetValueReader list(Types.ListType expectedList, GroupType array, return null; } - GroupType repeated = array.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1; int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1; - Type elementType = repeated.getType(0); + Type elementType = ParquetSchemaUtil.determineListElementType(array); int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1; return new ArrayReader<>(repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader)); diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java index 9af37c85d72a..88e439c0fe2b 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java @@ -21,11 +21,19 @@ import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.data.DataTest; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; @@ -35,11 +43,60 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Types; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetWriter; import org.junit.Assert; +import org.junit.Test; + +import static org.apache.iceberg.types.Types.NestedField.optional; public class TestFlinkParquetReader extends DataTest { private static final int NUM_RECORDS = 100; + @Test + public void testTwoLevelList() throws IOException { + Schema schema = new Schema( + optional(1, "arraybytes", Types.ListType.ofRequired(3, Types.BinaryType.get())), + optional(2, "topbytes", Types.BinaryType.get()) + ); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + + File testFile = temp.newFile(); + Assert.assertTrue(testFile.delete()); + + ParquetWriter writer = AvroParquetWriter.builder(new Path(testFile.toURI())) + .withDataModel(GenericData.get()) + .withSchema(avroSchema) + .config("parquet.avro.add-list-element-records", "true") + .config("parquet.avro.write-old-list-structure", "true") + .build(); + + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema); + List expectedByteList = new ArrayList(); + byte[] expectedByte = {0x00, 0x01}; + ByteBuffer expectedBinary = ByteBuffer.wrap(expectedByte); + expectedByteList.add(expectedBinary); + recordBuilder.set("arraybytes", expectedByteList); + recordBuilder.set("topbytes", expectedBinary); + GenericData.Record expectedRecord = recordBuilder.build(); + + writer.write(expectedRecord); + writer.close(); + + try (CloseableIterable reader = Parquet.read(Files.localInput(testFile)) + .project(schema) + .createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, type)) + .build()) { + Iterator rows = reader.iterator(); + Assert.assertTrue("Should have at least one row", rows.hasNext()); + RowData rowData = rows.next(); + Assert.assertArrayEquals(rowData.getArray(0).getBinary(0), expectedByte); + Assert.assertArrayEquals(rowData.getBinary(1), expectedByte); + Assert.assertFalse("Should not have more than one row", rows.hasNext()); + } + } + private void writeAndValidate(Iterable iterable, Schema schema) throws IOException { File testFile = temp.newFile(); Assert.assertTrue("Delete should succeed", testFile.delete()); diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index e0377050ba10..63401d989418 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -183,13 +183,12 @@ public ParquetValueReader list(Types.ListType expectedList, GroupType array, return null; } - GroupType repeated = array.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1; int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1; - Type elementType = repeated.getType(0); + Type elementType = ParquetSchemaUtil.determineListElementType(array); int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1; return new ParquetValueReaders.ListReader<>(repeatedD, repeatedR, diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java b/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java index b85dabc8e1f2..363631c8c04d 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java @@ -67,11 +67,17 @@ public Type list(GroupType list, Type elementType) { Preconditions.checkArgument(elementType != null, "List type must have element field"); + Type listElement = ParquetSchemaUtil.determineListElementType(list); MappedField field = nameMapping.find(currentPath()); - Type listType = Types.buildGroup(list.getRepetition()) - .as(LogicalTypeAnnotation.listType()) - .repeatedGroup().addFields(elementType).named(list.getFieldName(0)) - .named(list.getName()); + + Types.GroupBuilder listBuilder = Types.buildGroup(list.getRepetition()) + .as(LogicalTypeAnnotation.listType()); + if (listElement.isRepetition(Type.Repetition.REPEATED)) { + listBuilder.addFields(elementType); + } else { + listBuilder.repeatedGroup().addFields(elementType).named(list.getFieldName(0)); + } + Type listType = listBuilder.named(list.getName()); return field == null ? listType : listType.withId(field.id()); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java b/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java index 6aee360fef1b..e1476a06c877 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java @@ -96,12 +96,7 @@ public Type struct(GroupType struct, List fieldTypes) { @Override public Type list(GroupType array, Type elementType) { - GroupType repeated = array.getType(0).asGroupType(); - org.apache.parquet.schema.Type element = repeated.getType(0); - - Preconditions.checkArgument( - !element.isRepetition(Repetition.REPEATED), - "Elements cannot have repetition REPEATED: %s", element); + org.apache.parquet.schema.Type element = ParquetSchemaUtil.determineListElementType(array); Integer elementFieldId = getId(element); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvroValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvroValueReaders.java index fe2c8467bd40..e71f65178f0f 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvroValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvroValueReaders.java @@ -121,13 +121,12 @@ public ParquetValueReader struct(Types.StructType expected, GroupType struct, @Override public ParquetValueReader list(Types.ListType expectedList, GroupType array, ParquetValueReader elementReader) { - GroupType repeated = array.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1; int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1; - Type elementType = repeated.getType(0); + Type elementType = ParquetSchemaUtil.determineListElementType(array); int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1; return new ListReader<>(repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader)); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java index 3605ff71e041..0370f8c02ec7 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java @@ -164,4 +164,58 @@ public Boolean primitive(PrimitiveType primitive) { } } + public static Type determineListElementType(GroupType array) { + Type repeated = array.getFields().get(0); + boolean isOldListElementType = isOldListElementType(array); + + return isOldListElementType ? repeated : repeated.asGroupType().getType(0); + } + + // Parquet LIST backwards-compatibility rules. + // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules + static boolean isOldListElementType(GroupType list) { + Type repeatedType = list.getFields().get(0); + String parentName = list.getName(); + + return + // For legacy 2-level list types with primitive element type, e.g.: + // + // // ARRAY (nullable list, non-null elements) + // optional group my_list (LIST) { + // repeated int32 element; + // } + // + repeatedType.isPrimitive() || + // For legacy 2-level list types whose element type is a group type with 2 or more fields, + // e.g.: + // + // // ARRAY> (nullable list, non-null elements) + // optional group my_list (LIST) { + // repeated group element { + // required binary str (UTF8); + // required int32 num; + // }; + // } + // + repeatedType.asGroupType().getFieldCount() > 1 || + // For legacy 2-level list types generated by parquet-avro (Parquet version < 1.6.0), e.g.: + // + // // ARRAY> (nullable list, non-null elements) + // optional group my_list (LIST) { + // repeated group array { + // required binary str (UTF8); + // }; + // } + repeatedType.getName().equals("array") || + // For Parquet data generated by parquet-thrift, e.g.: + // + // // ARRAY> (nullable list, non-null elements) + // optional group my_list (LIST) { + // repeated group my_list_tuple { + // required binary str (UTF8); + // }; + // } + // + repeatedType.getName().equals(parentName + "_tuple"); + } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetTypeVisitor.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetTypeVisitor.java index 43d94b516e6c..b7668f0a8b3b 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetTypeVisitor.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetTypeVisitor.java @@ -66,30 +66,41 @@ private static T visitList(GroupType list, ParquetTypeVisitor visitor) { Preconditions.checkArgument(list.getFieldCount() == 1, "Invalid list: does not contain single repeated field: %s", list); - GroupType repeatedElement = list.getFields().get(0).asGroupType(); + Type repeatedElement = list.getFields().get(0); Preconditions.checkArgument(repeatedElement.isRepetition(Type.Repetition.REPEATED), "Invalid list: inner group is not repeated"); - Preconditions.checkArgument(repeatedElement.getFieldCount() <= 1, - "Invalid list: repeated group is not a single field: %s", list); - visitor.beforeRepeatedElement(repeatedElement); - try { - T elementResult = null; - if (repeatedElement.getFieldCount() > 0) { - Type elementField = repeatedElement.getType(0); - visitor.beforeElementField(elementField); - try { - elementResult = visit(elementField, visitor); - } finally { - visitor.afterElementField(elementField); - } - } + Type listElement = ParquetSchemaUtil.determineListElementType(list); + if (listElement.isRepetition(Type.Repetition.REPEATED)) { + T elementResult = visitListElement(listElement, visitor); + return visitor.list(list, elementResult); + } else { + return visitThreeLevelList(list, repeatedElement, listElement, visitor); + } + } + private static T visitThreeLevelList( + GroupType list, Type repeated, Type listElement, ParquetTypeVisitor visitor) { + visitor.beforeRepeatedElement(repeated); + try { + T elementResult = visitListElement(listElement, visitor); return visitor.list(list, elementResult); + } finally { + visitor.afterRepeatedElement(repeated); + } + } + private static T visitListElement(Type listElement, ParquetTypeVisitor visitor) { + T elementResult = null; + + visitor.beforeElementField(listElement); + try { + elementResult = visit(listElement, visitor); } finally { - visitor.afterRepeatedElement(repeatedElement); + visitor.afterElementField(listElement); } + + return elementResult; } private static T visitMap(GroupType map, ParquetTypeVisitor visitor) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java b/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java index aafa2902bc64..4d82fbd6f921 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java @@ -108,15 +108,19 @@ public Type struct(GroupType struct, List fields) { @Override public Type list(GroupType list, Type element) { - GroupType repeated = list.getType(0).asGroupType(); - Type originalElement = repeated.getType(0); + Type repeated = list.getType(0); + Type originalElement = ParquetSchemaUtil.determineListElementType(list); Integer elementId = getId(originalElement); if (elementId != null && selectedIds.contains(elementId)) { return list; } else if (element != null) { if (!Objects.equal(element, originalElement)) { - return list.withNewFields(repeated.withNewFields(element)); + if (originalElement.isRepetition(Type.Repetition.REPEATED)) { + return list.withNewFields(element); + } else { + return list.withNewFields(repeated.asGroupType().withNewFields(element)); + } } return list; } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/TypeWithSchemaVisitor.java b/parquet/src/main/java/org/apache/iceberg/parquet/TypeWithSchemaVisitor.java index 3a9b439cb300..4bc8301f305c 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/TypeWithSchemaVisitor.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/TypeWithSchemaVisitor.java @@ -63,12 +63,11 @@ public static T visit(org.apache.iceberg.types.Type iType, Type type, TypeWi Preconditions.checkArgument(group.getFieldCount() == 1, "Invalid list: does not contain single repeated field: %s", group); - GroupType repeatedElement = group.getFields().get(0).asGroupType(); + Type repeatedElement = group.getFields().get(0); Preconditions.checkArgument(repeatedElement.isRepetition(Type.Repetition.REPEATED), "Invalid list: inner group is not repeated"); - Preconditions.checkArgument(repeatedElement.getFieldCount() <= 1, - "Invalid list: repeated group is not a single field: %s", group); + Type listElement = ParquetSchemaUtil.determineListElementType(group); Types.ListType list = null; Types.NestedField element = null; if (iType != null) { @@ -76,16 +75,10 @@ public static T visit(org.apache.iceberg.types.Type iType, Type type, TypeWi element = list.fields().get(0); } - visitor.fieldNames.push(repeatedElement.getName()); - try { - T elementResult = null; - if (repeatedElement.getFieldCount() > 0) { - elementResult = visitField(element, repeatedElement.getType(0), visitor); - } - - return visitor.list(list, group, elementResult); - } finally { - visitor.fieldNames.pop(); + if (listElement.isRepetition(Type.Repetition.REPEATED)) { + return visitTwoLevelList(list, element, group, listElement, visitor); + } else { + return visitThreeLevelList(list, element, group, listElement, visitor); } case MAP: @@ -149,6 +142,27 @@ public static T visit(org.apache.iceberg.types.Type iType, Type type, TypeWi } } + private static T visitTwoLevelList( + Types.ListType iListType, Types.NestedField iListElement, GroupType pListType, Type pListElement, + TypeWithSchemaVisitor visitor) { + T elementResult = visitField(iListElement, pListElement, visitor); + return visitor.list(iListType, pListType, elementResult); + } + + private static T visitThreeLevelList( + Types.ListType iListType, Types.NestedField iListElement, GroupType pListType, Type pListElement, + TypeWithSchemaVisitor visitor) { + visitor.fieldNames.push(pListType.getFieldName(0)); + + try { + T elementResult = visitField(iListElement, pListElement, visitor); + + return visitor.list(iListType, pListType, elementResult); + } finally { + visitor.fieldNames.pop(); + } + } + private static T visitField(Types.NestedField iField, Type field, TypeWithSchemaVisitor visitor) { visitor.fieldNames.push(field.getName()); try { diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java index 048a0ff42cd8..ecac1796f5e4 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java @@ -21,17 +21,27 @@ import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.function.Function; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.IntegerType; import org.apache.iceberg.util.Pair; +import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.schema.MessageType; import org.junit.Assert; import org.junit.Rule; @@ -97,6 +107,46 @@ public void testNumberOfBytesWritten() throws IOException { Assert.assertEquals(expectedSize, actualSize); } + @Test + public void testTwoLevelList() throws IOException { + Schema schema = new Schema( + optional(1, "arraybytes", Types.ListType.ofRequired(3, Types.BinaryType.get())), + optional(2, "topbytes", Types.BinaryType.get()) + ); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + + File testFile = temp.newFile(); + Assert.assertTrue(testFile.delete()); + + ParquetWriter writer = AvroParquetWriter.builder(new Path(testFile.toURI())) + .withDataModel(GenericData.get()) + .withSchema(avroSchema) + .config("parquet.avro.add-list-element-records", "true") + .config("parquet.avro.write-old-list-structure", "true") + .build(); + + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema); + List expectedByteList = new ArrayList(); + byte[] expectedByte = {0x00, 0x01}; + ByteBuffer expectedBinary = ByteBuffer.wrap(expectedByte); + expectedByteList.add(expectedBinary); + recordBuilder.set("arraybytes", expectedByteList); + recordBuilder.set("topbytes", expectedBinary); + GenericData.Record expectedRecord = recordBuilder.build(); + + writer.write(expectedRecord); + writer.close(); + + GenericData.Record recordRead = Iterables.getOnlyElement(Parquet.read(Files.localInput(testFile)) + .project(schema) + .callInit() + .build()); + + Assert.assertEquals(expectedByteList, recordRead.get("arraybytes")); + Assert.assertEquals(expectedBinary, recordRead.get("topbytes")); + } + + private Pair generateFileWithTwoRowGroups(Function> createWriterFunc) throws IOException { Schema schema = new Schema( diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java index da943d901345..170a167c0f6f 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java @@ -204,6 +204,105 @@ public void testSchemaConversionForHiveStyleLists() { Assert.assertEquals("Schema must match", expectedSchema.asStruct(), actualSchema.asStruct()); } + @Test + public void testLegacyTwoLevelListTypeWithPrimitiveElement() { + String parquetSchemaString = + "message spark_schema {\n" + + " optional group arraybytes (LIST) {\n" + + " repeated binary array;\n" + + " }\n" + + "}\n"; + MessageType messageType = MessageTypeParser.parseMessageType(parquetSchemaString); + + Schema expectedSchema = new Schema( + optional(1, "arraybytes", Types.ListType.ofRequired(1000, Types.BinaryType.get())) + ); + + Schema actualSchema = ParquetSchemaUtil.convert(messageType); + Assert.assertEquals("Schema must match", expectedSchema.asStruct(), actualSchema.asStruct()); + } + + @Test + public void testLegacyTwoLevelListTypeWithGroupTypeElementWithTwoFields() { + String messageType = + "message root {" + + " required group f0 {" + + " required group f00 (LIST) {" + + " repeated group element {" + + " required int32 f000;" + + " optional int64 f001;" + + " }" + + " }" + + " }" + + "}"; + + MessageType parquetScehma = MessageTypeParser.parseMessageType(messageType); + Schema expectedSchema = new Schema( + required(1, "f0", Types.StructType.of( + required(1003, "f00", Types.ListType.ofRequired( + 1002, + Types.StructType.of( + required(1000, "f000", Types.IntegerType.get()), + optional(1001, "f001", Types.LongType.get()) + ) + )) + )) + ); + + Schema actualSchema = ParquetSchemaUtil.convert(parquetScehma); + Assert.assertEquals("Schema must match", expectedSchema.asStruct(), actualSchema.asStruct()); + } + + @Test + public void testLegacyTwoLevelListGenByParquetAvro() { + String messageType = + "message root {" + + " optional group my_list (LIST) {" + + " repeated group array {" + + " required binary str (UTF8);" + + " }" + + " }" + + "}"; + + MessageType parquetScehma = MessageTypeParser.parseMessageType(messageType); + Schema expectedSchema = new Schema( + optional(1, "my_list", Types.ListType.ofRequired( + 1001, + Types.StructType.of( + required(1000, "str", Types.StringType.get()) + ) + )) + ); + + Schema actualSchema = ParquetSchemaUtil.convert(parquetScehma); + Assert.assertEquals("Schema must match", expectedSchema.asStruct(), actualSchema.asStruct()); + } + + @Test + public void testLegacyTwoLevelListGenByParquetThrift() { + String messageType = + "message root {" + + " optional group my_list (LIST) {" + + " repeated group my_list_tuple {" + + " required binary str (UTF8);" + + " }" + + " }" + + "}"; + + MessageType parquetScehma = MessageTypeParser.parseMessageType(messageType); + Schema expectedSchema = new Schema( + optional(1, "my_list", Types.ListType.ofRequired( + 1001, + Types.StructType.of( + required(1000, "str", Types.StringType.get()) + ) + )) + ); + + Schema actualSchema = ParquetSchemaUtil.convert(parquetScehma); + Assert.assertEquals("Schema must match", expectedSchema.asStruct(), actualSchema.asStruct()); + } + private Type primitive(Integer id, String name, PrimitiveTypeName typeName, Repetition repetition) { PrimitiveBuilder builder = org.apache.parquet.schema.Types.primitive(typeName, repetition); if (id != null) { diff --git a/pig/src/main/java/org/apache/iceberg/pig/PigParquetReader.java b/pig/src/main/java/org/apache/iceberg/pig/PigParquetReader.java index c90c8a910b16..9fddba474b3c 100644 --- a/pig/src/main/java/org/apache/iceberg/pig/PigParquetReader.java +++ b/pig/src/main/java/org/apache/iceberg/pig/PigParquetReader.java @@ -174,13 +174,12 @@ public ParquetValueReader struct( @Override public ParquetValueReader list( Types.ListType expectedList, GroupType array, ParquetValueReader elementReader) { - GroupType repeated = array.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1; int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1; - Type elementType = repeated.getType(0); + Type elementType = ParquetSchemaUtil.determineListElementType(array); int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1; return new ArrayReader<>(repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader)); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 8abee4a575e1..35627ace23b0 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -187,13 +187,12 @@ public ParquetValueReader struct(Types.StructType expected, GroupType struct, @Override public ParquetValueReader list(Types.ListType expectedList, GroupType array, ParquetValueReader elementReader) { - GroupType repeated = array.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1; int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1; - Type elementType = repeated.getType(0); + Type elementType = ParquetSchemaUtil.determineListElementType(array); int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1; return new ArrayReader<>(repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader)); diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java index e10c327a76bd..6cbfdc0d3d1b 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java @@ -20,15 +20,19 @@ package org.apache.iceberg.spark.actions; import java.io.File; +import java.io.FilenameFilter; import java.io.IOException; import java.net.URI; import java.nio.file.Paths; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.TrueFileFilter; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.actions.MigrateTable; @@ -43,6 +47,11 @@ import org.apache.iceberg.spark.source.SimpleRecord; import org.apache.iceberg.spark.source.SparkTable; import org.apache.iceberg.types.Types; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; @@ -54,6 +63,10 @@ import org.apache.spark.sql.catalyst.parser.ParseException; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.types.ArrayType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.junit.After; import org.junit.Assert; @@ -136,6 +149,7 @@ public void before() { spark.conf().set("hive.exec.dynamic.partition", "true"); spark.conf().set("hive.exec.dynamic.partition.mode", "nonstrict"); spark.conf().set("spark.sql.parquet.writeLegacyFormat", false); + spark.conf().set("spark.sql.parquet.writeLegacyFormat", false); spark.sql(String.format("DROP TABLE IF EXISTS %s", baseTableName)); List expected = Lists.newArrayList( @@ -610,7 +624,73 @@ public void testStructOfThreeLevelLists() throws Exception { structOfThreeLevelLists(false); } - public void threeLevelList(boolean useLegacyMode) throws Exception { + @Test + public void testTwoLevelList() throws IOException { + spark.conf().set("spark.sql.parquet.writeLegacyFormat", true); + + String tableName = sourceName("testTwoLevelList"); + File location = temp.newFolder(); + + StructType sparkSchema = + new StructType( + new StructField[]{ + new StructField( + "col1", new ArrayType( + new StructType( + new StructField[]{ + new StructField( + "col2", + DataTypes.IntegerType, + false, + Metadata.empty()) + }), false), true, Metadata.empty())}); + + // even though this list looks like three level list, it is actually a 2-level list where the items are + // structs with 1 field. + String expectedParquetSchema = + "message spark_schema {\n" + + " optional group col1 (LIST) {\n" + + " repeated group array {\n" + + " required int32 col2;\n" + + " }\n" + + " }\n" + + "}\n"; + + // generate parquet file with required schema + List testData = Collections.singletonList("{\"col1\": [{\"col2\": 1}]}"); + spark.read().schema(sparkSchema).json( + JavaSparkContext.fromSparkContext(spark.sparkContext()).parallelize(testData)) + .coalesce(1).write().format("parquet").mode(SaveMode.Append).save(location.getPath()); + + File parquetFile = Arrays.stream(Objects.requireNonNull(location.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.endsWith("parquet"); + } + }))).findAny().get(); + + // verify generated parquet file has expected schema + ParquetFileReader pqReader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(parquetFile.getPath()), + spark.sessionState().newHadoopConf())); + MessageType schema = pqReader.getFooter().getFileMetaData().getSchema(); + Assert.assertEquals(MessageTypeParser.parseMessageType(expectedParquetSchema), schema); + + // create sql table on top of it + sql("CREATE EXTERNAL TABLE %s (col1 ARRAY>)" + + " STORED AS parquet" + + " LOCATION '%s'", tableName, location); + List expected = sql("select array(struct(1))"); + + // migrate table + SparkActions.get().migrateTable(tableName).execute(); + + // check migrated table is returning expected result + List results = sql("SELECT * FROM %s", tableName); + Assert.assertTrue(results.size() > 0); + assertEquals("Output must match", expected, results); + } + + private void threeLevelList(boolean useLegacyMode) throws Exception { spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode); String tableName = sourceName(String.format("threeLevelList_%s", useLegacyMode));