Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<dep.iceberg.version>0.11.0</dep.iceberg.version>
<dep.iceberg.version>0.11.1</dep.iceberg.version>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this to make the MetadataColumns available?

</properties>

<dependencies>
Expand Down Expand Up @@ -144,6 +144,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-data</artifactId>
<version>${dep.iceberg.version}</version>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-hive-metastore</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.types.Types;

import java.util.Objects;
Expand All @@ -27,11 +28,15 @@
import static io.trino.plugin.iceberg.ColumnIdentity.createColumnIdentity;
import static io.trino.plugin.iceberg.ColumnIdentity.primitiveColumnIdentity;
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.util.Objects.requireNonNull;

public class IcebergColumnHandle
implements ColumnHandle
{
public static final IcebergColumnHandle ROW_POSITION_COLUMN = new IcebergColumnHandle(
createColumnIdentity(MetadataColumns.ROW_POSITION), BIGINT, Optional.of(MetadataColumns.ROW_POSITION.doc()));

private final ColumnIdentity columnIdentity;
private final Type type;
private final Optional<String> comment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.Transactions.createTableTransaction;
import static org.apache.iceberg.util.SerializationUtil.serializeToBytes;

public class IcebergMetadata
implements ConnectorMetadata
Expand Down Expand Up @@ -255,6 +256,7 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
name.getTableName(),
name.getTableType(),
snapshotId,
serializeToBytes(table.schema()),
TupleDomain.all(),
TupleDomain.all());
}
Expand Down Expand Up @@ -814,6 +816,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
table.getTableName(),
table.getTableType(),
table.getSnapshotId(),
serializeToBytes(icebergTable.schema()),
newUnenforcedConstraint,
newEnforcedConstraint),
newUnenforcedConstraint.transformKeys(ColumnHandle.class::cast),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@
import io.trino.spi.predicate.Utils;
import io.trino.spi.type.TimeZoneKey;
import io.trino.spi.type.Type;
import org.apache.iceberg.io.CloseableIterable;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA;
import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue;
import static java.util.Objects.requireNonNull;
Expand All @@ -38,16 +42,21 @@ public class IcebergPageSource
private final Block[] prefilledBlocks;
private final int[] delegateIndexes;
private final ConnectorPageSource delegate;
private final TrinoDeleteFilter deleteFilter;
private final List<Type> columnTypes;

public IcebergPageSource(
List<IcebergColumnHandle> columns,
Map<Integer, String> partitionKeys,
ConnectorPageSource delegate,
TrinoDeleteFilter deleteFilter,
TimeZoneKey timeZoneKey)
{
int size = requireNonNull(columns, "columns is null").size();
requireNonNull(partitionKeys, "partitionKeys is null");
this.delegate = requireNonNull(delegate, "delegate is null");
this.deleteFilter = requireNonNull(deleteFilter, "deleteFilter is null");
this.columnTypes = columns.stream().map(IcebergColumnHandle::getType).collect(toImmutableList());

this.prefilledBlocks = new Block[size];
this.delegateIndexes = new int[size];
Expand Down Expand Up @@ -106,7 +115,16 @@ public Page getNextPage()
blocks[i] = dataPage.getBlock(delegateIndexes[i]);
}
}
return new Page(batchSize, blocks);

