diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml
index 450cdcc79de2..a377b281526e 100644
--- a/plugin/trino-iceberg/pom.xml
+++ b/plugin/trino-iceberg/pom.xml
@@ -15,7 +15,7 @@
${project.parent.basedir}
- 0.11.0
+ 0.11.1
@@ -144,6 +144,12 @@
+
+ org.apache.iceberg
+ iceberg-data
+ ${dep.iceberg.version}
+
+
org.apache.iceberg
iceberg-hive-metastore
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java
index 4b0fd392ad77..cbeed35ff956 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java
@@ -19,6 +19,7 @@
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
+import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.types.Types;
import java.util.Objects;
@@ -27,11 +28,15 @@
import static io.trino.plugin.iceberg.ColumnIdentity.createColumnIdentity;
import static io.trino.plugin.iceberg.ColumnIdentity.primitiveColumnIdentity;
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
+import static io.trino.spi.type.BigintType.BIGINT;
import static java.util.Objects.requireNonNull;
public class IcebergColumnHandle
implements ColumnHandle
{
+ public static final IcebergColumnHandle ROW_POSITION_COLUMN = new IcebergColumnHandle(
+ createColumnIdentity(MetadataColumns.ROW_POSITION), BIGINT, Optional.of(MetadataColumns.ROW_POSITION.doc()));
+
private final ColumnIdentity columnIdentity;
private final Type type;
private final Optional comment;
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
index 903ddeaf17fe..4e7fc016593b 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
@@ -166,6 +166,7 @@
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.Transactions.createTableTransaction;
+import static org.apache.iceberg.util.SerializationUtil.serializeToBytes;
public class IcebergMetadata
implements ConnectorMetadata
@@ -255,6 +256,7 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
name.getTableName(),
name.getTableType(),
snapshotId,
+ serializeToBytes(table.schema()),
TupleDomain.all(),
TupleDomain.all());
}
@@ -814,6 +816,7 @@ public Optional> applyFilter(C
table.getTableName(),
table.getTableType(),
table.getSnapshotId(),
+ serializeToBytes(icebergTable.schema()),
newUnenforcedConstraint,
newEnforcedConstraint),
newUnenforcedConstraint.transformKeys(ColumnHandle.class::cast),
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java
index b8b32e5ebf3a..de9468ab331b 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java
@@ -21,13 +21,17 @@
import io.trino.spi.predicate.Utils;
import io.trino.spi.type.TimeZoneKey;
import io.trino.spi.type.Type;
+import org.apache.iceberg.io.CloseableIterable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
import static com.google.common.base.Throwables.throwIfInstanceOf;
+import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA;
import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue;
import static java.util.Objects.requireNonNull;
@@ -38,16 +42,21 @@ public class IcebergPageSource
private final Block[] prefilledBlocks;
private final int[] delegateIndexes;
private final ConnectorPageSource delegate;
+ private final TrinoDeleteFilter deleteFilter;
+ private final List columnTypes;
public IcebergPageSource(
List columns,
Map partitionKeys,
ConnectorPageSource delegate,
+ TrinoDeleteFilter deleteFilter,
TimeZoneKey timeZoneKey)
{
int size = requireNonNull(columns, "columns is null").size();
requireNonNull(partitionKeys, "partitionKeys is null");
this.delegate = requireNonNull(delegate, "delegate is null");
+ this.deleteFilter = requireNonNull(deleteFilter, "deleteFilter is null");
+ this.columnTypes = columns.stream().map(IcebergColumnHandle::getType).collect(toImmutableList());
this.prefilledBlocks = new Block[size];
this.delegateIndexes = new int[size];
@@ -106,7 +115,16 @@ public Page getNextPage()
blocks[i] = dataPage.getBlock(delegateIndexes[i]);
}
}
- return new Page(batchSize, blocks);
+
+ CloseableIterable filteredRows = deleteFilter.filter(CloseableIterable.transform(
+ CloseableIterable.withNoopClose(IntStream.range(0, batchSize).boxed().collect(toImmutableList())),
+ p -> new TrinoRow(columnTypes, blocks, p)));
+ int[] positionsToKeep = StreamSupport.stream(filteredRows.spliterator(), false).mapToInt(TrinoRow::getPosition).toArray();
+ Block[] filteredBlocks = new Block[prefilledBlocks.length];
+ for (int i = 0; i < filteredBlocks.length; i++) {
+ filteredBlocks[i] = blocks[i].getPositions(positionsToKeep, 0, positionsToKeep.length);
+ }
+ return new Page(positionsToKeep.length, filteredBlocks);
}
catch (RuntimeException e) {
closeWithSuppression(e);
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java
index 928e3cf9f353..bcd134647179 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java
@@ -59,6 +59,7 @@
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.StandardTypes;
import io.trino.spi.type.Type;
+import io.trino.spi.type.TypeManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -66,6 +67,9 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockMissingException;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.types.Types;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
@@ -78,6 +82,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -96,6 +101,7 @@
import static io.trino.parquet.predicate.PredicateUtils.buildPredicate;
import static io.trino.parquet.predicate.PredicateUtils.predicateMatches;
import static io.trino.plugin.hive.parquet.ParquetColumnIOConverter.constructField;
+import static io.trino.plugin.iceberg.IcebergColumnHandle.ROW_POSITION_COLUMN;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CURSOR_ERROR;
@@ -111,12 +117,14 @@
import static io.trino.plugin.iceberg.IcebergSessionProperties.isOrcBloomFiltersEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isOrcNestedLazy;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isUseFileSizeFromMetadata;
+import static io.trino.plugin.iceberg.IcebergUtil.getColumns;
import static io.trino.plugin.iceberg.TypeConverter.ORC_ICEBERG_ID_KEY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toCollection;
import static java.util.stream.Collectors.toList;
import static org.joda.time.DateTimeZone.UTC;
@@ -127,18 +135,24 @@ public class IcebergPageSourceProvider
private final FileFormatDataSourceStats fileFormatDataSourceStats;
private final OrcReaderOptions orcReaderOptions;
private final ParquetReaderOptions parquetReaderOptions;
+ private final FileIoProvider fileIoProvider;
+ private final TypeManager typeManager;
@Inject
public IcebergPageSourceProvider(
HdfsEnvironment hdfsEnvironment,
FileFormatDataSourceStats fileFormatDataSourceStats,
OrcReaderConfig orcReaderConfig,
- ParquetReaderConfig parquetReaderConfig)
+ ParquetReaderConfig parquetReaderConfig,
+ FileIoProvider fileIoProvider,
+ TypeManager typeManager)
{
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.fileFormatDataSourceStats = requireNonNull(fileFormatDataSourceStats, "fileFormatDataSourceStats is null");
this.orcReaderOptions = requireNonNull(orcReaderConfig, "orcReaderConfig is null").toOrcReaderOptions();
this.parquetReaderOptions = requireNonNull(parquetReaderConfig, "parquetReaderConfig is null").toParquetReaderOptions();
+ this.fileIoProvider = requireNonNull(fileIoProvider, "fileIoProvider is null");
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
}
@Override
@@ -159,12 +173,22 @@ public ConnectorPageSource createPageSource(
Map partitionKeys = split.getPartitionKeys();
- List regularColumns = columns.stream()
+ LinkedHashSet regularColumns = columns.stream()
.map(IcebergColumnHandle.class::cast)
.filter(column -> !partitionKeys.containsKey(column.getId()))
- .collect(toImmutableList());
+ .collect(toCollection(LinkedHashSet::new));
HdfsContext hdfsContext = new HdfsContext(session);
+ FileIO fileIo = fileIoProvider.createFileIo(hdfsContext, null);
+ List deleteReadFields = icebergColumns.stream()
+ .map(column -> table.getSchema().findField(column.getId()))
+ .collect(toImmutableList());
+ Schema deleteReadSchema = new Schema(deleteReadFields);
+ TrinoDeleteFilter deleteFilter = new TrinoDeleteFilter(fileIo, split.getTask(), deleteReadSchema, deleteReadSchema);
+ getColumns(deleteFilter.requiredSchema(), typeManager).stream()
+ .filter(column -> !partitionKeys.containsKey(column.getId()))
+ .forEachOrdered(regularColumns::add);
+
ConnectorPageSource dataPageSource = createDataPageSource(
session,
hdfsContext,
@@ -173,10 +197,10 @@ public ConnectorPageSource createPageSource(
split.getLength(),
split.getFileSize(),
split.getFileFormat(),
- regularColumns,
+ ImmutableList.copyOf(regularColumns),
table.getUnenforcedPredicate());
- return new IcebergPageSource(icebergColumns, partitionKeys, dataPageSource, session.getTimeZoneKey());
+ return new IcebergPageSource(icebergColumns, partitionKeys, dataPageSource, deleteFilter, session.getTimeZoneKey());
}
private ConnectorPageSource createDataPageSource(
@@ -201,8 +225,14 @@ private ConnectorPageSource createDataPageSource(
}
}
+ List rowIndexPositions = dataColumns.stream().map(ROW_POSITION_COLUMN::equals).collect(toImmutableList());
+
switch (fileFormat) {
case ORC:
+ if (rowIndexPositions.stream().anyMatch(v -> v)) {
+ throw new UnsupportedOperationException("positional delete is not supported by ORC");
+ }
+
return createOrcPageSource(
hdfsEnvironment,
session.getUser(),
@@ -233,6 +263,7 @@ private ConnectorPageSource createDataPageSource(
length,
fileSize,
dataColumns,
+ rowIndexPositions,
parquetReaderOptions
.withMaxReadBlockSize(getParquetMaxReadBlockSize(session)),
predicate,
@@ -433,6 +464,7 @@ private static ConnectorPageSource createParquetPageSource(
long length,
long fileSize,
List regularColumns,
+ List rowIndexLocations,
ParquetReaderOptions options,
TupleDomain effectivePredicate,
FileFormatDataSourceStats fileFormatDataSourceStats)
@@ -507,7 +539,7 @@ private static ConnectorPageSource createParquetPageSource(
}
}
- return new ParquetPageSource(parquetReader, trinoTypes.build(), internalFields.build());
+ return new ParquetPageSource(parquetReader, trinoTypes.build(), rowIndexLocations, internalFields.build());
}
catch (IOException | RuntimeException e) {
try {
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java
index ff4f58a00f2b..d58d47f88c3b 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java
@@ -20,6 +20,7 @@
import io.trino.spi.HostAddress;
import io.trino.spi.connector.ConnectorSplit;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
import java.util.Collections;
import java.util.List;
@@ -27,33 +28,24 @@
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;
+import static org.apache.iceberg.util.SerializationUtil.deserializeFromBytes;
public class IcebergSplit
implements ConnectorSplit
{
- private final String path;
- private final long start;
- private final long length;
- private final long fileSize;
- private final FileFormat fileFormat;
+ private final byte[] serializedTask;
private final List addresses;
private final Map partitionKeys;
+ private transient FileScanTask task;
+
@JsonCreator
public IcebergSplit(
- @JsonProperty("path") String path,
- @JsonProperty("start") long start,
- @JsonProperty("length") long length,
- @JsonProperty("fileSize") long fileSize,
- @JsonProperty("fileFormat") FileFormat fileFormat,
+ @JsonProperty("serializedTask") byte[] serializedTask,
@JsonProperty("addresses") List addresses,
@JsonProperty("partitionKeys") Map partitionKeys)
{
- this.path = requireNonNull(path, "path is null");
- this.start = start;
- this.length = length;
- this.fileSize = fileSize;
- this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
+ this.serializedTask = requireNonNull(serializedTask, "serializedTask is null");
this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null"));
this.partitionKeys = Collections.unmodifiableMap(requireNonNull(partitionKeys, "partitionKeys is null"));
}
@@ -64,6 +56,20 @@ public boolean isRemotelyAccessible()
return true;
}
+ @JsonProperty
+ public byte[] getSerializedTask()
+ {
+ return serializedTask;
+ }
+
+ public FileScanTask getTask()
+ {
+ if (task == null) {
+ task = deserializeFromBytes(serializedTask);
+ }
+ return task;
+ }
+
@JsonProperty
@Override
public List getAddresses()
@@ -71,34 +77,29 @@ public List getAddresses()
return addresses;
}
- @JsonProperty
public String getPath()
{
- return path;
+ return getTask().file().path().toString();
}
- @JsonProperty
public long getStart()
{
- return start;
+ return getTask().start();
}
- @JsonProperty
public long getLength()
{
- return length;
+ return getTask().length();
}
- @JsonProperty
public long getFileSize()
{
- return fileSize;
+ return getTask().file().fileSizeInBytes();
}
- @JsonProperty
public FileFormat getFileFormat()
{
- return fileFormat;
+ return getTask().file().format();
}
@JsonProperty
@@ -111,9 +112,9 @@ public Map getPartitionKeys()
public Object getInfo()
{
return ImmutableMap.builder()
- .put("path", path)
- .put("start", start)
- .put("length", length)
+ .put("path", getPath())
+ .put("start", getStart())
+ .put("length", getLength())
.build();
}
@@ -121,9 +122,9 @@ public Object getInfo()
public String toString()
{
return toStringHelper(this)
- .addValue(path)
- .addValue(start)
- .addValue(length)
+ .addValue(getPath())
+ .addValue(getStart())
+ .addValue(getLength())
.toString();
}
}
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java
index eff0b1175c7d..a52ffedb6e6e 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java
@@ -34,6 +34,7 @@
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.iceberg.util.SerializationUtil.serializeToBytes;
public class IcebergSplitSource
implements ConnectorSplitSource
@@ -89,11 +90,7 @@ private ConnectorSplit toIcebergSplit(FileScanTask task)
// on reader side evaluating a condition that we know will always be true.
return new IcebergSplit(
- task.file().path().toString(),
- task.start(),
- task.length(),
- task.file().fileSizeInBytes(),
- task.file().format(),
+ serializeToBytes(task),
ImmutableList.of(),
getPartitionKeys(task));
}
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java
index b0a7d0af830d..c6e7bbe6834e 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java
@@ -18,12 +18,14 @@
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.predicate.TupleDomain;
+import org.apache.iceberg.Schema;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import static java.util.Objects.requireNonNull;
+import static org.apache.iceberg.util.SerializationUtil.deserializeFromBytes;
public class IcebergTableHandle
implements ConnectorTableHandle
@@ -32,6 +34,7 @@ public class IcebergTableHandle
private final String tableName;
private final TableType tableType;
private final Optional snapshotId;
+ private final byte[] serializedSchema;
// Filter used during split generation and table scan, but not required to be strictly enforced by Iceberg Connector
private final TupleDomain unenforcedPredicate;
@@ -39,12 +42,15 @@ public class IcebergTableHandle
// Filter guaranteed to be enforced by Iceberg connector
private final TupleDomain enforcedPredicate;
+ private transient Schema schema;
+
@JsonCreator
public IcebergTableHandle(
@JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("tableType") TableType tableType,
@JsonProperty("snapshotId") Optional snapshotId,
+ @JsonProperty("serializedSchema") byte[] serializedSchema,
@JsonProperty("unenforcedPredicate") TupleDomain unenforcedPredicate,
@JsonProperty("enforcedPredicate") TupleDomain enforcedPredicate)
{
@@ -52,6 +58,7 @@ public IcebergTableHandle(
this.tableName = requireNonNull(tableName, "tableName is null");
this.tableType = requireNonNull(tableType, "tableType is null");
this.snapshotId = requireNonNull(snapshotId, "snapshotId is null");
+ this.serializedSchema = requireNonNull(serializedSchema, "serializedSchema is null");
this.unenforcedPredicate = requireNonNull(unenforcedPredicate, "unenforcedPredicate is null");
this.enforcedPredicate = requireNonNull(enforcedPredicate, "enforcedPredicate is null");
}
@@ -80,6 +87,20 @@ public Optional getSnapshotId()
return snapshotId;
}
+ @JsonProperty
+ public byte[] getSerializedSchema()
+ {
+ return serializedSchema;
+ }
+
+ public Schema getSchema()
+ {
+ if (schema == null) {
+ schema = deserializeFromBytes(serializedSchema);
+ }
+ return schema;
+ }
+
@JsonProperty
public TupleDomain getUnenforcedPredicate()
{
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoDeleteFilter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoDeleteFilter.java
new file mode 100644
index 000000000000..04f579e9fa68
--- /dev/null
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoDeleteFilter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.iceberg;
+
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+
+public class TrinoDeleteFilter
+ extends DeleteFilter
+{
+ private final FileIO fileIO;
+
+ public TrinoDeleteFilter(FileIO fileIO, FileScanTask task, Schema tableSchema, Schema requestedSchema)
+ {
+ super(task, tableSchema, requestedSchema);
+ this.fileIO = fileIO;
+ }
+
+ @Override
+ protected StructLike asStructLike(TrinoRow trinoRow)
+ {
+ return trinoRow;
+ }
+
+ @Override
+ protected InputFile getInputFile(String s)
+ {
+ return fileIO.newInputFile(s);
+ }
+}
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoRow.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoRow.java
new file mode 100644
index 000000000000..cdaccbfef87a
--- /dev/null
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoRow.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.iceberg;
+
+import io.trino.spi.TrinoException;
+import io.trino.spi.block.Block;
+import io.trino.spi.type.BigintType;
+import io.trino.spi.type.BooleanType;
+import io.trino.spi.type.DateType;
+import io.trino.spi.type.DecimalType;
+import io.trino.spi.type.DoubleType;
+import io.trino.spi.type.IntegerType;
+import io.trino.spi.type.RealType;
+import io.trino.spi.type.SmallintType;
+import io.trino.spi.type.TinyintType;
+import io.trino.spi.type.Type;
+import io.trino.spi.type.VarbinaryType;
+import io.trino.spi.type.VarcharType;
+import org.apache.iceberg.StructLike;
+
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static io.trino.plugin.iceberg.util.Timestamps.getTimestampTz;
+import static io.trino.plugin.iceberg.util.Timestamps.timestampTzToMicros;
+import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+import static io.trino.spi.type.Decimals.readBigDecimal;
+import static io.trino.spi.type.TimeType.TIME_MICROS;
+import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
+import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS;
+import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MICROSECOND;
+import static java.lang.Float.intBitsToFloat;
+import static java.lang.Math.toIntExact;
+import static java.util.Objects.requireNonNull;
+
+public class TrinoRow
+ implements StructLike
+{
+ private final List types;
+ private final Block[] blocks;
+ private final int position;
+
+ public TrinoRow(List types, Block[] blocks, int position)
+ {
+ this.types = requireNonNull(types, "types list is null");
+ this.blocks = requireNonNull(blocks, "blocks array is null");
+ checkArgument(position >= 0, "page position must be positive: %s", position);
+ this.position = position;
+ }
+
+ public int getPosition()
+ {
+ return position;
+ }
+
+ @Override
+ public int size()
+ {
+ return blocks.length;
+ }
+
+ @Override
+ public T get(int i, Class aClass)
+ {
+ Block block = blocks[i].getLoadedBlock();
+ Type type = types.get(i);
+ T value;
+ // TODO: can refactor with IcebergPageSink.getIcebergValue
+ if (block.isNull(position)) {
+ value = null;
+ }
+ else if (type instanceof BigintType) {
+ value = aClass.cast(type.getLong(block, position));
+ }
+ else if (type instanceof IntegerType || type instanceof SmallintType || type instanceof TinyintType || type instanceof DateType) {
+ value = aClass.cast(toIntExact(type.getLong(block, position)));
+ }
+ else if (type instanceof BooleanType) {
+ value = aClass.cast(type.getBoolean(block, position));
+ }
+ else if (type instanceof DecimalType) {
+ value = aClass.cast(readBigDecimal((DecimalType) type, block, position));
+ }
+ else if (type instanceof RealType) {
+ value = aClass.cast(intBitsToFloat(toIntExact(type.getLong(block, position))));
+ }
+ else if (type instanceof DoubleType) {
+ value = aClass.cast(type.getDouble(block, position));
+ }
+ else if (type.equals(TIME_MICROS)) {
+ value = aClass.cast(type.getLong(block, position) / PICOSECONDS_PER_MICROSECOND);
+ }
+ else if (type.equals(TIMESTAMP_MICROS)) {
+ value = aClass.cast(type.getLong(block, position));
+ }
+ else if (type.equals(TIMESTAMP_TZ_MICROS)) {
+ value = aClass.cast(timestampTzToMicros(getTimestampTz(block, position)));
+ }
+ else if (type instanceof VarbinaryType) {
+ value = aClass.cast(type.getSlice(block, position).getBytes());
+ }
+ else if (type instanceof VarcharType) {
+ value = aClass.cast(type.getSlice(block, position).toStringUtf8());
+ }
+ else {
+ // will most likely throw unsupported exception
+ value = block.getObject(position, aClass);
+ }
+
+ return value;
+ }
+
+ @Override
+ public void set(int i, T t)
+ {
+ throw new TrinoException(NOT_SUPPORTED, "writing to TrinoRow is not supported");
+ }
+}