From 00685109df3fcfa19d85ea70c2583a838b47e2ee Mon Sep 17 00:00:00 2001 From: Yingjie Luan <1275963@gmail.com> Date: Sun, 23 Apr 2023 09:54:12 -0700 Subject: [PATCH 1/3] Move getBloomFilterStore() to Parquet library --- .../io/trino/parquet/BloomFilterStore.java | 28 +++++++++++++++++++ .../parquet/ParquetPageSourceFactory.java | 25 +---------------- 2 files changed, 29 insertions(+), 24 deletions(-) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/BloomFilterStore.java b/lib/trino-parquet/src/main/java/io/trino/parquet/BloomFilterStore.java index 534e8eae417b..85c3dc616eb4 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/BloomFilterStore.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/BloomFilterStore.java @@ -16,6 +16,9 @@ import com.google.common.collect.ImmutableMap; import io.airlift.slice.BasicSliceInput; import io.airlift.slice.Slice; +import io.trino.spi.predicate.Domain; +import io.trino.spi.predicate.TupleDomain; +import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter; import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.format.BloomFilterHeader; @@ -32,6 +35,7 @@ import java.util.Set; import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES; @@ -92,6 +96,30 @@ public Optional getBloomFilter(ColumnPath columnPath) } } + public static Optional getBloomFilterStore( + ParquetDataSource dataSource, + BlockMetaData blockMetadata, + TupleDomain parquetTupleDomain, + ParquetReaderOptions options) + { + if (!options.useBloomFilter() || parquetTupleDomain.isAll() || parquetTupleDomain.isNone()) { + return Optional.empty(); + } + + boolean hasBloomFilter = blockMetadata.getColumns().stream().anyMatch(BloomFilterStore::hasBloomFilter); + if (!hasBloomFilter) { + return Optional.empty(); + } + + Map parquetDomains = parquetTupleDomain.getDomains() + .orElseThrow(() -> new IllegalStateException("Predicate other than none should have domains")); + Set columnsFilteredPaths = parquetDomains.keySet().stream() + .map(column -> ColumnPath.get(column.getPath())) + .collect(toImmutableSet()); + + return Optional.of(new BloomFilterStore(dataSource, blockMetadata, columnsFilteredPaths)); + } + public static boolean hasBloomFilter(ColumnChunkMetaData columnMetaData) { return columnMetaData.getBloomFilterOffset() > 0; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java index ac7384a9bd90..2b7139b49a97 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java @@ -73,6 +73,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; +import static io.trino.parquet.BloomFilterStore.getBloomFilterStore; import static io.trino.parquet.ParquetTypeUtils.constructField; import static io.trino.parquet.ParquetTypeUtils.getColumnIO; import static io.trino.parquet.ParquetTypeUtils.getDescriptors; @@ -394,30 +395,6 @@ public static Optional getColumnIndexStore( return Optional.of(new TrinoColumnIndexStore(dataSource, blockMetadata, columnsReadPaths, columnsFilteredPaths)); } - public static Optional getBloomFilterStore( - ParquetDataSource dataSource, - BlockMetaData blockMetadata, - TupleDomain parquetTupleDomain, - ParquetReaderOptions options) - { - if (!options.useBloomFilter() || parquetTupleDomain.isAll() || parquetTupleDomain.isNone()) { - return Optional.empty(); - } - - boolean hasBloomFilter = blockMetadata.getColumns().stream().anyMatch(BloomFilterStore::hasBloomFilter); - if (!hasBloomFilter) { - return Optional.empty(); - } - - Map parquetDomains = parquetTupleDomain.getDomains() - .orElseThrow(() -> new IllegalStateException("Predicate other than none should have domains")); - Set columnsFilteredPaths = parquetDomains.keySet().stream() - .map(column -> ColumnPath.get(column.getPath())) - .collect(toImmutableSet()); - - return Optional.of(new BloomFilterStore(dataSource, blockMetadata, columnsFilteredPaths)); - } - public static TupleDomain getParquetTupleDomain( Map, ColumnDescriptor> descriptorsByPath, TupleDomain effectivePredicate, From c4f0e7efe9c6858f1c92d0f8d88762d226fdaf5b Mon Sep 17 00:00:00 2001 From: Yingjie Luan <1275963@gmail.com> Date: Sun, 23 Apr 2023 09:56:17 -0700 Subject: [PATCH 2/3] Move TestHiveParquetWithBloomFilters to io.trino.testing --- .../TestHiveParquetWithBloomFilters.java | 129 +++++------------- .../BaseTestParquetWithBloomFilters.java | 102 ++++++++++++++ 2 files changed, 138 insertions(+), 93 deletions(-) create mode 100644 testing/trino-testing/src/main/java/io/trino/testing/BaseTestParquetWithBloomFilters.java diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestHiveParquetWithBloomFilters.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestHiveParquetWithBloomFilters.java index 27043c39c00e..b06884f79682 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestHiveParquetWithBloomFilters.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestHiveParquetWithBloomFilters.java @@ -14,24 +14,22 @@ package io.trino.plugin.hive.parquet; import com.google.common.collect.ImmutableList; -import io.trino.Session; import io.trino.plugin.hive.HiveQueryRunner; -import io.trino.testing.AbstractTestQueryFramework; +import io.trino.spi.connector.CatalogSchemaTableName; +import io.trino.spi.connector.SchemaTableName; +import io.trino.testing.BaseTestParquetWithBloomFilters; +import io.trino.testing.DistributedQueryRunner; import io.trino.testing.QueryRunner; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.mapred.JobConf; import org.joda.time.DateTimeZone; -import org.testng.annotations.Test; import java.io.File; -import java.nio.file.Files; -import java.util.Arrays; +import java.nio.file.Path; import java.util.Iterator; import java.util.List; import java.util.Optional; -import static com.google.common.io.MoreFiles.deleteRecursively; -import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static io.trino.hadoop.ConfigurationInstantiator.newEmptyConfiguration; import static io.trino.testing.TestingNames.randomNameSuffix; import static java.lang.String.format; @@ -42,96 +40,36 @@ import static org.apache.parquet.format.CompressionCodec.SNAPPY; import static org.apache.parquet.hadoop.ParquetOutputFormat.BLOOM_FILTER_ENABLED; import static org.apache.parquet.hadoop.ParquetOutputFormat.WRITER_VERSION; -import static org.assertj.core.api.Assertions.assertThat; public class TestHiveParquetWithBloomFilters - extends AbstractTestQueryFramework + extends BaseTestParquetWithBloomFilters { - private static final String COLUMN_NAME = "dataColumn"; - // containing extreme values, so the row group cannot be eliminated by the column chunk's min/max statistics - private static final List TEST_VALUES = Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE, 1, 3, 7, 10, 15); - private static final int MISSING_VALUE = 0; - @Override protected QueryRunner createQueryRunner() throws Exception { - return HiveQueryRunner.builder().build(); + DistributedQueryRunner queryRunner = HiveQueryRunner.builder().build(); + dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("hive_data"); + return queryRunner; } - @Test - public void verifyBloomFilterEnabled() - { - assertThat(query(format("SHOW SESSION LIKE '%s.parquet_use_bloom_filter'", getSession().getCatalog().orElseThrow()))) - .skippingTypesCheck() - .matches(result -> result.getRowCount() == 1) - .matches(result -> { - String value = (String) result.getMaterializedRows().get(0).getField(1); - return value.equals("true"); - }); - } - - @Test - public void testBloomFilterRowGroupPruning() - throws Exception + @Override + protected CatalogSchemaTableName createParquetTableWithBloomFilter(String columnName, List testValues) { - File tmpDir = Files.createTempDirectory("testBloomFilterRowGroupPruning").toFile(); - try { - File parquetFile = new File(tmpDir, randomNameSuffix()); - - String tableName = "parquet_with_bloom_filters_" + randomNameSuffix(); - createParquetBloomFilterSource(parquetFile, COLUMN_NAME, TEST_VALUES); - assertUpdate( - format( - "CREATE TABLE %s (%s INT) WITH (format = 'PARQUET', external_location = '%s')", - tableName, - COLUMN_NAME, - tmpDir.getAbsolutePath())); - - // When reading bloom filter is enabled, row groups are pruned when searching for a missing value - assertQueryStats( - getSession(), - "SELECT * FROM " + tableName + " WHERE " + COLUMN_NAME + " = " + MISSING_VALUE, - queryStats -> { - assertThat(queryStats.getPhysicalInputPositions()).isEqualTo(0); - assertThat(queryStats.getProcessedInputPositions()).isEqualTo(0); - }, - results -> assertThat(results.getRowCount()).isEqualTo(0)); + // create the managed table + String tableName = "parquet_with_bloom_filters_" + randomNameSuffix(); + CatalogSchemaTableName catalogSchemaTableName = new CatalogSchemaTableName("hive", new SchemaTableName("tpch", tableName)); + assertUpdate(format("CREATE TABLE %s (%s INT) WITH (format = 'PARQUET')", catalogSchemaTableName, columnName)); - // When reading bloom filter is enabled, row groups are not pruned when searching for a value present in the file - assertQueryStats( - getSession(), - "SELECT * FROM " + tableName + " WHERE " + COLUMN_NAME + " = " + TEST_VALUES.get(0), - queryStats -> { - assertThat(queryStats.getPhysicalInputPositions()).isGreaterThan(0); - assertThat(queryStats.getProcessedInputPositions()).isEqualTo(queryStats.getPhysicalInputPositions()); - }, - results -> assertThat(results.getRowCount()).isEqualTo(1)); + // directly write data to the managed table + Path tableLocation = Path.of("%s/tpch/%s".formatted(dataDirectory, tableName)); + Path fileLocation = tableLocation.resolve("bloomFilterFile.parquet"); + writeParquetFileWithBloomFilter(fileLocation.toFile(), columnName, testValues); - // When reading bloom filter is disabled, row groups are not pruned when searching for a missing value - assertQueryStats( - bloomFiltersDisabled(getSession()), - "SELECT * FROM " + tableName + " WHERE " + COLUMN_NAME + " = " + MISSING_VALUE, - queryStats -> { - assertThat(queryStats.getPhysicalInputPositions()).isGreaterThan(0); - assertThat(queryStats.getProcessedInputPositions()).isEqualTo(queryStats.getPhysicalInputPositions()); - }, - results -> assertThat(results.getRowCount()).isEqualTo(0)); - } - finally { - deleteRecursively(tmpDir.toPath(), ALLOW_INSECURE); - } + return catalogSchemaTableName; } - private static Session bloomFiltersDisabled(Session session) - { - return Session.builder(session) - .setCatalogSessionProperty(session.getCatalog().orElseThrow(), "parquet_use_bloom_filter", "false") - .build(); - } - - private static void createParquetBloomFilterSource(File tempFile, String columnName, List testValues) - throws Exception + public static void writeParquetFileWithBloomFilter(File tempFile, String columnName, List testValues) { List objectInspectors = singletonList(javaIntObjectInspector); List columnNames = ImmutableList.of(columnName); @@ -140,15 +78,20 @@ private static void createParquetBloomFilterSource(File tempFile, String columnN jobConf.setEnum(WRITER_VERSION, PARQUET_1_0); jobConf.setBoolean(BLOOM_FILTER_ENABLED, true); - ParquetTester.writeParquetColumn( - jobConf, - tempFile, - SNAPPY, - ParquetTester.createTableProperties(columnNames, objectInspectors), - getStandardStructObjectInspector(columnNames, objectInspectors), - new Iterator[] {testValues.iterator()}, - Optional.empty(), - false, - DateTimeZone.getDefault()); + try { + ParquetTester.writeParquetColumn( + jobConf, + tempFile, + SNAPPY, + ParquetTester.createTableProperties(columnNames, objectInspectors), + getStandardStructObjectInspector(columnNames, objectInspectors), + new Iterator[] {testValues.iterator()}, + Optional.empty(), + false, + DateTimeZone.getDefault()); + } + catch (Exception e) { + throw new RuntimeException(e); + } } } diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseTestParquetWithBloomFilters.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseTestParquetWithBloomFilters.java new file mode 100644 index 000000000000..76eb9f57f0b3 --- /dev/null +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseTestParquetWithBloomFilters.java @@ -0,0 +1,102 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.testing; + +import com.google.common.collect.ImmutableSet; +import io.trino.Session; +import io.trino.spi.connector.CatalogSchemaTableName; +import org.testng.annotations.Test; + +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; + +import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThat; + +public abstract class BaseTestParquetWithBloomFilters + extends AbstractTestQueryFramework +{ + protected Path dataDirectory; + private static final String COLUMN_NAME = "dataColumn"; + // containing extreme values, so the row group cannot be eliminated by the column chunk's min/max statistics + private static final List TEST_VALUES = Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE, 1, 3, 7, 10, 15); + private static final int MISSING_VALUE = 0; + + @Test + public void verifyBloomFilterEnabled() + { + assertThat(query(format("SHOW SESSION LIKE '%s.parquet_use_bloom_filter'", getSession().getCatalog().orElseThrow()))) + .skippingTypesCheck() + .matches(result -> result.getRowCount() == 1) + .matches(result -> { + String value = (String) result.getMaterializedRows().get(0).getField(1); + return value.equals("true"); + }); + } + + @Test + public void testBloomFilterRowGroupPruning() + { + CatalogSchemaTableName tableName = createParquetTableWithBloomFilter(COLUMN_NAME, TEST_VALUES); + + // assert table is populated with data + assertQueryStats( + getSession(), + "SELECT * FROM " + tableName, + queryStats -> {}, + results -> assertThat(results.getOnlyColumnAsSet()).isEqualTo(ImmutableSet.copyOf(TEST_VALUES))); + + // When reading bloom filter is enabled, row groups are pruned when searching for a missing value + assertQueryStats( + getSession(), + "SELECT * FROM " + tableName + " WHERE " + COLUMN_NAME + " = " + MISSING_VALUE, + queryStats -> { + assertThat(queryStats.getPhysicalInputPositions()).isEqualTo(0); + assertThat(queryStats.getProcessedInputPositions()).isEqualTo(0); + }, + results -> assertThat(results.getRowCount()).isEqualTo(0)); + + // When reading bloom filter is enabled, row groups are not pruned when searching for a value present in the file + assertQueryStats( + getSession(), + "SELECT * FROM " + tableName + " WHERE " + COLUMN_NAME + " = " + TEST_VALUES.get(0), + queryStats -> { + assertThat(queryStats.getPhysicalInputPositions()).isGreaterThan(0); + assertThat(queryStats.getProcessedInputPositions()).isEqualTo(queryStats.getPhysicalInputPositions()); + }, + results -> assertThat(results.getRowCount()).isEqualTo(1)); + + // When reading bloom filter is disabled, row groups are not pruned when searching for a missing value + assertQueryStats( + bloomFiltersDisabled(getSession()), + "SELECT * FROM " + tableName + " WHERE " + COLUMN_NAME + " = " + MISSING_VALUE, + queryStats -> { + assertThat(queryStats.getPhysicalInputPositions()).isGreaterThan(0); + assertThat(queryStats.getProcessedInputPositions()).isEqualTo(queryStats.getPhysicalInputPositions()); + }, + results -> assertThat(results.getRowCount()).isEqualTo(0)); + + assertUpdate("DROP TABLE " + tableName); + } + + private static Session bloomFiltersDisabled(Session session) + { + return Session.builder(session) + .setCatalogSessionProperty(session.getCatalog().orElseThrow(), "parquet_use_bloom_filter", "false") + .build(); + } + + protected abstract CatalogSchemaTableName createParquetTableWithBloomFilter(String columnName, List testValues); +} From b1d730e41a7f222b0781a269a141d8b5dcfa1c64 Mon Sep 17 00:00:00 2001 From: Yingjie Luan <1275963@gmail.com> Date: Sun, 23 Apr 2023 09:59:46 -0700 Subject: [PATCH 3/3] Enable reading Parquet's bloomfilter statistics for Iceberg connector --- docs/src/main/sphinx/connector/iceberg.rst | 6 ++ .../iceberg/IcebergPageSourceProvider.java | 8 ++- .../iceberg/IcebergSessionProperties.java | 11 +++ .../TestIcebergParquetWithBloomFilters.java | 71 +++++++++++++++++++ 4 files changed, 95 insertions(+), 1 deletion(-) create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetWithBloomFilters.java diff --git a/docs/src/main/sphinx/connector/iceberg.rst b/docs/src/main/sphinx/connector/iceberg.rst index 6afaa0f5d01c..d75d6304faca 100644 --- a/docs/src/main/sphinx/connector/iceberg.rst +++ b/docs/src/main/sphinx/connector/iceberg.rst @@ -1521,3 +1521,9 @@ with Parquet files performed by the Iceberg connector. for structural data types. The equivalent catalog session property is ``parquet_optimized_nested_reader_enabled``. - ``true`` + * - ``parquet.use-bloom-filter`` + - Whether bloom filters are used for predicate pushdown when reading + Parquet files. Set this property to ``false`` to disable the usage of + bloom filters by default. The equivalent catalog session property is + ``parquet_use_bloom_filter``. + - ``true`` diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index 6d46b71f22c8..eb1430bb129f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -34,6 +34,7 @@ import io.trino.orc.TupleDomainOrcPredicate; import io.trino.orc.TupleDomainOrcPredicate.TupleDomainOrcPredicateBuilder; import io.trino.orc.metadata.OrcType; +import io.trino.parquet.BloomFilterStore; import io.trino.parquet.Field; import io.trino.parquet.ParquetCorruptionException; import io.trino.parquet.ParquetDataSource; @@ -131,6 +132,7 @@ import static io.trino.orc.OrcReader.INITIAL_BATCH_SIZE; import static io.trino.orc.OrcReader.ProjectedLayout; import static io.trino.orc.OrcReader.fullyProjectedLayout; +import static io.trino.parquet.BloomFilterStore.getBloomFilterStore; import static io.trino.parquet.ParquetTypeUtils.getColumnIO; import static io.trino.parquet.ParquetTypeUtils.getDescriptors; import static io.trino.parquet.predicate.PredicateUtils.buildPredicate; @@ -156,6 +158,7 @@ import static io.trino.plugin.iceberg.IcebergSessionProperties.isParquetOptimizedNestedReaderEnabled; import static io.trino.plugin.iceberg.IcebergSessionProperties.isParquetOptimizedReaderEnabled; import static io.trino.plugin.iceberg.IcebergSessionProperties.isUseFileSizeFromMetadata; +import static io.trino.plugin.iceberg.IcebergSessionProperties.useParquetBloomFilter; import static io.trino.plugin.iceberg.IcebergSplitManager.ICEBERG_DOMAIN_COMPACTION_THRESHOLD; import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue; import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle; @@ -510,6 +513,7 @@ public ReaderPageSourceWithRowPositions createDataPageSource( .withMaxReadBlockSize(getParquetMaxReadBlockSize(session)) .withMaxReadBlockRowCount(getParquetMaxReadBlockRowCount(session)) .withBatchColumnReaders(isParquetOptimizedReaderEnabled(session)) + .withBloomFilter(useParquetBloomFilter(session)) .withBatchNestedColumnReaders(isParquetOptimizedNestedReaderEnabled(session)), predicate, fileFormatDataSourceStats, @@ -927,8 +931,10 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource( List blocks = new ArrayList<>(); for (BlockMetaData block : parquetMetadata.getBlocks()) { long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); + Optional bloomFilterStore = getBloomFilterStore(dataSource, block, parquetTupleDomain, options); + if (start <= firstDataPage && firstDataPage < start + length && - predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain, Optional.empty(), Optional.empty(), UTC, ICEBERG_DOMAIN_COMPACTION_THRESHOLD)) { + predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain, Optional.empty(), bloomFilterStore, UTC, ICEBERG_DOMAIN_COMPACTION_THRESHOLD)) { blocks.add(block); blockStarts.add(nextStart); if (startRowPosition.isEmpty()) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java index 498acb0c9afc..7e723d6d8917 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java @@ -67,6 +67,7 @@ public final class IcebergSessionProperties private static final String ORC_WRITER_MAX_STRIPE_ROWS = "orc_writer_max_stripe_rows"; private static final String ORC_WRITER_MAX_DICTIONARY_MEMORY = "orc_writer_max_dictionary_memory"; private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size"; + private static final String PARQUET_USE_BLOOM_FILTER = "parquet_use_bloom_filter"; private static final String PARQUET_MAX_READ_BLOCK_ROW_COUNT = "parquet_max_read_block_row_count"; private static final String PARQUET_OPTIMIZED_READER_ENABLED = "parquet_optimized_reader_enabled"; private static final String PARQUET_OPTIMIZED_NESTED_READER_ENABLED = "parquet_optimized_nested_reader_enabled"; @@ -197,6 +198,11 @@ public IcebergSessionProperties( "Parquet: Maximum size of a block to read", parquetReaderConfig.getMaxReadBlockSize(), false)) + .add(booleanProperty( + PARQUET_USE_BLOOM_FILTER, + "Parquet: Enable using Parquet bloom filter", + parquetReaderConfig.isUseBloomFilter(), + false)) .add(integerProperty( PARQUET_MAX_READ_BLOCK_ROW_COUNT, "Parquet: Maximum number of rows read in a batch", @@ -432,6 +438,11 @@ public static int getParquetWriterBatchSize(ConnectorSession session) return session.getProperty(PARQUET_WRITER_BATCH_SIZE, Integer.class); } + public static boolean useParquetBloomFilter(ConnectorSession session) + { + return session.getProperty(PARQUET_USE_BLOOM_FILTER, Boolean.class); + } + public static Duration getDynamicFilteringWaitTimeout(ConnectorSession session) { return session.getProperty(DYNAMIC_FILTERING_WAIT_TIMEOUT, Duration.class); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetWithBloomFilters.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetWithBloomFilters.java new file mode 100644 index 000000000000..54b3c936b1d1 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetWithBloomFilters.java @@ -0,0 +1,71 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.hive.TestingHivePlugin; +import io.trino.spi.connector.CatalogSchemaTableName; +import io.trino.spi.connector.SchemaTableName; +import io.trino.testing.BaseTestParquetWithBloomFilters; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; + +import java.nio.file.Path; +import java.util.List; + +import static io.trino.plugin.hive.parquet.TestHiveParquetWithBloomFilters.writeParquetFileWithBloomFilter; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static java.lang.String.format; + +public class TestIcebergParquetWithBloomFilters + extends BaseTestParquetWithBloomFilters +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + DistributedQueryRunner queryRunner = IcebergQueryRunner.builder().build(); + dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data"); + + // create hive catalog + queryRunner.installPlugin(new TestingHivePlugin()); + queryRunner.createCatalog("hive", "hive", ImmutableMap.builder() + .put("hive.metastore", "file") + .put("hive.metastore.catalog.dir", dataDirectory.toString()) + .put("hive.security", "allow-all") + .buildOrThrow()); + + return queryRunner; + } + + @Override + protected CatalogSchemaTableName createParquetTableWithBloomFilter(String columnName, List testValues) + { + // create the managed table + String tableName = "parquet_with_bloom_filters_" + randomNameSuffix(); + CatalogSchemaTableName hiveCatalogSchemaTableName = new CatalogSchemaTableName("hive", new SchemaTableName("tpch", tableName)); + CatalogSchemaTableName icebergCatalogSchemaTableName = new CatalogSchemaTableName("iceberg", new SchemaTableName("tpch", tableName)); + assertUpdate(format("CREATE TABLE %s (%s INT) WITH (format = 'PARQUET')", hiveCatalogSchemaTableName, columnName)); + + // directly write data to the managed table + Path tableLocation = Path.of("%s/tpch/%s".formatted(dataDirectory, tableName)); + Path fileLocation = tableLocation.resolve("bloomFilterFile.parquet"); + writeParquetFileWithBloomFilter(fileLocation.toFile(), columnName, testValues); + + // migrate the hive table to the iceberg table + assertUpdate("CALL iceberg.system.migrate('tpch', '" + tableName + "', 'false')"); + + return icebergCatalogSchemaTableName; + } +}