Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ private static class ReadBuilder extends TypeWithSchemaVisitor<ParquetValueReade
}

@Override
public ParquetValueReader<RowData> message(Types.StructType expected, MessageType message,
public ParquetValueReader<RowData> message(org.apache.iceberg.types.Type expected, MessageType message,
List<ParquetValueReader<?>> fieldReaders) {
return struct(expected, message.asGroupType(), fieldReaders);
return struct(expected == null ? null : expected.asStructType(), message.asGroupType(), fieldReaders);
}

@Override
public ParquetValueReader<RowData> struct(Types.StructType expected, GroupType struct,
public ParquetValueReader<RowData> struct(org.apache.iceberg.types.Type expected, GroupType struct,
List<ParquetValueReader<?>> fieldReaders) {
// match the expected struct's order
Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
Expand All @@ -103,7 +103,7 @@ public ParquetValueReader<RowData> struct(Types.StructType expected, GroupType s
}

List<Types.NestedField> expectedFields = expected != null ?
expected.fields() : ImmutableList.of();
expected.asStructType().fields() : ImmutableList.of();
List<ParquetValueReader<?>> reorderedFields = Lists.newArrayListWithExpectedSize(
expectedFields.size());
List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size());
Expand Down Expand Up @@ -132,7 +132,7 @@ public ParquetValueReader<RowData> struct(Types.StructType expected, GroupType s
}

@Override
public ParquetValueReader<?> list(Types.ListType expectedList, GroupType array,
public ParquetValueReader<?> list(org.apache.iceberg.types.Type expectedList, GroupType array,
ParquetValueReader<?> elementReader) {
GroupType repeated = array.getFields().get(0).asGroupType();
String[] repeatedPath = currentPath();
Expand All @@ -147,7 +147,7 @@ public ParquetValueReader<?> list(Types.ListType expectedList, GroupType array,
}

@Override
public ParquetValueReader<?> map(Types.MapType expectedMap, GroupType map,
public ParquetValueReader<?> map(org.apache.iceberg.types.Type expectedMap, GroupType map,
ParquetValueReader<?> keyReader,
ParquetValueReader<?> valueReader) {
GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
Expand All @@ -168,7 +168,7 @@ public ParquetValueReader<?> map(Types.MapType expectedMap, GroupType map,

@Override
@SuppressWarnings("CyclomaticComplexity")
public ParquetValueReader<?> primitive(org.apache.iceberg.types.Type.PrimitiveType expected,
public ParquetValueReader<?> primitive(org.apache.iceberg.types.Type expected,
PrimitiveType primitive) {
ColumnDescriptor desc = type.getColumnDescription(currentPath());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,7 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.RowType.RowField;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.iceberg.parquet.ParquetValueReaders;
Expand Down Expand Up @@ -68,27 +64,27 @@ private static class WriteBuilder extends ParquetWithFlinkSchemaVisitor<ParquetV
}

@Override
public ParquetValueWriter<?> message(RowType sStruct, MessageType message, List<ParquetValueWriter<?>> fields) {
public ParquetValueWriter<?> message(LogicalType sStruct, MessageType message, List<ParquetValueWriter<?>> fields) {
return struct(sStruct, message.asGroupType(), fields);
}

@Override
public ParquetValueWriter<?> struct(RowType sStruct, GroupType struct,
public ParquetValueWriter<?> struct(LogicalType fStruct, GroupType struct,
List<ParquetValueWriter<?>> fieldWriters) {
List<Type> fields = struct.getFields();
List<RowField> flinkFields = sStruct.getFields();
List<LogicalType> flinkFields = fStruct.getChildren();
List<ParquetValueWriter<?>> writers = Lists.newArrayListWithExpectedSize(fieldWriters.size());
List<LogicalType> flinkTypes = Lists.newArrayList();
for (int i = 0; i < fields.size(); i += 1) {
writers.add(newOption(struct.getType(i), fieldWriters.get(i)));
flinkTypes.add(flinkFields.get(i).getType());
flinkTypes.add(flinkFields.get(i));
}

return new RowDataWriter(writers, flinkTypes);
}

@Override
public ParquetValueWriter<?> list(ArrayType sArray, GroupType array, ParquetValueWriter<?> elementWriter) {
public ParquetValueWriter<?> list(LogicalType fArray, GroupType array, ParquetValueWriter<?> elementWriter) {
GroupType repeated = array.getFields().get(0).asGroupType();
String[] repeatedPath = currentPath();

Expand All @@ -97,11 +93,11 @@ public ParquetValueWriter<?> list(ArrayType sArray, GroupType array, ParquetValu

return new ArrayDataWriter<>(repeatedD, repeatedR,
newOption(repeated.getType(0), elementWriter),
sArray.getElementType());
arrayElementType(fArray));
}

@Override
public ParquetValueWriter<?> map(MapType sMap, GroupType map,
public ParquetValueWriter<?> map(LogicalType fMap, GroupType map,
ParquetValueWriter<?> keyWriter, ParquetValueWriter<?> valueWriter) {
GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
String[] repeatedPath = currentPath();
Expand All @@ -112,10 +108,9 @@ public ParquetValueWriter<?> map(MapType sMap, GroupType map,
return new MapDataWriter<>(repeatedD, repeatedR,
newOption(repeatedKeyValue.getType(0), keyWriter),
newOption(repeatedKeyValue.getType(1), valueWriter),
sMap.getKeyType(), sMap.getValueType());
mapKeyType(fMap), mapValueType(fMap));
}


private ParquetValueWriter<?> newOption(org.apache.parquet.schema.Type fieldType, ParquetValueWriter<?> writer) {
int maxD = type.getMaxDefinitionLevel(path(fieldType.getName()));
return ParquetValueWriters.option(fieldType, maxD, writer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,181 +19,48 @@

package org.apache.iceberg.flink.data;

import java.util.Deque;
import java.util.List;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.RowType.RowField;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.iceberg.parquet.ParquetTypeWithPartnerVisitor;

public class ParquetWithFlinkSchemaVisitor<T> {
private final Deque<String> fieldNames = Lists.newLinkedList();
public class ParquetWithFlinkSchemaVisitor<T> extends ParquetTypeWithPartnerVisitor<LogicalType, T> {

public static <T> T visit(LogicalType sType, Type type, ParquetWithFlinkSchemaVisitor<T> visitor) {
Preconditions.checkArgument(sType != null, "Invalid DataType: null");
if (type instanceof MessageType) {
Preconditions.checkArgument(sType instanceof RowType, "Invalid struct: %s is not a struct", sType);
RowType struct = (RowType) sType;
return visitor.message(struct, (MessageType) type, visitFields(struct, type.asGroupType(), visitor));
} else if (type.isPrimitive()) {
return visitor.primitive(sType, type.asPrimitiveType());
} else {
// if not a primitive, the typeId must be a group
GroupType group = type.asGroupType();
OriginalType annotation = group.getOriginalType();
if (annotation != null) {
switch (annotation) {
case LIST:
Preconditions.checkArgument(!group.isRepetition(Type.Repetition.REPEATED),
"Invalid list: top-level group is repeated: %s", group);
Preconditions.checkArgument(group.getFieldCount() == 1,
"Invalid list: does not contain single repeated field: %s", group);

GroupType repeatedElement = group.getFields().get(0).asGroupType();
Preconditions.checkArgument(repeatedElement.isRepetition(Type.Repetition.REPEATED),
"Invalid list: inner group is not repeated");
Preconditions.checkArgument(repeatedElement.getFieldCount() <= 1,
"Invalid list: repeated group is not a single field: %s", group);

Preconditions.checkArgument(sType instanceof ArrayType, "Invalid list: %s is not an array", sType);
ArrayType array = (ArrayType) sType;
RowType.RowField element = new RowField(
"element", array.getElementType(), "element of " + array.asSummaryString());

visitor.fieldNames.push(repeatedElement.getName());
try {
T elementResult = null;
if (repeatedElement.getFieldCount() > 0) {
elementResult = visitField(element, repeatedElement.getType(0), visitor);
}

return visitor.list(array, group, elementResult);

} finally {
visitor.fieldNames.pop();
}

case MAP:
Preconditions.checkArgument(!group.isRepetition(Type.Repetition.REPEATED),
"Invalid map: top-level group is repeated: %s", group);
Preconditions.checkArgument(group.getFieldCount() == 1,
"Invalid map: does not contain single repeated field: %s", group);

GroupType repeatedKeyValue = group.getType(0).asGroupType();
Preconditions.checkArgument(repeatedKeyValue.isRepetition(Type.Repetition.REPEATED),
"Invalid map: inner group is not repeated");
Preconditions.checkArgument(repeatedKeyValue.getFieldCount() <= 2,
"Invalid map: repeated group does not have 2 fields");

Preconditions.checkArgument(sType instanceof MapType, "Invalid map: %s is not a map", sType);
MapType map = (MapType) sType;
RowField keyField = new RowField("key", map.getKeyType(), "key of " + map.asSummaryString());
RowField valueField = new RowField(
"value", map.getValueType(), "value of " + map.asSummaryString());

visitor.fieldNames.push(repeatedKeyValue.getName());
try {
T keyResult = null;
T valueResult = null;
switch (repeatedKeyValue.getFieldCount()) {
case 2:
// if there are 2 fields, both key and value are projected
keyResult = visitField(keyField, repeatedKeyValue.getType(0), visitor);
valueResult = visitField(valueField, repeatedKeyValue.getType(1), visitor);
break;
case 1:
// if there is just one, use the name to determine what it is
Type keyOrValue = repeatedKeyValue.getType(0);
if (keyOrValue.getName().equalsIgnoreCase("key")) {
keyResult = visitField(keyField, keyOrValue, visitor);
// value result remains null
} else {
valueResult = visitField(valueField, keyOrValue, visitor);
// key result remains null
}
break;
default:
// both results will remain null
}

return visitor.map(map, group, keyResult, valueResult);

} finally {
visitor.fieldNames.pop();
}

default:
}
}
Preconditions.checkArgument(sType instanceof RowType, "Invalid struct: %s is not a struct", sType);
RowType struct = (RowType) sType;
return visitor.struct(struct, group, visitFields(struct, group, visitor));
@Override
protected LogicalType arrayElementType(LogicalType arrayType) {
if (arrayType == null) {
return null;
}
}

private static <T> T visitField(RowType.RowField sField, Type field, ParquetWithFlinkSchemaVisitor<T> visitor) {
visitor.fieldNames.push(field.getName());
try {
return visit(sField.getType(), field, visitor);
} finally {
visitor.fieldNames.pop();
}
return ((ArrayType) arrayType).getElementType();
}

private static <T> List<T> visitFields(RowType struct, GroupType group,
ParquetWithFlinkSchemaVisitor<T> visitor) {
List<RowType.RowField> sFields = struct.getFields();
Preconditions.checkArgument(sFields.size() == group.getFieldCount(),
"Structs do not match: %s and %s", struct, group);
List<T> results = Lists.newArrayListWithExpectedSize(group.getFieldCount());
for (int i = 0; i < sFields.size(); i += 1) {
Type field = group.getFields().get(i);
RowType.RowField sField = sFields.get(i);
Preconditions.checkArgument(field.getName().equals(AvroSchemaUtil.makeCompatibleName(sField.getName())),
"Structs do not match: field %s != %s", field.getName(), sField.getName());
results.add(visitField(sField, field, visitor));
@Override
protected LogicalType mapKeyType(LogicalType mapType) {
if (mapType == null) {
return null;
}

return results;
}

public T message(RowType sStruct, MessageType message, List<T> fields) {
return null;
}

public T struct(RowType sStruct, GroupType struct, List<T> fields) {
return null;
return ((MapType) mapType).getKeyType();
}

public T list(ArrayType sArray, GroupType array, T element) {
return null;
}

public T map(MapType sMap, GroupType map, T key, T value) {
return null;
}
@Override
protected LogicalType mapValueType(LogicalType mapType) {
if (mapType == null) {
return null;
}

public T primitive(LogicalType sPrimitive, PrimitiveType primitive) {
return null;
return ((MapType) mapType).getValueType();
}

protected String[] currentPath() {
return Lists.newArrayList(fieldNames.descendingIterator()).toArray(new String[0]);
}
@Override
protected LogicalType fieldType(LogicalType structType, int pos, Integer fieldId) {
if (structType == null || ((RowType) structType).getFieldCount() < pos + 1) {
return null;
}

protected String[] path(String name) {
List<String> list = Lists.newArrayList(fieldNames.descendingIterator());
list.add(name);
return list.toArray(new String[0]);
return ((RowType) structType).getTypeAt(pos);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also curious whether the inner fields are keep the same order as the parquet's fields. If not, then we would get the incorrect data type by the pos provided from parquet.

}

}
Loading