From 589469535c78e1e87ee2e1012e821c1a61c4ea1c Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Tue, 3 Oct 2023 11:30:37 +0900 Subject: [PATCH 1/3] Inline ParquetPageSourceFactory.createPageSource in Delta Lake --- plugin/trino-delta-lake/pom.xml | 11 +- .../checkpoint/CheckpointEntryIterator.java | 135 ++++++++++++++++-- .../hive/parquet/ParquetPageSource.java | 2 +- 3 files changed, 127 insertions(+), 21 deletions(-) diff --git a/plugin/trino-delta-lake/pom.xml b/plugin/trino-delta-lake/pom.xml index e2254f129c90..5ffb195cf258 100644 --- a/plugin/trino-delta-lake/pom.xml +++ b/plugin/trino-delta-lake/pom.xml @@ -144,6 +144,11 @@ trino-hive + + io.trino + trino-memory-context + + io.trino trino-parquet @@ -257,12 +262,6 @@ runtime - - io.trino - trino-memory-context - runtime - - com.azure azure-core diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java index b7932a2e1c67..5a792f3010de 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java @@ -20,7 +20,15 @@ import com.google.common.math.LongMath; import io.airlift.log.Logger; import io.trino.filesystem.TrinoInputFile; +import io.trino.memory.context.AggregatedMemoryContext; +import io.trino.parquet.ParquetCorruptionException; +import io.trino.parquet.ParquetDataSource; +import io.trino.parquet.ParquetDataSourceId; import io.trino.parquet.ParquetReaderOptions; +import io.trino.parquet.ParquetWriteValidation; +import io.trino.parquet.predicate.TupleDomainParquetPredicate; +import io.trino.parquet.reader.MetadataReader; +import io.trino.parquet.reader.ParquetReader; import io.trino.plugin.deltalake.DeltaLakeColumnHandle; import io.trino.plugin.deltalake.DeltaLakeColumnMetadata; import io.trino.plugin.deltalake.transactionlog.AddFileEntry; @@ -37,6 +45,7 @@ import io.trino.plugin.hive.HiveColumnHandle.ColumnType; import io.trino.plugin.hive.HiveColumnProjectionInfo; import io.trino.plugin.hive.HiveType; +import io.trino.plugin.hive.ReaderColumns; import io.trino.plugin.hive.ReaderPageSource; import io.trino.plugin.hive.parquet.ParquetPageSourceFactory; import io.trino.spi.Page; @@ -54,7 +63,13 @@ import io.trino.spi.type.TypeManager; import io.trino.spi.type.TypeSignature; import jakarta.annotation.Nullable; -import org.joda.time.DateTimeZone; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.schema.MessageType; import java.io.IOException; import java.io.UncheckedIOException; @@ -72,6 +87,10 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.parquet.ParquetTypeUtils.getColumnIO; +import static io.trino.parquet.ParquetTypeUtils.getDescriptors; +import static io.trino.parquet.predicate.PredicateUtils.buildPredicate; +import static io.trino.parquet.predicate.PredicateUtils.predicateMatches; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_DATA; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; @@ -85,6 +104,11 @@ import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.PROTOCOL; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.REMOVE; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.TRANSACTION; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; +import static io.trino.plugin.hive.HivePageSourceProvider.projectBaseColumns; +import static io.trino.plugin.hive.parquet.ParquetPageSource.handleException; +import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.createParquetPageSource; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone; import static io.trino.spi.type.TimeZoneKey.UTC_KEY; @@ -98,6 +122,8 @@ import static java.math.RoundingMode.UNNECESSARY; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toUnmodifiableList; +import static org.joda.time.DateTimeZone.UTC; public class CheckpointEntryIterator extends AbstractIterator @@ -186,19 +212,100 @@ public CheckpointEntryIterator( TupleDomain.all() : buildTupleDomainColumnHandle(getOnlyElement(fields), getOnlyElement(columns)); - ReaderPageSource pageSource = ParquetPageSourceFactory.createPageSource( - checkpoint, - 0, - fileSize, - columns, - tupleDomain, - true, - DateTimeZone.UTC, - stats, - parquetReaderOptions, - Optional.empty(), - domainCompactionThreshold, - OptionalLong.empty()); + ReaderPageSource pageSource; + Optional parquetWriteValidation = Optional.empty(); + MessageType fileSchema; + MessageType requestedSchema; + MessageColumnIO messageColumn; + ParquetDataSource dataSource = null; + try { + AggregatedMemoryContext memoryContext = AggregatedMemoryContext.newSimpleAggregatedMemoryContext(); + dataSource = ParquetPageSourceFactory.createDataSource(checkpoint, OptionalLong.empty(), parquetReaderOptions, memoryContext, stats); + + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, parquetWriteValidation); + FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); + fileSchema = fileMetaData.getSchema(); + + Optional message = ParquetPageSourceFactory.getParquetMessageType(columns, true, fileSchema); + + requestedSchema = message.orElse(new MessageType(fileSchema.getName(), ImmutableList.of())); + messageColumn = getColumnIO(fileSchema, requestedSchema); + + Map, ColumnDescriptor> descriptorsByPath = getDescriptors(fileSchema, requestedSchema); + + TupleDomain parquetTupleDomain = parquetReaderOptions.isIgnoreStatistics() + ? TupleDomain.all() + : ParquetPageSourceFactory.getParquetTupleDomain(descriptorsByPath, tupleDomain, fileSchema, true); + TupleDomainParquetPredicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, UTC); + + long nextStart = 0; + ImmutableList.Builder blocks = ImmutableList.builder(); + ImmutableList.Builder blockStarts = ImmutableList.builder(); + ImmutableList.Builder> columnIndexes = ImmutableList.builder(); + for (BlockMetaData block : parquetMetadata.getBlocks()) { + long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); + Optional columnIndex = ParquetPageSourceFactory.getColumnIndexStore(dataSource, block, descriptorsByPath, parquetTupleDomain, parquetReaderOptions); + + if ((long) 0 <= firstDataPage && firstDataPage < fileSize && + predicateMatches( + parquetPredicate, + block, + dataSource, + descriptorsByPath, + parquetTupleDomain, + columnIndex, + Optional.empty(), + UTC, + domainCompactionThreshold)) { + blocks.add(block); + blockStarts.add(nextStart); + columnIndexes.add(columnIndex); + } + nextStart += block.getRowCount(); + } + + Optional readerProjections = projectBaseColumns(columns, true); + List baseColumns = readerProjections.map(projection -> + projection.get().stream() + .map(HiveColumnHandle.class::cast) + .collect(toUnmodifiableList())) + .orElse(columns); + + ParquetDataSourceId dataSourceId = dataSource.getId(); + ParquetDataSource finalDataSource = dataSource; + ParquetPageSourceFactory.ParquetReaderProvider parquetReaderProvider = columnFields -> new ParquetReader( + Optional.ofNullable(fileMetaData.getCreatedBy()), + columnFields, + blocks.build(), + blockStarts.build(), + finalDataSource, + UTC, + memoryContext, + parquetReaderOptions, + exception -> handleException(dataSourceId, exception), + Optional.of(parquetPredicate), + columnIndexes.build(), + parquetWriteValidation); + ConnectorPageSource parquetPageSource = createParquetPageSource(baseColumns, fileSchema, messageColumn, true, parquetReaderProvider); + pageSource = new ReaderPageSource(parquetPageSource, readerProjections); + } + catch (Exception e) { + try { + if (dataSource != null) { + dataSource.close(); + } + } + catch (IOException ignored) { + } + if (e instanceof TrinoException) { + throw (TrinoException) e; + } + if (e instanceof ParquetCorruptionException) { + throw new TrinoException(HIVE_BAD_DATA, e); + } + String message = format("Error opening Hive split %s (offset=%s, length=%s): %s", checkpoint.location(), (long) 0, fileSize, e.getMessage()); + throw new TrinoException(HIVE_CANNOT_OPEN_SPLIT, message, e); + } verify(pageSource.getReaderColumns().isEmpty(), "All columns expected to be base columns"); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSource.java index 283eab238afa..2b656213b551 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSource.java @@ -189,7 +189,7 @@ private Page getColumnAdaptationsPage(Page page) return new Page(batchSize, blocks); } - static TrinoException handleException(ParquetDataSourceId dataSourceId, Exception exception) + public static TrinoException handleException(ParquetDataSourceId dataSourceId, Exception exception) { if (exception instanceof TrinoException) { return (TrinoException) exception; From ce4c9eb40a0ffc39247b18525f3fbb9c9c192017 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Tue, 3 Oct 2023 11:31:05 +0900 Subject: [PATCH 2/3] Accept FilterPredicate in ParquetReader --- .../predicate/TupleDomainParquetPredicate.java | 2 +- .../io/trino/parquet/reader/ParquetReader.java | 16 ++++++++++------ .../checkpoint/CheckpointEntryIterator.java | 3 ++- .../hive/parquet/ParquetPageSourceFactory.java | 3 ++- .../plugin/hudi/HudiPageSourceProvider.java | 3 ++- 5 files changed, 17 insertions(+), 10 deletions(-) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/TupleDomainParquetPredicate.java b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/TupleDomainParquetPredicate.java index c69d0506a02c..a72c57ef6c10 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/TupleDomainParquetPredicate.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/TupleDomainParquetPredicate.java @@ -671,7 +671,7 @@ private static Optional> extractDiscreteValues(int domainComp return Optional.of(valueSet.getDiscreteSet()); } - private FilterPredicate convertToParquetFilter(DateTimeZone timeZone) + public FilterPredicate convertToParquetFilter(DateTimeZone timeZone) { FilterPredicate filter = null; diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java index 8f3a3e78e518..b5ca6c8805f5 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java @@ -155,7 +155,7 @@ public ParquetReader( AggregatedMemoryContext memoryContext, ParquetReaderOptions options, Function exceptionTransform, - Optional parquetPredicate, + Optional filter, List> columnIndexStore, Optional writeValidation) throws IOException @@ -187,12 +187,8 @@ public ParquetReader( this.writeChecksumBuilder = writeValidation.map(validation -> createWriteChecksumBuilder(validation.getTypes())); this.rowGroupStatisticsValidation = writeValidation.map(validation -> createStatisticsValidationBuilder(validation.getTypes())); - requireNonNull(parquetPredicate, "parquetPredicate is null"); + requireNonNull(filter, "filter is null"); this.columnIndexStore = requireNonNull(columnIndexStore, "columnIndexStore is null"); - Optional filter = Optional.empty(); - if (parquetPredicate.isPresent() && options.isUseColumnIndex()) { - filter = parquetPredicate.get().toParquetFilter(timeZone); - } this.blockRowRanges = calculateFilteredRowRanges(blocks, filter, columnIndexStore, primitiveFields); this.blockFactory = new ParquetBlockFactory(exceptionTransform); @@ -536,6 +532,14 @@ public AggregatedMemoryContext getMemoryContext() return memoryContext; } + public static Optional toParquetFilter(Optional parquetPredicate, DateTimeZone timeZone, ParquetReaderOptions options) + { + if (parquetPredicate.isPresent() && options.isUseColumnIndex()) { + return parquetPredicate.get().toParquetFilter(timeZone); + } + return Optional.empty(); + } + private static FilteredRowRanges[] calculateFilteredRowRanges( List blocks, Optional filter, diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java index 5a792f3010de..c97653b815c8 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java @@ -91,6 +91,7 @@ import static io.trino.parquet.ParquetTypeUtils.getDescriptors; import static io.trino.parquet.predicate.PredicateUtils.buildPredicate; import static io.trino.parquet.predicate.PredicateUtils.predicateMatches; +import static io.trino.parquet.reader.ParquetReader.toParquetFilter; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_DATA; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; @@ -283,7 +284,7 @@ public CheckpointEntryIterator( memoryContext, parquetReaderOptions, exception -> handleException(dataSourceId, exception), - Optional.of(parquetPredicate), + toParquetFilter(Optional.of(parquetPredicate), UTC, parquetReaderOptions), columnIndexes.build(), parquetWriteValidation); ConnectorPageSource parquetPageSource = createParquetPageSource(baseColumns, fileSchema, messageColumn, true, parquetReaderProvider); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java index b21d0c1f3245..46cde719c004 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java @@ -84,6 +84,7 @@ import static io.trino.parquet.ParquetTypeUtils.lookupColumnByName; import static io.trino.parquet.predicate.PredicateUtils.buildPredicate; import static io.trino.parquet.predicate.PredicateUtils.predicateMatches; +import static io.trino.parquet.reader.ParquetReader.toParquetFilter; import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.REGULAR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA; import static io.trino.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; @@ -289,7 +290,7 @@ && predicateMatches( memoryContext, options, exception -> handleException(dataSourceId, exception), - Optional.of(parquetPredicate), + toParquetFilter(Optional.of(parquetPredicate), timeZone, options), columnIndexes.build(), parquetWriteValidation); ConnectorPageSource parquetPageSource = createParquetPageSource(baseColumns, fileSchema, messageColumn, useColumnNames, parquetReaderProvider); diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java index 93b0e3c5af02..bda23a0e1372 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java @@ -75,6 +75,7 @@ import static io.trino.parquet.ParquetTypeUtils.getDescriptors; import static io.trino.parquet.predicate.PredicateUtils.buildPredicate; import static io.trino.parquet.predicate.PredicateUtils.predicateMatches; +import static io.trino.parquet.reader.ParquetReader.toParquetFilter; import static io.trino.plugin.hive.HivePageSourceProvider.projectBaseColumns; import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.ParquetReaderProvider; import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.createDataSource; @@ -248,7 +249,7 @@ && predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parq memoryContext, options, exception -> handleException(dataSourceId, exception), - Optional.of(parquetPredicate), + toParquetFilter(Optional.of(parquetPredicate), timeZone, options), columnIndexes.build(), Optional.empty()); return createParquetPageSource(baseColumns, fileSchema, messageColumn, useColumnNames, parquetReaderProvider); From 644fb08607a6ec0f6e65cfc7c39d992f933fe5ff Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Tue, 3 Oct 2023 12:39:27 +0900 Subject: [PATCH 3/3] Support OR-ed condition in Delta checkpoint iterator --- .../checkpoint/CheckpointEntryIterator.java | 92 +++++++++++++------ 1 file changed, 65 insertions(+), 27 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java index c97653b815c8..d170ca325753 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java @@ -64,6 +64,8 @@ import io.trino.spi.type.TypeSignature; import jakarta.annotation.Nullable; import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; @@ -74,6 +76,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; @@ -86,12 +89,10 @@ import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; -import static com.google.common.collect.Iterables.getOnlyElement; import static io.trino.parquet.ParquetTypeUtils.getColumnIO; import static io.trino.parquet.ParquetTypeUtils.getDescriptors; import static io.trino.parquet.predicate.PredicateUtils.buildPredicate; import static io.trino.parquet.predicate.PredicateUtils.predicateMatches; -import static io.trino.parquet.reader.ParquetReader.toParquetFilter; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_DATA; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; @@ -209,9 +210,18 @@ public CheckpointEntryIterator( .map(field -> buildColumnHandle(field, checkpointSchemaManager, this.metadataEntry, this.protocolEntry).toHiveColumnHandle()) .collect(toImmutableList()); - TupleDomain tupleDomain = columns.size() > 1 ? - TupleDomain.all() : - buildTupleDomainColumnHandle(getOnlyElement(fields), getOnlyElement(columns)); + List> tupleDomains; // OR-ed condition + if (columns.isEmpty()) { + tupleDomains = ImmutableList.of(TupleDomain.all()); + } + else { + ImmutableList.Builder> builder = ImmutableList.builder(); + int i = 0; + for (EntryType field : fields) { + builder.add(buildTupleDomainColumnHandle(field, columns.get(i++))); + } + tupleDomains = builder.build(); + } ReaderPageSource pageSource; Optional parquetWriteValidation = Optional.empty(); @@ -234,10 +244,25 @@ public CheckpointEntryIterator( Map, ColumnDescriptor> descriptorsByPath = getDescriptors(fileSchema, requestedSchema); - TupleDomain parquetTupleDomain = parquetReaderOptions.isIgnoreStatistics() - ? TupleDomain.all() - : ParquetPageSourceFactory.getParquetTupleDomain(descriptorsByPath, tupleDomain, fileSchema, true); - TupleDomainParquetPredicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, UTC); + List parquetDomains = new ArrayList<>(); + FilterPredicate nullableFilterPredicate = null; + for (TupleDomain domain : tupleDomains) { + TupleDomain parquetTupleDomain = parquetReaderOptions.isIgnoreStatistics() + ? TupleDomain.all() + : ParquetPageSourceFactory.getParquetTupleDomain(descriptorsByPath, domain, fileSchema, true); + TupleDomainParquetPredicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, UTC); + parquetDomains.add(new ParquetDomainAndPredicate(parquetTupleDomain, parquetPredicate)); + + FilterPredicate filter = parquetPredicate.convertToParquetFilter(UTC); + if (filter != null) { + if (nullableFilterPredicate == null) { + nullableFilterPredicate = filter; + } + else { + nullableFilterPredicate = FilterApi.or(nullableFilterPredicate, filter); + } + } + } long nextStart = 0; ImmutableList.Builder blocks = ImmutableList.builder(); @@ -245,24 +270,34 @@ public CheckpointEntryIterator( ImmutableList.Builder> columnIndexes = ImmutableList.builder(); for (BlockMetaData block : parquetMetadata.getBlocks()) { long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); - Optional columnIndex = ParquetPageSourceFactory.getColumnIndexStore(dataSource, block, descriptorsByPath, parquetTupleDomain, parquetReaderOptions); - - if ((long) 0 <= firstDataPage && firstDataPage < fileSize && - predicateMatches( - parquetPredicate, - block, - dataSource, - descriptorsByPath, - parquetTupleDomain, - columnIndex, - Optional.empty(), - UTC, - domainCompactionThreshold)) { - blocks.add(block); - blockStarts.add(nextStart); - columnIndexes.add(columnIndex); + boolean isBlockAdded = false; + for (ParquetDomainAndPredicate domain : parquetDomains) { + if (isBlockAdded) { + break; + } + + TupleDomain parquetTupleDomain = domain.tupleDomain; + TupleDomainParquetPredicate parquetPredicate = domain.predicate; + Optional columnIndex = ParquetPageSourceFactory.getColumnIndexStore(dataSource, block, descriptorsByPath, parquetTupleDomain, parquetReaderOptions); + + if ((long) 0 <= firstDataPage && firstDataPage < fileSize && + predicateMatches( + parquetPredicate, + block, + dataSource, + descriptorsByPath, + parquetTupleDomain, + columnIndex, + Optional.empty(), + UTC, + domainCompactionThreshold)) { + blocks.add(block); + blockStarts.add(nextStart); + columnIndexes.add(columnIndex); + isBlockAdded = true; + } + nextStart += block.getRowCount(); } - nextStart += block.getRowCount(); } Optional readerProjections = projectBaseColumns(columns, true); @@ -272,6 +307,7 @@ public CheckpointEntryIterator( .collect(toUnmodifiableList())) .orElse(columns); + Optional filterPredicate = Optional.ofNullable(nullableFilterPredicate); ParquetDataSourceId dataSourceId = dataSource.getId(); ParquetDataSource finalDataSource = dataSource; ParquetPageSourceFactory.ParquetReaderProvider parquetReaderProvider = columnFields -> new ParquetReader( @@ -284,7 +320,7 @@ public CheckpointEntryIterator( memoryContext, parquetReaderOptions, exception -> handleException(dataSourceId, exception), - toParquetFilter(Optional.of(parquetPredicate), UTC, parquetReaderOptions), + filterPredicate, columnIndexes.build(), parquetWriteValidation); ConnectorPageSource parquetPageSource = createParquetPageSource(baseColumns, fileSchema, messageColumn, true, parquetReaderProvider); @@ -317,6 +353,8 @@ public CheckpointEntryIterator( .collect(toImmutableList()); } + private record ParquetDomainAndPredicate(TupleDomain tupleDomain, TupleDomainParquetPredicate predicate) {} + private DeltaLakeColumnHandle buildColumnHandle(EntryType entryType, CheckpointSchemaManager schemaManager, MetadataEntry metadataEntry, ProtocolEntry protocolEntry) { Type type = switch (entryType) {