Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.data.DataTest;
import org.apache.iceberg.data.DataTestHelpers;
import org.apache.iceberg.data.GenericRecord;
Expand All @@ -34,7 +40,13 @@
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.junit.Assert;
import org.junit.Test;

import static org.apache.iceberg.types.Types.NestedField.optional;

public class TestGenericData extends DataTest {
@Override
Expand Down Expand Up @@ -78,4 +90,52 @@ protected void writeAndValidate(Schema schema) throws IOException {
}
}
}

@Test
public void testTwoLevelList() throws IOException {
Schema schema = new Schema(
optional(1, "arraybytes", Types.ListType.ofRequired(3, Types.BinaryType.get())),
optional(2, "topbytes", Types.BinaryType.get())
);
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());

File testFile = temp.newFile();
Assert.assertTrue(testFile.delete());

ParquetWriter<org.apache.avro.generic.GenericRecord>
writer = AvroParquetWriter.<org.apache.avro.generic.GenericRecord>builder(new Path(testFile.toURI()))
.withDataModel(GenericData.get())
.withSchema(avroSchema)
.config("parquet.avro.add-list-element-records", "true")
.config("parquet.avro.write-old-list-structure", "true")
.build();

GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema);
List<ByteBuffer> expectedByteList = new ArrayList();
byte[] expectedByte = {0x00, 0x01};
ByteBuffer expectedBinary = ByteBuffer.wrap(expectedByte);
expectedByteList.add(expectedBinary);
recordBuilder.set("arraybytes", expectedByteList);
recordBuilder.set("topbytes", expectedBinary);
GenericData.Record expectedRecord = recordBuilder.build();

writer.write(expectedRecord);
writer.close();

// test reuseContainers
try (CloseableIterable<Record> reader = Parquet.read(Files.localInput(testFile))
.project(schema)
.reuseContainers()
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema))
.build()) {
CloseableIterator it = reader.iterator();
Assert.assertTrue("Should have at least one row", it.hasNext());
while (it.hasNext()) {
GenericRecord actualRecord = (GenericRecord) it.next();
Assert.assertEquals(actualRecord.get(0, ArrayList.class).get(0), expectedBinary);
Assert.assertEquals(actualRecord.get(1, ByteBuffer.class), expectedBinary);
Assert.assertFalse("Should not have more than one row", it.hasNext());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.table.data.TimestampData;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
Expand Down Expand Up @@ -143,13 +144,12 @@ public ParquetValueReader<?> list(Types.ListType expectedList, GroupType array,
return null;
}

GroupType repeated = array.getFields().get(0).asGroupType();
String[] repeatedPath = currentPath();

int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;

Type elementType = repeated.getType(0);
Type elementType = ParquetSchemaUtil.determineListElementType(array);
int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1;

return new ArrayReader<>(repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,19 @@

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.data.DataTest;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
Expand All @@ -35,11 +43,60 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Types;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.junit.Assert;
import org.junit.Test;

import static org.apache.iceberg.types.Types.NestedField.optional;

public class TestFlinkParquetReader extends DataTest {
private static final int NUM_RECORDS = 100;

@Test
public void testTwoLevelList() throws IOException {
Schema schema = new Schema(
optional(1, "arraybytes", Types.ListType.ofRequired(3, Types.BinaryType.get())),
optional(2, "topbytes", Types.BinaryType.get())
);
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());

File testFile = temp.newFile();
Assert.assertTrue(testFile.delete());

ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(new Path(testFile.toURI()))
.withDataModel(GenericData.get())
.withSchema(avroSchema)
.config("parquet.avro.add-list-element-records", "true")
.config("parquet.avro.write-old-list-structure", "true")
.build();

GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema);
List<ByteBuffer> expectedByteList = new ArrayList();
byte[] expectedByte = {0x00, 0x01};
ByteBuffer expectedBinary = ByteBuffer.wrap(expectedByte);
expectedByteList.add(expectedBinary);
recordBuilder.set("arraybytes", expectedByteList);
recordBuilder.set("topbytes", expectedBinary);
GenericData.Record expectedRecord = recordBuilder.build();

writer.write(expectedRecord);
writer.close();

try (CloseableIterable<RowData> reader = Parquet.read(Files.localInput(testFile))
.project(schema)
.createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, type))
.build()) {
Iterator<RowData> rows = reader.iterator();
Assert.assertTrue("Should have at least one row", rows.hasNext());
RowData rowData = rows.next();
Assert.assertArrayEquals(rowData.getArray(0).getBinary(0), expectedByte);
Assert.assertArrayEquals(rowData.getBinary(1), expectedByte);
Assert.assertFalse("Should not have more than one row", rows.hasNext());
}
}

