Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public Type struct(GroupType struct, List<Type> fieldTypes) {
org.apache.parquet.schema.Type field = parquetFields.get(i);

Preconditions.checkArgument(
!field.isRepetition(Repetition.REPEATED),
field.isPrimitive() || !field.isRepetition(Repetition.REPEATED),
"Fields cannot have repetition REPEATED: %s", field);

Integer fieldId = getId(field);
Expand All @@ -96,11 +96,11 @@ public Type struct(GroupType struct, List<Type> fieldTypes) {

@Override
public Type list(GroupType array, Type elementType) {
GroupType repeated = array.getType(0).asGroupType();
org.apache.parquet.schema.Type element = repeated.getType(0);
org.apache.parquet.schema.Type repeated = array.getType(0);
org.apache.parquet.schema.Type element = repeated.isPrimitive() ? repeated : repeated.asGroupType().getType(0);

Preconditions.checkArgument(
!element.isRepetition(Repetition.REPEATED),
element.isPrimitive() || !element.isRepetition(Repetition.REPEATED),
"Elements cannot have repetition REPEATED: %s", element);

Integer elementFieldId = getId(element);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,20 @@ private static <T> T visitList(GroupType list, ParquetTypeVisitor<T> visitor) {
Preconditions.checkArgument(list.getFieldCount() == 1,
"Invalid list: does not contain single repeated field: %s", list);

GroupType repeatedElement = list.getFields().get(0).asGroupType();
Type repeatedElement = list.getFields().get(0);

Preconditions.checkArgument(repeatedElement.isRepetition(Type.Repetition.REPEATED),
"Invalid list: inner group is not repeated");
Preconditions.checkArgument(repeatedElement.getFieldCount() <= 1,
"Invalid list: repeated group is not a single field: %s", list);

Preconditions.checkArgument(
repeatedElement.isPrimitive() || repeatedElement.asGroupType().getFieldCount() <= 1,
"Invalid list: repeated group is not a single field or primitive: %s", list);

visitor.beforeRepeatedElement(repeatedElement);
try {
T elementResult = null;
if (repeatedElement.getFieldCount() > 0) {
Type elementField = repeatedElement.getType(0);
if (repeatedElement.isPrimitive() || repeatedElement.asGroupType().getFieldCount() > 0) {
Type elementField = repeatedElement.isPrimitive() ? repeatedElement : repeatedElement.asGroupType().getType(0);
visitor.beforeElementField(elementField);
try {
elementResult = visit(elementField, visitor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ public static <T> T visit(org.apache.iceberg.types.Type iType, Type type, TypeWi
Preconditions.checkArgument(group.getFieldCount() == 1,
"Invalid list: does not contain single repeated field: %s", group);

GroupType repeatedElement = group.getFields().get(0).asGroupType();
Type repeatedElement = group.getFields().get(0);

Preconditions.checkArgument(repeatedElement.isRepetition(Type.Repetition.REPEATED),
"Invalid list: inner group is not repeated");
Preconditions.checkArgument(repeatedElement.getFieldCount() <= 1,
Preconditions.checkArgument(
repeatedElement.isPrimitive() || repeatedElement.asGroupType().getFieldCount() <= 1,
"Invalid list: repeated group is not a single field: %s", group);

Types.ListType list = null;
Expand All @@ -79,8 +81,10 @@ public static <T> T visit(org.apache.iceberg.types.Type iType, Type type, TypeWi
visitor.fieldNames.push(repeatedElement.getName());
try {
T elementResult = null;
if (repeatedElement.getFieldCount() > 0) {
elementResult = visitField(element, repeatedElement.getType(0), visitor);
if (repeatedElement.isPrimitive()) {
elementResult = visit(element != null ? element.type() : null, repeatedElement, visitor);
} else if (repeatedElement.asGroupType().getFieldCount() > 0) {
elementResult = visitField(element, repeatedElement.asGroupType().getType(0), visitor);
}

return visitor.list(list, group, elementResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,34 @@ public void testSchemaConversionWithoutAssigningIds() {
Assert.assertEquals("Schema must match", expectedSchema.asStruct(), actualSchema.asStruct());
}

@Test
public void testSchemaConversionListByteArray() {
MessageType messageType = new MessageType("test",
list(1, "my_bytes", Repetition.OPTIONAL,
primitive(2, "array", PrimitiveTypeName.BINARY, Repetition.REPEATED))
);

Schema expectedSchema = new Schema(
optional(1, "my_bytes", Types.ListType.ofRequired(2, Types.BinaryType.get()))
);

Schema actualSchema = ParquetSchemaUtil.convertAndPrune(messageType);
Assert.assertEquals("Schema must match", expectedSchema.asStruct(), actualSchema.asStruct());
}

@Test
public void testSchemaConversionByteArray() {
MessageType messageType = new MessageType("test",
primitive(1, "array", PrimitiveTypeName.BINARY, Repetition.REPEATED));

Schema expectedSchema = new Schema(
required(1, "array", Types.BinaryType.get())
);

Schema actualSchema = ParquetSchemaUtil.convertAndPrune(messageType);
Assert.assertEquals("Schema must match", expectedSchema.asStruct(), actualSchema.asStruct());
}

private Type primitive(Integer id, String name, PrimitiveTypeName typeName, Repetition repetition) {
PrimitiveBuilder<PrimitiveType> builder = org.apache.parquet.schema.Types.primitive(typeName, repetition);
if (id != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,14 @@ public ParquetValueReader<?> struct(Types.StructType expected, GroupType struct,
@Override
public ParquetValueReader<?> list(Types.ListType expectedList, GroupType array,
ParquetValueReader<?> elementReader) {
GroupType repeated = array.getFields().get(0).asGroupType();
Type repeated = array.getFields().get(0);
String[] repeatedPath = currentPath();

int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;

Type elementType = repeated.getType(0);
int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1;
Type elementType = repeated.isPrimitive() ? repeated : repeated.asGroupType().getType(0);
int elementD = repeated.isPrimitive() ? repeatedD : type.getMaxDefinitionLevel(path(elementType.getName())) - 1;

return new ArrayReader<>(repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark.data;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.schema.MessageType;
import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TestMalformedParquetFromAvro {

@Rule
public TemporaryFolder temp = new TemporaryFolder();


@Test
public void testWriteReadAvroBinary() throws IOException {
String schema = "{" +
"\"type\":\"record\"," +
"\"name\":\"DbRecord\"," +
"\"namespace\":\"com.iceberg\"," +
"\"fields\":[" +
"{\"name\":\"arraybytes\", " +
"\"type\":[ \"null\", { \"type\":\"array\", \"items\":\"bytes\"}], \"default\":null}," +
"{\"name\":\"topbytes\", \"type\":\"bytes\"}" +
"]" +
"}";

org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser();
org.apache.avro.Schema avroSchema = parser.parse(schema);
AvroSchemaConverter converter = new AvroSchemaConverter();
MessageType parquetScehma = converter.convert(avroSchema);
Schema icebergSchema = ParquetSchemaUtil.convert(parquetScehma);

File testFile = temp.newFile();
Assert.assertTrue(testFile.delete());

ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(new Path(testFile.toURI()))
.withDataModel(GenericData.get())
.withSchema(avroSchema)
.build();

GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema);
List<ByteBuffer> expectedByteList = new ArrayList();
byte[] expectedByte = {0x00, 0x01};
expectedByteList.add(ByteBuffer.wrap(expectedByte));

recordBuilder.set("arraybytes", expectedByteList);
recordBuilder.set("topbytes", ByteBuffer.wrap(expectedByte));
GenericData.Record record = recordBuilder.build();
writer.write(record);
writer.close();

List<InternalRow> rows;
try (CloseableIterable<InternalRow> reader =
Parquet.read(Files.localInput(testFile))
.project(icebergSchema)
.createReaderFunc(type -> SparkParquetReaders.buildReader(icebergSchema, type))
.build()) {
rows = Lists.newArrayList(reader);
}

InternalRow row = rows.get(0);
Assert.assertArrayEquals(row.getArray(0).getBinary(0), expectedByte);
Assert.assertArrayEquals(row.getBinary(1), expectedByte);
}

}