Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,47 +20,52 @@
import io.trino.parquet.RichColumnDescriptor;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.NamedTypeSignature;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeSignatureParameter;
import org.apache.parquet.io.ColumnIO;
import org.apache.parquet.io.GroupColumnIO;
import org.apache.parquet.io.PrimitiveColumnIO;

import java.util.List;
import java.util.Locale;
import java.util.Optional;

import static io.trino.parquet.ParquetTypeUtils.getArrayElementColumn;
import static io.trino.parquet.ParquetTypeUtils.getMapKeyValueColumn;
import static io.trino.parquet.ParquetTypeUtils.lookupColumnByName;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static org.apache.parquet.io.ColumnIOUtil.columnDefinitionLevel;
import static org.apache.parquet.io.ColumnIOUtil.columnRepetitionLevel;
import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;

public final class ParquetColumnIOConverter
public abstract class ParquetColumnIOConverter<Context>
{
private ParquetColumnIOConverter() {}
public static ParquetColumnIOConverter<Type> withLookupColumnByName()
{
return new ByNameParquetColumnIOConverter();
}

public static Optional<Field> constructField(Type type, ColumnIO columnIO)
public Optional<Field> constructField(Context context, Optional<ColumnIO> columnIO)
{
if (columnIO == null) {
if (columnIO.isEmpty()) {
return Optional.empty();
}
return constructField(context, columnIO.get());
}

protected Optional<Field> constructField(Context context, ColumnIO columnIO)
{
requireNonNull(columnIO, "columnIO is null");

boolean required = columnIO.getType().getRepetition() != OPTIONAL;
int repetitionLevel = columnRepetitionLevel(columnIO);
int definitionLevel = columnDefinitionLevel(columnIO);
Type type = getType(context);
if (type instanceof RowType) {
GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
List<Type> parameters = type.getTypeParameters();
ImmutableList.Builder<Optional<Field>> fieldsBuilder = ImmutableList.builder();
List<TypeSignatureParameter> fields = type.getTypeSignature().getParameters();
boolean structHasParameters = false;
for (int i = 0; i < fields.size(); i++) {
NamedTypeSignature namedTypeSignature = fields.get(i).getNamedTypeSignature();
String name = namedTypeSignature.getName().get().toLowerCase(Locale.ENGLISH);
Optional<Field> field = constructField(parameters.get(i), lookupColumnByName(groupColumnIO, name));
for (int fieldIndex = 0; fieldIndex < type.getTypeParameters().size(); fieldIndex++) {
Optional<Field> field = getRowFieldField(context, groupColumnIO, fieldIndex);
structHasParameters |= field.isPresent();
fieldsBuilder.add(field);
}
Expand All @@ -71,26 +76,87 @@ public static Optional<Field> constructField(Type type, ColumnIO columnIO)
}
if (type instanceof MapType) {
GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
MapType mapType = (MapType) type;
GroupColumnIO keyValueColumnIO = getMapKeyValueColumn(groupColumnIO);
if (keyValueColumnIO.getChildrenCount() != 2) {
Optional<Field> keyField = getMapKeyField(context, groupColumnIO);
Optional<Field> valueField = getMapValueField(context, groupColumnIO);
if (keyField.isEmpty() || valueField.isEmpty()) {
return Optional.empty();
}
Optional<Field> keyField = constructField(mapType.getKeyType(), keyValueColumnIO.getChild(0));
Optional<Field> valueField = constructField(mapType.getValueType(), keyValueColumnIO.getChild(1));
return Optional.of(new GroupField(type, repetitionLevel, definitionLevel, required, ImmutableList.of(keyField, valueField)));
}
if (type instanceof ArrayType) {
GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
List<Type> types = type.getTypeParameters();
if (groupColumnIO.getChildrenCount() != 1) {
Optional<Field> field = getArrayElementField(context, groupColumnIO);
if (field.isEmpty()) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this refactor isn't absolutely side-effect free.
Previously, we would bail out when groupColumnIO has child count != 1.
And then if constructField returned empty, we would return new GroupField with single empty element.

After the change, we return empty in both cases.

The first attempt at the refactor (the one with more abstract methods, see #9104 (comment)) avoided this situation, but wasn't pretty. Alternatively, #9124 avoids this too, by splitting ParquetColumnIOConverter instead of reusing it.

return Optional.empty();
}
Optional<Field> field = constructField(types.get(0), getArrayElementColumn(groupColumnIO.getChild(0)));
return Optional.of(new GroupField(type, repetitionLevel, definitionLevel, required, ImmutableList.of(field)));
}
PrimitiveColumnIO primitiveColumnIO = (PrimitiveColumnIO) columnIO;
RichColumnDescriptor column = new RichColumnDescriptor(primitiveColumnIO.getColumnDescriptor(), columnIO.getType().asPrimitiveType());
return Optional.of(new PrimitiveField(type, repetitionLevel, definitionLevel, required, column, primitiveColumnIO.getId()));
}

protected abstract Type getType(Context context);

protected abstract Optional<Field> getArrayElementField(Context arrayContext, GroupColumnIO groupColumnIO);

protected abstract Optional<Field> getMapKeyField(Context mapContext, GroupColumnIO groupColumnIO);

protected abstract Optional<Field> getMapValueField(Context mapContext, GroupColumnIO groupColumnIO);

protected abstract Optional<Field> getRowFieldField(Context rowContext, GroupColumnIO groupColumnIO, int rowFieldIndex);

private static class ByNameParquetColumnIOConverter
extends ParquetColumnIOConverter<Type>
{
@Override
protected Type getType(Type type)
{
return requireNonNull(type, "type is null");
}

@Override
protected Optional<Field> getArrayElementField(Type arrayType, GroupColumnIO groupColumnIO)
{
if (groupColumnIO.getChildrenCount() != 1) {
return Optional.empty();
}
return constructField(
((ArrayType) arrayType).getElementType(),
getArrayElementColumn(groupColumnIO.getChild(0)));
}

@Override
protected Optional<Field> getMapKeyField(Type mapType, GroupColumnIO groupColumnIO)
{
GroupColumnIO keyValueColumnIO = getMapKeyValueColumn(groupColumnIO);
if (keyValueColumnIO.getChildrenCount() != 2) {
return Optional.empty();
}
return constructField(
((MapType) mapType).getKeyType(),
keyValueColumnIO.getChild(0));
}

@Override
protected Optional<Field> getMapValueField(Type mapType, GroupColumnIO groupColumnIO)
{
GroupColumnIO keyValueColumnIO = getMapKeyValueColumn(groupColumnIO);
if (keyValueColumnIO.getChildrenCount() != 2) {
return Optional.empty();
}
return constructField(
((MapType) mapType).getValueType(),
keyValueColumnIO.getChild(1));
}

@Override
protected Optional<Field> getRowFieldField(Type rowType, GroupColumnIO groupColumnIO, int rowFieldIndex)
{
RowType.Field rowField = ((RowType) rowType).getFields().get(rowFieldIndex);
String name = rowField.getName().orElseThrow();
return Optional.ofNullable(lookupColumnByName(groupColumnIO, name.toLowerCase(ENGLISH)))
.flatMap(columnIO -> constructField(rowField.getType(), columnIO));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@
import static io.trino.plugin.hive.HiveSessionProperties.isParquetIgnoreStatistics;
import static io.trino.plugin.hive.HiveSessionProperties.isParquetUseColumnIndex;
import static io.trino.plugin.hive.HiveSessionProperties.isUseParquetColumnNames;
import static io.trino.plugin.hive.parquet.ParquetColumnIOConverter.constructField;
import static io.trino.plugin.hive.util.HiveUtil.getDeserializerClassName;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.lang.String.format;
Expand Down Expand Up @@ -311,7 +310,9 @@ && predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parq
internalFields.add(Optional.ofNullable(getParquetType(column, fileSchema, useColumnNames))
.flatMap(field -> {
String columnName = useColumnNames ? column.getBaseColumnName() : fileSchema.getFields().get(column.getBaseHiveColumnIndex()).getName();
return constructField(column.getBaseType(), lookupColumnByName(messageColumn, columnName));
return ParquetColumnIOConverter.withLookupColumnByName().constructField(
column.getBaseType(),
Optional.ofNullable(lookupColumnByName(messageColumn, columnName)));
}));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public ColumnIdentity(
this.id = id;
this.name = requireNonNull(name, "name is null");
this.typeCategory = requireNonNull(typeCategory, "typeCategory is null");
this.children = requireNonNull(children, "children is null");
this.children = ImmutableList.copyOf(requireNonNull(children, "children is null"));
checkArgument(
children.isEmpty() == (typeCategory == PRIMITIVE),
"Children should be empty if and only if column type is primitive");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.trino.plugin.hive.orc.OrcPageSource.ColumnAdaptation;
import io.trino.plugin.hive.orc.OrcReaderConfig;
import io.trino.plugin.hive.parquet.HdfsParquetDataSource;
import io.trino.plugin.hive.parquet.ParquetColumnIOConverter;
import io.trino.plugin.hive.parquet.ParquetPageSource;
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
import io.trino.spi.TrinoException;
Expand All @@ -58,6 +59,9 @@
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.StandardTypes;
import io.trino.spi.type.Type;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -71,6 +75,8 @@
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.io.ColumnIO;
import org.apache.parquet.io.GroupColumnIO;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.schema.MessageType;

Expand All @@ -85,18 +91,22 @@
import java.util.Optional;
import java.util.function.Function;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.Maps.uniqueIndex;
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.trino.orc.OrcReader.INITIAL_BATCH_SIZE;
import static io.trino.orc.OrcReader.ProjectedLayout.fullyProjectedLayout;
import static io.trino.parquet.ParquetTypeUtils.getArrayElementColumn;
import static io.trino.parquet.ParquetTypeUtils.getColumnIO;
import static io.trino.parquet.ParquetTypeUtils.getDescriptors;
import static io.trino.parquet.ParquetTypeUtils.getMapKeyValueColumn;
import static io.trino.parquet.ParquetTypeUtils.getParquetTypeByName;
import static io.trino.parquet.predicate.PredicateUtils.buildPredicate;
import static io.trino.parquet.predicate.PredicateUtils.predicateMatches;
import static io.trino.plugin.hive.parquet.ParquetColumnIOConverter.constructField;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CURSOR_ERROR;
Expand Down Expand Up @@ -455,10 +465,12 @@ private static ConnectorPageSource createParquetPageSource(
.filter(field -> field.getId() != null)
.collect(toImmutableMap(field -> field.getId().intValue(), Function.identity()));

// Map by name for a migrated table
boolean mapByName = parquetIdToField.isEmpty();

List<org.apache.parquet.schema.Type> parquetFields = regularColumns.stream()
.map(column -> {
if (parquetIdToField.isEmpty()) {
// This is a migrated table
if (mapByName) {
return getParquetTypeByName(column.getName(), fileSchema);
}
return parquetIdToField.get(column.getId());
Expand Down Expand Up @@ -504,7 +516,11 @@ private static ConnectorPageSource createParquetPageSource(
internalFields.add(Optional.empty());
}
else {
internalFields.add(constructField(column.getType(), messageColumnIO.getChild(parquetField.getName())));
// The top level columns are already mapped by name/id appropriately.
Optional<ColumnIO> columnIO = Optional.ofNullable(messageColumnIO.getChild(parquetField.getName()));
internalFields.add(mapByName
? ParquetColumnIOConverter.withLookupColumnByName().constructField(trinoType, columnIO)
: IcebergParquetColumnIOConverter.create().constructField(new FieldContext(trinoType, column.getColumnIdentity()), columnIO));
}
}

Expand Down Expand Up @@ -564,4 +580,119 @@ private static TrinoException handleException(OrcDataSourceId dataSourceId, Exce
}
return new TrinoException(ICEBERG_CURSOR_ERROR, format("Failed to read ORC file: %s", dataSourceId), exception);
}

private static class IcebergParquetColumnIOConverter
extends ParquetColumnIOConverter<FieldContext>
{
static IcebergParquetColumnIOConverter create()
{
return new IcebergParquetColumnIOConverter();
}

@Override
protected Type getType(FieldContext context)
{
return context.getType();
}

@Override
protected Optional<Field> getArrayElementField(FieldContext arrayContext, GroupColumnIO groupColumnIO)
{
checkArgument(arrayContext.getColumnIdentity().getChildren().size() == 1, "Not an array: %s", arrayContext);

if (groupColumnIO.getChildrenCount() != 1) {
return Optional.empty();
}
return constructField(
new FieldContext(
((ArrayType) arrayContext.getType()).getElementType(),
getOnlyElement(arrayContext.getColumnIdentity().getChildren())),
// TODO validate column ID
getArrayElementColumn(groupColumnIO.getChild(0)));
}

@Override
protected Optional<Field> getMapKeyField(FieldContext mapContext, GroupColumnIO groupColumnIO)
{
checkArgument(mapContext.getColumnIdentity().getChildren().size() == 2, "Not a map: %s", mapContext);

GroupColumnIO keyValueColumnIO = getMapKeyValueColumn(groupColumnIO);
if (keyValueColumnIO.getChildrenCount() != 2) {
return Optional.empty();
}
return constructField(
new FieldContext(
((MapType) mapContext.getType()).getKeyType(),
mapContext.getColumnIdentity().getChildren().get(0)),
// TODO validate column ID
keyValueColumnIO.getChild(0));
}

@Override
protected Optional<Field> getMapValueField(FieldContext mapContext, GroupColumnIO groupColumnIO)
{
checkArgument(mapContext.getColumnIdentity().getChildren().size() == 2, "Not a map: %s", mapContext);

GroupColumnIO keyValueColumnIO = getMapKeyValueColumn(groupColumnIO);
if (keyValueColumnIO.getChildrenCount() != 2) {
return Optional.empty();
}
return constructField(
new FieldContext(
((MapType) mapContext.getType()).getValueType(),
mapContext.getColumnIdentity().getChildren().get(1)),
// TODO validate column ID
keyValueColumnIO.getChild(1));
}

@Override
protected Optional<Field> getRowFieldField(FieldContext rowContext, GroupColumnIO groupColumnIO, int rowFieldIndex)
{
checkArgument(rowFieldIndex < rowContext.getColumnIdentity().getChildren().size(), "Row field out of bounds, or not a row: %s, %s", rowFieldIndex, rowContext);

FieldContext rowFieldContext = new FieldContext(
((RowType) rowContext.getType()).getFields().get(rowFieldIndex).getType(),
rowContext.getColumnIdentity().getChildren().get(rowFieldIndex));

int fieldId = rowFieldContext.getColumnIdentity().getId();
for (int i = 0; i < groupColumnIO.getChildrenCount(); i++) {
ColumnIO child = groupColumnIO.getChild(i);
if (child.getType().getId().intValue() == fieldId) {
return constructField(rowFieldContext, child);
}
}
return Optional.empty();
}
}

private static class FieldContext
{
private final Type type;
private final ColumnIdentity columnIdentity;

public FieldContext(Type type, ColumnIdentity columnIdentity)
{
this.type = requireNonNull(type, "type is null");
this.columnIdentity = requireNonNull(columnIdentity, "columnIdentity is null");
}

public Type getType()
{
return type;
}

public ColumnIdentity getColumnIdentity()
{
return columnIdentity;
}

@Override
public String toString()
{
return toStringHelper(this)
.add("type", type)
.add("columnIdentity", columnIdentity)
.toString();
}
}
}
Loading