Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class IcebergPageSource
private final int[] expectedColumnIndexes;
private final ConnectorPageSource delegate;
private final Optional<ReaderProjectionsAdapter> projectionsAdapter;
private final Optional<RowPredicate> deletePredicate;
private final Supplier<Optional<RowPredicate>> deletePredicate;
private final Supplier<IcebergPositionDeletePageSink> positionDeleteSinkSupplier;
private final Supplier<IcebergPageSink> updatedRowPageSinkSupplier;
// An array with one element per field in the $row_id column. The value in the array points to the
Expand All @@ -83,7 +83,7 @@ public IcebergPageSource(
List<IcebergColumnHandle> requiredColumns,
ConnectorPageSource delegate,
Optional<ReaderProjectionsAdapter> projectionsAdapter,
Optional<RowPredicate> deletePredicate,
Supplier<Optional<RowPredicate>> deletePredicate,
Supplier<IcebergPositionDeletePageSink> positionDeleteSinkSupplier,
Supplier<IcebergPageSink> updatedRowPageSinkSupplier,
List<IcebergColumnHandle> updatedColumns)
Expand Down Expand Up @@ -159,8 +159,9 @@ public Page getNextPage()
return null;
}

if (deletePredicate.isPresent()) {
dataPage = deletePredicate.get().filterPage(dataPage);
Optional<RowPredicate> deleteFilterPredicate = deletePredicate.get();
if (deleteFilterPredicate.isPresent()) {
dataPage = deleteFilterPredicate.get().filterPage(dataPage);
}

if (projectionsAdapter.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.base.Suppliers;
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.graph.Traverser;
import io.airlift.json.JsonCodec;
import io.airlift.slice.Slice;
Expand Down Expand Up @@ -72,7 +74,9 @@
import io.trino.spi.connector.EmptyPageSource;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.NullableValue;
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.predicate.ValueSet;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.MapType;
Expand Down Expand Up @@ -101,6 +105,7 @@
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.types.Conversions;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
Expand All @@ -117,6 +122,7 @@
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -164,7 +170,6 @@
import static io.trino.plugin.iceberg.IcebergSplitManager.ICEBERG_DOMAIN_COMPACTION_THRESHOLD;
import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue;
import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle;
import static io.trino.plugin.iceberg.IcebergUtil.getColumns;
import static io.trino.plugin.iceberg.IcebergUtil.getLocationProvider;
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys;
import static io.trino.plugin.iceberg.IcebergUtil.schemaFromHandles;
Expand Down Expand Up @@ -257,11 +262,7 @@ public ConnectorPageSource createPageSource(

Schema tableSchema = SchemaParser.fromJson(table.getTableSchemaJson());

List<DeleteFilter> deleteFilters = readDeletes(session, tableSchema, split.getPath(), split.getDeletes());

Set<IcebergColumnHandle> deleteFilterRequiredColumns = deleteFilters.stream()
.flatMap(filter -> getColumns(filter.schema(), typeManager).stream())
.collect(toImmutableSet());
Set<IcebergColumnHandle> deleteFilterRequiredColumns = requiredColumnsForDeletes(tableSchema, split.getDeletes());

PartitionSpec partitionSpec = PartitionSpecParser.fromJson(tableSchema, split.getPartitionSpecJson());
org.apache.iceberg.types.Type[] partitionColumnTypes = partitionSpec.fields().stream()
Expand Down Expand Up @@ -320,7 +321,7 @@ public ConnectorPageSource createPageSource(
}
}

ReaderPageSource dataPageSource = createDataPageSource(
ReaderPageSourceWithRowPositions readerPageSourceWithRowPositions = createDataPageSource(
session,
hdfsContext,
split.getPath(),
Expand All @@ -334,6 +335,7 @@ public ConnectorPageSource createPageSource(
effectivePredicate,
table.getNameMappingJson().map(NameMappingParser::fromJson),
partitionKeys);
ReaderPageSource dataPageSource = readerPageSourceWithRowPositions.getReaderPageSource();

Optional<ReaderProjectionsAdapter> projectionsAdapter = dataPageSource.getReaderColumns().map(readerColumns ->
new ReaderProjectionsAdapter(
Expand All @@ -346,9 +348,18 @@ public ConnectorPageSource createPageSource(
.map(readerColumns -> readerColumns.get().stream().map(IcebergColumnHandle.class::cast).collect(toList()))
.orElse(requiredColumns);

Optional<RowPredicate> deletePredicate = deleteFilters.stream()
.map(filter -> filter.createPredicate(readColumns))
.reduce(RowPredicate::and);
Supplier<Optional<RowPredicate>> deletePredicate = Suppliers.memoize(() -> {
List<DeleteFilter> deleteFilters = readDeletes(
session,
tableSchema,
split.getPath(),
split.getDeletes(),
readerPageSourceWithRowPositions.getStartRowPosition(),
readerPageSourceWithRowPositions.getEndRowPosition());
return deleteFilters.stream()
.map(filter -> filter.createPredicate(readColumns))
.reduce(RowPredicate::and);
});

Optional<PartitionData> partition = partitionSpec.isUnpartitioned() ? Optional.empty() : Optional.of(partitionData);
LocationProvider locationProvider = getLocationProvider(table.getSchemaTableName(), table.getTableLocation(), table.getStorageProperties());
Expand Down Expand Up @@ -400,8 +411,33 @@ public ConnectorPageSource createPageSource(
getClass().getClassLoader());
}

private List<DeleteFilter> readDeletes(ConnectorSession session, Schema schema, String dataFilePath, List<DeleteFile> deleteFiles)
private Set<IcebergColumnHandle> requiredColumnsForDeletes(Schema schema, List<DeleteFile> deletes)
{
ImmutableSet.Builder<IcebergColumnHandle> requiredColumns = ImmutableSet.builder();
for (DeleteFile deleteFile : deletes) {
if (deleteFile.content() == POSITION_DELETES) {
requiredColumns.add(getColumnHandle(ROW_POSITION, typeManager));
}
else if (deleteFile.content() == EQUALITY_DELETES) {
deleteFile.equalityFieldIds().stream()
.map(id -> getColumnHandle(schema.findField(id), typeManager))
.forEach(requiredColumns::add);
}
}

return requiredColumns.build();
}

private List<DeleteFilter> readDeletes(
ConnectorSession session,
Schema schema,
String dataFilePath,
List<DeleteFile> deleteFiles,
Optional<Long> startRowPosition,
Optional<Long> endRowPosition)
{
verify(startRowPosition.isPresent() == endRowPosition.isPresent(), "startRowPosition and endRowPosition must be specified together");

Slice targetPath = utf8Slice(dataFilePath);
List<DeleteFilter> filters = new ArrayList<>();
LongBitmapDataProvider deletedRows = new Roaring64Bitmap();
Expand All @@ -410,9 +446,29 @@ private List<DeleteFilter> readDeletes(ConnectorSession session, Schema schema,
IcebergColumnHandle deleteFilePos = getColumnHandle(DELETE_FILE_POS, typeManager);
List<IcebergColumnHandle> deleteColumns = ImmutableList.of(deleteFilePath, deleteFilePos);
TupleDomain<IcebergColumnHandle> deleteDomain = TupleDomain.fromFixedValues(ImmutableMap.of(deleteFilePath, NullableValue.of(VARCHAR, targetPath)));
if (startRowPosition.isPresent()) {
Range positionRange = Range.range(deleteFilePos.getType(), startRowPosition.get(), true, endRowPosition.get(), true);
TupleDomain<IcebergColumnHandle> positionDomain = TupleDomain.withColumnDomains(ImmutableMap.of(deleteFilePos, Domain.create(ValueSet.ofRanges(positionRange), false)));
deleteDomain = deleteDomain.intersect(positionDomain);
}

for (DeleteFile delete : deleteFiles) {
if (delete.content() == POSITION_DELETES) {
if (startRowPosition.isPresent()) {
byte[] lowerBoundBytes = delete.getLowerBounds().get(DELETE_FILE_POS.fieldId());
Optional<Long> positionLowerBound = Optional.ofNullable(lowerBoundBytes)
.map(bytes -> Conversions.fromByteBuffer(DELETE_FILE_POS.type(), ByteBuffer.wrap(bytes)));

byte[] upperBoundBytes = delete.getUpperBounds().get(DELETE_FILE_POS.fieldId());
Optional<Long> positionUpperBound = Optional.ofNullable(upperBoundBytes)
.map(bytes -> Conversions.fromByteBuffer(DELETE_FILE_POS.type(), ByteBuffer.wrap(bytes)));

if ((positionLowerBound.isPresent() && positionLowerBound.get() > endRowPosition.get()) ||
(positionUpperBound.isPresent() && positionUpperBound.get() < startRowPosition.get())) {
continue;
}
}

try (ConnectorPageSource pageSource = openDeletes(session, delete, deleteColumns, deleteDomain)) {
readPositionDeletes(pageSource, targetPath, deletedRows);
}
Expand Down Expand Up @@ -466,10 +522,11 @@ private ConnectorPageSource openDeletes(
tupleDomain,
Optional.empty(),
ImmutableMap.of())
.getReaderPageSource()
.get();
}

public ReaderPageSource createDataPageSource(
public ReaderPageSourceWithRowPositions createDataPageSource(
ConnectorSession session,
HdfsContext hdfsContext,
String path,
Expand Down Expand Up @@ -546,7 +603,7 @@ public ReaderPageSource createDataPageSource(
}
}

private static ReaderPageSource createOrcPageSource(
private static ReaderPageSourceWithRowPositions createOrcPageSource(
HdfsEnvironment hdfsEnvironment,
ConnectorIdentity identity,
Configuration configuration,
Expand Down Expand Up @@ -676,17 +733,20 @@ else if (orcColumn != null) {
exception -> handleException(orcDataSourceId, exception),
new IdBasedFieldMapperFactory(readColumns));

return new ReaderPageSource(
new OrcPageSource(
recordReader,
columnAdaptations,
orcDataSource,
Optional.empty(),
Optional.empty(),
memoryUsage,
stats,
reader.getCompressionKind()),
columnProjections);
return new ReaderPageSourceWithRowPositions(
new ReaderPageSource(
new OrcPageSource(
recordReader,
columnAdaptations,
orcDataSource,
Optional.empty(),
Optional.empty(),
memoryUsage,
stats,
reader.getCompressionKind()),
columnProjections),
Optional.empty(),
Optional.empty());
}
catch (IOException | RuntimeException e) {
if (orcDataSource != null) {
Expand Down Expand Up @@ -894,7 +954,7 @@ public OrcColumn get(String fieldName)
}
}

private static ReaderPageSource createParquetPageSource(
private static ReaderPageSourceWithRowPositions createParquetPageSource(
HdfsEnvironment hdfsEnvironment,
ConnectorIdentity identity,
Configuration configuration,
Expand Down Expand Up @@ -947,6 +1007,8 @@ private static ReaderPageSource createParquetPageSource(
Predicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, UTC);

long nextStart = 0;
Optional<Long> startRowPosition = Optional.empty();
Optional<Long> endRowPosition = Optional.empty();
ImmutableList.Builder<Long> blockStarts = ImmutableList.builder();
List<BlockMetaData> blocks = new ArrayList<>();
for (BlockMetaData block : parquetMetadata.getBlocks()) {
Expand All @@ -955,6 +1017,10 @@ private static ReaderPageSource createParquetPageSource(
predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain)) {
blocks.add(block);
blockStarts.add(nextStart);
if (startRowPosition.isEmpty()) {
startRowPosition = Optional.of(nextStart);
Comment thread
alexjo2144 marked this conversation as resolved.
Outdated
}
endRowPosition = Optional.of(nextStart + block.getRowCount());
}
nextStart += block.getRowCount();
}
Expand Down Expand Up @@ -1028,9 +1094,12 @@ else if (column.isRowPositionColumn()) {
}
}

return new ReaderPageSource(
constantPopulatingPageSourceBuilder.build(new ParquetPageSource(parquetReader, trinoTypes.build(), rowIndexChannels.build(), internalFields.build())),
columnProjections);
return new ReaderPageSourceWithRowPositions(
new ReaderPageSource(
constantPopulatingPageSourceBuilder.build(new ParquetPageSource(parquetReader, trinoTypes.build(), rowIndexChannels.build(), internalFields.build())),
columnProjections),
startRowPosition,
endRowPosition);
}
catch (IOException | RuntimeException e) {
try {
Expand Down Expand Up @@ -1059,7 +1128,7 @@ else if (column.isRowPositionColumn()) {
}
}

private ReaderPageSource createAvroPageSource(
private ReaderPageSourceWithRowPositions createAvroPageSource(
FileIO fileIo,
String path,
Path hadoopPath,
Expand Down Expand Up @@ -1125,19 +1194,22 @@ else if (field == null) {
}
}

return new ReaderPageSource(
constantPopulatingPageSourceBuilder.build(new IcebergAvroPageSource(
fileIo,
hadoopPath.toString(),
start,
length,
fileSchema,
nameMapping,
columnNames.build(),
columnTypes.build(),
rowIndexChannels.build(),
newSimpleAggregatedMemoryContext())),
columnProjections);
return new ReaderPageSourceWithRowPositions(
new ReaderPageSource(
constantPopulatingPageSourceBuilder.build(new IcebergAvroPageSource(
fileIo,
hadoopPath.toString(),
start,
length,
fileSchema,
nameMapping,
columnNames.build(),
columnTypes.build(),
rowIndexChannels.build(),
newSimpleAggregatedMemoryContext())),
columnProjections),
Optional.empty(),
Optional.empty());
}
catch (IOException e) {
throw new TrinoException(ICEBERG_CANNOT_OPEN_SPLIT, e);
Expand Down Expand Up @@ -1319,4 +1391,36 @@ private static String hadoopPath(String path)
}
return path;
}

private static final class ReaderPageSourceWithRowPositions
{
private final ReaderPageSource readerPageSource;
private final Optional<Long> startRowPosition;
private final Optional<Long> endRowPosition;

public ReaderPageSourceWithRowPositions(
ReaderPageSource readerPageSource,
Optional<Long> startRowPosition,
Optional<Long> endRowPosition)
{
this.readerPageSource = requireNonNull(readerPageSource, "readerPageSource is null");
this.startRowPosition = requireNonNull(startRowPosition, "startRowPosition is null");
this.endRowPosition = requireNonNull(endRowPosition, "endRowPosition is null");
}

public ReaderPageSource getReaderPageSource()
{
return readerPageSource;
}

public Optional<Long> getStartRowPosition()
{
return startRowPosition;
}

public Optional<Long> getEndRowPosition()
{
return endRowPosition;
}
}
}
Loading