From 70ed62d1d10bf2603af656b986246f6061fa8e1a Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Wed, 6 Dec 2023 12:38:00 +0100 Subject: [PATCH] Test checkpoint filtering on table with multiple partition fields --- .../TestCheckpointEntryIterator.java | 138 ++++++++++++++++++ 1 file changed, 138 insertions(+) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java index 2a8d84a65b73..dcf83f52d74a 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java @@ -51,6 +51,7 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Map; import java.util.Objects; @@ -740,6 +741,143 @@ public void testSkipAddEntriesThroughPartitionPruning() assertThat(addEntryIteratorWithRangePartitionFilter.getCompletedPositions().orElseThrow()).isEqualTo(3L); } + @Test + public void testSkipAddEntriesThroughComposedPartitionPruning() + throws IOException + { + MetadataEntry metadataEntry = new MetadataEntry( + "metadataId", + "metadataName", + "metadataDescription", + new MetadataEntry.Format( + "metadataFormatProvider", + ImmutableMap.of()), + "{\"type\":\"struct\",\"fields\":" + + "[{\"name\":\"ts\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"part_day\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"part_hour\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}", + ImmutableList.of( + "part_day", + "part_hour"), + ImmutableMap.of(), + 1000); + ProtocolEntry protocolEntry = new ProtocolEntry(1, 1, Optional.empty(), Optional.empty()); + LocalDateTime date = LocalDateTime.of(2023, 12, 1, 0, 0); + DateTimeFormatter dayFormatter = DateTimeFormatter.ofPattern("yyyyMMdd"); + DateTimeFormatter hourFormatter = DateTimeFormatter.ofPattern("HH"); + int numAddEntries = 100; + Set addFileEntries = IntStream.rangeClosed(1, numAddEntries).mapToObj(index -> + new AddFileEntry( + "addFilePath", + ImmutableMap.of( + "part_day", date.plusHours(index).format(dayFormatter), + "part_hour", date.plusHours(index).format(hourFormatter)), + 1000, + 1001, + true, + Optional.of("{" + + "\"numRecords\":20," + + "\"minValues\":{" + + "\"ts\":\"1990-10-31T01:00:00.000Z\"" + + "}," + + "\"maxValues\":{" + + "\"ts\":\"1990-10-31T02:00:00.000Z\"" + + "}," + + "\"nullCount\":{" + + "\"ts\":1" + + "}}"), + Optional.empty(), + ImmutableMap.of(), + Optional.empty())) + .collect(toImmutableSet()); + + CheckpointEntries entries = new CheckpointEntries( + metadataEntry, + protocolEntry, + ImmutableSet.of(), + addFileEntries, + ImmutableSet.of()); + + CheckpointWriter writer = new CheckpointWriter( + TESTING_TYPE_MANAGER, + checkpointSchemaManager, + "test", + ParquetWriterOptions.builder() // approximately 2 rows per row group + .setMaxBlockSize(DataSize.ofBytes(128L)) + .setMaxPageSize(DataSize.ofBytes(128L)) + .build()); + + File targetFile = File.createTempFile("testSkipAddEntries-", ".checkpoint.parquet"); + targetFile.deleteOnExit(); + + String targetPath = "file://" + targetFile.getAbsolutePath(); + targetFile.delete(); // file must not exist when writer is called + writer.write(entries, createOutputFile(targetPath)); + + CheckpointEntryIterator metadataAndProtocolEntryIterator = createCheckpointEntryIterator( + URI.create(targetPath), + ImmutableSet.of(METADATA, PROTOCOL), + Optional.empty(), + Optional.empty(), + TupleDomain.all(), + Optional.empty()); + + DeltaLakeColumnHandle partitionDayField = new DeltaLakeColumnHandle( + "part_day", + VARCHAR, + OptionalInt.empty(), + "part_day", + VARCHAR, + REGULAR, + Optional.empty()); + DeltaLakeColumnHandle partitionHourField = new DeltaLakeColumnHandle( + "part_hour", + VARCHAR, + OptionalInt.empty(), + "part_hour", + VARCHAR, + REGULAR, + Optional.empty()); + + CheckpointEntryIterator addEntryIterator = createCheckpointEntryIterator( + URI.create(targetPath), + ImmutableSet.of(ADD), + Optional.of(metadataEntry), + Optional.of(protocolEntry), + TupleDomain.all(), + Optional.of(alwaysTrue())); + + CheckpointEntryIterator addEntryIteratorEqualityDayPartitionFilter = createCheckpointEntryIterator( + URI.create(targetPath), + ImmutableSet.of(ADD), + Optional.of(metadataEntry), + Optional.of(protocolEntry), + TupleDomain.withColumnDomains(ImmutableMap.of(partitionDayField, singleValue(VARCHAR, utf8Slice("20231202")))), + Optional.of(alwaysTrue())); + + CheckpointEntryIterator addEntryIteratorWithDayAndHourEqualityPartitionFilter = createCheckpointEntryIterator( + URI.create(targetPath), + ImmutableSet.of(ADD), + Optional.of(metadataEntry), + Optional.of(protocolEntry), + TupleDomain.withColumnDomains(ImmutableMap.of( + partitionDayField, singleValue(VARCHAR, utf8Slice("20231202")), + partitionHourField, singleValue(VARCHAR, utf8Slice("10")))), + Optional.of(alwaysTrue())); + + assertThat(Iterators.size(metadataAndProtocolEntryIterator)).isEqualTo(2); + assertThat(metadataAndProtocolEntryIterator.getCompletedPositions().orElseThrow()).isEqualTo(3L); + + assertThat(Iterators.size(addEntryIterator)).isEqualTo(numAddEntries); + assertThat(addEntryIterator.getCompletedPositions().orElseThrow()).isEqualTo(101L); + + assertThat(Iterators.size(addEntryIteratorEqualityDayPartitionFilter)).isEqualTo(24); + assertThat(addEntryIteratorEqualityDayPartitionFilter.getCompletedPositions().orElseThrow()).isEqualTo(24L); + + assertThat(Iterators.size(addEntryIteratorWithDayAndHourEqualityPartitionFilter)).isEqualTo(1); + assertThat(addEntryIteratorWithDayAndHourEqualityPartitionFilter.getCompletedPositions().orElseThrow()).isEqualTo(1L); + } + @Test public void testSkipRemoveEntries() throws IOException