diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java b/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java index 6aee360fef1b..163098c49c94 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java @@ -73,7 +73,7 @@ public Type struct(GroupType struct, List fieldTypes) { org.apache.parquet.schema.Type field = parquetFields.get(i); Preconditions.checkArgument( - !field.isRepetition(Repetition.REPEATED), + field.isPrimitive() || !field.isRepetition(Repetition.REPEATED), "Fields cannot have repetition REPEATED: %s", field); Integer fieldId = getId(field); @@ -96,11 +96,11 @@ public Type struct(GroupType struct, List fieldTypes) { @Override public Type list(GroupType array, Type elementType) { - GroupType repeated = array.getType(0).asGroupType(); - org.apache.parquet.schema.Type element = repeated.getType(0); + org.apache.parquet.schema.Type repeated = array.getType(0); + org.apache.parquet.schema.Type element = repeated.isPrimitive() ? repeated : repeated.asGroupType().getType(0); Preconditions.checkArgument( - !element.isRepetition(Repetition.REPEATED), + element.isPrimitive() || !element.isRepetition(Repetition.REPEATED), "Elements cannot have repetition REPEATED: %s", element); Integer elementFieldId = getId(element); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetTypeVisitor.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetTypeVisitor.java index 43d94b516e6c..6ea9c523672b 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetTypeVisitor.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetTypeVisitor.java @@ -66,17 +66,20 @@ private static T visitList(GroupType list, ParquetTypeVisitor visitor) { Preconditions.checkArgument(list.getFieldCount() == 1, "Invalid list: does not contain single repeated field: %s", list); - GroupType repeatedElement = list.getFields().get(0).asGroupType(); + Type repeatedElement = list.getFields().get(0); + Preconditions.checkArgument(repeatedElement.isRepetition(Type.Repetition.REPEATED), "Invalid list: inner group is not repeated"); - Preconditions.checkArgument(repeatedElement.getFieldCount() <= 1, - "Invalid list: repeated group is not a single field: %s", list); + + Preconditions.checkArgument( + repeatedElement.isPrimitive() || repeatedElement.asGroupType().getFieldCount() <= 1, + "Invalid list: repeated group is not a single field or primitive: %s", list); visitor.beforeRepeatedElement(repeatedElement); try { T elementResult = null; - if (repeatedElement.getFieldCount() > 0) { - Type elementField = repeatedElement.getType(0); + if (repeatedElement.isPrimitive() || repeatedElement.asGroupType().getFieldCount() > 0) { + Type elementField = repeatedElement.isPrimitive() ? repeatedElement : repeatedElement.asGroupType().getType(0); visitor.beforeElementField(elementField); try { elementResult = visit(elementField, visitor); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/TypeWithSchemaVisitor.java b/parquet/src/main/java/org/apache/iceberg/parquet/TypeWithSchemaVisitor.java index 6dd76d722074..e0379b279966 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/TypeWithSchemaVisitor.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/TypeWithSchemaVisitor.java @@ -63,10 +63,12 @@ public static T visit(org.apache.iceberg.types.Type iType, Type type, TypeWi Preconditions.checkArgument(group.getFieldCount() == 1, "Invalid list: does not contain single repeated field: %s", group); - GroupType repeatedElement = group.getFields().get(0).asGroupType(); + Type repeatedElement = group.getFields().get(0); + Preconditions.checkArgument(repeatedElement.isRepetition(Type.Repetition.REPEATED), "Invalid list: inner group is not repeated"); - Preconditions.checkArgument(repeatedElement.getFieldCount() <= 1, + Preconditions.checkArgument( + repeatedElement.isPrimitive() || repeatedElement.asGroupType().getFieldCount() <= 1, "Invalid list: repeated group is not a single field: %s", group); Types.ListType list = null; @@ -79,8 +81,10 @@ public static T visit(org.apache.iceberg.types.Type iType, Type type, TypeWi visitor.fieldNames.push(repeatedElement.getName()); try { T elementResult = null; - if (repeatedElement.getFieldCount() > 0) { - elementResult = visitField(element, repeatedElement.getType(0), visitor); + if (repeatedElement.isPrimitive()) { + elementResult = visit(element != null ? element.type() : null, repeatedElement, visitor); + } else if (repeatedElement.asGroupType().getFieldCount() > 0) { + elementResult = visitField(element, repeatedElement.asGroupType().getType(0), visitor); } return visitor.list(list, group, elementResult); diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java index 92ca00a76469..342efebd1938 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java @@ -181,6 +181,34 @@ public void testSchemaConversionWithoutAssigningIds() { Assert.assertEquals("Schema must match", expectedSchema.asStruct(), actualSchema.asStruct()); } + @Test + public void testSchemaConversionListByteArray() { + MessageType messageType = new MessageType("test", + list(1, "my_bytes", Repetition.OPTIONAL, + primitive(2, "array", PrimitiveTypeName.BINARY, Repetition.REPEATED)) + ); + + Schema expectedSchema = new Schema( + optional(1, "my_bytes", Types.ListType.ofRequired(2, Types.BinaryType.get())) + ); + + Schema actualSchema = ParquetSchemaUtil.convertAndPrune(messageType); + Assert.assertEquals("Schema must match", expectedSchema.asStruct(), actualSchema.asStruct()); + } + + @Test + public void testSchemaConversionByteArray() { + MessageType messageType = new MessageType("test", + primitive(1, "array", PrimitiveTypeName.BINARY, Repetition.REPEATED)); + + Schema expectedSchema = new Schema( + required(1, "array", Types.BinaryType.get()) + ); + + Schema actualSchema = ParquetSchemaUtil.convertAndPrune(messageType); + Assert.assertEquals("Schema must match", expectedSchema.asStruct(), actualSchema.asStruct()); + } + private Type primitive(Integer id, String name, PrimitiveTypeName typeName, Repetition repetition) { PrimitiveBuilder builder = org.apache.parquet.schema.Types.primitive(typeName, repetition); if (id != null) { diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index ec91e32e9906..5144149f1153 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -184,14 +184,14 @@ public ParquetValueReader struct(Types.StructType expected, GroupType struct, @Override public ParquetValueReader list(Types.ListType expectedList, GroupType array, ParquetValueReader elementReader) { - GroupType repeated = array.getFields().get(0).asGroupType(); + Type repeated = array.getFields().get(0); String[] repeatedPath = currentPath(); int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1; int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1; - Type elementType = repeated.getType(0); - int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1; + Type elementType = repeated.isPrimitive() ? repeated : repeated.asGroupType().getType(0); + int elementD = repeated.isPrimitive() ? repeatedD : type.getMaxDefinitionLevel(path(elementType.getName())) - 1; return new ArrayReader<>(repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader)); } diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestMalformedParquetFromAvro.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestMalformedParquetFromAvro.java new file mode 100644 index 000000000000..cc995e38141a --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestMalformedParquetFromAvro.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.parquet.ParquetSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.schema.MessageType; +import org.apache.spark.sql.catalyst.InternalRow; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestMalformedParquetFromAvro { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + + @Test + public void testWriteReadAvroBinary() throws IOException { + String schema = "{" + + "\"type\":\"record\"," + + "\"name\":\"DbRecord\"," + + "\"namespace\":\"com.iceberg\"," + + "\"fields\":[" + + "{\"name\":\"arraybytes\", " + + "\"type\":[ \"null\", { \"type\":\"array\", \"items\":\"bytes\"}], \"default\":null}," + + "{\"name\":\"topbytes\", \"type\":\"bytes\"}" + + "]" + + "}"; + + org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser(); + org.apache.avro.Schema avroSchema = parser.parse(schema); + AvroSchemaConverter converter = new AvroSchemaConverter(); + MessageType parquetScehma = converter.convert(avroSchema); + Schema icebergSchema = ParquetSchemaUtil.convert(parquetScehma); + + File testFile = temp.newFile(); + Assert.assertTrue(testFile.delete()); + + ParquetWriter writer = AvroParquetWriter.builder(new Path(testFile.toURI())) + .withDataModel(GenericData.get()) + .withSchema(avroSchema) + .build(); + + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema); + List expectedByteList = new ArrayList(); + byte[] expectedByte = {0x00, 0x01}; + expectedByteList.add(ByteBuffer.wrap(expectedByte)); + + recordBuilder.set("arraybytes", expectedByteList); + recordBuilder.set("topbytes", ByteBuffer.wrap(expectedByte)); + GenericData.Record record = recordBuilder.build(); + writer.write(record); + writer.close(); + + List rows; + try (CloseableIterable reader = + Parquet.read(Files.localInput(testFile)) + .project(icebergSchema) + .createReaderFunc(type -> SparkParquetReaders.buildReader(icebergSchema, type)) + .build()) { + rows = Lists.newArrayList(reader); + } + + InternalRow row = rows.get(0); + Assert.assertArrayEquals(row.getArray(0).getBinary(0), expectedByte); + Assert.assertArrayEquals(row.getBinary(1), expectedByte); + } + +}