From c77ebcfb0b65a95b0bba3c8792f03a54405d7c4b Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Mon, 12 Jul 2021 20:43:12 -0700 Subject: [PATCH] Iceberg: support Parquet read with delete filter --- plugin/trino-iceberg/pom.xml | 8 +- .../plugin/iceberg/IcebergColumnHandle.java | 5 + .../trino/plugin/iceberg/IcebergMetadata.java | 3 + .../plugin/iceberg/IcebergPageSource.java | 20 ++- .../iceberg/IcebergPageSourceProvider.java | 44 +++++- .../io/trino/plugin/iceberg/IcebergSplit.java | 63 ++++----- .../plugin/iceberg/IcebergSplitSource.java | 7 +- .../plugin/iceberg/IcebergTableHandle.java | 21 +++ .../plugin/iceberg/TrinoDeleteFilter.java | 45 ++++++ .../io/trino/plugin/iceberg/TrinoRow.java | 129 ++++++++++++++++++ 10 files changed, 301 insertions(+), 44 deletions(-) create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoDeleteFilter.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoRow.java diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index 450cdcc79de2..a377b281526e 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -15,7 +15,7 @@ ${project.parent.basedir} - 0.11.0 + 0.11.1 @@ -144,6 +144,12 @@ + + org.apache.iceberg + iceberg-data + ${dep.iceberg.version} + + org.apache.iceberg iceberg-hive-metastore diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java index 4b0fd392ad77..cbeed35ff956 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java @@ -19,6 +19,7 @@ import io.trino.spi.connector.ColumnHandle; import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.types.Types; import java.util.Objects; @@ -27,11 +28,15 @@ import static io.trino.plugin.iceberg.ColumnIdentity.createColumnIdentity; import static io.trino.plugin.iceberg.ColumnIdentity.primitiveColumnIdentity; import static io.trino.plugin.iceberg.TypeConverter.toTrinoType; +import static io.trino.spi.type.BigintType.BIGINT; import static java.util.Objects.requireNonNull; public class IcebergColumnHandle implements ColumnHandle { + public static final IcebergColumnHandle ROW_POSITION_COLUMN = new IcebergColumnHandle( + createColumnIdentity(MetadataColumns.ROW_POSITION), BIGINT, Optional.of(MetadataColumns.ROW_POSITION.doc())); + private final ColumnIdentity columnIdentity; private final Type type; private final Optional comment; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 903ddeaf17fe..4e7fc016593b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -166,6 +166,7 @@ import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; import static org.apache.iceberg.Transactions.createTableTransaction; +import static org.apache.iceberg.util.SerializationUtil.serializeToBytes; public class IcebergMetadata implements ConnectorMetadata @@ -255,6 +256,7 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa name.getTableName(), name.getTableType(), snapshotId, + serializeToBytes(table.schema()), TupleDomain.all(), TupleDomain.all()); } @@ -814,6 +816,7 @@ public Optional> applyFilter(C table.getTableName(), table.getTableType(), table.getSnapshotId(), + serializeToBytes(icebergTable.schema()), newUnenforcedConstraint, newEnforcedConstraint), newUnenforcedConstraint.transformKeys(ColumnHandle.class::cast), diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java index b8b32e5ebf3a..de9468ab331b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java @@ -21,13 +21,17 @@ import io.trino.spi.predicate.Utils; import io.trino.spi.type.TimeZoneKey; import io.trino.spi.type.Type; +import org.apache.iceberg.io.CloseableIterable; import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; import java.util.Map; +import java.util.stream.IntStream; +import java.util.stream.StreamSupport; import static com.google.common.base.Throwables.throwIfInstanceOf; +import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue; import static java.util.Objects.requireNonNull; @@ -38,16 +42,21 @@ public class IcebergPageSource private final Block[] prefilledBlocks; private final int[] delegateIndexes; private final ConnectorPageSource delegate; + private final TrinoDeleteFilter deleteFilter; + private final List columnTypes; public IcebergPageSource( List columns, Map partitionKeys, ConnectorPageSource delegate, + TrinoDeleteFilter deleteFilter, TimeZoneKey timeZoneKey) { int size = requireNonNull(columns, "columns is null").size(); requireNonNull(partitionKeys, "partitionKeys is null"); this.delegate = requireNonNull(delegate, "delegate is null"); + this.deleteFilter = requireNonNull(deleteFilter, "deleteFilter is null"); + this.columnTypes = columns.stream().map(IcebergColumnHandle::getType).collect(toImmutableList()); this.prefilledBlocks = new Block[size]; this.delegateIndexes = new int[size]; @@ -106,7 +115,16 @@ public Page getNextPage() blocks[i] = dataPage.getBlock(delegateIndexes[i]); } } - return new Page(batchSize, blocks); + + CloseableIterable filteredRows = deleteFilter.filter(CloseableIterable.transform( + CloseableIterable.withNoopClose(IntStream.range(0, batchSize).boxed().collect(toImmutableList())), + p -> new TrinoRow(columnTypes, blocks, p))); + int[] positionsToKeep = StreamSupport.stream(filteredRows.spliterator(), false).mapToInt(TrinoRow::getPosition).toArray(); + Block[] filteredBlocks = new Block[prefilledBlocks.length]; + for (int i = 0; i < filteredBlocks.length; i++) { + filteredBlocks[i] = blocks[i].getPositions(positionsToKeep, 0, positionsToKeep.length); + } + return new Page(positionsToKeep.length, filteredBlocks); } catch (RuntimeException e) { closeWithSuppression(e); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index 928e3cf9f353..bcd134647179 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -59,6 +59,7 @@ import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.StandardTypes; import io.trino.spi.type.Type; +import io.trino.spi.type.TypeManager; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; @@ -66,6 +67,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.BlockMissingException; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.types.Types; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.FileMetaData; @@ -78,6 +82,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -96,6 +101,7 @@ import static io.trino.parquet.predicate.PredicateUtils.buildPredicate; import static io.trino.parquet.predicate.PredicateUtils.predicateMatches; import static io.trino.plugin.hive.parquet.ParquetColumnIOConverter.constructField; +import static io.trino.plugin.iceberg.IcebergColumnHandle.ROW_POSITION_COLUMN; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CURSOR_ERROR; @@ -111,12 +117,14 @@ import static io.trino.plugin.iceberg.IcebergSessionProperties.isOrcBloomFiltersEnabled; import static io.trino.plugin.iceberg.IcebergSessionProperties.isOrcNestedLazy; import static io.trino.plugin.iceberg.IcebergSessionProperties.isUseFileSizeFromMetadata; +import static io.trino.plugin.iceberg.IcebergUtil.getColumns; import static io.trino.plugin.iceberg.TypeConverter.ORC_ICEBERG_ID_KEY; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static java.lang.String.format; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toCollection; import static java.util.stream.Collectors.toList; import static org.joda.time.DateTimeZone.UTC; @@ -127,18 +135,24 @@ public class IcebergPageSourceProvider private final FileFormatDataSourceStats fileFormatDataSourceStats; private final OrcReaderOptions orcReaderOptions; private final ParquetReaderOptions parquetReaderOptions; + private final FileIoProvider fileIoProvider; + private final TypeManager typeManager; @Inject public IcebergPageSourceProvider( HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats fileFormatDataSourceStats, OrcReaderConfig orcReaderConfig, - ParquetReaderConfig parquetReaderConfig) + ParquetReaderConfig parquetReaderConfig, + FileIoProvider fileIoProvider, + TypeManager typeManager) { this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.fileFormatDataSourceStats = requireNonNull(fileFormatDataSourceStats, "fileFormatDataSourceStats is null"); this.orcReaderOptions = requireNonNull(orcReaderConfig, "orcReaderConfig is null").toOrcReaderOptions(); this.parquetReaderOptions = requireNonNull(parquetReaderConfig, "parquetReaderConfig is null").toParquetReaderOptions(); + this.fileIoProvider = requireNonNull(fileIoProvider, "fileIoProvider is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); } @Override @@ -159,12 +173,22 @@ public ConnectorPageSource createPageSource( Map partitionKeys = split.getPartitionKeys(); - List regularColumns = columns.stream() + LinkedHashSet regularColumns = columns.stream() .map(IcebergColumnHandle.class::cast) .filter(column -> !partitionKeys.containsKey(column.getId())) - .collect(toImmutableList()); + .collect(toCollection(LinkedHashSet::new)); HdfsContext hdfsContext = new HdfsContext(session); + FileIO fileIo = fileIoProvider.createFileIo(hdfsContext, null); + List deleteReadFields = icebergColumns.stream() + .map(column -> table.getSchema().findField(column.getId())) + .collect(toImmutableList()); + Schema deleteReadSchema = new Schema(deleteReadFields); + TrinoDeleteFilter deleteFilter = new TrinoDeleteFilter(fileIo, split.getTask(), deleteReadSchema, deleteReadSchema); + getColumns(deleteFilter.requiredSchema(), typeManager).stream() + .filter(column -> !partitionKeys.containsKey(column.getId())) + .forEachOrdered(regularColumns::add); + ConnectorPageSource dataPageSource = createDataPageSource( session, hdfsContext, @@ -173,10 +197,10 @@ public ConnectorPageSource createPageSource( split.getLength(), split.getFileSize(), split.getFileFormat(), - regularColumns, + ImmutableList.copyOf(regularColumns), table.getUnenforcedPredicate()); - return new IcebergPageSource(icebergColumns, partitionKeys, dataPageSource, session.getTimeZoneKey()); + return new IcebergPageSource(icebergColumns, partitionKeys, dataPageSource, deleteFilter, session.getTimeZoneKey()); } private ConnectorPageSource createDataPageSource( @@ -201,8 +225,14 @@ private ConnectorPageSource createDataPageSource( } } + List rowIndexPositions = dataColumns.stream().map(ROW_POSITION_COLUMN::equals).collect(toImmutableList()); + switch (fileFormat) { case ORC: + if (rowIndexPositions.stream().anyMatch(v -> v)) { + throw new UnsupportedOperationException("positional delete is not supported by ORC"); + } + return createOrcPageSource( hdfsEnvironment, session.getUser(), @@ -233,6 +263,7 @@ private ConnectorPageSource createDataPageSource( length, fileSize, dataColumns, + rowIndexPositions, parquetReaderOptions .withMaxReadBlockSize(getParquetMaxReadBlockSize(session)), predicate, @@ -433,6 +464,7 @@ private static ConnectorPageSource createParquetPageSource( long length, long fileSize, List regularColumns, + List rowIndexLocations, ParquetReaderOptions options, TupleDomain effectivePredicate, FileFormatDataSourceStats fileFormatDataSourceStats) @@ -507,7 +539,7 @@ private static ConnectorPageSource createParquetPageSource( } } - return new ParquetPageSource(parquetReader, trinoTypes.build(), internalFields.build()); + return new ParquetPageSource(parquetReader, trinoTypes.build(), rowIndexLocations, internalFields.build()); } catch (IOException | RuntimeException e) { try { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java index ff4f58a00f2b..d58d47f88c3b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java @@ -20,6 +20,7 @@ import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; import java.util.Collections; import java.util.List; @@ -27,33 +28,24 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.util.SerializationUtil.deserializeFromBytes; public class IcebergSplit implements ConnectorSplit { - private final String path; - private final long start; - private final long length; - private final long fileSize; - private final FileFormat fileFormat; + private final byte[] serializedTask; private final List addresses; private final Map partitionKeys; + private transient FileScanTask task; + @JsonCreator public IcebergSplit( - @JsonProperty("path") String path, - @JsonProperty("start") long start, - @JsonProperty("length") long length, - @JsonProperty("fileSize") long fileSize, - @JsonProperty("fileFormat") FileFormat fileFormat, + @JsonProperty("serializedTask") byte[] serializedTask, @JsonProperty("addresses") List addresses, @JsonProperty("partitionKeys") Map partitionKeys) { - this.path = requireNonNull(path, "path is null"); - this.start = start; - this.length = length; - this.fileSize = fileSize; - this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); + this.serializedTask = requireNonNull(serializedTask, "serializedTask is null"); this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null")); this.partitionKeys = Collections.unmodifiableMap(requireNonNull(partitionKeys, "partitionKeys is null")); } @@ -64,6 +56,20 @@ public boolean isRemotelyAccessible() return true; } + @JsonProperty + public byte[] getSerializedTask() + { + return serializedTask; + } + + public FileScanTask getTask() + { + if (task == null) { + task = deserializeFromBytes(serializedTask); + } + return task; + } + @JsonProperty @Override public List getAddresses() @@ -71,34 +77,29 @@ public List getAddresses() return addresses; } - @JsonProperty public String getPath() { - return path; + return getTask().file().path().toString(); } - @JsonProperty public long getStart() { - return start; + return getTask().start(); } - @JsonProperty public long getLength() { - return length; + return getTask().length(); } - @JsonProperty public long getFileSize() { - return fileSize; + return getTask().file().fileSizeInBytes(); } - @JsonProperty public FileFormat getFileFormat() { - return fileFormat; + return getTask().file().format(); } @JsonProperty @@ -111,9 +112,9 @@ public Map getPartitionKeys() public Object getInfo() { return ImmutableMap.builder() - .put("path", path) - .put("start", start) - .put("length", length) + .put("path", getPath()) + .put("start", getStart()) + .put("length", getLength()) .build(); } @@ -121,9 +122,9 @@ public Object getInfo() public String toString() { return toStringHelper(this) - .addValue(path) - .addValue(start) - .addValue(length) + .addValue(getPath()) + .addValue(getStart()) + .addValue(getLength()) .toString(); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java index eff0b1175c7d..a52ffedb6e6e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java @@ -34,6 +34,7 @@ import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.iceberg.util.SerializationUtil.serializeToBytes; public class IcebergSplitSource implements ConnectorSplitSource @@ -89,11 +90,7 @@ private ConnectorSplit toIcebergSplit(FileScanTask task) // on reader side evaluating a condition that we know will always be true. return new IcebergSplit( - task.file().path().toString(), - task.start(), - task.length(), - task.file().fileSizeInBytes(), - task.file().format(), + serializeToBytes(task), ImmutableList.of(), getPartitionKeys(task)); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java index b0a7d0af830d..c6e7bbe6834e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java @@ -18,12 +18,14 @@ import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.predicate.TupleDomain; +import org.apache.iceberg.Schema; import java.util.Locale; import java.util.Objects; import java.util.Optional; import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.util.SerializationUtil.deserializeFromBytes; public class IcebergTableHandle implements ConnectorTableHandle @@ -32,6 +34,7 @@ public class IcebergTableHandle private final String tableName; private final TableType tableType; private final Optional snapshotId; + private final byte[] serializedSchema; // Filter used during split generation and table scan, but not required to be strictly enforced by Iceberg Connector private final TupleDomain unenforcedPredicate; @@ -39,12 +42,15 @@ public class IcebergTableHandle // Filter guaranteed to be enforced by Iceberg connector private final TupleDomain enforcedPredicate; + private transient Schema schema; + @JsonCreator public IcebergTableHandle( @JsonProperty("schemaName") String schemaName, @JsonProperty("tableName") String tableName, @JsonProperty("tableType") TableType tableType, @JsonProperty("snapshotId") Optional snapshotId, + @JsonProperty("serializedSchema") byte[] serializedSchema, @JsonProperty("unenforcedPredicate") TupleDomain unenforcedPredicate, @JsonProperty("enforcedPredicate") TupleDomain enforcedPredicate) { @@ -52,6 +58,7 @@ public IcebergTableHandle( this.tableName = requireNonNull(tableName, "tableName is null"); this.tableType = requireNonNull(tableType, "tableType is null"); this.snapshotId = requireNonNull(snapshotId, "snapshotId is null"); + this.serializedSchema = requireNonNull(serializedSchema, "serializedSchema is null"); this.unenforcedPredicate = requireNonNull(unenforcedPredicate, "unenforcedPredicate is null"); this.enforcedPredicate = requireNonNull(enforcedPredicate, "enforcedPredicate is null"); } @@ -80,6 +87,20 @@ public Optional getSnapshotId() return snapshotId; } + @JsonProperty + public byte[] getSerializedSchema() + { + return serializedSchema; + } + + public Schema getSchema() + { + if (schema == null) { + schema = deserializeFromBytes(serializedSchema); + } + return schema; + } + @JsonProperty public TupleDomain getUnenforcedPredicate() { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoDeleteFilter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoDeleteFilter.java new file mode 100644 index 000000000000..04f579e9fa68 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoDeleteFilter.java @@ -0,0 +1,45 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; + +public class TrinoDeleteFilter + extends DeleteFilter +{ + private final FileIO fileIO; + + public TrinoDeleteFilter(FileIO fileIO, FileScanTask task, Schema tableSchema, Schema requestedSchema) + { + super(task, tableSchema, requestedSchema); + this.fileIO = fileIO; + } + + @Override + protected StructLike asStructLike(TrinoRow trinoRow) + { + return trinoRow; + } + + @Override + protected InputFile getInputFile(String s) + { + return fileIO.newInputFile(s); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoRow.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoRow.java new file mode 100644 index 000000000000..cdaccbfef87a --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoRow.java @@ -0,0 +1,129 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import io.trino.spi.TrinoException; +import io.trino.spi.block.Block; +import io.trino.spi.type.BigintType; +import io.trino.spi.type.BooleanType; +import io.trino.spi.type.DateType; +import io.trino.spi.type.DecimalType; +import io.trino.spi.type.DoubleType; +import io.trino.spi.type.IntegerType; +import io.trino.spi.type.RealType; +import io.trino.spi.type.SmallintType; +import io.trino.spi.type.TinyintType; +import io.trino.spi.type.Type; +import io.trino.spi.type.VarbinaryType; +import io.trino.spi.type.VarcharType; +import org.apache.iceberg.StructLike; + +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.trino.plugin.iceberg.util.Timestamps.getTimestampTz; +import static io.trino.plugin.iceberg.util.Timestamps.timestampTzToMicros; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static io.trino.spi.type.Decimals.readBigDecimal; +import static io.trino.spi.type.TimeType.TIME_MICROS; +import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS; +import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS; +import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MICROSECOND; +import static java.lang.Float.intBitsToFloat; +import static java.lang.Math.toIntExact; +import static java.util.Objects.requireNonNull; + +public class TrinoRow + implements StructLike +{ + private final List types; + private final Block[] blocks; + private final int position; + + public TrinoRow(List types, Block[] blocks, int position) + { + this.types = requireNonNull(types, "types list is null"); + this.blocks = requireNonNull(blocks, "blocks array is null"); + checkArgument(position >= 0, "page position must be positive: %s", position); + this.position = position; + } + + public int getPosition() + { + return position; + } + + @Override + public int size() + { + return blocks.length; + } + + @Override + public T get(int i, Class aClass) + { + Block block = blocks[i].getLoadedBlock(); + Type type = types.get(i); + T value; + // TODO: can refactor with IcebergPageSink.getIcebergValue + if (block.isNull(position)) { + value = null; + } + else if (type instanceof BigintType) { + value = aClass.cast(type.getLong(block, position)); + } + else if (type instanceof IntegerType || type instanceof SmallintType || type instanceof TinyintType || type instanceof DateType) { + value = aClass.cast(toIntExact(type.getLong(block, position))); + } + else if (type instanceof BooleanType) { + value = aClass.cast(type.getBoolean(block, position)); + } + else if (type instanceof DecimalType) { + value = aClass.cast(readBigDecimal((DecimalType) type, block, position)); + } + else if (type instanceof RealType) { + value = aClass.cast(intBitsToFloat(toIntExact(type.getLong(block, position)))); + } + else if (type instanceof DoubleType) { + value = aClass.cast(type.getDouble(block, position)); + } + else if (type.equals(TIME_MICROS)) { + value = aClass.cast(type.getLong(block, position) / PICOSECONDS_PER_MICROSECOND); + } + else if (type.equals(TIMESTAMP_MICROS)) { + value = aClass.cast(type.getLong(block, position)); + } + else if (type.equals(TIMESTAMP_TZ_MICROS)) { + value = aClass.cast(timestampTzToMicros(getTimestampTz(block, position))); + } + else if (type instanceof VarbinaryType) { + value = aClass.cast(type.getSlice(block, position).getBytes()); + } + else if (type instanceof VarcharType) { + value = aClass.cast(type.getSlice(block, position).toStringUtf8()); + } + else { + // will most likely throw unsupported exception + value = block.getObject(position, aClass); + } + + return value; + } + + @Override + public void set(int i, T t) + { + throw new TrinoException(NOT_SUPPORTED, "writing to TrinoRow is not supported"); + } +}