private void writeAndValidate(Iterable<Record> iterable, Schema schema) throws IOException {
File testFile = temp.newFile();
Assert.assertTrue("Delete should succeed", testFile.delete());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,12 @@ public ParquetValueReader<?> list(Types.ListType expectedList, GroupType array,
return null;
}

GroupType repeated = array.getFields().get(0).asGroupType();
String[] repeatedPath = currentPath();

int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;

Type elementType = repeated.getType(0);
Type elementType = ParquetSchemaUtil.determineListElementType(array);
int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1;

return new ParquetValueReaders.ListReader<>(repeatedD, repeatedR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,17 @@ public Type list(GroupType list, Type elementType) {
Preconditions.checkArgument(elementType != null,
"List type must have element field");

Type listElement = ParquetSchemaUtil.determineListElementType(list);
MappedField field = nameMapping.find(currentPath());
Type listType = Types.buildGroup(list.getRepetition())
.as(LogicalTypeAnnotation.listType())
.repeatedGroup().addFields(elementType).named(list.getFieldName(0))
.named(list.getName());

Types.GroupBuilder<GroupType> listBuilder = Types.buildGroup(list.getRepetition())
.as(LogicalTypeAnnotation.listType());
if (listElement.isRepetition(Type.Repetition.REPEATED)) {
listBuilder.addFields(elementType);
} else {
listBuilder.repeatedGroup().addFields(elementType).named(list.getFieldName(0));
}
Type listType = listBuilder.named(list.getName());

return field == null ? listType : listType.withId(field.id());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,7 @@ public Type struct(GroupType struct, List<Type> fieldTypes) {

@Override
public Type list(GroupType array, Type elementType) {
GroupType repeated = array.getType(0).asGroupType();
org.apache.parquet.schema.Type element = repeated.getType(0);

Preconditions.checkArgument(
!element.isRepetition(Repetition.REPEATED),
"Elements cannot have repetition REPEATED: %s", element);
org.apache.parquet.schema.Type element = ParquetSchemaUtil.determineListElementType(array);

Integer elementFieldId = getId(element);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,12 @@ public ParquetValueReader<?> struct(Types.StructType expected, GroupType struct,
@Override
public ParquetValueReader<?> list(Types.ListType expectedList, GroupType array,
ParquetValueReader<?> elementReader) {
GroupType repeated = array.getFields().get(0).asGroupType();
String[] repeatedPath = currentPath();

int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;

Type elementType = repeated.getType(0);
Type elementType = ParquetSchemaUtil.determineListElementType(array);
int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1;

return new ListReader<>(repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,58 @@ public Boolean primitive(PrimitiveType primitive) {
}
}

public static Type determineListElementType(GroupType array) {
Type repeated = array.getFields().get(0);
boolean isOldListElementType = isOldListElementType(array);

return isOldListElementType ? repeated : repeated.asGroupType().getType(0);
}

// Parquet LIST backwards-compatibility rules.
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
static boolean isOldListElementType(GroupType list) {
Type repeatedType = list.getFields().get(0);
String parentName = list.getName();

return
// For legacy 2-level list types with primitive element type, e.g.:
//
// // ARRAY<INT> (nullable list, non-null elements)
// optional group my_list (LIST) {
// repeated int32 element;
// }
//
repeatedType.isPrimitive() ||
// For legacy 2-level list types whose element type is a group type with 2 or more fields,
// e.g.:
//
// // ARRAY<STRUCT<str: STRING, num: INT>> (nullable list, non-null elements)
// optional group my_list (LIST) {
// repeated group element {
// required binary str (UTF8);
// required int32 num;
// };
// }
//
repeatedType.asGroupType().getFieldCount() > 1 ||
// For legacy 2-level list types generated by parquet-avro (Parquet version < 1.6.0), e.g.:
//
// // ARRAY<STRUCT<str: STRING>> (nullable list, non-null elements)
// optional group my_list (LIST) {
// repeated group array {
// required binary str (UTF8);
// };
// }
repeatedType.getName().equals("array") ||
// For Parquet data generated by parquet-thrift, e.g.:
//
// // ARRAY<STRUCT<str: STRING>> (nullable list, non-null elements)
// optional group my_list (LIST) {
// repeated group my_list_tuple {
// required binary str (UTF8);
// };
// }
//
repeatedType.getName().equals(parentName + "_tuple");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,30 +66,41 @@ private static <T> T visitList(GroupType list, ParquetTypeVisitor<T> visitor) {
Preconditions.checkArgument(list.getFieldCount() == 1,
"Invalid list: does not contain single repeated field: %s", list);

GroupType repeatedElement = list.getFields().get(0).asGroupType();
Type repeatedElement = list.getFields().get(0);
Preconditions.checkArgument(repeatedElement.isRepetition(Type.Repetition.REPEATED),
"Invalid list: inner group is not repeated");
Preconditions.checkArgument(repeatedElement.getFieldCount() <= 1,
"Invalid list: repeated group is not a single field: %s", list);

visitor.beforeRepeatedElement(repeatedElement);
try {
T elementResult = null;
if (repeatedElement.getFieldCount() > 0) {
Type elementField = repeatedElement.getType(0);
visitor.beforeElementField(elementField);
try {
elementResult = visit(elementField, visitor);
} finally {
visitor.afterElementField(elementField);
}
}
Type listElement = ParquetSchemaUtil.determineListElementType(list);
if (listElement.isRepetition(Type.Repetition.REPEATED)) {
T elementResult = visitListElement(listElement, visitor);
return visitor.list(list, elementResult);
} else {
return visitThreeLevelList(list, repeatedElement, listElement, visitor);
}
}

private static <T> T visitThreeLevelList(
GroupType list, Type repeated, Type listElement, ParquetTypeVisitor<T> visitor) {
visitor.beforeRepeatedElement(repeated);
try {
T elementResult = visitListElement(listElement, visitor);
return visitor.list(list, elementResult);
} finally {
visitor.afterRepeatedElement(repeated);
}
}

private static <T> T visitListElement(Type listElement, ParquetTypeVisitor<T> visitor) {
T elementResult = null;

visitor.beforeElementField(listElement);
try {
elementResult = visit(listElement, visitor);
} finally {
visitor.afterRepeatedElement(repeatedElement);
visitor.afterElementField(listElement);
}

return elementResult;
}

private static <T> T visitMap(GroupType map, ParquetTypeVisitor<T> visitor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,19 @@ public Type struct(GroupType struct, List<Type> fields) {

@Override
public Type list(GroupType list, Type element) {
GroupType repeated = list.getType(0).asGroupType();
Type originalElement = repeated.getType(0);
Type repeated = list.getType(0);
Type originalElement = ParquetSchemaUtil.determineListElementType(list);
Integer elementId = getId(originalElement);

if (elementId != null && selectedIds.contains(elementId)) {
return list;
} else if (element != null) {
if (!Objects.equal(element, originalElement)) {
return list.withNewFields(repeated.withNewFields(element));
if (originalElement.isRepetition(Type.Repetition.REPEATED)) {
return list.withNewFields(element);
} else {
return list.withNewFields(repeated.asGroupType().withNewFields(element));
}
}
return list;
}
Expand Down
Loading