From 6df8a8b49a4d63e75ce44f913f90cb5be215f925 Mon Sep 17 00:00:00 2001 From: Alex Jo Date: Thu, 28 Jul 2022 17:05:43 -0400 Subject: [PATCH 1/2] Avoid reading Iceberg delete files when not needed Parqet only. Skip reading the delete files associated with a data file if the deletes are not relevant. This can happen when the statistics from the data file already show the split can be skipped. Additionally, this can happen when the line numbers read by the split are known and can be used to filter positional deletes. --- .../plugin/iceberg/IcebergPageSource.java | 9 +- .../iceberg/IcebergPageSourceProvider.java | 188 ++++++++++++++---- .../plugin/iceberg/delete/DeleteFile.java | 39 +++- .../plugin/iceberg/delete/DeleteFilter.java | 3 - .../iceberg/delete/EqualityDeleteFilter.java | 6 - .../iceberg/delete/PositionDeleteFilter.java | 8 - 6 files changed, 187 insertions(+), 66 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java index c443fc8328c7..01983ceac710 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java @@ -59,7 +59,7 @@ public class IcebergPageSource private final int[] expectedColumnIndexes; private final ConnectorPageSource delegate; private final Optional projectionsAdapter; - private final Optional deletePredicate; + private final Supplier> deletePredicate; private final Supplier positionDeleteSinkSupplier; private final Supplier updatedRowPageSinkSupplier; // An array with one element per field in the $row_id column. The value in the array points to the @@ -83,7 +83,7 @@ public IcebergPageSource( List requiredColumns, ConnectorPageSource delegate, Optional projectionsAdapter, - Optional deletePredicate, + Supplier> deletePredicate, Supplier positionDeleteSinkSupplier, Supplier updatedRowPageSinkSupplier, List updatedColumns) @@ -159,8 +159,9 @@ public Page getNextPage() return null; } - if (deletePredicate.isPresent()) { - dataPage = deletePredicate.get().filterPage(dataPage); + Optional deleteFilterPredicate = deletePredicate.get(); + if (deleteFilterPredicate.isPresent()) { + dataPage = deleteFilterPredicate.get().filterPage(dataPage); } if (projectionsAdapter.isPresent()) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index f3cfaeacd6c3..8f20d8e30dec 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -13,9 +13,11 @@ */ package io.trino.plugin.iceberg; +import com.google.common.base.Suppliers; import com.google.common.base.VerifyException; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.graph.Traverser; import io.airlift.json.JsonCodec; import io.airlift.slice.Slice; @@ -72,7 +74,9 @@ import io.trino.spi.connector.EmptyPageSource; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.NullableValue; +import io.trino.spi.predicate.Range; import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.predicate.ValueSet; import io.trino.spi.security.ConnectorIdentity; import io.trino.spi.type.ArrayType; import io.trino.spi.type.MapType; @@ -101,6 +105,7 @@ import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.parquet.ParquetSchemaUtil; +import org.apache.iceberg.types.Conversions; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.FileMetaData; @@ -117,6 +122,7 @@ import java.io.UncheckedIOException; import java.net.URI; import java.net.URLEncoder; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -164,7 +170,6 @@ import static io.trino.plugin.iceberg.IcebergSplitManager.ICEBERG_DOMAIN_COMPACTION_THRESHOLD; import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue; import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle; -import static io.trino.plugin.iceberg.IcebergUtil.getColumns; import static io.trino.plugin.iceberg.IcebergUtil.getLocationProvider; import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys; import static io.trino.plugin.iceberg.IcebergUtil.schemaFromHandles; @@ -257,11 +262,7 @@ public ConnectorPageSource createPageSource( Schema tableSchema = SchemaParser.fromJson(table.getTableSchemaJson()); - List deleteFilters = readDeletes(session, tableSchema, split.getPath(), split.getDeletes()); - - Set deleteFilterRequiredColumns = deleteFilters.stream() - .flatMap(filter -> getColumns(filter.schema(), typeManager).stream()) - .collect(toImmutableSet()); + Set deleteFilterRequiredColumns = requiredColumnsForDeletes(tableSchema, split.getDeletes()); PartitionSpec partitionSpec = PartitionSpecParser.fromJson(tableSchema, split.getPartitionSpecJson()); org.apache.iceberg.types.Type[] partitionColumnTypes = partitionSpec.fields().stream() @@ -320,7 +321,7 @@ public ConnectorPageSource createPageSource( } } - ReaderPageSource dataPageSource = createDataPageSource( + ReaderPageSourceWithRowPositions readerPageSourceWithRowPositions = createDataPageSource( session, hdfsContext, split.getPath(), @@ -334,6 +335,7 @@ public ConnectorPageSource createPageSource( effectivePredicate, table.getNameMappingJson().map(NameMappingParser::fromJson), partitionKeys); + ReaderPageSource dataPageSource = readerPageSourceWithRowPositions.getReaderPageSource(); Optional projectionsAdapter = dataPageSource.getReaderColumns().map(readerColumns -> new ReaderProjectionsAdapter( @@ -346,9 +348,18 @@ public ConnectorPageSource createPageSource( .map(readerColumns -> readerColumns.get().stream().map(IcebergColumnHandle.class::cast).collect(toList())) .orElse(requiredColumns); - Optional deletePredicate = deleteFilters.stream() - .map(filter -> filter.createPredicate(readColumns)) - .reduce(RowPredicate::and); + Supplier> deletePredicate = Suppliers.memoize(() -> { + List deleteFilters = readDeletes( + session, + tableSchema, + split.getPath(), + split.getDeletes(), + readerPageSourceWithRowPositions.getStartRowPosition(), + readerPageSourceWithRowPositions.getEndRowPosition()); + return deleteFilters.stream() + .map(filter -> filter.createPredicate(readColumns)) + .reduce(RowPredicate::and); + }); Optional partition = partitionSpec.isUnpartitioned() ? Optional.empty() : Optional.of(partitionData); LocationProvider locationProvider = getLocationProvider(table.getSchemaTableName(), table.getTableLocation(), table.getStorageProperties()); @@ -400,8 +411,33 @@ public ConnectorPageSource createPageSource( getClass().getClassLoader()); } - private List readDeletes(ConnectorSession session, Schema schema, String dataFilePath, List deleteFiles) + private Set requiredColumnsForDeletes(Schema schema, List deletes) + { + ImmutableSet.Builder requiredColumns = ImmutableSet.builder(); + for (DeleteFile deleteFile : deletes) { + if (deleteFile.content() == POSITION_DELETES) { + requiredColumns.add(getColumnHandle(ROW_POSITION, typeManager)); + } + else if (deleteFile.content() == EQUALITY_DELETES) { + deleteFile.equalityFieldIds().stream() + .map(id -> getColumnHandle(schema.findField(id), typeManager)) + .forEach(requiredColumns::add); + } + } + + return requiredColumns.build(); + } + + private List readDeletes( + ConnectorSession session, + Schema schema, + String dataFilePath, + List deleteFiles, + Optional startRowPosition, + Optional endRowPosition) { + verify(startRowPosition.isPresent() == endRowPosition.isPresent(), "startRowPosition and endRowPosition must be specified together"); + Slice targetPath = utf8Slice(dataFilePath); List filters = new ArrayList<>(); LongBitmapDataProvider deletedRows = new Roaring64Bitmap(); @@ -410,9 +446,29 @@ private List readDeletes(ConnectorSession session, Schema schema, IcebergColumnHandle deleteFilePos = getColumnHandle(DELETE_FILE_POS, typeManager); List deleteColumns = ImmutableList.of(deleteFilePath, deleteFilePos); TupleDomain deleteDomain = TupleDomain.fromFixedValues(ImmutableMap.of(deleteFilePath, NullableValue.of(VARCHAR, targetPath))); + if (startRowPosition.isPresent()) { + Range positionRange = Range.range(deleteFilePos.getType(), startRowPosition.get(), true, endRowPosition.get(), true); + TupleDomain positionDomain = TupleDomain.withColumnDomains(ImmutableMap.of(deleteFilePos, Domain.create(ValueSet.ofRanges(positionRange), false))); + deleteDomain = deleteDomain.intersect(positionDomain); + } for (DeleteFile delete : deleteFiles) { if (delete.content() == POSITION_DELETES) { + if (startRowPosition.isPresent()) { + byte[] lowerBoundBytes = delete.getLowerBounds().get(DELETE_FILE_POS.fieldId()); + Optional positionLowerBound = Optional.ofNullable(lowerBoundBytes) + .map(bytes -> Conversions.fromByteBuffer(DELETE_FILE_POS.type(), ByteBuffer.wrap(bytes))); + + byte[] upperBoundBytes = delete.getUpperBounds().get(DELETE_FILE_POS.fieldId()); + Optional positionUpperBound = Optional.ofNullable(upperBoundBytes) + .map(bytes -> Conversions.fromByteBuffer(DELETE_FILE_POS.type(), ByteBuffer.wrap(bytes))); + + if ((positionLowerBound.isPresent() && positionLowerBound.get() > endRowPosition.get()) || + (positionUpperBound.isPresent() && positionUpperBound.get() < startRowPosition.get())) { + continue; + } + } + try (ConnectorPageSource pageSource = openDeletes(session, delete, deleteColumns, deleteDomain)) { readPositionDeletes(pageSource, targetPath, deletedRows); } @@ -466,10 +522,11 @@ private ConnectorPageSource openDeletes( tupleDomain, Optional.empty(), ImmutableMap.of()) + .getReaderPageSource() .get(); } - public ReaderPageSource createDataPageSource( + public ReaderPageSourceWithRowPositions createDataPageSource( ConnectorSession session, HdfsContext hdfsContext, String path, @@ -546,7 +603,7 @@ public ReaderPageSource createDataPageSource( } } - private static ReaderPageSource createOrcPageSource( + private static ReaderPageSourceWithRowPositions createOrcPageSource( HdfsEnvironment hdfsEnvironment, ConnectorIdentity identity, Configuration configuration, @@ -676,17 +733,20 @@ else if (orcColumn != null) { exception -> handleException(orcDataSourceId, exception), new IdBasedFieldMapperFactory(readColumns)); - return new ReaderPageSource( - new OrcPageSource( - recordReader, - columnAdaptations, - orcDataSource, - Optional.empty(), - Optional.empty(), - memoryUsage, - stats, - reader.getCompressionKind()), - columnProjections); + return new ReaderPageSourceWithRowPositions( + new ReaderPageSource( + new OrcPageSource( + recordReader, + columnAdaptations, + orcDataSource, + Optional.empty(), + Optional.empty(), + memoryUsage, + stats, + reader.getCompressionKind()), + columnProjections), + Optional.empty(), + Optional.empty()); } catch (IOException | RuntimeException e) { if (orcDataSource != null) { @@ -894,7 +954,7 @@ public OrcColumn get(String fieldName) } } - private static ReaderPageSource createParquetPageSource( + private static ReaderPageSourceWithRowPositions createParquetPageSource( HdfsEnvironment hdfsEnvironment, ConnectorIdentity identity, Configuration configuration, @@ -947,6 +1007,8 @@ private static ReaderPageSource createParquetPageSource( Predicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, UTC); long nextStart = 0; + Optional startRowPosition = Optional.empty(); + Optional endRowPosition = Optional.empty(); ImmutableList.Builder blockStarts = ImmutableList.builder(); List blocks = new ArrayList<>(); for (BlockMetaData block : parquetMetadata.getBlocks()) { @@ -955,6 +1017,10 @@ private static ReaderPageSource createParquetPageSource( predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain)) { blocks.add(block); blockStarts.add(nextStart); + if (startRowPosition.isEmpty()) { + startRowPosition = Optional.of(nextStart); + } + endRowPosition = Optional.of(nextStart + block.getRowCount()); } nextStart += block.getRowCount(); } @@ -1028,9 +1094,12 @@ else if (column.isRowPositionColumn()) { } } - return new ReaderPageSource( - constantPopulatingPageSourceBuilder.build(new ParquetPageSource(parquetReader, trinoTypes.build(), rowIndexChannels.build(), internalFields.build())), - columnProjections); + return new ReaderPageSourceWithRowPositions( + new ReaderPageSource( + constantPopulatingPageSourceBuilder.build(new ParquetPageSource(parquetReader, trinoTypes.build(), rowIndexChannels.build(), internalFields.build())), + columnProjections), + startRowPosition, + endRowPosition); } catch (IOException | RuntimeException e) { try { @@ -1059,7 +1128,7 @@ else if (column.isRowPositionColumn()) { } } - private ReaderPageSource createAvroPageSource( + private ReaderPageSourceWithRowPositions createAvroPageSource( FileIO fileIo, String path, Path hadoopPath, @@ -1125,19 +1194,22 @@ else if (field == null) { } } - return new ReaderPageSource( - constantPopulatingPageSourceBuilder.build(new IcebergAvroPageSource( - fileIo, - hadoopPath.toString(), - start, - length, - fileSchema, - nameMapping, - columnNames.build(), - columnTypes.build(), - rowIndexChannels.build(), - newSimpleAggregatedMemoryContext())), - columnProjections); + return new ReaderPageSourceWithRowPositions( + new ReaderPageSource( + constantPopulatingPageSourceBuilder.build(new IcebergAvroPageSource( + fileIo, + hadoopPath.toString(), + start, + length, + fileSchema, + nameMapping, + columnNames.build(), + columnTypes.build(), + rowIndexChannels.build(), + newSimpleAggregatedMemoryContext())), + columnProjections), + Optional.empty(), + Optional.empty()); } catch (IOException e) { throw new TrinoException(ICEBERG_CANNOT_OPEN_SPLIT, e); @@ -1319,4 +1391,36 @@ private static String hadoopPath(String path) } return path; } + + private static final class ReaderPageSourceWithRowPositions + { + private final ReaderPageSource readerPageSource; + private final Optional startRowPosition; + private final Optional endRowPosition; + + public ReaderPageSourceWithRowPositions( + ReaderPageSource readerPageSource, + Optional startRowPosition, + Optional endRowPosition) + { + this.readerPageSource = requireNonNull(readerPageSource, "readerPageSource is null"); + this.startRowPosition = requireNonNull(startRowPosition, "startRowPosition is null"); + this.endRowPosition = requireNonNull(endRowPosition, "endRowPosition is null"); + } + + public ReaderPageSource getReaderPageSource() + { + return readerPageSource; + } + + public Optional getStartRowPosition() + { + return startRowPosition; + } + + public Optional getEndRowPosition() + { + return endRowPosition; + } + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFile.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFile.java index eea89a9df9a0..0de94ed30d2f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFile.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFile.java @@ -16,14 +16,20 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.airlift.slice.SizeOf; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileFormat; import org.openjdk.jol.info.ClassLayout; +import java.nio.ByteBuffer; import java.util.List; +import java.util.Map; import java.util.Optional; +import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.collect.ImmutableMap.toImmutableMap; import static io.airlift.slice.SizeOf.SIZE_OF_INT; import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Objects.requireNonNull; @@ -38,16 +44,25 @@ public final class DeleteFile private final long recordCount; private final long fileSizeInBytes; private final List equalityFieldIds; + private final Map lowerBounds; + private final Map upperBounds; public static DeleteFile fromIceberg(org.apache.iceberg.DeleteFile deleteFile) { + Map lowerBounds = firstNonNull(deleteFile.lowerBounds(), ImmutableMap.of()) + .entrySet().stream().collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().array().clone())); + Map upperBounds = firstNonNull(deleteFile.upperBounds(), ImmutableMap.of()) + .entrySet().stream().collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().array().clone())); + return new DeleteFile( deleteFile.content(), deleteFile.path().toString(), deleteFile.format(), deleteFile.recordCount(), deleteFile.fileSizeInBytes(), - Optional.ofNullable(deleteFile.equalityFieldIds()).orElseGet(ImmutableList::of)); + Optional.ofNullable(deleteFile.equalityFieldIds()).orElseGet(ImmutableList::of), + lowerBounds, + upperBounds); } @JsonCreator @@ -57,7 +72,9 @@ public DeleteFile( FileFormat format, long recordCount, long fileSizeInBytes, - List equalityFieldIds) + List equalityFieldIds, + Map lowerBounds, + Map upperBounds) { this.content = requireNonNull(content, "content is null"); this.path = requireNonNull(path, "path is null"); @@ -65,6 +82,8 @@ public DeleteFile( this.recordCount = recordCount; this.fileSizeInBytes = fileSizeInBytes; this.equalityFieldIds = ImmutableList.copyOf(requireNonNull(equalityFieldIds, "equalityFieldIds is null")); + this.lowerBounds = ImmutableMap.copyOf(requireNonNull(lowerBounds, "lowerBounds is null")); + this.upperBounds = ImmutableMap.copyOf(requireNonNull(upperBounds, "upperBounds is null")); } @JsonProperty @@ -103,11 +122,25 @@ public List equalityFieldIds() return equalityFieldIds; } + @JsonProperty + public Map getLowerBounds() + { + return lowerBounds; + } + + @JsonProperty + public Map getUpperBounds() + { + return upperBounds; + } + public long getRetainedSizeInBytes() { return INSTANCE_SIZE + estimatedSizeOf(path) - + estimatedSizeOf(equalityFieldIds, ignored -> SIZE_OF_INT); + + estimatedSizeOf(equalityFieldIds, ignored -> SIZE_OF_INT) + + estimatedSizeOf(lowerBounds, entry -> SIZE_OF_INT, SizeOf::sizeOf) + + estimatedSizeOf(upperBounds, entry -> SIZE_OF_INT, SizeOf::sizeOf); } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFilter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFilter.java index a911c948e62c..b061ec276efc 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFilter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFilter.java @@ -14,13 +14,10 @@ package io.trino.plugin.iceberg.delete; import io.trino.plugin.iceberg.IcebergColumnHandle; -import org.apache.iceberg.Schema; import java.util.List; public interface DeleteFilter { - Schema schema(); - RowPredicate createPredicate(List columns); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/EqualityDeleteFilter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/EqualityDeleteFilter.java index 87579c5ef43f..90dccceb997e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/EqualityDeleteFilter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/EqualityDeleteFilter.java @@ -42,12 +42,6 @@ private EqualityDeleteFilter(Schema schema, StructLikeSet deleteSet) this.deleteSet = requireNonNull(deleteSet, "deleteSet is null"); } - @Override - public Schema schema() - { - return schema; - } - @Override public RowPredicate createPredicate(List columns) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteFilter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteFilter.java index 303c85875f18..952f215fb16b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteFilter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteFilter.java @@ -18,8 +18,6 @@ import io.trino.spi.Page; import io.trino.spi.block.Block; import io.trino.spi.connector.ConnectorPageSource; -import org.apache.iceberg.MetadataColumns; -import org.apache.iceberg.Schema; import org.roaringbitmap.longlong.ImmutableLongBitmapDataProvider; import org.roaringbitmap.longlong.LongBitmapDataProvider; @@ -40,12 +38,6 @@ public PositionDeleteFilter(ImmutableLongBitmapDataProvider deletedRows) this.deletedRows = requireNonNull(deletedRows, "deletedRows is null"); } - @Override - public Schema schema() - { - return new Schema(MetadataColumns.ROW_POSITION); - } - @Override public RowPredicate createPredicate(List columns) { From 77bc71eda70a3cc775e296a9386b1f62b6a81ca2 Mon Sep 17 00:00:00 2001 From: Alex Jo Date: Thu, 4 Aug 2022 12:17:27 -0400 Subject: [PATCH 2/2] empty