diff --git a/lib/trino-orc/src/main/java/io/trino/orc/NameBasedFieldMapper.java b/lib/trino-orc/src/main/java/io/trino/orc/NameBasedFieldMapper.java new file mode 100644 index 000000000000..80a4179f1451 --- /dev/null +++ b/lib/trino-orc/src/main/java/io/trino/orc/NameBasedFieldMapper.java @@ -0,0 +1,49 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.orc; + +import com.google.common.collect.Maps; +import io.trino.orc.OrcReader.FieldMapper; + +import java.util.Locale; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +public class NameBasedFieldMapper + implements FieldMapper +{ + private final Map nestedColumns; + + private NameBasedFieldMapper(Map nestedColumns) + { + this.nestedColumns = requireNonNull(nestedColumns, "nestedColumns is null"); + } + + @Override + public OrcColumn get(String fieldName) + { + return nestedColumns.get(fieldName); + } + + public static FieldMapper create(OrcColumn column) + { + requireNonNull(column, "column is null"); + Map nestedColumns = Maps.uniqueIndex( + column.getNestedColumns(), + field -> field.getColumnName().toLowerCase(Locale.ENGLISH)); + + return new NameBasedFieldMapper(nestedColumns); + } +} diff --git a/lib/trino-orc/src/main/java/io/trino/orc/OrcReader.java b/lib/trino-orc/src/main/java/io/trino/orc/OrcReader.java index 690a2c36a9c0..8eacade0f581 100644 --- a/lib/trino-orc/src/main/java/io/trino/orc/OrcReader.java +++ b/lib/trino-orc/src/main/java/io/trino/orc/OrcReader.java @@ -261,38 +261,15 @@ public OrcRecordReader createRecordReader( return createRecordReader( readColumns, readTypes, + Collections.nCopies(readColumns.size(), ProjectedLayout.fullyProjectedLayout()), predicate, 0, orcDataSource.getEstimatedSize(), legacyFileTimeZone, systemMemoryUsage, initialBatchSize, - exceptionTransform); - } - - public OrcRecordReader createRecordReader( - List readColumns, - List readTypes, - OrcPredicate predicate, - long offset, - long length, - DateTimeZone legacyFileTimeZone, - AggregatedMemoryContext systemMemoryUsage, - int initialBatchSize, - Function exceptionTransform) - throws OrcCorruptionException - { - return createRecordReader( - readColumns, - readTypes, - Collections.nCopies(readColumns.size(), ProjectedLayout.fullyProjectedLayout()), - predicate, - offset, - length, - legacyFileTimeZone, - systemMemoryUsage, - initialBatchSize, - exceptionTransform); + exceptionTransform, + NameBasedFieldMapper::create); } public OrcRecordReader createRecordReader( @@ -305,7 +282,8 @@ public OrcRecordReader createRecordReader( DateTimeZone legacyFileTimeZone, AggregatedMemoryContext systemMemoryUsage, int initialBatchSize, - Function exceptionTransform) + Function exceptionTransform, + FieldMapperFactory fieldMapperFactory) throws OrcCorruptionException { return new OrcRecordReader( @@ -331,7 +309,8 @@ public OrcRecordReader createRecordReader( systemMemoryUsage, writeValidation, initialBatchSize, - exceptionTransform); + exceptionTransform, + fieldMapperFactory); } private static OrcDataSource wrapWithCacheIfTiny(OrcDataSource dataSource, DataSize maxCacheSize) @@ -498,4 +477,15 @@ public static ProjectedLayout createProjectedLayout(OrcColumn root, List writeValidation, int initialBatchSize, - Function exceptionTransform) + Function exceptionTransform, + FieldMapperFactory fieldMapperFactory) throws OrcCorruptionException { requireNonNull(readColumns, "readColumns is null"); @@ -239,7 +241,14 @@ public OrcRecordReader( metadataReader, writeValidation); - columnReaders = createColumnReaders(readColumns, readTypes, readLayouts, streamReadersSystemMemoryContext, blockFactory); + columnReaders = createColumnReaders( + readColumns, + readTypes, + readLayouts, + streamReadersSystemMemoryContext, + blockFactory, + fieldMapperFactory); + currentBytesPerCell = new long[columnReaders.length]; maxBytesPerCell = new long[columnReaders.length]; nextBatchSize = initialBatchSize; @@ -558,7 +567,8 @@ private static ColumnReader[] createColumnReaders( List readTypes, List readLayouts, AggregatedMemoryContext systemMemoryContext, - OrcBlockFactory blockFactory) + OrcBlockFactory blockFactory, + FieldMapperFactory fieldMapperFactory) throws OrcCorruptionException { ColumnReader[] columnReaders = new ColumnReader[columns.size()]; @@ -566,7 +576,7 @@ private static ColumnReader[] createColumnReaders( Type readType = readTypes.get(columnIndex); OrcColumn column = columns.get(columnIndex); OrcReader.ProjectedLayout projectedLayout = readLayouts.get(columnIndex); - columnReaders[columnIndex] = createColumnReader(readType, column, projectedLayout, systemMemoryContext, blockFactory); + columnReaders[columnIndex] = createColumnReader(readType, column, projectedLayout, systemMemoryContext, blockFactory, fieldMapperFactory); } return columnReaders; } diff --git a/lib/trino-orc/src/main/java/io/trino/orc/reader/ColumnReaders.java b/lib/trino-orc/src/main/java/io/trino/orc/reader/ColumnReaders.java index 15d04c568247..2c747a79399d 100644 --- a/lib/trino-orc/src/main/java/io/trino/orc/reader/ColumnReaders.java +++ b/lib/trino-orc/src/main/java/io/trino/orc/reader/ColumnReaders.java @@ -18,6 +18,7 @@ import io.trino.orc.OrcColumn; import io.trino.orc.OrcCorruptionException; import io.trino.orc.OrcReader; +import io.trino.orc.OrcReader.FieldMapperFactory; import io.trino.spi.type.TimeType; import io.trino.spi.type.Type; @@ -34,7 +35,8 @@ public static ColumnReader createColumnReader( OrcColumn column, OrcReader.ProjectedLayout projectedLayout, AggregatedMemoryContext systemMemoryContext, - OrcBlockFactory blockFactory) + OrcBlockFactory blockFactory, + FieldMapperFactory fieldMapperFactory) throws OrcCorruptionException { if (type instanceof TimeType) { @@ -68,15 +70,15 @@ public static ColumnReader createColumnReader( case TIMESTAMP_INSTANT: return new TimestampColumnReader(type, column, systemMemoryContext.newLocalMemoryContext(ColumnReaders.class.getSimpleName())); case LIST: - return new ListColumnReader(type, column, systemMemoryContext, blockFactory); + return new ListColumnReader(type, column, systemMemoryContext, blockFactory, fieldMapperFactory); case STRUCT: - return new StructColumnReader(type, column, projectedLayout, systemMemoryContext, blockFactory); + return new StructColumnReader(type, column, projectedLayout, systemMemoryContext, blockFactory, fieldMapperFactory); case MAP: - return new MapColumnReader(type, column, systemMemoryContext, blockFactory); + return new MapColumnReader(type, column, systemMemoryContext, blockFactory, fieldMapperFactory); case DECIMAL: return new DecimalColumnReader(type, column, systemMemoryContext.newLocalMemoryContext(ColumnReaders.class.getSimpleName())); case UNION: - return new UnionColumnReader(type, column, systemMemoryContext, blockFactory); + return new UnionColumnReader(type, column, systemMemoryContext, blockFactory, fieldMapperFactory); default: throw new IllegalArgumentException("Unsupported type: " + column.getColumnType()); } diff --git a/lib/trino-orc/src/main/java/io/trino/orc/reader/ListColumnReader.java b/lib/trino-orc/src/main/java/io/trino/orc/reader/ListColumnReader.java index 4934d7014429..a0f8b0f9bfda 100644 --- a/lib/trino-orc/src/main/java/io/trino/orc/reader/ListColumnReader.java +++ b/lib/trino-orc/src/main/java/io/trino/orc/reader/ListColumnReader.java @@ -18,6 +18,7 @@ import io.trino.orc.OrcBlockFactory; import io.trino.orc.OrcColumn; import io.trino.orc.OrcCorruptionException; +import io.trino.orc.OrcReader.FieldMapperFactory; import io.trino.orc.metadata.ColumnEncoding; import io.trino.orc.metadata.ColumnMetadata; import io.trino.orc.stream.BooleanInputStream; @@ -73,7 +74,7 @@ public class ListColumnReader private boolean rowGroupOpen; - public ListColumnReader(Type type, OrcColumn column, AggregatedMemoryContext systemMemoryContext, OrcBlockFactory blockFactory) + public ListColumnReader(Type type, OrcColumn column, AggregatedMemoryContext systemMemoryContext, OrcBlockFactory blockFactory, FieldMapperFactory fieldMapperFactory) throws OrcCorruptionException { requireNonNull(type, "type is null"); @@ -82,7 +83,13 @@ public ListColumnReader(Type type, OrcColumn column, AggregatedMemoryContext sys this.column = requireNonNull(column, "column is null"); this.blockFactory = requireNonNull(blockFactory, "blockFactory is null"); - this.elementColumnReader = createColumnReader(elementType, column.getNestedColumns().get(0), fullyProjectedLayout(), systemMemoryContext, blockFactory); + this.elementColumnReader = createColumnReader( + elementType, + column.getNestedColumns().get(0), + fullyProjectedLayout(), + systemMemoryContext, + blockFactory, + fieldMapperFactory); } @Override diff --git a/lib/trino-orc/src/main/java/io/trino/orc/reader/MapColumnReader.java b/lib/trino-orc/src/main/java/io/trino/orc/reader/MapColumnReader.java index b83f45135b00..d6e0cee5595f 100644 --- a/lib/trino-orc/src/main/java/io/trino/orc/reader/MapColumnReader.java +++ b/lib/trino-orc/src/main/java/io/trino/orc/reader/MapColumnReader.java @@ -18,6 +18,7 @@ import io.trino.orc.OrcBlockFactory; import io.trino.orc.OrcColumn; import io.trino.orc.OrcCorruptionException; +import io.trino.orc.OrcReader.FieldMapperFactory; import io.trino.orc.metadata.ColumnEncoding; import io.trino.orc.metadata.ColumnMetadata; import io.trino.orc.stream.BooleanInputStream; @@ -77,7 +78,7 @@ public class MapColumnReader private boolean rowGroupOpen; - public MapColumnReader(Type type, OrcColumn column, AggregatedMemoryContext systemMemoryContext, OrcBlockFactory blockFactory) + public MapColumnReader(Type type, OrcColumn column, AggregatedMemoryContext systemMemoryContext, OrcBlockFactory blockFactory, FieldMapperFactory fieldMapperFactory) throws OrcCorruptionException { requireNonNull(type, "type is null"); @@ -86,8 +87,20 @@ public MapColumnReader(Type type, OrcColumn column, AggregatedMemoryContext syst this.column = requireNonNull(column, "column is null"); this.blockFactory = requireNonNull(blockFactory, "blockFactory is null"); - this.keyColumnReader = createColumnReader(this.type.getKeyType(), column.getNestedColumns().get(0), fullyProjectedLayout(), systemMemoryContext, blockFactory); - this.valueColumnReader = createColumnReader(this.type.getValueType(), column.getNestedColumns().get(1), fullyProjectedLayout(), systemMemoryContext, blockFactory); + this.keyColumnReader = createColumnReader( + this.type.getKeyType(), + column.getNestedColumns().get(0), + fullyProjectedLayout(), + systemMemoryContext, + blockFactory, + fieldMapperFactory); + this.valueColumnReader = createColumnReader( + this.type.getValueType(), + column.getNestedColumns().get(1), + fullyProjectedLayout(), + systemMemoryContext, + blockFactory, + fieldMapperFactory); } @Override diff --git a/lib/trino-orc/src/main/java/io/trino/orc/reader/StructColumnReader.java b/lib/trino-orc/src/main/java/io/trino/orc/reader/StructColumnReader.java index 686848b9805d..608ac110b7bc 100644 --- a/lib/trino-orc/src/main/java/io/trino/orc/reader/StructColumnReader.java +++ b/lib/trino-orc/src/main/java/io/trino/orc/reader/StructColumnReader.java @@ -21,6 +21,8 @@ import io.trino.orc.OrcColumn; import io.trino.orc.OrcCorruptionException; import io.trino.orc.OrcReader; +import io.trino.orc.OrcReader.FieldMapper; +import io.trino.orc.OrcReader.FieldMapperFactory; import io.trino.orc.metadata.ColumnEncoding; import io.trino.orc.metadata.ColumnMetadata; import io.trino.orc.stream.BooleanInputStream; @@ -47,7 +49,6 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Verify.verify; -import static com.google.common.collect.ImmutableMap.toImmutableMap; import static io.trino.orc.metadata.Stream.StreamKind.PRESENT; import static io.trino.orc.reader.ColumnReaders.createColumnReader; import static io.trino.orc.reader.ReaderUtils.verifyStreamType; @@ -75,7 +76,13 @@ public class StructColumnReader private boolean rowGroupOpen; - StructColumnReader(Type type, OrcColumn column, OrcReader.ProjectedLayout readLayout, AggregatedMemoryContext systemMemoryContext, OrcBlockFactory blockFactory) + StructColumnReader( + Type type, + OrcColumn column, + OrcReader.ProjectedLayout readLayout, + AggregatedMemoryContext systemMemoryContext, + OrcBlockFactory blockFactory, + FieldMapperFactory fieldMapperFactory) throws OrcCorruptionException { requireNonNull(type, "type is null"); @@ -85,9 +92,7 @@ public class StructColumnReader this.column = requireNonNull(column, "column is null"); this.blockFactory = requireNonNull(blockFactory, "blockFactory is null"); - Map nestedColumns = column.getNestedColumns().stream() - .collect(toImmutableMap(stream -> stream.getColumnName().toLowerCase(Locale.ENGLISH), stream -> stream)); - + FieldMapper fieldMapper = fieldMapperFactory.create(column); ImmutableList.Builder fieldNames = ImmutableList.builder(); ImmutableMap.Builder structFields = ImmutableMap.builder(); for (Field field : this.type.getFields()) { @@ -96,12 +101,20 @@ public class StructColumnReader .toLowerCase(Locale.ENGLISH); fieldNames.add(fieldName); - OrcColumn fieldStream = nestedColumns.get(fieldName); + OrcColumn fieldStream = fieldMapper.get(fieldName); if (fieldStream != null) { OrcReader.ProjectedLayout fieldLayout = readLayout.getFieldLayout(fieldName); if (fieldLayout != null) { - structFields.put(fieldName, createColumnReader(field.getType(), fieldStream, fieldLayout, systemMemoryContext, blockFactory)); + structFields.put( + fieldName, + createColumnReader( + field.getType(), + fieldStream, + fieldLayout, + systemMemoryContext, + blockFactory, + fieldMapperFactory)); } } } diff --git a/lib/trino-orc/src/main/java/io/trino/orc/reader/UnionColumnReader.java b/lib/trino-orc/src/main/java/io/trino/orc/reader/UnionColumnReader.java index 1427310123c3..fb6ff1694d59 100644 --- a/lib/trino-orc/src/main/java/io/trino/orc/reader/UnionColumnReader.java +++ b/lib/trino-orc/src/main/java/io/trino/orc/reader/UnionColumnReader.java @@ -19,6 +19,7 @@ import io.trino.orc.OrcBlockFactory; import io.trino.orc.OrcColumn; import io.trino.orc.OrcCorruptionException; +import io.trino.orc.OrcReader.FieldMapperFactory; import io.trino.orc.metadata.ColumnEncoding; import io.trino.orc.metadata.ColumnMetadata; import io.trino.orc.stream.BooleanInputStream; @@ -81,7 +82,7 @@ public class UnionColumnReader private boolean rowGroupOpen; - UnionColumnReader(Type type, OrcColumn column, AggregatedMemoryContext systemMemoryContext, OrcBlockFactory blockFactory) + UnionColumnReader(Type type, OrcColumn column, AggregatedMemoryContext systemMemoryContext, OrcBlockFactory blockFactory, FieldMapperFactory fieldMapperFactory) throws OrcCorruptionException { requireNonNull(type, "type is null"); @@ -94,7 +95,13 @@ public class UnionColumnReader ImmutableList.Builder fieldReadersBuilder = ImmutableList.builder(); List fields = column.getNestedColumns(); for (int i = 0; i < fields.size(); i++) { - fieldReadersBuilder.add(createColumnReader(type.getTypeParameters().get(i + 1), fields.get(i), fullyProjectedLayout(), systemMemoryContext, blockFactory)); + fieldReadersBuilder.add(createColumnReader( + type.getTypeParameters().get(i + 1), + fields.get(i), + fullyProjectedLayout(), + systemMemoryContext, + blockFactory, + fieldMapperFactory)); } fieldReaders = fieldReadersBuilder.build(); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeleteDeltaPageSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeleteDeltaPageSource.java index cbf4e67522c6..fb68ea652c1f 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeleteDeltaPageSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeleteDeltaPageSource.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import io.trino.memory.context.AggregatedMemoryContext; +import io.trino.orc.NameBasedFieldMapper; import io.trino.orc.OrcColumn; import io.trino.orc.OrcCorruptionException; import io.trino.orc.OrcDataSource; @@ -46,6 +47,7 @@ import static com.google.common.collect.Maps.uniqueIndex; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static io.trino.orc.OrcReader.MAX_BATCH_SIZE; +import static io.trino.orc.OrcReader.ProjectedLayout.fullyProjectedLayout; import static io.trino.orc.OrcReader.createOrcReader; import static io.trino.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; import static io.trino.plugin.hive.HiveErrorCode.HIVE_MISSING_DATA; @@ -144,13 +146,15 @@ private OrcDeleteDeltaPageSource( recordReader = reader.createRecordReader( rowIdColumns, ImmutableList.of(BIGINT, BIGINT), + ImmutableList.of(fullyProjectedLayout(), fullyProjectedLayout()), OrcPredicate.TRUE, 0, fileSize, UTC, systemMemoryContext, MAX_BATCH_SIZE, - exception -> handleException(orcDataSource.getId(), exception)); + exception -> handleException(orcDataSource.getId(), exception), + NameBasedFieldMapper::create); } @Override diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSourceFactory.java index bc81866ae17d..9b69418ade81 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSourceFactory.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import io.trino.memory.context.AggregatedMemoryContext; +import io.trino.orc.NameBasedFieldMapper; import io.trino.orc.OrcColumn; import io.trino.orc.OrcDataSource; import io.trino.orc.OrcDataSourceId; @@ -366,7 +367,8 @@ else if (column.getBaseHiveColumnIndex() < fileColumns.size()) { legacyFileTimeZone, systemMemoryUsage, INITIAL_BATCH_SIZE, - exception -> handleException(orcDataSource.getId(), exception)); + exception -> handleException(orcDataSource.getId(), exception), + NameBasedFieldMapper::create); Optional deletedRows = acidInfo.map(info -> new OrcDeletedRows( diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ColumnIdentity.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ColumnIdentity.java new file mode 100644 index 000000000000..9f1a35aa0da5 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ColumnIdentity.java @@ -0,0 +1,154 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import org.apache.iceberg.types.Types; + +import java.util.List; +import java.util.Objects; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.plugin.iceberg.ColumnIdentity.TypeCategory.ARRAY; +import static io.trino.plugin.iceberg.ColumnIdentity.TypeCategory.MAP; +import static io.trino.plugin.iceberg.ColumnIdentity.TypeCategory.PRIMITIVE; +import static io.trino.plugin.iceberg.ColumnIdentity.TypeCategory.STRUCT; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +public class ColumnIdentity +{ + private final int id; + private final String name; + private final TypeCategory typeCategory; + private final List children; + + @JsonCreator + public ColumnIdentity( + @JsonProperty("id") int id, + @JsonProperty("name") String name, + @JsonProperty("typeCategory") TypeCategory typeCategory, + @JsonProperty("children") List children) + { + this.id = id; + this.name = requireNonNull(name, "name is null"); + this.typeCategory = requireNonNull(typeCategory, "typeCategory is null"); + this.children = requireNonNull(children, "children is null"); + checkArgument( + children.isEmpty() == (typeCategory == PRIMITIVE), + "Children should be empty if and only if column type is primitive"); + } + + @JsonProperty + public int getId() + { + return id; + } + + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public TypeCategory getTypeCategory() + { + return typeCategory; + } + + @JsonProperty + public List getChildren() + { + return children; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ColumnIdentity that = (ColumnIdentity) o; + return id == that.id && + name.equals(that.name) && + typeCategory == that.typeCategory && + children.equals(that.children); + } + + @Override + public int hashCode() + { + return Objects.hash(id, name, typeCategory, children); + } + + @Override + public String toString() + { + return id + ":" + name; + } + + public enum TypeCategory + { + PRIMITIVE, + STRUCT, + ARRAY, + MAP + } + + public static ColumnIdentity primitiveColumnIdentity(int id, String name) + { + return new ColumnIdentity(id, name, PRIMITIVE, ImmutableList.of()); + } + + public static ColumnIdentity createColumnIdentity(Types.NestedField column) + { + int id = column.fieldId(); + String name = column.name(); + org.apache.iceberg.types.Type fieldType = column.type(); + + if (!fieldType.isNestedType()) { + return new ColumnIdentity(id, name, PRIMITIVE, ImmutableList.of()); + } + + if (fieldType.isListType()) { + ColumnIdentity elementColumn = createColumnIdentity(getOnlyElement(fieldType.asListType().fields())); + return new ColumnIdentity(id, name, ARRAY, ImmutableList.of(elementColumn)); + } + + if (fieldType.isStructType()) { + List fieldColumns = fieldType.asStructType().fields().stream() + .map(ColumnIdentity::createColumnIdentity) + .collect(toImmutableList()); + return new ColumnIdentity(id, name, STRUCT, fieldColumns); + } + + if (fieldType.isMapType()) { + List keyValueColumns = fieldType.asMapType().fields().stream() + .map(ColumnIdentity::createColumnIdentity) + .collect(toImmutableList()); + checkArgument(keyValueColumns.size() == 2, "Expected map type to have two fields"); + return new ColumnIdentity(id, name, MAP, keyValueColumns); + } + + throw new UnsupportedOperationException(format("Iceberg column type %s is not supported", fieldType.typeId())); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java index 7142b24842ef..4b0fd392ad77 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java @@ -14,46 +14,43 @@ package io.trino.plugin.iceberg; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.type.Type; +import io.trino.spi.type.TypeManager; +import org.apache.iceberg.types.Types; import java.util.Objects; import java.util.Optional; +import static io.trino.plugin.iceberg.ColumnIdentity.createColumnIdentity; +import static io.trino.plugin.iceberg.ColumnIdentity.primitiveColumnIdentity; +import static io.trino.plugin.iceberg.TypeConverter.toTrinoType; import static java.util.Objects.requireNonNull; public class IcebergColumnHandle implements ColumnHandle { - private final int id; - private final String name; + private final ColumnIdentity columnIdentity; private final Type type; private final Optional comment; @JsonCreator public IcebergColumnHandle( - @JsonProperty("id") int id, - @JsonProperty("name") String name, + @JsonProperty("columnIdentity") ColumnIdentity columnIdentity, @JsonProperty("type") Type type, @JsonProperty("comment") Optional comment) { - this.id = id; - this.name = requireNonNull(name, "name is null"); + this.columnIdentity = requireNonNull(columnIdentity, "columnIdentity is null"); this.type = requireNonNull(type, "type is null"); this.comment = requireNonNull(comment, "comment is null"); } @JsonProperty - public int getId() - { - return id; - } - - @JsonProperty - public String getName() + public ColumnIdentity getColumnIdentity() { - return name; + return columnIdentity; } @JsonProperty @@ -68,10 +65,22 @@ public Optional getComment() return comment; } + @JsonIgnore + public int getId() + { + return columnIdentity.getId(); + } + + @JsonIgnore + public String getName() + { + return columnIdentity.getName(); + } + @Override public int hashCode() { - return Objects.hash(id, name, type, comment); + return Objects.hash(columnIdentity, type, comment); } @Override @@ -84,8 +93,7 @@ public boolean equals(Object obj) return false; } IcebergColumnHandle other = (IcebergColumnHandle) obj; - return this.id == other.id && - Objects.equals(this.name, other.name) && + return Objects.equals(this.columnIdentity, other.columnIdentity) && Objects.equals(this.type, other.type) && Objects.equals(this.comment, other.comment); } @@ -93,6 +101,19 @@ public boolean equals(Object obj) @Override public String toString() { - return id + ":" + name + ":" + type.getDisplayName(); + return getId() + ":" + getName() + ":" + type.getDisplayName(); + } + + public static IcebergColumnHandle primitiveIcebergColumnHandle(int id, String name, Type type, Optional comment) + { + return new IcebergColumnHandle(primitiveColumnIdentity(id, name), type, comment); + } + + public static IcebergColumnHandle create(Types.NestedField column, TypeManager typeManager) + { + return new IcebergColumnHandle( + createColumnIdentity(column), + toTrinoType(column.type(), typeManager), + Optional.ofNullable(column.doc())); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 340a1f5923cc..3a2585a3d585 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -118,6 +118,7 @@ import static io.trino.plugin.hive.metastore.StorageFormat.VIEW_STORAGE_FORMAT; import static io.trino.plugin.hive.util.HiveWriteUtils.getTableDefaultLocation; import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression; +import static io.trino.plugin.iceberg.IcebergColumnHandle.primitiveIcebergColumnHandle; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; import static io.trino.plugin.iceberg.IcebergSchemaProperties.getSchemaLocation; import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY; @@ -592,7 +593,7 @@ public Optional finishInsert(ConnectorSession session, @Override public ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle) { - return new IcebergColumnHandle(0, "$row_id", BIGINT, Optional.empty()); + return primitiveIcebergColumnHandle(0, "$row_id", BIGINT, Optional.empty()); } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index eac170081e05..c0e97f297578 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -15,7 +15,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import io.trino.memory.context.AggregatedMemoryContext; +import io.trino.orc.NameBasedFieldMapper; import io.trino.orc.OrcColumn; import io.trino.orc.OrcCorruptionException; import io.trino.orc.OrcDataSource; @@ -74,6 +76,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -85,6 +88,7 @@ import static com.google.common.collect.Maps.uniqueIndex; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static io.trino.orc.OrcReader.INITIAL_BATCH_SIZE; +import static io.trino.orc.OrcReader.ProjectedLayout.fullyProjectedLayout; import static io.trino.parquet.ParquetTypeUtils.getColumnIO; import static io.trino.parquet.ParquetTypeUtils.getDescriptors; import static io.trino.parquet.ParquetTypeUtils.getParquetTypeByName; @@ -310,13 +314,17 @@ private static ConnectorPageSource createOrcPageSource( OrcRecordReader recordReader = reader.createRecordReader( fileReadColumns, fileReadTypes, + Collections.nCopies(fileReadColumns.size(), fullyProjectedLayout()), predicateBuilder.build(), start, length, UTC, systemMemoryUsage, INITIAL_BATCH_SIZE, - exception -> handleException(orcDataSourceId, exception)); + exception -> handleException(orcDataSourceId, exception), + fileColumnsByIcebergId.isEmpty() + ? NameBasedFieldMapper::create + : new IdBasedFieldMapperFactory(columns)); return new OrcPageSource( recordReader, @@ -346,6 +354,71 @@ private static ConnectorPageSource createOrcPageSource( } } + private static class IdBasedFieldMapperFactory + implements OrcReader.FieldMapperFactory + { + // Stores a mapping between subfield names and ids for every top-level/nested column id + private final Map> fieldNameToIdMappingForTableColumns; + + public IdBasedFieldMapperFactory(List columns) + { + requireNonNull(columns, "columns is null"); + + ImmutableMap.Builder> mapping = ImmutableMap.builder(); + for (IcebergColumnHandle column : columns) { + // Recursively compute subfield name to id mapping for every column + populateMapping(column.getColumnIdentity(), mapping); + } + + this.fieldNameToIdMappingForTableColumns = mapping.build(); + } + + @Override + public OrcReader.FieldMapper create(OrcColumn column) + { + Map nestedColumns = Maps.uniqueIndex( + column.getNestedColumns(), + field -> Integer.valueOf(field.getAttributes().get(ORC_ICEBERG_ID_KEY))); + + int icebergId = Integer.valueOf(column.getAttributes().get(ORC_ICEBERG_ID_KEY)); + return new IdBasedFieldMapper(nestedColumns, fieldNameToIdMappingForTableColumns.get(icebergId)); + } + + private static void populateMapping( + ColumnIdentity identity, + ImmutableMap.Builder> fieldNameToIdMappingForTableColumns) + { + fieldNameToIdMappingForTableColumns.put( + identity.getId(), + identity.getChildren().stream() + .collect(toImmutableMap(ColumnIdentity::getName, ColumnIdentity::getId))); + + for (ColumnIdentity child : identity.getChildren()) { + populateMapping(child, fieldNameToIdMappingForTableColumns); + } + } + } + + private static class IdBasedFieldMapper + implements OrcReader.FieldMapper + { + private final Map idToColumnMappingForFile; + private final Map nameToIdMappingForTableColumns; + + public IdBasedFieldMapper(Map idToColumnMappingForFile, Map nameToIdMappingForTableColumns) + { + this.idToColumnMappingForFile = requireNonNull(idToColumnMappingForFile, "idToColumnMappingForFile is null"); + this.nameToIdMappingForTableColumns = requireNonNull(nameToIdMappingForTableColumns, "nameToIdMappingForTableColumns is null"); + } + + @Override + public OrcColumn get(String fieldName) + { + int fieldId = nameToIdMappingForTableColumns.get(fieldName); + return idToColumnMappingForFile.get(fieldId); + } + } + private static ConnectorPageSource createParquetPageSource( HdfsEnvironment hdfsEnvironment, String user, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index 96f7a05ecc0c..103696c28031 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -58,7 +58,6 @@ import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_PARTITION_VALUE; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID; -import static io.trino.plugin.iceberg.TypeConverter.toTrinoType; import static io.trino.plugin.iceberg.util.Timestamps.timestampTzFromMicros; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static io.trino.spi.type.BigintType.BIGINT; @@ -121,11 +120,7 @@ public static long resolveSnapshotId(Table table, long snapshotId) public static List getColumns(Schema schema, TypeManager typeManager) { return schema.columns().stream() - .map(column -> new IcebergColumnHandle( - column.fieldId(), - column.name(), - toTrinoType(column.type(), typeManager), - Optional.ofNullable(column.doc()))) + .map(column -> IcebergColumnHandle.create(column, typeManager)) .collect(toImmutableList()); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergColumnHandle.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergColumnHandle.java new file mode 100644 index 000000000000..5f9dfd14b0d4 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergColumnHandle.java @@ -0,0 +1,77 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.airlift.json.JsonCodec; +import io.airlift.json.JsonCodecFactory; +import io.airlift.json.ObjectMapperProvider; +import io.trino.spi.type.ArrayType; +import io.trino.spi.type.RowType; +import io.trino.spi.type.Type; +import io.trino.type.TypeDeserializer; +import org.testng.annotations.Test; + +import java.util.Optional; + +import static io.trino.metadata.MetadataManager.createTestMetadataManager; +import static io.trino.plugin.iceberg.ColumnIdentity.TypeCategory.ARRAY; +import static io.trino.plugin.iceberg.ColumnIdentity.TypeCategory.PRIMITIVE; +import static io.trino.plugin.iceberg.ColumnIdentity.TypeCategory.STRUCT; +import static io.trino.plugin.iceberg.IcebergColumnHandle.primitiveIcebergColumnHandle; +import static io.trino.spi.type.BigintType.BIGINT; +import static org.testng.Assert.assertEquals; + +public class TestIcebergColumnHandle +{ + @Test + public void testRoundTrip() + { + testRoundTrip(primitiveIcebergColumnHandle(12, "blah", BIGINT, Optional.of("this is a comment"))); + + // Nested column + ColumnIdentity foo1 = new ColumnIdentity(1, "foo1", PRIMITIVE, ImmutableList.of()); + ColumnIdentity foo2 = new ColumnIdentity(2, "foo2", PRIMITIVE, ImmutableList.of()); + ColumnIdentity foo3 = new ColumnIdentity(3, "foo3", ARRAY, ImmutableList.of(foo1)); + IcebergColumnHandle nestedColumn = new IcebergColumnHandle( + new ColumnIdentity( + 5, + "foo5", + STRUCT, + ImmutableList.of(foo2, foo3)), + RowType.from(ImmutableList.of( + RowType.field("foo2", BIGINT), + RowType.field("foo3", new ArrayType(BIGINT)))), + Optional.empty()); + testRoundTrip(nestedColumn); + } + + private void testRoundTrip(IcebergColumnHandle expected) + { + ObjectMapperProvider objectMapperProvider = new ObjectMapperProvider(); + objectMapperProvider.setJsonDeserializers(ImmutableMap.of(Type.class, new TypeDeserializer(createTestMetadataManager()))); + JsonCodec codec = new JsonCodecFactory(objectMapperProvider).jsonCodec(IcebergColumnHandle.class); + + String json = codec.toJson(expected); + IcebergColumnHandle actual = codec.fromJson(json); + + assertEquals(actual, expected); + assertEquals(actual.getName(), expected.getName()); + assertEquals(actual.getColumnIdentity(), expected.getColumnIdentity()); + assertEquals(actual.getId(), actual.getId()); + assertEquals(actual.getType(), expected.getType()); + assertEquals(actual.getComment(), expected.getComment()); + } +} diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/iceberg/TestSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/iceberg/TestSparkCompatibility.java index c3cbd8047a8f..d5bd2f171212 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/iceberg/TestSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/iceberg/TestSparkCompatibility.java @@ -28,6 +28,7 @@ import static io.trino.tests.utils.QueryExecutors.onPresto; import static io.trino.tests.utils.QueryExecutors.onSpark; import static java.lang.String.format; +import static org.testng.Assert.assertEquals; public class TestSparkCompatibility extends ProductTest @@ -359,6 +360,38 @@ public void testSparkReadingNestedPrestoData() assertThat(sparkResult).containsOnly(row); } + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + public void testIdBasedFieldMapping() + { + String baseTableName = "test_schema_evolution_for_nested_fields"; + String prestoTableName = prestoTableName(baseTableName); + String sparkTableName = sparkTableName(baseTableName); + + onPresto().executeQuery(format( + "CREATE TABLE %s (_struct ROW(rename BIGINT, keep BIGINT, drop_and_add BIGINT), _partition BIGINT) " + + "WITH (partitioning = ARRAY['_partition'])", + prestoTableName)); + onPresto().executeQuery(format("INSERT INTO %s VALUES (row(1, 2, 3), 1001)", prestoTableName)); + + // Alter nested fields using Spark. Presto does not support this yet. + onSpark().executeQuery(format("ALTER TABLE %s RENAME COLUMN _struct.rename TO renamed", sparkTableName)); + onSpark().executeQuery(format("ALTER TABLE %s DROP COLUMN _struct.drop_and_add", sparkTableName)); + onSpark().executeQuery(format("ALTER TABLE %s ADD COLUMN _struct.drop_and_add BIGINT", sparkTableName)); + + Row expected = row( + rowBuilder() + // Rename does not change id + .addField("renamed", 1L) + .addField("keep", 2L) + // Dropping and re-adding changes id + .addField("drop_and_add", null) + .build(), + 1001); + + QueryResult result = onPresto().executeQuery(format("SELECT * FROM %s", prestoTableName)); + assertEquals(result.column(1).get(0), expected.getValues().get(0)); + } + private static String sparkTableName(String tableName) { return format("%s.default.%s", SPARK_CATALOG, tableName); @@ -368,4 +401,9 @@ private static String prestoTableName(String tableName) { return format("%s.default.%s", PRESTO_CATALOG, tableName); } + + private io.trino.jdbc.Row.Builder rowBuilder() + { + return io.trino.jdbc.Row.builder(); + } }