diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/TupleDomainParquetPredicate.java b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/TupleDomainParquetPredicate.java index c69d0506a02c..a72c57ef6c10 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/TupleDomainParquetPredicate.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/TupleDomainParquetPredicate.java @@ -671,7 +671,7 @@ private static Optional> extractDiscreteValues(int domainComp return Optional.of(valueSet.getDiscreteSet()); } - private FilterPredicate convertToParquetFilter(DateTimeZone timeZone) + public FilterPredicate convertToParquetFilter(DateTimeZone timeZone) { FilterPredicate filter = null; diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java index 8f3a3e78e518..b5ca6c8805f5 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java @@ -155,7 +155,7 @@ public ParquetReader( AggregatedMemoryContext memoryContext, ParquetReaderOptions options, Function exceptionTransform, - Optional parquetPredicate, + Optional filter, List> columnIndexStore, Optional writeValidation) throws IOException @@ -187,12 +187,8 @@ public ParquetReader( this.writeChecksumBuilder = writeValidation.map(validation -> createWriteChecksumBuilder(validation.getTypes())); this.rowGroupStatisticsValidation = writeValidation.map(validation -> createStatisticsValidationBuilder(validation.getTypes())); - requireNonNull(parquetPredicate, "parquetPredicate is null"); + requireNonNull(filter, "filter is null"); this.columnIndexStore = requireNonNull(columnIndexStore, "columnIndexStore is null"); - Optional filter = Optional.empty(); - if (parquetPredicate.isPresent() && options.isUseColumnIndex()) { - filter = parquetPredicate.get().toParquetFilter(timeZone); - } this.blockRowRanges = calculateFilteredRowRanges(blocks, filter, columnIndexStore, primitiveFields); this.blockFactory = new ParquetBlockFactory(exceptionTransform); @@ -536,6 +532,14 @@ public AggregatedMemoryContext getMemoryContext() return memoryContext; } + public static Optional toParquetFilter(Optional parquetPredicate, DateTimeZone timeZone, ParquetReaderOptions options) + { + if (parquetPredicate.isPresent() && options.isUseColumnIndex()) { + return parquetPredicate.get().toParquetFilter(timeZone); + } + return Optional.empty(); + } + private static FilteredRowRanges[] calculateFilteredRowRanges( List blocks, Optional filter, diff --git a/plugin/trino-delta-lake/pom.xml b/plugin/trino-delta-lake/pom.xml index e2254f129c90..5ffb195cf258 100644 --- a/plugin/trino-delta-lake/pom.xml +++ b/plugin/trino-delta-lake/pom.xml @@ -144,6 +144,11 @@ trino-hive + + io.trino + trino-memory-context + + io.trino trino-parquet @@ -257,12 +262,6 @@ runtime - - io.trino - trino-memory-context - runtime - - com.azure azure-core diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java index b7932a2e1c67..d170ca325753 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java @@ -20,7 +20,15 @@ import com.google.common.math.LongMath; import io.airlift.log.Logger; import io.trino.filesystem.TrinoInputFile; +import io.trino.memory.context.AggregatedMemoryContext; +import io.trino.parquet.ParquetCorruptionException; +import io.trino.parquet.ParquetDataSource; +import io.trino.parquet.ParquetDataSourceId; import io.trino.parquet.ParquetReaderOptions; +import io.trino.parquet.ParquetWriteValidation; +import io.trino.parquet.predicate.TupleDomainParquetPredicate; +import io.trino.parquet.reader.MetadataReader; +import io.trino.parquet.reader.ParquetReader; import io.trino.plugin.deltalake.DeltaLakeColumnHandle; import io.trino.plugin.deltalake.DeltaLakeColumnMetadata; import io.trino.plugin.deltalake.transactionlog.AddFileEntry; @@ -37,6 +45,7 @@ import io.trino.plugin.hive.HiveColumnHandle.ColumnType; import io.trino.plugin.hive.HiveColumnProjectionInfo; import io.trino.plugin.hive.HiveType; +import io.trino.plugin.hive.ReaderColumns; import io.trino.plugin.hive.ReaderPageSource; import io.trino.plugin.hive.parquet.ParquetPageSourceFactory; import io.trino.spi.Page; @@ -54,11 +63,20 @@ import io.trino.spi.type.TypeManager; import io.trino.spi.type.TypeSignature; import jakarta.annotation.Nullable; -import org.joda.time.DateTimeZone; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.schema.MessageType; import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; @@ -71,7 +89,10 @@ import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; -import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.parquet.ParquetTypeUtils.getColumnIO; +import static io.trino.parquet.ParquetTypeUtils.getDescriptors; +import static io.trino.parquet.predicate.PredicateUtils.buildPredicate; +import static io.trino.parquet.predicate.PredicateUtils.predicateMatches; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_DATA; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; @@ -85,6 +106,11 @@ import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.PROTOCOL; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.REMOVE; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.TRANSACTION; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; +import static io.trino.plugin.hive.HivePageSourceProvider.projectBaseColumns; +import static io.trino.plugin.hive.parquet.ParquetPageSource.handleException; +import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.createParquetPageSource; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone; import static io.trino.spi.type.TimeZoneKey.UTC_KEY; @@ -98,6 +124,8 @@ import static java.math.RoundingMode.UNNECESSARY; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toUnmodifiableList; +import static org.joda.time.DateTimeZone.UTC; public class CheckpointEntryIterator extends AbstractIterator @@ -182,23 +210,139 @@ public CheckpointEntryIterator( .map(field -> buildColumnHandle(field, checkpointSchemaManager, this.metadataEntry, this.protocolEntry).toHiveColumnHandle()) .collect(toImmutableList()); - TupleDomain tupleDomain = columns.size() > 1 ? - TupleDomain.all() : - buildTupleDomainColumnHandle(getOnlyElement(fields), getOnlyElement(columns)); - - ReaderPageSource pageSource = ParquetPageSourceFactory.createPageSource( - checkpoint, - 0, - fileSize, - columns, - tupleDomain, - true, - DateTimeZone.UTC, - stats, - parquetReaderOptions, - Optional.empty(), - domainCompactionThreshold, - OptionalLong.empty()); + List> tupleDomains; // OR-ed condition + if (columns.isEmpty()) { + tupleDomains = ImmutableList.of(TupleDomain.all()); + } + else { + ImmutableList.Builder> builder = ImmutableList.builder(); + int i = 0; + for (EntryType field : fields) { + builder.add(buildTupleDomainColumnHandle(field, columns.get(i++))); + } + tupleDomains = builder.build(); + } + + ReaderPageSource pageSource; + Optional parquetWriteValidation = Optional.empty(); + MessageType fileSchema; + MessageType requestedSchema; + MessageColumnIO messageColumn; + ParquetDataSource dataSource = null; + try { + AggregatedMemoryContext memoryContext = AggregatedMemoryContext.newSimpleAggregatedMemoryContext(); + dataSource = ParquetPageSourceFactory.createDataSource(checkpoint, OptionalLong.empty(), parquetReaderOptions, memoryContext, stats); + + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, parquetWriteValidation); + FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); + fileSchema = fileMetaData.getSchema(); + + Optional message = ParquetPageSourceFactory.getParquetMessageType(columns, true, fileSchema); + + requestedSchema = message.orElse(new MessageType(fileSchema.getName(), ImmutableList.of())); + messageColumn = getColumnIO(fileSchema, requestedSchema); + + Map, ColumnDescriptor> descriptorsByPath = getDescriptors(fileSchema, requestedSchema); + + List parquetDomains = new ArrayList<>(); + FilterPredicate nullableFilterPredicate = null; + for (TupleDomain domain : tupleDomains) { + TupleDomain parquetTupleDomain = parquetReaderOptions.isIgnoreStatistics() + ? TupleDomain.all() + : ParquetPageSourceFactory.getParquetTupleDomain(descriptorsByPath, domain, fileSchema, true); + TupleDomainParquetPredicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, UTC); + parquetDomains.add(new ParquetDomainAndPredicate(parquetTupleDomain, parquetPredicate)); + + FilterPredicate filter = parquetPredicate.convertToParquetFilter(UTC); + if (filter != null) { + if (nullableFilterPredicate == null) { + nullableFilterPredicate = filter; + } + else { + nullableFilterPredicate = FilterApi.or(nullableFilterPredicate, filter); + } + } + } + + long nextStart = 0; + ImmutableList.Builder blocks = ImmutableList.builder(); + ImmutableList.Builder blockStarts = ImmutableList.builder(); + ImmutableList.Builder> columnIndexes = ImmutableList.builder(); + for (BlockMetaData block : parquetMetadata.getBlocks()) { + long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); + boolean isBlockAdded = false; + for (ParquetDomainAndPredicate domain : parquetDomains) { + if (isBlockAdded) { + break; + } + + TupleDomain parquetTupleDomain = domain.tupleDomain; + TupleDomainParquetPredicate parquetPredicate = domain.predicate; + Optional columnIndex = ParquetPageSourceFactory.getColumnIndexStore(dataSource, block, descriptorsByPath, parquetTupleDomain, parquetReaderOptions); + + if ((long) 0 <= firstDataPage && firstDataPage < fileSize && + predicateMatches( + parquetPredicate, + block, + dataSource, + descriptorsByPath, + parquetTupleDomain, + columnIndex, + Optional.empty(), + UTC, + domainCompactionThreshold)) { + blocks.add(block); + blockStarts.add(nextStart); + columnIndexes.add(columnIndex); + isBlockAdded = true; + } + nextStart += block.getRowCount(); + } + } + + Optional readerProjections = projectBaseColumns(columns, true); + List baseColumns = readerProjections.map(projection -> + projection.get().stream() + .map(HiveColumnHandle.class::cast) + .collect(toUnmodifiableList())) + .orElse(columns); + + Optional filterPredicate = Optional.ofNullable(nullableFilterPredicate); + ParquetDataSourceId dataSourceId = dataSource.getId(); + ParquetDataSource finalDataSource = dataSource; + ParquetPageSourceFactory.ParquetReaderProvider parquetReaderProvider = columnFields -> new ParquetReader( + Optional.ofNullable(fileMetaData.getCreatedBy()), + columnFields, + blocks.build(), + blockStarts.build(), + finalDataSource, + UTC, + memoryContext, + parquetReaderOptions, + exception -> handleException(dataSourceId, exception), + filterPredicate, + columnIndexes.build(), + parquetWriteValidation); + ConnectorPageSource parquetPageSource = createParquetPageSource(baseColumns, fileSchema, messageColumn, true, parquetReaderProvider); + pageSource = new ReaderPageSource(parquetPageSource, readerProjections); + } + catch (Exception e) { + try { + if (dataSource != null) { + dataSource.close(); + } + } + catch (IOException ignored) { + } + if (e instanceof TrinoException) { + throw (TrinoException) e; + } + if (e instanceof ParquetCorruptionException) { + throw new TrinoException(HIVE_BAD_DATA, e); + } + String message = format("Error opening Hive split %s (offset=%s, length=%s): %s", checkpoint.location(), (long) 0, fileSize, e.getMessage()); + throw new TrinoException(HIVE_CANNOT_OPEN_SPLIT, message, e); + } verify(pageSource.getReaderColumns().isEmpty(), "All columns expected to be base columns"); @@ -209,6 +353,8 @@ public CheckpointEntryIterator( .collect(toImmutableList()); } + private record ParquetDomainAndPredicate(TupleDomain tupleDomain, TupleDomainParquetPredicate predicate) {} + private DeltaLakeColumnHandle buildColumnHandle(EntryType entryType, CheckpointSchemaManager schemaManager, MetadataEntry metadataEntry, ProtocolEntry protocolEntry) { Type type = switch (entryType) { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSource.java index 283eab238afa..2b656213b551 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSource.java @@ -189,7 +189,7 @@ private Page getColumnAdaptationsPage(Page page) return new Page(batchSize, blocks); } - static TrinoException handleException(ParquetDataSourceId dataSourceId, Exception exception) + public static TrinoException handleException(ParquetDataSourceId dataSourceId, Exception exception) { if (exception instanceof TrinoException) { return (TrinoException) exception; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java index b21d0c1f3245..46cde719c004 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java @@ -84,6 +84,7 @@ import static io.trino.parquet.ParquetTypeUtils.lookupColumnByName; import static io.trino.parquet.predicate.PredicateUtils.buildPredicate; import static io.trino.parquet.predicate.PredicateUtils.predicateMatches; +import static io.trino.parquet.reader.ParquetReader.toParquetFilter; import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.REGULAR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA; import static io.trino.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; @@ -289,7 +290,7 @@ && predicateMatches( memoryContext, options, exception -> handleException(dataSourceId, exception), - Optional.of(parquetPredicate), + toParquetFilter(Optional.of(parquetPredicate), timeZone, options), columnIndexes.build(), parquetWriteValidation); ConnectorPageSource parquetPageSource = createParquetPageSource(baseColumns, fileSchema, messageColumn, useColumnNames, parquetReaderProvider); diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java index 93b0e3c5af02..bda23a0e1372 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java @@ -75,6 +75,7 @@ import static io.trino.parquet.ParquetTypeUtils.getDescriptors; import static io.trino.parquet.predicate.PredicateUtils.buildPredicate; import static io.trino.parquet.predicate.PredicateUtils.predicateMatches; +import static io.trino.parquet.reader.ParquetReader.toParquetFilter; import static io.trino.plugin.hive.HivePageSourceProvider.projectBaseColumns; import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.ParquetReaderProvider; import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.createDataSource; @@ -248,7 +249,7 @@ && predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parq memoryContext, options, exception -> handleException(dataSourceId, exception), - Optional.of(parquetPredicate), + toParquetFilter(Optional.of(parquetPredicate), timeZone, options), columnIndexes.build(), Optional.empty()); return createParquetPageSource(baseColumns, fileSchema, messageColumn, useColumnNames, parquetReaderProvider);