CloseableIterable<TrinoRow> filteredRows = deleteFilter.filter(CloseableIterable.transform(
CloseableIterable.withNoopClose(IntStream.range(0, batchSize).boxed().collect(toImmutableList())),
p -> new TrinoRow(columnTypes, blocks, p)));
int[] positionsToKeep = StreamSupport.stream(filteredRows.spliterator(), false).mapToInt(TrinoRow::getPosition).toArray();
Block[] filteredBlocks = new Block[prefilledBlocks.length];
for (int i = 0; i < filteredBlocks.length; i++) {
filteredBlocks[i] = blocks[i].getPositions(positionsToKeep, 0, positionsToKeep.length);
}
return new Page(positionsToKeep.length, filteredBlocks);
}
catch (RuntimeException e) {
closeWithSuppression(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,17 @@
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.StandardTypes;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockMissingException;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.types.Types;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
Expand All @@ -78,6 +82,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -96,6 +101,7 @@
import static io.trino.parquet.predicate.PredicateUtils.buildPredicate;
import static io.trino.parquet.predicate.PredicateUtils.predicateMatches;
import static io.trino.plugin.hive.parquet.ParquetColumnIOConverter.constructField;
import static io.trino.plugin.iceberg.IcebergColumnHandle.ROW_POSITION_COLUMN;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CURSOR_ERROR;
Expand All @@ -111,12 +117,14 @@
import static io.trino.plugin.iceberg.IcebergSessionProperties.isOrcBloomFiltersEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isOrcNestedLazy;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isUseFileSizeFromMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.getColumns;
import static io.trino.plugin.iceberg.TypeConverter.ORC_ICEBERG_ID_KEY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toCollection;
import static java.util.stream.Collectors.toList;
import static org.joda.time.DateTimeZone.UTC;

Expand All @@ -127,18 +135,24 @@ public class IcebergPageSourceProvider
private final FileFormatDataSourceStats fileFormatDataSourceStats;
private final OrcReaderOptions orcReaderOptions;
private final ParquetReaderOptions parquetReaderOptions;
private final FileIoProvider fileIoProvider;
private final TypeManager typeManager;

@Inject
public IcebergPageSourceProvider(
HdfsEnvironment hdfsEnvironment,
FileFormatDataSourceStats fileFormatDataSourceStats,
OrcReaderConfig orcReaderConfig,
ParquetReaderConfig parquetReaderConfig)
ParquetReaderConfig parquetReaderConfig,
FileIoProvider fileIoProvider,
TypeManager typeManager)
{
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.fileFormatDataSourceStats = requireNonNull(fileFormatDataSourceStats, "fileFormatDataSourceStats is null");
this.orcReaderOptions = requireNonNull(orcReaderConfig, "orcReaderConfig is null").toOrcReaderOptions();
this.parquetReaderOptions = requireNonNull(parquetReaderConfig, "parquetReaderConfig is null").toParquetReaderOptions();
this.fileIoProvider = requireNonNull(fileIoProvider, "fileIoProvider is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
}

@Override
Expand All @@ -159,12 +173,22 @@ public ConnectorPageSource createPageSource(

Map<Integer, String> partitionKeys = split.getPartitionKeys();

List<IcebergColumnHandle> regularColumns = columns.stream()
LinkedHashSet<IcebergColumnHandle> regularColumns = columns.stream()
.map(IcebergColumnHandle.class::cast)
.filter(column -> !partitionKeys.containsKey(column.getId()))
.collect(toImmutableList());
.collect(toCollection(LinkedHashSet::new));

HdfsContext hdfsContext = new HdfsContext(session);
FileIO fileIo = fileIoProvider.createFileIo(hdfsContext, null);
List<Types.NestedField> deleteReadFields = icebergColumns.stream()
.map(column -> table.getSchema().findField(column.getId()))
.collect(toImmutableList());
Schema deleteReadSchema = new Schema(deleteReadFields);
TrinoDeleteFilter deleteFilter = new TrinoDeleteFilter(fileIo, split.getTask(), deleteReadSchema, deleteReadSchema);
getColumns(deleteFilter.requiredSchema(), typeManager).stream()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly the deleteFilter.requiredSchema will always be a superset of the columns we request (the columns arg to this method). So do we need to create the initial regularColumns at all? Can we just assign the result of this stream to regularColumns?

.filter(column -> !partitionKeys.containsKey(column.getId()))
.forEachOrdered(regularColumns::add);

ConnectorPageSource dataPageSource = createDataPageSource(
session,
hdfsContext,
Expand All @@ -173,10 +197,10 @@ public ConnectorPageSource createPageSource(
split.getLength(),
split.getFileSize(),
split.getFileFormat(),
regularColumns,
ImmutableList.copyOf(regularColumns),
table.getUnenforcedPredicate());

return new IcebergPageSource(icebergColumns, partitionKeys, dataPageSource, session.getTimeZoneKey());
return new IcebergPageSource(icebergColumns, partitionKeys, dataPageSource, deleteFilter, session.getTimeZoneKey());
}

private ConnectorPageSource createDataPageSource(
Expand All @@ -201,8 +225,14 @@ private ConnectorPageSource createDataPageSource(
}
}

List<Boolean> rowIndexPositions = dataColumns.stream().map(ROW_POSITION_COLUMN::equals).collect(toImmutableList());

switch (fileFormat) {
case ORC:
if (rowIndexPositions.stream().anyMatch(v -> v)) {
throw new UnsupportedOperationException("positional delete is not supported by ORC");
}

return createOrcPageSource(
hdfsEnvironment,
session.getUser(),
Expand Down Expand Up @@ -233,6 +263,7 @@ private ConnectorPageSource createDataPageSource(
length,
fileSize,
dataColumns,
rowIndexPositions,
parquetReaderOptions
.withMaxReadBlockSize(getParquetMaxReadBlockSize(session)),
predicate,
Expand Down Expand Up @@ -433,6 +464,7 @@ private static ConnectorPageSource createParquetPageSource(
long length,
long fileSize,
List<IcebergColumnHandle> regularColumns,
List<Boolean> rowIndexLocations,
ParquetReaderOptions options,
TupleDomain<IcebergColumnHandle> effectivePredicate,
FileFormatDataSourceStats fileFormatDataSourceStats)
Expand Down Expand Up @@ -507,7 +539,7 @@ private static ConnectorPageSource createParquetPageSource(
}
}

return new ParquetPageSource(parquetReader, trinoTypes.build(), internalFields.build());
return new ParquetPageSource(parquetReader, trinoTypes.build(), rowIndexLocations, internalFields.build());
}
catch (IOException | RuntimeException e) {
try {
Expand Down
Loading