Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;

import javax.annotation.Nullable;

import java.math.BigInteger;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -208,6 +210,18 @@ public static ColumnIO lookupColumnByName(GroupColumnIO groupColumnIO, String co
return null;
}

@Nullable
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not pretty, but consistent with other methods in this file. A change to Optional can be a follow up, if needed.

public static ColumnIO lookupColumnById(GroupColumnIO groupColumnIO, int columnId)
{
for (int i = 0; i < groupColumnIO.getChildrenCount(); i++) {
ColumnIO child = groupColumnIO.getChild(i);
if (child.getType().getId().intValue() == columnId) {
return child;
}
}
return null;
}

public static Optional<DecimalType> createDecimalType(RichColumnDescriptor descriptor)
{
if (descriptor.getPrimitiveType().getOriginalType() != DECIMAL) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import io.trino.parquet.RichColumnDescriptor;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.NamedTypeSignature;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeSignatureParameter;
import org.apache.parquet.io.ColumnIO;
import org.apache.parquet.io.GroupColumnIO;
import org.apache.parquet.io.PrimitiveColumnIO;
Expand All @@ -39,9 +37,9 @@
import static org.apache.parquet.io.ColumnIOUtil.columnRepetitionLevel;
import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;

public final class ParquetColumnIOConverter
public final class HiveParquetColumnIOConverter
{
private ParquetColumnIOConverter() {}
private HiveParquetColumnIOConverter() {}

public static Optional<Field> constructField(Type type, ColumnIO columnIO)
{
Expand All @@ -52,15 +50,15 @@ public static Optional<Field> constructField(Type type, ColumnIO columnIO)
int repetitionLevel = columnRepetitionLevel(columnIO);
int definitionLevel = columnDefinitionLevel(columnIO);
if (type instanceof RowType) {
RowType rowType = (RowType) type;
GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
List<Type> parameters = type.getTypeParameters();
ImmutableList.Builder<Optional<Field>> fieldsBuilder = ImmutableList.builder();
List<TypeSignatureParameter> fields = type.getTypeSignature().getParameters();
List<RowType.Field> fields = rowType.getFields();
boolean structHasParameters = false;
for (int i = 0; i < fields.size(); i++) {
NamedTypeSignature namedTypeSignature = fields.get(i).getNamedTypeSignature();
String name = namedTypeSignature.getName().get().toLowerCase(Locale.ENGLISH);
Optional<Field> field = constructField(parameters.get(i), lookupColumnByName(groupColumnIO, name));
RowType.Field rowField = fields.get(i);
String name = rowField.getName().orElseThrow().toLowerCase(Locale.ENGLISH);
Optional<Field> field = constructField(rowField.getType(), lookupColumnByName(groupColumnIO, name));
structHasParameters |= field.isPresent();
fieldsBuilder.add(field);
}
Expand All @@ -70,8 +68,8 @@ public static Optional<Field> constructField(Type type, ColumnIO columnIO)
return Optional.empty();
}
if (type instanceof MapType) {
GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
MapType mapType = (MapType) type;
GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
GroupColumnIO keyValueColumnIO = getMapKeyValueColumn(groupColumnIO);
if (keyValueColumnIO.getChildrenCount() != 2) {
return Optional.empty();
Expand All @@ -81,12 +79,12 @@ public static Optional<Field> constructField(Type type, ColumnIO columnIO)
return Optional.of(new GroupField(type, repetitionLevel, definitionLevel, required, ImmutableList.of(keyField, valueField)));
}
if (type instanceof ArrayType) {
ArrayType arrayType = (ArrayType) type;
GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
List<Type> types = type.getTypeParameters();
if (groupColumnIO.getChildrenCount() != 1) {
return Optional.empty();
}
Optional<Field> field = constructField(types.get(0), getArrayElementColumn(groupColumnIO.getChild(0)));
Optional<Field> field = constructField(arrayType.getElementType(), getArrayElementColumn(groupColumnIO.getChild(0)));
return Optional.of(new GroupField(type, repetitionLevel, definitionLevel, required, ImmutableList.of(field)));
}
PrimitiveColumnIO primitiveColumnIO = (PrimitiveColumnIO) columnIO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
import static io.trino.plugin.hive.HiveSessionProperties.isParquetIgnoreStatistics;
import static io.trino.plugin.hive.HiveSessionProperties.isParquetUseColumnIndex;
import static io.trino.plugin.hive.HiveSessionProperties.isUseParquetColumnNames;
import static io.trino.plugin.hive.parquet.ParquetColumnIOConverter.constructField;
import static io.trino.plugin.hive.parquet.HiveParquetColumnIOConverter.constructField;
import static io.trino.plugin.hive.util.HiveUtil.getDeserializerClassName;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.lang.String.format;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public ColumnIdentity(
this.id = id;
this.name = requireNonNull(name, "name is null");
this.typeCategory = requireNonNull(typeCategory, "typeCategory is null");
this.children = requireNonNull(children, "children is null");
this.children = ImmutableList.copyOf(requireNonNull(children, "children is null"));
checkArgument(
children.isEmpty() == (typeCategory == PRIMITIVE),
"Children should be empty if and only if column type is primitive");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@
import io.trino.plugin.hive.orc.OrcPageSource.ColumnAdaptation;
import io.trino.plugin.hive.orc.OrcReaderConfig;
import io.trino.plugin.hive.parquet.HdfsParquetDataSource;
import io.trino.plugin.hive.parquet.HiveParquetColumnIOConverter;
import io.trino.plugin.hive.parquet.ParquetPageSource;
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
import io.trino.plugin.iceberg.IcebergParquetColumnIOConverter.FieldContext;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorPageSource;
Expand All @@ -71,6 +73,7 @@
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.io.ColumnIO;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.schema.MessageType;

Expand All @@ -96,7 +99,6 @@
import static io.trino.parquet.ParquetTypeUtils.getParquetTypeByName;
import static io.trino.parquet.predicate.PredicateUtils.buildPredicate;
import static io.trino.parquet.predicate.PredicateUtils.predicateMatches;
import static io.trino.plugin.hive.parquet.ParquetColumnIOConverter.constructField;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CURSOR_ERROR;
Expand Down Expand Up @@ -455,10 +457,12 @@ private static ConnectorPageSource createParquetPageSource(
.filter(field -> field.getId() != null)
.collect(toImmutableMap(field -> field.getId().intValue(), Function.identity()));

// Map by name for a migrated table
boolean mapByName = parquetIdToField.isEmpty();

List<org.apache.parquet.schema.Type> parquetFields = regularColumns.stream()
.map(column -> {
if (parquetIdToField.isEmpty()) {
// This is a migrated table
if (mapByName) {
return getParquetTypeByName(column.getName(), fileSchema);
}
return parquetIdToField.get(column.getId());
Expand Down Expand Up @@ -504,7 +508,11 @@ private static ConnectorPageSource createParquetPageSource(
internalFields.add(Optional.empty());
}
else {
internalFields.add(constructField(column.getType(), messageColumnIO.getChild(parquetField.getName())));
// The top level columns are already mapped by name/id appropriately.
ColumnIO columnIO = messageColumnIO.getChild(parquetField.getName());
internalFields.add(mapByName
? HiveParquetColumnIOConverter.constructField(trinoType, columnIO)
: IcebergParquetColumnIOConverter.constructField(new FieldContext(trinoType, column.getColumnIdentity()), columnIO));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import io.trino.parquet.Field;
import io.trino.parquet.GroupField;
import io.trino.parquet.PrimitiveField;
import io.trino.parquet.RichColumnDescriptor;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import org.apache.parquet.io.ColumnIO;
import org.apache.parquet.io.GroupColumnIO;
import org.apache.parquet.io.PrimitiveColumnIO;

import java.util.List;
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.parquet.ParquetTypeUtils.getArrayElementColumn;
import static io.trino.parquet.ParquetTypeUtils.getMapKeyValueColumn;
import static io.trino.parquet.ParquetTypeUtils.lookupColumnById;
import static java.util.Objects.requireNonNull;
import static org.apache.parquet.io.ColumnIOUtil.columnDefinitionLevel;
import static org.apache.parquet.io.ColumnIOUtil.columnRepetitionLevel;
import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;

public final class IcebergParquetColumnIOConverter
{
private IcebergParquetColumnIOConverter() {}

public static Optional<Field> constructField(FieldContext context, ColumnIO columnIO)
{
requireNonNull(context, "context is null");
if (columnIO == null) {
return Optional.empty();
}
boolean required = columnIO.getType().getRepetition() != OPTIONAL;
int repetitionLevel = columnRepetitionLevel(columnIO);
int definitionLevel = columnDefinitionLevel(columnIO);
Type type = context.getType();
if (type instanceof RowType) {
RowType rowType = (RowType) type;
GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
ImmutableList.Builder<Optional<Field>> fieldsBuilder = ImmutableList.builder();
List<RowType.Field> fields = rowType.getFields();
boolean structHasParameters = false;
for (int i = 0; i < fields.size(); i++) {
RowType.Field rowField = fields.get(i);
ColumnIdentity fieldIdentity = context.getColumnIdentity().getChildren().get(i);
Optional<Field> field = constructField(new FieldContext(rowField.getType(), fieldIdentity), lookupColumnById(groupColumnIO, fieldIdentity.getId()));
structHasParameters |= field.isPresent();
fieldsBuilder.add(field);
}
if (structHasParameters) {
return Optional.of(new GroupField(type, repetitionLevel, definitionLevel, required, fieldsBuilder.build()));
}
return Optional.empty();
}
if (type instanceof MapType) {
MapType mapType = (MapType) type;
GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
GroupColumnIO keyValueColumnIO = getMapKeyValueColumn(groupColumnIO);
if (keyValueColumnIO.getChildrenCount() != 2) {
return Optional.empty();
}
checkArgument(context.getColumnIdentity().getChildren().size() == 2, "Not a map: %s", context);
ColumnIdentity keyIdentity = context.getColumnIdentity().getChildren().get(0);
ColumnIdentity valueIdentity = context.getColumnIdentity().getChildren().get(1);
// TODO validate column ID
Optional<Field> keyField = constructField(new FieldContext(mapType.getKeyType(), keyIdentity), keyValueColumnIO.getChild(0));
// TODO validate column ID
Optional<Field> valueField = constructField(new FieldContext(mapType.getValueType(), valueIdentity), keyValueColumnIO.getChild(1));
return Optional.of(new GroupField(type, repetitionLevel, definitionLevel, required, ImmutableList.of(keyField, valueField)));
}
if (type instanceof ArrayType) {
ArrayType arrayType = (ArrayType) type;
GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
if (groupColumnIO.getChildrenCount() != 1) {
return Optional.empty();
}
checkArgument(context.getColumnIdentity().getChildren().size() == 1, "Not an array: %s", context);
ColumnIdentity elementIdentity = getOnlyElement(context.getColumnIdentity().getChildren());
// TODO validate column ID
Optional<Field> field = constructField(new FieldContext(arrayType.getElementType(), elementIdentity), getArrayElementColumn(groupColumnIO.getChild(0)));
return Optional.of(new GroupField(type, repetitionLevel, definitionLevel, required, ImmutableList.of(field)));
}
PrimitiveColumnIO primitiveColumnIO = (PrimitiveColumnIO) columnIO;
RichColumnDescriptor column = new RichColumnDescriptor(primitiveColumnIO.getColumnDescriptor(), columnIO.getType().asPrimitiveType());
return Optional.of(new PrimitiveField(type, repetitionLevel, definitionLevel, required, column, primitiveColumnIO.getId()));
}

public static class FieldContext
{
private final Type type;
private final ColumnIdentity columnIdentity;

public FieldContext(Type type, ColumnIdentity columnIdentity)
{
this.type = requireNonNull(type, "type is null");
this.columnIdentity = requireNonNull(columnIdentity, "columnIdentity is null");
}

public Type getType()
{
return type;
}

public ColumnIdentity getColumnIdentity()
{
return columnIdentity;
}

@Override
public String toString()
{
return toStringHelper(this)
.add("type", type)
.add("columnIdentity", columnIdentity)
.toString();
}
}
}
Loading