diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java index a10166cf0213..9326ad38bac9 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java @@ -504,7 +504,7 @@ private ReaderPageSource createParquetPageSource(Location path) dataColumns.stream() .map(DeltaLakeColumnHandle::toHiveColumnHandle) .collect(toImmutableList()), - TupleDomain.all(), + ImmutableList.of(TupleDomain.all()), true, parquetDateTimeZone, new FileFormatDataSourceStats(), diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java index 0d848a613ede..4a56b213cd53 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java @@ -230,7 +230,7 @@ public ConnectorPageSource createPageSource( split.getStart(), split.getLength(), hiveColumnHandles.build(), - parquetPredicate, + ImmutableList.of(parquetPredicate), true, parquetDateTimeZone, fileFormatDataSourceStats, diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.java index f485d3e0238e..d5cf1f482538 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.java @@ -196,7 +196,7 @@ private static DeltaLakePageSource createDeltaLakePageSource( 0, split.fileSize(), splitColumns.stream().filter(column -> column.getColumnType() == REGULAR).map(DeltaLakeColumnHandle::toHiveColumnHandle).collect(toImmutableList()), - TupleDomain.all(), // TODO add predicate pushdown https://github.com/trinodb/trino/issues/16990 + ImmutableList.of(TupleDomain.all()), // TODO add predicate pushdown https://github.com/trinodb/trino/issues/16990 true, parquetDateTimeZone, fileFormatDataSourceStats, diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeTransactionLogEntry.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeTransactionLogEntry.java index 3ddc2eff488c..cb86f1c99f4e 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeTransactionLogEntry.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeTransactionLogEntry.java @@ -17,6 +17,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.annotation.Nullable; +import java.util.Objects; + import static java.util.Objects.requireNonNull; public class DeltaLakeTransactionLogEntry @@ -156,6 +158,31 @@ public DeltaLakeTransactionLogEntry withCommitInfo(CommitInfoEntry commitInfo) return new DeltaLakeTransactionLogEntry(txn, add, remove, metaData, protocol, commitInfo, cdcEntry); } + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DeltaLakeTransactionLogEntry that = (DeltaLakeTransactionLogEntry) o; + return Objects.equals(txn, that.txn) && + Objects.equals(add, that.add) && + Objects.equals(remove, that.remove) && + Objects.equals(metaData, that.metaData) && + Objects.equals(protocol, that.protocol) && + Objects.equals(commitInfo, that.commitInfo) && + Objects.equals(cdcEntry, that.cdcEntry); + } + + @Override + public int hashCode() + { + return Objects.hash(txn, add, remove, metaData, protocol, commitInfo, cdcEntry); + } + @Override public String toString() { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java index 9acaa296792f..1892b44643dd 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java @@ -80,7 +80,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; -import static com.google.common.collect.Iterables.getOnlyElement; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema; @@ -187,20 +186,20 @@ public CheckpointEntryIterator( this.columnsWithMinMaxStats = columnsWithStats(schema, this.metadataEntry.getOriginalPartitionColumns()); } - List columns = fields.stream() - .map(field -> buildColumnHandle(field, checkpointSchemaManager, this.metadataEntry, this.protocolEntry).toHiveColumnHandle()) - .collect(toImmutableList()); - - TupleDomain tupleDomain = columns.size() > 1 ? - TupleDomain.all() : - buildTupleDomainColumnHandle(getOnlyElement(fields), getOnlyElement(columns)); + ImmutableList.Builder columnsBuilder = ImmutableList.builderWithExpectedSize(fields.size()); + ImmutableList.Builder> disjunctDomainsBuilder = ImmutableList.builderWithExpectedSize(fields.size()); + for (EntryType field : fields) { + HiveColumnHandle column = buildColumnHandle(field, checkpointSchemaManager, this.metadataEntry, this.protocolEntry).toHiveColumnHandle(); + columnsBuilder.add(column); + disjunctDomainsBuilder.add(buildTupleDomainColumnHandle(field, column)); + } ReaderPageSource pageSource = ParquetPageSourceFactory.createPageSource( checkpoint, 0, fileSize, - columns, - tupleDomain, + columnsBuilder.build(), + disjunctDomainsBuilder.build(), // OR-ed condition true, DateTimeZone.UTC, stats, diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java index 0dbe85242b3a..5cd49f81ac57 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java @@ -61,6 +61,7 @@ import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS; import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; @TestInstance(PER_CLASS) @@ -82,6 +83,16 @@ public void tearDown() checkpointSchemaManager = null; } + @Test + public void testReadNoEntries() + throws Exception + { + URI checkpointUri = getResource(TEST_CHECKPOINT).toURI(); + assertThatThrownBy(() -> createCheckpointEntryIterator(checkpointUri, ImmutableSet.of(), Optional.empty(), Optional.empty())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("fields is empty"); + } + @Test public void testReadMetadataEntry() throws Exception @@ -134,6 +145,49 @@ public void testReadProtocolEntries() Optional.empty())); } + @Test + public void testReadMetadataAndProtocolEntry() + throws Exception + { + URI checkpointUri = getResource(TEST_CHECKPOINT).toURI(); + CheckpointEntryIterator checkpointEntryIterator = createCheckpointEntryIterator(checkpointUri, ImmutableSet.of(METADATA, PROTOCOL), Optional.empty(), Optional.empty()); + List entries = ImmutableList.copyOf(checkpointEntryIterator); + + assertThat(entries).hasSize(2); + assertThat(entries).containsExactlyInAnyOrder( + DeltaLakeTransactionLogEntry.metadataEntry(new MetadataEntry( + "b6aeffad-da73-4dde-b68e-937e468b1fde", + null, + null, + new MetadataEntry.Format("parquet", Map.of()), + "{\"type\":\"struct\",\"fields\":[" + + "{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"married\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}}," + + + "{\"name\":\"phones\",\"type\":{\"type\":\"array\",\"elementType\":{\"type\":\"struct\",\"fields\":[" + + "{\"name\":\"number\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"label\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}," + + "\"containsNull\":true},\"nullable\":true,\"metadata\":{}}," + + + "{\"name\":\"address\",\"type\":{\"type\":\"struct\",\"fields\":[" + + "{\"name\":\"street\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"city\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"state\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"zip\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}," + + + "{\"name\":\"income\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}]}", + List.of("age"), + Map.of(), + 1579190100722L)), + DeltaLakeTransactionLogEntry.protocolEntry( + new ProtocolEntry( + 1, + 2, + Optional.empty(), + Optional.empty()))); + } + @Test public void testReadAddEntries() throws Exception @@ -309,6 +363,8 @@ public void testSkipRemoveEntries() targetFile.delete(); // file must not exist when writer is called writer.write(entries, createOutputFile(targetPath)); + CheckpointEntryIterator metadataAndProtocolEntryIterator = + createCheckpointEntryIterator(URI.create(targetPath), ImmutableSet.of(METADATA, PROTOCOL), Optional.empty(), Optional.empty()); CheckpointEntryIterator addEntryIterator = createCheckpointEntryIterator( URI.create(targetPath), ImmutableSet.of(ADD), @@ -319,10 +375,12 @@ public void testSkipRemoveEntries() CheckpointEntryIterator txnEntryIterator = createCheckpointEntryIterator(URI.create(targetPath), ImmutableSet.of(TRANSACTION), Optional.empty(), Optional.empty()); + assertThat(Iterators.size(metadataAndProtocolEntryIterator)).isEqualTo(2); assertThat(Iterators.size(addEntryIterator)).isEqualTo(1); assertThat(Iterators.size(removeEntryIterator)).isEqualTo(numRemoveEntries); assertThat(Iterators.size(txnEntryIterator)).isEqualTo(0); + assertThat(metadataAndProtocolEntryIterator.getCompletedPositions().orElseThrow()).isEqualTo(3L); assertThat(addEntryIterator.getCompletedPositions().orElseThrow()).isEqualTo(2L); assertThat(removeEntryIterator.getCompletedPositions().orElseThrow()).isEqualTo(100L); assertThat(txnEntryIterator.getCompletedPositions().orElseThrow()).isEqualTo(0L); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java index b21d0c1f3245..5dd96adc51d0 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java @@ -187,7 +187,7 @@ public Optional createPageSource( start, length, columns, - effectivePredicate, + ImmutableList.of(effectivePredicate), isUseParquetColumnNames(session), timeZone, stats, @@ -210,7 +210,7 @@ public static ReaderPageSource createPageSource( long start, long length, List columns, - TupleDomain effectivePredicate, + List> disjunctTupleDomains, boolean useColumnNames, DateTimeZone timeZone, FileFormatDataSourceStats stats, @@ -237,11 +237,23 @@ public static ReaderPageSource createPageSource( messageColumn = getColumnIO(fileSchema, requestedSchema); Map, ColumnDescriptor> descriptorsByPath = getDescriptors(fileSchema, requestedSchema); - TupleDomain parquetTupleDomain = options.isIgnoreStatistics() - ? TupleDomain.all() - : getParquetTupleDomain(descriptorsByPath, effectivePredicate, fileSchema, useColumnNames); - - TupleDomainParquetPredicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, timeZone); + List> parquetTupleDomains; + List parquetPredicates; + if (options.isIgnoreStatistics()) { + parquetTupleDomains = ImmutableList.of(); + parquetPredicates = ImmutableList.of(); + } + else { + ImmutableList.Builder> parquetTupleDomainsBuilder = ImmutableList.builderWithExpectedSize(disjunctTupleDomains.size()); + ImmutableList.Builder parquetPredicatesBuilder = ImmutableList.builderWithExpectedSize(disjunctTupleDomains.size()); + for (TupleDomain tupleDomain : disjunctTupleDomains) { + TupleDomain parquetTupleDomain = getParquetTupleDomain(descriptorsByPath, tupleDomain, fileSchema, useColumnNames); + parquetTupleDomainsBuilder.add(parquetTupleDomain); + parquetPredicatesBuilder.add(buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, timeZone)); + } + parquetTupleDomains = parquetTupleDomainsBuilder.build(); + parquetPredicates = parquetPredicatesBuilder.build(); + } long nextStart = 0; ImmutableList.Builder blocks = ImmutableList.builder(); @@ -249,23 +261,27 @@ public static ReaderPageSource createPageSource( ImmutableList.Builder> columnIndexes = ImmutableList.builder(); for (BlockMetaData block : parquetMetadata.getBlocks()) { long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); - Optional columnIndex = getColumnIndexStore(dataSource, block, descriptorsByPath, parquetTupleDomain, options); - Optional bloomFilterStore = getBloomFilterStore(dataSource, block, parquetTupleDomain, options); - - if (start <= firstDataPage && firstDataPage < start + length - && predicateMatches( - parquetPredicate, - block, - dataSource, - descriptorsByPath, - parquetTupleDomain, - columnIndex, - bloomFilterStore, - timeZone, - domainCompactionThreshold)) { - blocks.add(block); - blockStarts.add(nextStart); - columnIndexes.add(columnIndex); + for (int i = 0; i < disjunctTupleDomains.size(); i++) { + TupleDomain parquetTupleDomain = parquetTupleDomains.get(i); + TupleDomainParquetPredicate parquetPredicate = parquetPredicates.get(i); + Optional columnIndex = getColumnIndexStore(dataSource, block, descriptorsByPath, parquetTupleDomain, options); + Optional bloomFilterStore = getBloomFilterStore(dataSource, block, parquetTupleDomain, options); + if (start <= firstDataPage && firstDataPage < start + length + && predicateMatches( + parquetPredicate, + block, + dataSource, + descriptorsByPath, + parquetTupleDomain, + columnIndex, + bloomFilterStore, + timeZone, + domainCompactionThreshold)) { + blocks.add(block); + blockStarts.add(nextStart); + columnIndexes.add(columnIndex); + break; + } } nextStart += block.getRowCount(); } @@ -289,7 +305,9 @@ && predicateMatches( memoryContext, options, exception -> handleException(dataSourceId, exception), - Optional.of(parquetPredicate), + // We avoid using disjuncts of parquetPredicate for page pruning in ParquetReader as currently column indexes + // are not present in the Parquet files which are read with disjunct predicates. + parquetPredicates.size() == 1 ? Optional.of(parquetPredicates.get(0)) : Optional.empty(), columnIndexes.build(), parquetWriteValidation); ConnectorPageSource parquetPageSource = createParquetPageSource(baseColumns, fileSchema, messageColumn, useColumnNames, parquetReaderProvider);